Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flux.mergeSequential does not subscribe to last Producer in specific circumstances #3802

Open
jmmerz opened this issue Apr 30, 2024 · 1 comment

Comments

@jmmerz
Copy link

jmmerz commented Apr 30, 2024

Problem Description

I'm upgrading an application to take Reactor from 3.4.x to the latest (3.6.5) and have run into this problem:

If I invoke Flux.mergeSequential(Publisher<? extends I>... sources) like:

Flux<String> = Flux.mergeSequential(producer1, producer2, ..., producerN, monoFinal);

where producer1, producer2, ..., producerN have code that prepares an operation and then awaits a signal, and monoFinal is a Mono.fromCallable() containing code that sends the signal to allow the code run by release the first N Producers to continue and complete.

It seems the requirements to trigger the unexpected behavior are:

  1. The first N Producers must begin upon subscription and wait for a signal from the last one.
  2. The last Producer must be a Mono.fromCallable(). I've found that if I use a Mono.fromRunnable() instead, the contained Runnable is executed, while in the former case, the Callable is not executed.

Background (my specific use case for the above)

I have some code structured as described above where producer1, producer2, ..., producerN send commands to a Redis client that is configured not to flush commands over the wire immediately, and monoFinal is a Mono.fromCallable() that tells the client to flush the commands so that they are sent over the wire as a batch after the last Mono runs.

When invoked like this, producer1, producer2, ..., producerN are all subscribed by the merged Flux, and their code waits for the signal sent by monoFinal, but monoFinal is not subscribed - at least its Callable is never executed.

I've developed a simplified test case not requiring external services/libraries to demonstrate what's happening (link below in Steps to Reproduce).

Expected Behavior

The merged Flux should subscribe to all Producers eagerly and their emitted values should be produced in order.

Actual Behavior

The first N Producers are subscribed, but the last Mono appears never to be subscribed (or at least it's Callable is never never executed).

Steps to Reproduce

I've created a small project with a test case to Github. If it is preferable to deliver test code in another way, please let me know, I'm happy to oblige. See the project in test-reactor-flux-mergeSequential:

  • MergeSequentialUnitTest contains three tests:
    1. test_mergeSequential_failsWhenDelayed: Demonstrates the error case described above
    2. test_mergeSequential_succeedsWhenNotDelayed: Demonstrates that all Producers complete if the initial producers are not dependent on the last producer to complete.
    3. test_mergeSequential_succeedsWhenDelayedWithDoOnSubscribeWorkaround: Demonstrates the workaround described below of applying Mono.doOnSubscribe(Consumer<? super Subscription> onSubscribe) to monoFinal. (Mono.log() works as well.)
  • build.gradle.kts#L16-L29 contains the current non-working version and commented out inclusions of first non-working version and last working version

Possible Solution

I don't have a solution, but I've found a workaround (explained below) in case it helps.

Workaround 1

If I redefine monoFinal using Mono.fromRunnable() in place of the existing Mono.fromCallable(), things work as expected.

Workaround 2

(This workaround seems less good, but I'm leaving it documented in case it helps narrow down the cause.)

If I invoke Mono.doOnSubscribe(Consumer<? super Subscription> onSubscribe) on monoFinal even with a No-Op Consumer, it causes things to work.

The same thing happens with Mono.log() which I suspect is due to adding signal handling for onSubscribe as well. Other error handlers (onCancel, onError, onTerminate) have no effect.

Your Environment

  • Reactor version(s) used:
    • Reactor version in which I found the issue: 3.6.5
    • First Reactor version showing the issue: 3.5.0
    • Last working Reactor version: 3.4.37
  • Other relevant libraries versions (eg. netty, ...):
    • Able to reproduce without including other libraries
  • JVM version (java -version):
    • Java 21 (also does not work in Java 17)
  • OS and version (eg uname -a):
    • Linux [Hostname omitted] 3.10.0-1160.114.2.el7.x86_64 #​1 SMP Wed Mar 20 15:54:52 UTC 2024 x86_64 x86_64 x86_64 GNU/Linux
    • Windows 10/WSL: Linux [Hostname omitted] 5.15.146.1-microsoft-standard-WSL2 #​1 SMP Thu Jan 11 04:09:03 UTC 2024 x86_64 x86_64 x86_64 GNU/Linux
@jmmerz jmmerz changed the title Flux.mergeSequential appears not subscribe to last Producer in specific circumstances Flux.mergeSequential does not subscribe to last Producer in specific circumstances May 1, 2024
@jmmerz
Copy link
Author

jmmerz commented May 1, 2024

Update: I've discovered that the problem appears related specifically to using a Mono.fromCallable() as the final producer for the Flux.mergeSequential(). If I instead use a Mono.fromRunnable() the code works as expected.

I've updated the description and workarounds in the original ticket accordingly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant