Skip to content

Commit

Permalink
GROUP-89 Updated onRequest DormantState
Browse files Browse the repository at this point in the history
  • Loading branch information
makmn1 committed Mar 19, 2024
1 parent 30263d0 commit 87adc0d
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import lombok.extern.slf4j.Slf4j;
import org.grouphq.groupsync.config.ClientProperties;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;

/**
* State representing when {@link GroupInitialStateService} is waiting for a client to trigger the initialization.
Expand All @@ -30,21 +29,21 @@ public DormantState(GroupInitialStateService groupInitialStateService, ClientPro
*/
@Override
public Mono<Void> onRequest() {
initialRequest.compareAndSet(null,
groupInitialStateService.initializeGroupState()
.doFinally(signalType -> {
if (signalType == SignalType.ON_COMPLETE) {
groupInitialStateService.setState(new ReadyState(groupInitialStateService));
} else if (signalType == SignalType.ON_ERROR) {
groupInitialStateService.setState(
new DormantState(groupInitialStateService, clientProperties));
}
}).cache()
);

final Mono<Void> cachedRequest = initialRequest.get();

groupInitialStateService.setState(new LoadingState(groupInitialStateService, cachedRequest));
return cachedRequest;
return Mono.defer(() -> {
initialRequest.compareAndSet(null,
groupInitialStateService.initializeGroupState()
.doOnSuccess(unused -> groupInitialStateService.setState(
new ReadyState(groupInitialStateService)))
.doOnError(error -> groupInitialStateService.setState(
new DormantState(groupInitialStateService, clientProperties)))
.cache());

final Mono<Void> cachedRequest = initialRequest.get();

groupInitialStateService.setState(new LoadingState(groupInitialStateService, cachedRequest));

return cachedRequest;
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,13 @@ void whenInReadyStateThenRespondToEventUpdates() {
TODO: Non-deterministic and subject to flakiness. Need to find a better way to test this.
The issue here is that the sink emission may not cause the current state of events to update
before requesting them again if the update takes too long.
If you remove the delayElement call, this test will likely pass when run in isolation and with
If you remove the delaySubscription call, this test will likely pass when run in isolation and with
other tests in this file. But when running all unit tests, it usually fails.
*/
StepVerifier.create(groupInitialStateService.requestCurrentEvents()
.then(Mono.fromRunnable(() -> updateSink.tryEmitNext(publicOutboxEventDisbanded)))
.delayElement(Duration.of(2500, ChronoUnit.MILLIS))
.thenMany(groupInitialStateService.requestCurrentEvents())
.thenMany(groupInitialStateService.requestCurrentEvents()
.delaySubscription(Duration.of(1000, ChronoUnit.MILLIS)))
)
.verifyComplete();
}
Expand Down

0 comments on commit 87adc0d

Please sign in to comment.