Skip to content

Commit

Permalink
Merge pull request #23 from GroupHQ/GROUP-101-Fix-Broken-Consumer-For…
Browse files Browse the repository at this point in the history
…-Group-Sync

GROUP-101 Updated Event Consumer to Drop Errors
  • Loading branch information
makmn1 committed Mar 21, 2024
2 parents 1c59ae8 + f239d53 commit ad8a63c
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 35 deletions.
2 changes: 1 addition & 1 deletion config/pmd/design.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
<rule ref="category/java/design.xml/TooManyFields" />
<rule ref="category/java/design.xml/TooManyMethods">
<properties>
<property name="maxmethods" value="20" />
<property name="maxmethods" value="25" />
</properties>
</rule>
<rule ref="category/java/design.xml/UselessOverridingMethod" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,23 @@ public GroupEventForwarder(GroupUpdateService groupUpdateService) {
@Bean
public Consumer<Flux<OutboxEvent>> processedEvents() {
return outboxEvents ->
outboxEvents.flatMap(this::forwardUpdate)
.doOnError(throwable -> log.error("Error while forwarding events. "
+ "Attempting to resume. Error: {}", throwable.getMessage()))
.onErrorResume(throwable -> Flux.empty())
outboxEvents.flatMap(outboxEvent ->
forwardUpdate(outboxEvent)
.doOnError(throwable -> log.error("Error while forwarding events. "
+ "Attempting to resume. Error: {}", throwable.getMessage()))
.onErrorResume(throwable -> Mono.empty())
)
.subscribe();
}

private Mono<Void> forwardUpdate(OutboxEvent outboxEvent) {
switch (outboxEvent.getEventStatus()) {
case SUCCESSFUL -> {
groupUpdateService.sendPublicOutboxEventToAll(
PublicOutboxEvent.convertOutboxEvent(outboxEvent));
groupUpdateService.sendOutboxEventToEventOwner(OutboxEvent.convertEventDataToPublic(outboxEvent));
}
return Mono.defer(() -> switch (outboxEvent.getEventStatus()) {
case SUCCESSFUL ->
groupUpdateService.sendPublicOutboxEventToAll(PublicOutboxEvent.convertOutboxEvent(outboxEvent))
.then(groupUpdateService.sendOutboxEventToEventOwner(
OutboxEvent.convertEventDataToPublic(outboxEvent)));
case FAILED -> groupUpdateService
.sendOutboxEventToEventOwner(OutboxEvent.convertEventDataToPublic(outboxEvent));
default -> log.error("Unknown event status: {}", outboxEvent.getEventStatus());
}

return Mono.empty();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.springframework.stereotype.Service;
import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/**
Expand Down Expand Up @@ -36,14 +37,20 @@ public Flux<OutboxEvent> eventOwnerUpdateStream() {
.onBackpressureBuffer(100, BufferOverflowStrategy.DROP_OLDEST);
}

public void sendPublicOutboxEventToAll(PublicOutboxEvent outboxEvent) {
final Sinks.EmitResult result = publicUpdatesSink.tryEmitNext(outboxEvent);
emitResultLogger("PUBLIC", outboxEvent, result);
public Mono<Void> sendPublicOutboxEventToAll(PublicOutboxEvent outboxEvent) {
return Mono.defer(() ->
Mono.just(publicUpdatesSink.tryEmitNext(outboxEvent))
.doOnNext(result -> emitResultLogger("PUBLIC", outboxEvent, result))
.then()
);
}

public void sendOutboxEventToEventOwner(OutboxEvent outboxEvent) {
final Sinks.EmitResult result = userUpdatesSink.tryEmitNext(outboxEvent);
emitResultLogger(outboxEvent.getEventStatus().toString(), outboxEvent, result);
public Mono<Void> sendOutboxEventToEventOwner(OutboxEvent outboxEvent) {
return Mono.defer(() ->
Mono.just(userUpdatesSink.tryEmitNext(outboxEvent))
.doOnNext(result -> emitResultLogger("USER", outboxEvent, result))
.then()
);
}

private void emitResultLogger(String eventName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ public static OutboxEvent of(UUID eventId, Long aggregateId, AggregateType aggre
}

public static OutboxEvent convertEventDataToPublic(OutboxEvent outboxEvent) {
if (outboxEvent.getEventStatus() == EventStatus.FAILED) {
return outboxEvent;
}

return switch (outboxEvent.getEventType()) {
case MEMBER_JOINED, MEMBER_LEFT -> convertMember(outboxEvent);
default -> outboxEvent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

import org.grouphq.groupsync.GroupTestUtility;
import org.grouphq.groupsync.groupservice.domain.members.Member;
import org.grouphq.groupsync.groupservice.domain.outbox.ErrorData;
import org.grouphq.groupsync.groupservice.domain.outbox.OutboxEvent;
import org.grouphq.groupsync.groupservice.domain.outbox.enums.EventStatus;
import org.grouphq.groupsync.groupservice.domain.outbox.enums.EventType;
import org.grouphq.groupsync.groupservice.web.objects.egress.PublicMember;
import org.junit.jupiter.api.DisplayName;
Expand Down Expand Up @@ -35,4 +37,16 @@ void convertsOutboxEventMemberEventDataForMemberLeftToPublicType() {

assertThat(convertedOutboxEvent.getEventData()).isExactlyInstanceOf(PublicMember.class);
}

@Test
@DisplayName("Does not perform any conversion for failed events")
void doesNotPerformAnyConversionForFailedEvents() {
final ErrorData errorData = new ErrorData("Error message");
final OutboxEvent outboxEvent = GroupTestUtility.generateOutboxEvent(
1L, errorData, EventType.MEMBER_JOINED, EventStatus.FAILED);

final OutboxEvent nonConvertedEvent = OutboxEvent.convertEventDataToPublic(outboxEvent);

assertThat(nonConvertedEvent).isEqualTo(outboxEvent);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,12 @@ void forwardsPrivateMemberEventsToUserWithPublicEventDataModels() {
@DisplayName("Forwards events to the outbox event update failed sink")
void forwardsEventsToTheOutboxEventUpdateFailedSink() {
final List<OutboxEvent> outboxEvents = List.of(
GroupTestUtility.generateOutboxEvent(USER, EventStatus.FAILED),
GroupTestUtility.generateOutboxEvent(USER, EventStatus.FAILED),
GroupTestUtility.generateOutboxEvent(USER, EventStatus.FAILED)
GroupTestUtility.generateOutboxEvent(USER, EventStatus.FAILED, EventType.GROUP_CREATED),
GroupTestUtility.generateOutboxEvent(USER, EventStatus.FAILED, EventType.GROUP_UPDATED),
GroupTestUtility.generateOutboxEvent(USER, EventStatus.FAILED, EventType.GROUP_DISBANDED),
GroupTestUtility.generateOutboxEvent(USER, EventStatus.FAILED, EventType.MEMBER_JOINED),
GroupTestUtility.generateOutboxEvent(USER, EventStatus.FAILED, EventType.MEMBER_LEFT),
GroupTestUtility.generateOutboxEvent(USER, EventStatus.FAILED, EventType.NOTHING)
);

final Flux<OutboxEvent> groupUpdatesStream =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.grouphq.groupsync.group.event;

import static org.mockito.BDDMockito.willDoNothing;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.verify;

import org.grouphq.groupsync.GroupTestUtility;
Expand All @@ -16,6 +16,7 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Tag("UnitTest")
@ExtendWith(MockitoExtension.class)
Expand All @@ -35,8 +36,8 @@ void forwardsSuccessfulEventsToTheUpdatesSink() {
final PublicOutboxEvent publicOutboxEvent =
PublicOutboxEvent.convertOutboxEvent(outboxEvent);

willDoNothing().given(groupUpdateService).sendPublicOutboxEventToAll(publicOutboxEvent);
willDoNothing().given(groupUpdateService).sendOutboxEventToEventOwner(outboxEvent);
given(groupUpdateService.sendPublicOutboxEventToAll(publicOutboxEvent)).willReturn(Mono.empty());
given(groupUpdateService.sendOutboxEventToEventOwner(outboxEvent)).willReturn(Mono.empty());

groupEventForwarder.processedEvents().accept(Flux.just(outboxEvent));

Expand All @@ -50,7 +51,7 @@ void forwardsFailedEventsToTheUpdatesFailedSink() {
final OutboxEvent outboxEvent =
GroupTestUtility.generateOutboxEvent("ID", EventStatus.FAILED);

willDoNothing().given(groupUpdateService).sendOutboxEventToEventOwner(outboxEvent);
given(groupUpdateService.sendOutboxEventToEventOwner(outboxEvent)).willReturn(Mono.empty());

groupEventForwarder.processedEvents().accept(Flux.just(outboxEvent));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

@Tag("UnitTest")
Expand All @@ -36,8 +37,11 @@ void updatesSinkWithNewOutboxEventsAndEmitsThem() {
final PublicOutboxEvent publicOutboxEvent =
PublicOutboxEvent.convertOutboxEvent(outboxEvent);

StepVerifier.create(groupUpdateService.publicUpdatesStream())
.then(() -> groupUpdateService.sendPublicOutboxEventToAll(publicOutboxEvent))
StepVerifier.create(groupUpdateService.publicUpdatesStream()
.publishOn(Schedulers.boundedElastic())
.doOnSubscribe(subscription ->
groupUpdateService.sendPublicOutboxEventToAll(publicOutboxEvent).subscribe())
)
.expectNext(publicOutboxEvent)
.thenCancel()
.verify(Duration.ofSeconds(1));
Expand All @@ -56,8 +60,12 @@ void updatesSinkWithFailedOutboxEventsAndEmitsThem() {
final OutboxEvent outboxEvent =
GroupTestUtility.generateOutboxEvent("ID", EventStatus.FAILED);

StepVerifier.create(groupUpdateService.eventOwnerUpdateStream())
.then(() -> groupUpdateService.sendOutboxEventToEventOwner(outboxEvent))
StepVerifier.create(
groupUpdateService.eventOwnerUpdateStream()
.publishOn(Schedulers.boundedElastic())
.doOnSubscribe(subscription ->
groupUpdateService.sendOutboxEventToEventOwner(outboxEvent).subscribe())
)
.expectNext(outboxEvent)
.thenCancel()
.verify(Duration.ofSeconds(1));
Expand Down
68 changes: 65 additions & 3 deletions src/testFixtures/java/org/grouphq/groupsync/GroupTestUtility.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.grouphq.groupsync.groupservice.domain.groups.GroupStatus;
import org.grouphq.groupsync.groupservice.domain.members.Member;
import org.grouphq.groupsync.groupservice.domain.members.MemberStatus;
import org.grouphq.groupsync.groupservice.domain.outbox.ErrorData;
import org.grouphq.groupsync.groupservice.domain.outbox.EventDataModel;
import org.grouphq.groupsync.groupservice.domain.outbox.OutboxEvent;
import org.grouphq.groupsync.groupservice.domain.outbox.enums.AggregateType;
Expand Down Expand Up @@ -134,7 +135,7 @@ public static Group generateFullGroupDetailsWithMembers(GroupStatus status) {

final Set<PublicMember> members = new HashSet<>();
for (int i = 0; i < maxCapacity / 2; i++) {
final Member member = generateFullMemberDetails(faker.name().username(), groupId);
final Member member = generateFullMemberDetails(faker.name().firstName(), groupId);
final PublicMember publicMember = Member.toPublicMember(member);
members.add(publicMember);
}
Expand Down Expand Up @@ -435,8 +436,9 @@ public static OutboxEvent generateOutboxEvent() {
}

/**
* Generates an outbox event with the given EventDataModel.
* Generates an outbox event with the given parameters.
*
* @param aggregateId the aggregate ID to use
* @param eventData the event data to use
* @param eventType the event type to use
*
Expand All @@ -457,11 +459,42 @@ public static OutboxEvent generateOutboxEvent(
);
}

/**
* Generates an outbox event with the given parameters.
*
* @param aggregateId the aggregate ID to use
* @param eventData the event data to use
* @param eventType the event type to use
* @param eventStatus the event status to use
*
* @return an OutboxEvent object with all details.
*/
public static OutboxEvent generateOutboxEvent(
Long aggregateId, EventDataModel eventData, EventType eventType, EventStatus eventStatus) {

return new OutboxEvent(
UUID.randomUUID(),
aggregateId,
UUID.randomUUID().toString(),
AggregateType.GROUP,
eventType,
eventData,
eventStatus,
Instant.now()
);
}

/**
* Overloaded method for {@link #generateOutboxEvent()} ()}.
*/
public static OutboxEvent generateOutboxEvent(String webSocketId, EventStatus eventStatus) {
final EventDataModel eventData = GroupTestUtility.generateFullGroupDetails(GroupStatus.ACTIVE);
EventDataModel eventData;

if (eventStatus == EventStatus.FAILED) {
eventData = new ErrorData("Error message");
} else {
eventData = GroupTestUtility.generateFullGroupDetails(GroupStatus.ACTIVE);
}

return new OutboxEvent(
UUID.randomUUID(),
Expand All @@ -474,4 +507,33 @@ public static OutboxEvent generateOutboxEvent(String webSocketId, EventStatus ev
Instant.now()
);
}

/**
* Overloaded method for {@link #generateOutboxEvent()} ()}.
*/
public static OutboxEvent generateOutboxEvent(String webSocketId, EventStatus eventStatus, EventType eventType) {
EventDataModel eventData;

if (eventStatus == EventStatus.FAILED) {
eventData = new ErrorData("Error message");
} else {
eventData = switch (eventType) {
case GROUP_CREATED -> GroupTestUtility.generateFullGroupDetails(GroupStatus.ACTIVE);
case GROUP_UPDATED, GROUP_DISBANDED -> GroupTestUtility.generateFullGroupDetails(GroupStatus.DISBANDED);
case MEMBER_JOINED, MEMBER_LEFT -> GroupTestUtility.generateFullMemberDetails();
case NOTHING -> null;
};
}

return new OutboxEvent(
UUID.randomUUID(),
FAKER.number().randomNumber(12, true),
webSocketId,
AggregateType.GROUP,
eventType,
eventData,
eventStatus,
Instant.now()
);
}
}

0 comments on commit ad8a63c

Please sign in to comment.