-
Notifications
You must be signed in to change notification settings - Fork 531
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Happens-before relationship between Publisher#subscribe()
and Subscriber.onSubscribe()
?
#486
Comments
@NiteshKant Isn't this addressed by https://github.com/reactive-streams/reactive-streams-jvm#1.3 ? |
I do not think so. Take the below code as an example: public class UnsafePublisher implements Publisher<String> {
@Override
public void subscribe(final Subscriber<? super String> subscriber) {
// Assume unsafePublish does an unsafe publish of the Subscriber instance
unsafePublish(subscriber);
}
private void unsafePublish(final Subscriber<? super String> subscriber) {
// Assume we are here in a different thread to which subscriber instance was published unsafely.
subscriber.onSubscribe(new Subscription() {
private boolean done;
@Override
public void request(final long n) {
if (!done) {
done = true;
subscriber.onNext("foo");
subscriber.onComplete();
}
}
@Override
public void cancel() {
done = true;
}
});
}
}
However, due to the unsafe publishing of |
@NiteshKant Wouldn't https://github.com/reactive-streams/reactive-streams-jvm#1.9 prevent the onSubscribe to be executed on some other thread, as it would not be executed under |
https://github.com/reactive-streams/reactive-streams-jvm#1.9 is focused on the ordering of methods invoked on the <1.12, or existing rule(s)?>: rational: |
I don’t think 1.9 disallows The intent here is to see whether this aspect should be covered by the spec. |
So far the creation of protocol entities as well as their internal structure and communication is not mentioned in the spec. It is reasonable to assume that a Subscriber does not need to be thread-safe given the provisions in §1.9, the same as objects delivered to @NiteshKant are you aware of any implementations that would be affected by such a new rule? Side note: §1.9 is indeed ambiguous on whether |
@rkuhn I would agree that it will be reckless to use unsafe publication of a The objects delivered to
I do not know of any implementation that would be negatively affected but I can point you to this change in ServiceTalk that will be positively impacted with regards to clarity in memory visibility.
Actually I thought it is intentional for the spec to allow for delayed |
@rkuhn @NiteshKant You could also argue that since the spec doesn't mandate that creation of any of Publisher, Subscriber, Subscription, and Processor is safely published, the Publisher cannot assume that it can unsafely publish a Subscriber. |
Yes agreed! The question here is whether we should make this explicit in the spec that a |
@NiteshKant I guess we could clarify the intent of 1.9? |
Yes that sounds like something that will be beneficial. Should I take a stab at the clarification? |
@NiteshKant Yes, please do! :) |
Motivation: Publisher#flatMapSingle has the Subscription from onSubscribe saved to a volatile variable. This was due to the Subscription being in the context of the mapped Single's Subscriber termination callbacks and ambiguity in the Reactive Streams specification about visibility related to Publisher#subscribe(..) and Subscriber state. However recent [discussions](reactive-streams/reactive-streams-jvm#486) have provided more insight that the Publisher must provide visibility, and therefore the volatile state is not necessary. Modifications: - Publisher#flatMapSingle subscription member variable can be non-volatile Result: Less volatile state and more clear expecations.
Motivation: Publisher#flatMapSingle has the Subscription from onSubscribe saved to a volatile variable. This was due to the Subscription being in the context of the mapped Single's Subscriber termination callbacks and ambiguity in the Reactive Streams specification about visibility related to Publisher#subscribe(..) and Subscriber state. However recent [discussions](reactive-streams/reactive-streams-jvm#486) have provided more insight that the Publisher must provide visibility, and therefore the volatile state is not necessary. Modifications: - Publisher#flatMapSingle subscription member variable can be non-volatile Result: Less volatile state and more clear expecations.
I was just about to post a question regarding rule 1.9, but I saw this issue is very related. Is The rule currently says
The current language seems to only demand a synchronous callback if |
@devsr that is my understanding of the rule too that |
@NiteshKant I'm interested in understanding the offload use-case, could you point to some code or otherwise exemplify? |
Rule 1.9 defines that `onSubscribe()` on a `Subscriber` should be called before calling any other method on that `Subscriber`. However, this does not clarify that if `onSubscribe()` is signalled asynchronously then there should be a happens-before relationship between `subscribe()` and signalling of `onSubscribe()`. In absence of such a guarantee, non-final fields initialized in a constructor have no guarantee to be visible inside `onSubscribe()`. Considering a simple example: public class UnsafeSubscriber implements Subscriber<String> { private boolean duplicateOnSubscribe = false; @OverRide public void onSubscribe(final Subscription s) { if (duplicateOnSubscribe) { throw new IllegalStateException("Duplicate onSubscribe() calls."); } duplicateOnSubscribe = true; } @OverRide public void onNext(final String s) { } @OverRide public void onError(final Throwable t) { } @OverRide public void onComplete() { } } If an UnsafeSubscriber instance is created in a different thread than the one that invokes onSubscribe() (true for an asynchronous Publisher), according to the java memory model, this statement inside onSubscribe(): if (duplicateOnSubscribe) { is guaranteed to compute to false if and only if the instance is published safely between these threads. None of the rules in the specifications establish a happens-before relationship between Publisher#subscribe() and Subscriber#onSubscribe(). So, the usage above can be categorized as unsafe. In a more convoluted form, the assignment: private boolean duplicateOnSubscribe = false; can be interleaved with duplicateOnSubscribe = true; such that duplicateOnSubscribe is set to false later. Has this been considered before or am I missing something? Fixes reactive-streams#486
Coming back to this makes me think: should we perhaps add an overarching clarification that conformance to this spec does not imply race freedom as defined by the JMM? To my mind such a guarantee was never among our goals, and to me it is unnatural to assume that the spec of a communication scheme that is independent of the concrete message transport guarantees low-level properties like safe publication. If someone chooses to use unsafe publication, they live outside many specifications, including the JMM and Reactive Streams. In spec parlance: “Using unsafe publication for any communication between Publisher, Subscriber, and Subscription leads to undefined behavior under this specification.” My guess is that if every specification in the Java ecosystem were checked against this measure, then we’d see this disclaimer on all of them. |
@rkuhn I'm inclined to agree with your assessment. Relying on unsafe publication to yield safe operational semantics would be an awkward position to take. |
@rkuhn @viktorklang the unsafe publication bit maybe tripping y'all off more that what I desired. The suggestion here is in similar spirits as rule 2.11.
Would adding such an overarching clarification make this rule void?
I should have been more explicit about this to start, sorry about that. Let me start with a simple example and I can point to concrete features from some reactive libraries, if the usecase is not pursuasive enough. Lets assume a generic library is providing a feature to create a public class OffloadingPublisher<T> implements Publisher<T> {
private final Executor executor;
private final Callable<T> producer;
public OffloadingPublisher(final Executor executor, final Callable<T> producer) {
this.executor = executor;
this.producer = producer;
}
@Override
public void subscribe(final Subscriber<? super T> s) {
executor.execute(new Runnable() {
private T result; // Ignoring multi-threaded access for simplicity
@Override
public void run() {
s.onSubscribe(new Subscription() {
@Override
public void request(final long n) {
// atomically check and deliver result if not delivered
}
@Override
public void cancel() {
}
});
T result = producer.call();
// atomically check and deliver result if already requested and not delivered
}
});
}
} One could argue that instead of offloading |
@NiteshKant Thanks for clarifying, Nitesh! Would your problem be solved by instead moving the Thread-offloading to be initialized from within the Subscription instead? Based on the use-case you may not want to trigger it until For the case where you want to immediately (eagerly) get the value out of the callable, you can after calling onSubscribe(subscription) kick off the computation by invoking your own init-method on your subscriber implementation. |
@viktorklang Most definitely thats another way of implementing this The point I am demonstrating here is; spec enforcing that |
@NiteshKant The main benefit of the rule is to make sure of one thing: The reason for this being important is that you don't want to risk having code where associations get lost and there is no way of even starting to try to figure that out. More often than not, the caller is in control of the Subscriber, but the Publisher implementation is provided by third party, so it would be tricky to debug by using a different Subscriber. |
@viktorklang I think there is some disconnect here.
From your comment it looks like you are saying that the rule already ensures that |
I guess the intent here is to clarify that there should not be any possibility to offload Imaging a case, you offloading Thus, I will clarify the @viktorklang statement.
When the |
The real question is what callers expect to have happened when In either case, whether we want onSubscribe to be invoked synchronously during subscribe or not, it should be clarified in §1.9. |
My expectation has been |
@OlegDokuka For the sync case, I believe the rule could be clarified using something like: Publisher.subscribe MUST call onSubscribe on the provided Subscriber, and do so prior to any other signals to that Subscriber, and MUST return normally, except when the provided Subscriber is null in which case it MUST throw a java.lang.NullPointerException to the caller, for all other situations the only legal way to signal failure (or reject the Subscriber) is by calling onError (after calling onSubscribe). |
sounds good to me. I guess we can enforce it saying synchronously explicitly (we have that in vocabulary, so it should be fine for those may endup thinking that offloading onSubscribe to eventloop may be legal as well) E.g.
|
Considering a simple example:
If an
UnsafeSubscriber
instance is created in a different thread than the one that invokesonSubscribe()
(true for an asynchronousPublisher
), according to the java memory model, this statement insideonSubscribe()
:if (duplicateOnSubscribe) {
is guaranteed to compute to
false
if and only if the instance is published safely between these threads. None of the rules in the specifications establish a happens-before relationship betweenPublisher#subscribe()
andSubscriber#onSubscribe()
. So, the usage above can be categorized as unsafe. In a more convoluted form, the assignment:private boolean duplicateOnSubscribe = false;
can be interleaved with
duplicateOnSubscribe = true;
such thatduplicateOnSubscribe
is set tofalse
later.Has this been considered before or am I missing something?
The text was updated successfully, but these errors were encountered: