Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error handling for streaming reactive controllers #10058

Draft
wants to merge 2 commits into
base: 4.5.x
Choose a base branch
from

Conversation

jeremyg484
Copy link
Contributor

Error handling for streaming reactive controller methods, such as those that use a Flux as the return type, is
improved such that immediate error signals in the stream that occur before any data has been written will be routed
appropriately to any user-supplied @Error handler methods.

A new test specification is added to cover this scenario, and the TCK's ErrorHandlerFluxTest is improved to cover
the enhanced error handling.

This resolves micronaut-projects/micronaut-reactor#238

Error handling for streaming reactive controller methods, such as those
that use a `Flux` as the return type, is improved such that immediate
error signals in the stream that occur before any data has been written
will be routed appropriately to any user-supplied `@Error` handler
methods.

A new test specification is added to cover this scenario, and the TCK's
`ErrorHandlerFluxTest` is improved to cover the enhanced error handling.
@jeremyg484 jeremyg484 changed the base branch from 4.2.x to 4.1.x November 1, 2023 23:27
@jeremyg484 jeremyg484 self-assigned this Nov 1, 2023
@yawkat yawkat self-requested a review November 2, 2023 07:08
}
bufferWritten = true;
bufferedFirstValue = null;
subscriber.request(Long.MAX_VALUE);
Copy link
Member

@yawkat yawkat Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't do that, backpressure must be maintained

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this was a mistake. I've replace this line with an onRequest callback on the sink that will still respect demand.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can still lose requests when the subscription happens after the downstream request

}

@SuppressWarnings("unchecked")
private CorePublisher<MutableHttpResponse<?>> chunkedResponsePublisher(PropagatedContext propagatedContext,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs to be thread safe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yawkat Maybe I'm missing something...how is this method not thread-safe?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for example the second accept can be called concurrently with any of the methods of StreamSubscriber

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you mean the publisher itself. That is a good point. I don't believe that will currently happen just because of the way the publisher is being consumed by the rest of the pipeline, but that is definitely too unsafe an assumption.

@yawkat
Copy link
Member

yawkat commented Nov 2, 2023

The general idea is correct but implementing a custom reactive publisher is complicated wrt concurrency, backpressure, nesting etc.. We regularly have bugs like #10017 even with simpler code than this. This PR needs a better reactive processor (without FluxSink etc bc of backpressure).

Even better would be if you could come up with a solution using the built-in reactor operators. I'm not sure if it's possible, but if this PR used only normal operators, it'd be much easier to be sure it works for all the edge cases.

@jeremyg484
Copy link
Contributor Author

The general idea is correct but implementing a custom reactive publisher is complicated wrt concurrency, backpressure, nesting etc.. We regularly have bugs like #10017 even with simpler code than this. This PR needs a better reactive processor (without FluxSink etc bc of backpressure).

Even better would be if you could come up with a solution using the built-in reactor operators. I'm not sure if it's possible, but if this PR used only normal operators, it'd be much easier to be sure it works for all the edge cases.

@yawkat FluxSink supports backpressure and I have made a quick fix to take advantage of that.

I agree in that I would prefer a simpler solution. I attempted several other approaches purely using the built-in operators before settling on this...they all failed to support this use case in a way that did not result in problems such as the first item in the stream needing to generated twice, etc.

Now that I have a working solution and understand the necessary flow completely, I can revisit it one more time to see if there is a way to reproduce this flow purely using the built-in operators, but I too am skeptical that it is possible.

@jeremyg484
Copy link
Contributor Author

Thinking through this some more, I agree that I need to try harder to simplify this implementation further. I've got some further ideas to try...I'm going to convert this to a draft until I have a chance to explore them.

@jeremyg484 jeremyg484 marked this pull request as draft November 2, 2023 18:33
@CLAassistant
Copy link

CLAassistant commented Feb 7, 2024

CLA assistant check
All committers have signed the CLA.

@jeremyg484 jeremyg484 changed the base branch from 4.1.x to 4.5.x May 3, 2024 15:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: No status
Development

Successfully merging this pull request may close these issues.

Global @Error doesn't work when Controller return is Flux
3 participants