-
Notifications
You must be signed in to change notification settings - Fork 0
/
13-DomainEventPublisherFactory.java
50 lines (43 loc) · 1.72 KB
/
13-DomainEventPublisherFactory.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@FunctionalInterface
public interface DomainEventPublisher {
Mono<Void> publish(Persistable<UUID> entity, DomainEvent event, UUID userId);
default Mono<Void> publish(Persistable<UUID> entity, DomainEvent event) {
return publish(entity, event, null);
}
}
@AllArgsConstructor
class DomainEventPublisherFactory {
private final KafkaSender<String, Object> kafkaSender;
private final DomainEventTopicResolver topicResolver;
public DomainEventPublisher build() {
return (entity, event, userId) -> kafkaSender.createOutbound()
.send(createProducerRecord(topicResolver.resolve(entity), entity, event, userId))
.then();
}
private Mono<ProducerRecord<String, Object>> createProducerRecord(
String topic,
Persistable<UUID> entity,
Object payload,
UUID userId
) {
return Mono.fromSupplier(() -> {
var rec = new ProducerRecord<>(topic, getIdString(entity), payload);
// TODO(MK): Consider using faster (but less secure) AlternativeJdkIdGenerator
rec.headers().add(DomainEventHeaders.EVENT_ID, toBytes(UUID.randomUUID()));
rec.headers().add(DomainEventHeaders.ENTITY, entity.getClass().getName().getBytes());
Optional.ofNullable(userId)
.map(this::toBytes)
.ifPresent(it -> rec.headers().add(DomainEventHeaders.USER_ID, it));
return rec;
});
}
private byte[] toBytes(UUID id) {
return id.toString().getBytes();
}
private String getIdString(Persistable<UUID> entity) {
return Optional.ofNullable(entity)
.map(Persistable::getId)
.map(UUID::toString)
.orElse(null);
}
}