-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Examples with Interop #5
Comments
/cc @smaldini |
I’m currently looking into trying out some interop with Akka Streams, are artifacts published somewhere? |
You’ll have to hold off. It’s not spec compliant yet. Once it is I’ll get some artefacts published. |
Bummer; I guess I’ll look into ratpack and Reactor then (want to show something at conferences next week). |
I’m actively working on this right now. Should have it working in the next day or two. |
Cool, would you please ping this issue when I should try it out? Thanks! |
What I've just pushed should be compliant and I think is good enough for an initial release. There's probably bugs and there are certainly performance problems, but I think it can go out. @benjchristensen how do we get a binary out? |
I'll push the buttons (since we haven't yet got the fully self-serve Travis based release process working yet like RxScala). |
0.2.0 is released on BinTray (https://bintray.com/reactivex/RxJava/RxJavaReactiveStreams/0.2.0/view) and is making its way to Maven Central. @alkemist Thank you for your work on this. Would you mind providing a section on the README or in the Wiki with basic usage examples? |
Looking forward to it for my next demos :D On Wed, Oct 29, 2014 at 5:23 PM, Ben Christensen [email protected]
Stéphane |
It can be seen on Maven Central now: http://repo1.maven.org/maven2/io/reactivex/rxjava-reactive-streams/0.2.0/ |
Thanks a lot, @benjchristensen and @alkemist! |
Pushed one example of interop with Ratpack: https://github.com/ReactiveX/RxJavaReactiveStreams/blob/0.x/examples/ratpack/src/test/java/rx/reactivestreams/example/ratpack/RatpackExamples.java#L46-46 Leaving this ticket open for more examples and some stuff in the README. |
Cool, nice to see that code, that's very helpful. |
@alkemist I’m trying to extract the sample code you linked to (since I cannot figure out how to run the tests you point to). The problem I am facing is that I cannot figure out how to publish ratpack 0.9.10-SNAPSHOT locally, what is the magic incantation? I tried adding the maven-publish plugin but that does not do anything when I say |
@rkuhn can you use this repo: maven { url "http://oss.jfrog.org/repo" } ? can't remember for sure but I think this is where goes all the ratpack snapshots. BTW if you can share your example I want to complete it with Reactor as I keep pitching : Reactor for the backend access/data layer, Akka Streams to get into an Actor system and scale out, RxJava to bridge with some metrics and especially Hystrix right now, Ratpack to bridge with the HTTP client (WS/ESS) 💃 |
Got it working, thanks! You can find my sample project here. @benjchristensen this might be interesting for you as well. |
@rkuhn Having some hard time configuring the sample build into an IDEA project :( |
Thanks @rkuhn ... that code looks like a good example for my talks next week as well. Do you mind if I use it (and possibly tweak/enhance it)? |
@benjchristensen By all means: use it! @smaldini That is the reason why I created an sbt project, I cannot figure out this gradle thing ;-) You should be able to just add a gradle build if you know better how that works (I don’t use IDEA). |
Beautiful |
I am trying to convert the project to our own sample suite as I am having issues with both. Do you know where I can find 0.9 snapshots for Akka Streams, my build seems to complain about that. |
Ah, yes: I am about to publish a suitable version, should not take more than an hour. |
I'll also propose we all use our own schedulers/dispatchers to make it more obvious that we talk each other handling async back pressure 👯 E.g. rxjava.observeOn(Schedulers.computation()), reactorStream.dispatchOn(new Environment()) |
Akka Streams 0.10-M1 is on its way to Maven Central; I’ll update my code as soon as it is there. |
updated the build and it still works :-) |
Using Akka Streams 0.10-M1 I was playing around and I've found an issue somewhere that the backpressure isn't propagating. Not sure yet where. package reactive_streams_interop;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Publisher;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.observables.GroupedObservable;
import rx.schedulers.Schedulers;
import akka.actor.ActorSystem;
import akka.stream.FlowMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
public class RxAkka {
public static void main(String... args) {
final ActorSystem system = ActorSystem.create("InteropTest");
final FlowMaterializer mat = FlowMaterializer.create(system);
final AtomicInteger numEmitted = new AtomicInteger();
// RxJava Observable
Observable<GroupedObservable<Boolean, Integer>> oddAndEvenGroups = Observable.range(1, 1000000)
.doOnNext(i -> numEmitted.incrementAndGet())
.groupBy(i -> i % 2 == 0)
.take(2);
Observable<String> strings = oddAndEvenGroups.<String> flatMap(group -> {
// schedule odd and even on different event loops
Observable<Integer> asyncGroup = group.observeOn(Schedulers.computation());
/* using Akka Streams */
// convert to Reactive Streams Publisher
Publisher<Integer> groupPublisher = RxReactiveStreams.toPublisher(asyncGroup);
// convert to Akka Streams Source
Source<String> stringSource = Source.from(groupPublisher).map(i -> i + " " + group.getKey());
// convert back from Akka to Rx Observable
return RxReactiveStreams.toObservable(stringSource.take(2000).runWith(Sink.<String> fanoutPublisher(1, 1), mat));
/* using only Rx */
// return asyncGroup.take(2000).map(i -> i + " " + group.getKey());
});
strings.toBlocking().forEach(System.out::println);
system.shutdown();
System.out.println("Number emitted from source (should be < 6000): " + numEmitted.get());
}
} This non-deterministically blows up with:
This means the It sometimes works however:
It works deterministically if I use just Rx without the conversion to/from. I have not yet spent time to hunt down where the issue is occurring. |
The above is done with:
|
Here is an example I'm considering using for a presentation on Tuesday. It's buggy right now (as shown above) but demonstrates the goals of interop while going through non-trivial operators (groupBy and flatMap) along with injected concurrency and thread-hopping. Any recommendations on what to do differently that would be better? Can someone provide me a more realistic example of going from or to Akka Streams? I'd prefer to have something that is not so contrived if possible. @smaldini I'll play more with yours next as shown in https://github.com/rkuhn/ReactiveStreamsInterop/blob/7124906fb50f9a91cee4e8d58c00853898eed239/src/main/java/com/rolandkuhn/rsinterop/JavaMain.java |
This example with Reactor and RxJava seems to be working deterministically: package reactive_streams_interop;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Publisher;
import reactor.rx.Stream;
import reactor.rx.Streams;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.observables.GroupedObservable;
import rx.schedulers.Schedulers;
public class RxReactor {
public static void main(String... args) {
final AtomicInteger numEmitted = new AtomicInteger();
// RxJava Observable
Observable<GroupedObservable<Boolean, Integer>> oddAndEvenGroups = Observable.range(1, 1000000)
.doOnNext(i -> numEmitted.incrementAndGet())
.groupBy(i -> i % 2 == 0)
.take(2);
Observable<String> strings = oddAndEvenGroups.<String> flatMap(group -> {
// schedule odd and even on different event loops
Observable<Integer> asyncGroup = group.observeOn(Schedulers.computation());
// convert to Reactive Streams Publisher
Publisher<Integer> groupPublisher = RxReactiveStreams.toPublisher(asyncGroup);
// Convert to Reactor Stream
final Stream<String> linesStream = Streams.create(groupPublisher).map(i -> i + " " + group.getKey()).take(2000);
// convert back from Reactor Stream to Rx Observable
return RxReactiveStreams.toObservable(linesStream);
});
strings.toBlocking().forEach(System.out::println);
System.out.println("Number emitted from source (should be < 6000): " + numEmitted.get());
}
} Output ends with:
@smaldini Do you have a better example you'd like me to show? |
@rkuhn Are you okay with me using the following slide? Is there anything you'd like me to change? If you can provide me a more realistic example I'd happily change. For example an Akka Actor that acts as a source to RxJava for consumption. |
@smaldini I can't find a logo for Reactor, is there something I should use? |
@alkemist Is there an example you'd like me to show in my presentation at QCon? Are you close to publishing to Maven Central a ratpack-rx version that supports RxJava 1.0? I tried with 0.9.9 but that isn't working (only 17 more days until 1.0 Final and no more breaking changes!). |
@benjchristensen 0.9.10 was released today. It's in Jcenter but a successful sync to Central is proving elusive. Regarding what to demo, I just pushed examples for SSE and WebSocket streaming using publishers. Chunks, SSE and WebSocket are really the only useful ways we leverage RS streams ATM. Up to you what you want to show. |
@benjchristensen The Akka part of the sample is fine, you can make it arbitrarily complex with all the operators out there. The problem you are seeing very likely is that Akka streams buffer less elements, exposing the deadlock in your sample code: following a groupBy with a flatMap can only work under very specific circumstances, in general it is not a safe combination under bounded memory processing (since the even numbers will need to be buffered for as long as it takes to finish all the odd ones). Concerning the exception itself: looking at the lines referenced in the stack trace it seems that data are just pushed too eagerly within the part leading up to |
@alkemist Thanks, I'll try that. |
How much it buffers shouldn't matter, as
What do you mean by this? The
That's correct, hence the problem. It works with Rx+Reactor and with Rx by itself, which implies that the I have not yet spent the time to debug where too much is being requested or the request is being lost. I'll paste examples below using |
Example of RxJava + Reactor where the package reactive_streams_interop;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Publisher;
import reactor.rx.Stream;
import reactor.rx.Streams;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.observables.GroupedObservable;
import rx.schedulers.Schedulers;
public class RxReactor {
public static void main(String... args) {
final AtomicInteger numEmitted = new AtomicInteger();
// RxJava Observable
Observable<GroupedObservable<Boolean, Integer>> oddAndEvenGroups = Observable.range(1, 1000000)
.doOnNext(i -> numEmitted.incrementAndGet())
.groupBy(i -> i % 2 == 0)
.take(2);
Observable<String> strings = oddAndEvenGroups.<String> flatMap(group -> {
// schedule odd and even on different event loops
Observable<Integer> asyncGroup = group.observeOn(Schedulers.computation()).map(i -> {
if (group.getKey()) {
// make 'even' group be slow
try {
Thread.sleep(10);
} catch (Exception e) {
}
} else {
// make 'odd' group be faster but still have some "computational cost"
try {
Thread.sleep(1);
} catch (Exception e) {
}
}
return i;
});
// convert to Reactive Streams Publisher
Publisher<Integer> groupPublisher = RxReactiveStreams.toPublisher(asyncGroup);
// Convert to Reactor Stream
final Stream<String> linesStream = Streams.create(groupPublisher).map(i -> i + " " + group.getKey()).take(2000);
// convert back from Reactor Stream to Rx Observable
return RxReactiveStreams.toObservable(linesStream);
});
strings.toBlocking().forEach(System.out::println);
System.out.println("Number emitted from source (should be < 6000): " + numEmitted.get());
}
} You end up with output such as this:
|
RxJava by itself doing package reactive_streams_interop;
import java.util.concurrent.atomic.AtomicInteger;
import rx.Observable;
import rx.observables.GroupedObservable;
import rx.schedulers.Schedulers;
public class RxJavaGroupByFlatMap {
public static void main(String... args) {
final AtomicInteger numEmitted = new AtomicInteger();
// RxJava Observable
Observable<GroupedObservable<Boolean, Integer>> oddAndEvenGroups = Observable.range(1, 1000000)
.doOnNext(i -> numEmitted.incrementAndGet())
.groupBy(i -> i % 2 == 0)
.take(2);
Observable<String> strings = oddAndEvenGroups.<String> flatMap(group -> {
// schedule odd and even on different event loops
return group.observeOn(Schedulers.computation()).map(i -> {
if (group.getKey()) {
// make 'even' group be slow
try {
Thread.sleep(10);
} catch (Exception e) {
}
} else {
// make 'odd' group be faster but still have some "computational cost"
try {
Thread.sleep(1);
} catch (Exception e) {
}
}
return i;
}).map(i -> i + " " + group.getKey()).take(2000);
});
strings.toBlocking().forEach(System.out::println);
System.out.println("Number emitted from source (should be < 6000): " + numEmitted.get());
}
} |
Here is one that multicasts a stream (use package reactive_streams_interop;
import java.util.concurrent.atomic.AtomicInteger;
import rx.Observable;
import rx.schedulers.Schedulers;
public class RxJavaPublishZip {
public static void main(String... args) {
final AtomicInteger numEmitted = new AtomicInteger();
Observable<Object> strings = Observable.range(1, 1000000).doOnNext(i -> numEmitted.incrementAndGet())
.publish(oi -> {
// schedule it so we are async and need backpressure
Observable<String> odd = oi.observeOn(Schedulers.computation())
.filter(i -> i % 2 != 0).map(i -> i + "-odd").map(s -> {
// make odd slow
try {
Thread.sleep(1);
} catch (Exception e1) {
}
return s;
});
Observable<String> even = oi.observeOn(Schedulers.computation())
.filter(i -> i % 2 == 0).map(i -> i + "-even");
return Observable.zip(odd, even, (o, e) -> o + " " + e + " Thread: " + Thread.currentThread());
}).take(2000);
strings.toBlocking().forEach(System.out::println);
System.out.println("Number emitted from source (should be ~4000): " + numEmitted.get());
}
} Output would be like this:
You can see the backpressure happening because the num emitted is < 5000 in this case rather than spinning and emitting all 1000000. It is not exactly 4000 because we are async and allow buffers in If I put |
@benjchristensen do you have in mind anything more than what we have now in terms of the general pattern? Where the proposed pattern is to have more projects in I'd like to encourage the respective library/framework owners to contribute examples. I don't intend to do it. |
Looking at the readme again, we probably do want at least one example of real interop there. |
@alkemist The examples above are sufficient enough, but we should get them into |
@rkuhn Any thoughts on the examples above? I'm interested in your perspective since you stated that |
Sorry for the delay, I’m traveling too much this month; I’ll try to elaborate later this week, otherwise we can talk it through next week at React. |
We need some examples for the README to demonstrate usage.
Perhaps we can add unit test examples with dependencies on other libraries to demonstrate interop?
The text was updated successfully, but these errors were encountered: