Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Examples with Interop #5

Open
benjchristensen opened this issue Oct 1, 2014 · 47 comments
Open

Examples with Interop #5

benjchristensen opened this issue Oct 1, 2014 · 47 comments

Comments

@benjchristensen
Copy link
Member

We need some examples for the README to demonstrate usage.

Perhaps we can add unit test examples with dependencies on other libraries to demonstrate interop?

@benjchristensen
Copy link
Member Author

/cc @smaldini

@rkuhn
Copy link

rkuhn commented Oct 29, 2014

I’m currently looking into trying out some interop with Akka Streams, are artifacts published somewhere?

@ldaley
Copy link
Collaborator

ldaley commented Oct 29, 2014

You’ll have to hold off. It’s not spec compliant yet. Once it is I’ll get some artefacts published.

@rkuhn
Copy link

rkuhn commented Oct 29, 2014

Bummer; I guess I’ll look into ratpack and Reactor then (want to show something at conferences next week).

@ldaley
Copy link
Collaborator

ldaley commented Oct 29, 2014

I’m actively working on this right now. Should have it working in the next day or two.

@rkuhn
Copy link

rkuhn commented Oct 29, 2014

Cool, would you please ping this issue when I should try it out? Thanks!

@ldaley
Copy link
Collaborator

ldaley commented Oct 29, 2014

What I've just pushed should be compliant and I think is good enough for an initial release. There's probably bugs and there are certainly performance problems, but I think it can go out.

@benjchristensen how do we get a binary out?

@benjchristensen
Copy link
Member Author

I'll push the buttons (since we haven't yet got the fully self-serve Travis based release process working yet like RxScala).

@benjchristensen
Copy link
Member Author

0.2.0 is released on BinTray (https://bintray.com/reactivex/RxJava/RxJavaReactiveStreams/0.2.0/view) and is making its way to Maven Central.

@alkemist Thank you for your work on this. Would you mind providing a section on the README or in the Wiki with basic usage examples?

@smaldini
Copy link

Looking forward to it for my next demos :D

On Wed, Oct 29, 2014 at 5:23 PM, Ben Christensen [email protected]
wrote:

0.2.0 is released on BinTray (
https://bintray.com/reactivex/RxJava/RxJavaReactiveStreams/0.2.0/view)
and is making its way to Maven Central.

@alkemist https://github.com/alkemist Thank you for your work on this.
Would you mind providing a section on the README or in the Wiki with basic
usage examples?


Reply to this email directly or view it on GitHub
#5 (comment)
.

Stéphane

@benjchristensen
Copy link
Member Author

It can be seen on Maven Central now: http://repo1.maven.org/maven2/io/reactivex/rxjava-reactive-streams/0.2.0/

@rkuhn
Copy link

rkuhn commented Oct 29, 2014

Thanks a lot, @benjchristensen and @alkemist!

@ldaley
Copy link
Collaborator

ldaley commented Oct 30, 2014

Pushed one example of interop with Ratpack: https://github.com/ReactiveX/RxJavaReactiveStreams/blob/0.x/examples/ratpack/src/test/java/rx/reactivestreams/example/ratpack/RatpackExamples.java#L46-46

Leaving this ticket open for more examples and some stuff in the README.

@benjchristensen
Copy link
Member Author

Cool, nice to see that code, that's very helpful.

@rkuhn
Copy link

rkuhn commented Oct 30, 2014

@alkemist I’m trying to extract the sample code you linked to (since I cannot figure out how to run the tests you point to). The problem I am facing is that I cannot figure out how to publish ratpack 0.9.10-SNAPSHOT locally, what is the magic incantation? I tried adding the maven-publish plugin but that does not do anything when I say ./gradlew publishToMavenLocal (and ratpack-rx:publishToMavenLocal does not exist).

@smaldini
Copy link

@rkuhn can you use this repo: maven { url "http://oss.jfrog.org/repo" } ? can't remember for sure but I think this is where goes all the ratpack snapshots. BTW if you can share your example I want to complete it with Reactor as I keep pitching : Reactor for the backend access/data layer, Akka Streams to get into an Actor system and scale out, RxJava to bridge with some metrics and especially Hystrix right now, Ratpack to bridge with the HTTP client (WS/ESS) 💃

@rkuhn
Copy link

rkuhn commented Oct 30, 2014

Got it working, thanks! You can find my sample project here. @benjchristensen this might be interesting for you as well.

@smaldini
Copy link

@rkuhn Having some hard time configuring the sample build into an IDEA project :(

@benjchristensen
Copy link
Member Author

Thanks @rkuhn ... that code looks like a good example for my talks next week as well. Do you mind if I use it (and possibly tweak/enhance it)?

@rkuhn
Copy link

rkuhn commented Oct 30, 2014

@benjchristensen By all means: use it!

@smaldini That is the reason why I created an sbt project, I cannot figure out this gradle thing ;-) You should be able to just add a gradle build if you know better how that works (I don’t use IDEA).

@rkuhn
Copy link

rkuhn commented Oct 30, 2014

@smaldini I added Reactor to my sample: Java and Scala. It works!

@smaldini
Copy link

Beautiful

@smaldini
Copy link

I am trying to convert the project to our own sample suite as I am having issues with both. Do you know where I can find 0.9 snapshots for Akka Streams, my build seems to complain about that.

@rkuhn
Copy link

rkuhn commented Oct 30, 2014

Ah, yes: I am about to publish a suitable version, should not take more than an hour.

@smaldini
Copy link

I'll also propose we all use our own schedulers/dispatchers to make it more obvious that we talk each other handling async back pressure 👯

E.g. rxjava.observeOn(Schedulers.computation()), reactorStream.dispatchOn(new Environment())

@rkuhn
Copy link

rkuhn commented Oct 30, 2014

Akka Streams 0.10-M1 is on its way to Maven Central; I’ll update my code as soon as it is there.

@rkuhn
Copy link

rkuhn commented Oct 30, 2014

updated the build and it still works :-)

@benjchristensen
Copy link
Member Author

Using Akka Streams 0.10-M1 I was playing around and I've found an issue somewhere that the backpressure isn't propagating. Not sure yet where.

package reactive_streams_interop;

import java.util.concurrent.atomic.AtomicInteger;

import org.reactivestreams.Publisher;

import rx.Observable;
import rx.RxReactiveStreams;
import rx.observables.GroupedObservable;
import rx.schedulers.Schedulers;
import akka.actor.ActorSystem;
import akka.stream.FlowMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

public class RxAkka {

    public static void main(String... args) {
        final ActorSystem system = ActorSystem.create("InteropTest");
        final FlowMaterializer mat = FlowMaterializer.create(system);

        final AtomicInteger numEmitted = new AtomicInteger();

        // RxJava Observable
        Observable<GroupedObservable<Boolean, Integer>> oddAndEvenGroups = Observable.range(1, 1000000)
                .doOnNext(i -> numEmitted.incrementAndGet())
                .groupBy(i -> i % 2 == 0)
                .take(2);

        Observable<String> strings = oddAndEvenGroups.<String> flatMap(group -> {
            // schedule odd and even on different event loops
                Observable<Integer> asyncGroup = group.observeOn(Schedulers.computation());
                /* using Akka Streams */
                // convert to Reactive Streams Publisher
                Publisher<Integer> groupPublisher = RxReactiveStreams.toPublisher(asyncGroup);
                // convert to Akka Streams Source
                Source<String> stringSource = Source.from(groupPublisher).map(i -> i + " " + group.getKey());
                // convert back from Akka to Rx Observable
                return RxReactiveStreams.toObservable(stringSource.take(2000).runWith(Sink.<String> fanoutPublisher(1, 1), mat));

                /* using only Rx */
                //                return asyncGroup.take(2000).map(i -> i + " " + group.getKey());
            });

        strings.toBlocking().forEach(System.out::println);
        system.shutdown();
        System.out.println("Number emitted from source (should be < 6000): " + numEmitted.get());
    }
}

This non-deterministically blows up with:

Exception in thread "main" java.lang.RuntimeException: rx.exceptions.MissingBackpressureException
    at rx.observables.BlockingObservable.forEach(BlockingObservable.java:138)
    at reactive_streams_interop.RxAkka.main(RxAkka.java:45)
Caused by: rx.exceptions.MissingBackpressureException
    at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:222)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:115)
    at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$1$3.onNext(OperatorGroupBy.java:236)
    at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:181)
    at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
    at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.emitItem(OperatorGroupBy.java:278)
    at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onNext(OperatorGroupBy.java:182)
    at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:84)
    at rx.internal.operators.OnSubscribeRange$RangeProducer.request(OnSubscribeRange.java:93)
    at rx.Subscriber.setProducer(Subscriber.java:143)
    at rx.Subscriber.setProducer(Subscriber.java:137)
    at rx.internal.operators.OnSubscribeRange.call(OnSubscribeRange.java:39)
    at rx.internal.operators.OnSubscribeRange.call(OnSubscribeRange.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable.subscribe(Observable.java:7463)
    at rx.observables.BlockingObservable.forEach(BlockingObservable.java:98)
    ... 1 more
[ERROR] [11/01/2014 22:45:52.646] [InteropTest-akka.actor.default-dispatcher-16] [akka://InteropTest/user/$a/flow-2-1-map] failure during processing
rx.exceptions.MissingBackpressureException
    at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:222)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:115)
    at rx.internal.operators.OperatorGroupBy$GroupBySubscriber$1$3.onNext(OperatorGroupBy.java:236)
    at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:181)
    at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
    at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.emitItem(OperatorGroupBy.java:278)
    at rx.internal.operators.OperatorGroupBy$GroupBySubscriber.onNext(OperatorGroupBy.java:182)
    at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:84)
    at rx.internal.operators.OnSubscribeRange$RangeProducer.request(OnSubscribeRange.java:93)
    at rx.Subscriber.setProducer(Subscriber.java:143)
    at rx.Subscriber.setProducer(Subscriber.java:137)
    at rx.internal.operators.OnSubscribeRange.call(OnSubscribeRange.java:39)
    at rx.internal.operators.OnSubscribeRange.call(OnSubscribeRange.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable$1.call(Observable.java:145)
    at rx.Observable$1.call(Observable.java:1)
    at rx.Observable.subscribe(Observable.java:7463)
    at rx.observables.BlockingObservable.forEach(BlockingObservable.java:98)
    at reactive_streams_interop.RxAkka.main(RxAkka.java:45)

This means the request flow isn't working.

It sometimes works however:

3998 true
4000 true
Number emitted from source (should be < 6000): 7055
[INFO] [11/01/2014 22:46:56.696] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-1-fanoutPublisher] Message [akka.stream.impl.Cancel] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-1-fanoutPublisher#-718931434] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.696] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-2-2-take] Message [akka.stream.impl.RequestMore] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-2-2-take#-1094054996] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.696] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-2-2-take] Message [akka.stream.impl.RequestMore] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-2-2-take#-1094054996] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.696] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-2-fanoutPublisher] Message [akka.stream.impl.Cancel] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-2-fanoutPublisher#51622231] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.696] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-1-2-take] Message [akka.stream.actor.ActorSubscriberMessage$OnNext] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-1-2-take#-1970444328] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.697] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-1-2-take] Message [akka.stream.actor.ActorSubscriberMessage$OnNext] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-1-2-take#-1970444328] was not delivered. [6] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.697] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-1-2-take] Message [akka.stream.actor.ActorSubscriberMessage$OnNext] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-1-2-take#-1970444328] was not delivered. [7] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.697] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-1-2-take] Message [akka.stream.actor.ActorSubscriberMessage$OnNext] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-1-2-take#-1970444328] was not delivered. [8] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [11/01/2014 22:46:56.697] [InteropTest-akka.actor.default-dispatcher-23] [akka://InteropTest/user/$a/flow-1-2-take] Message [akka.stream.impl.RequestMore] from Actor[akka://InteropTest/deadLetters] to Actor[akka://InteropTest/user/$a/flow-1-2-take#-1970444328] was not delivered. [9] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

It works deterministically if I use just Rx without the conversion to/from. I have not yet spent time to hunt down where the issue is occurring.

@benjchristensen
Copy link
Member Author

The above is done with:

compile 'io.reactivex:rxjava:1.0.0.rc.8'
compile 'io.reactivex:rxjava-reactive-streams:0.3.0'
compile 'com.typesafe.akka:akka-stream-experimental_2.11:0.10-M1'

@benjchristensen
Copy link
Member Author

Here is an example I'm considering using for a presentation on Tuesday. It's buggy right now (as shown above) but demonstrates the goals of interop while going through non-trivial operators (groupBy and flatMap) along with injected concurrency and thread-hopping.

rxjava-akka-streams-interop-example

Any recommendations on what to do differently that would be better? Can someone provide me a more realistic example of going from or to Akka Streams? I'd prefer to have something that is not so contrived if possible.

@smaldini I'll play more with yours next as shown in https://github.com/rkuhn/ReactiveStreamsInterop/blob/7124906fb50f9a91cee4e8d58c00853898eed239/src/main/java/com/rolandkuhn/rsinterop/JavaMain.java

@benjchristensen
Copy link
Member Author

This example with Reactor and RxJava seems to be working deterministically:

package reactive_streams_interop;

import java.util.concurrent.atomic.AtomicInteger;

import org.reactivestreams.Publisher;

import reactor.rx.Stream;
import reactor.rx.Streams;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.observables.GroupedObservable;
import rx.schedulers.Schedulers;

public class RxReactor {

    public static void main(String... args) {

        final AtomicInteger numEmitted = new AtomicInteger();

        // RxJava Observable
        Observable<GroupedObservable<Boolean, Integer>> oddAndEvenGroups = Observable.range(1, 1000000)
                .doOnNext(i -> numEmitted.incrementAndGet())
                .groupBy(i -> i % 2 == 0)
                .take(2);

        Observable<String> strings = oddAndEvenGroups.<String> flatMap(group -> {
            // schedule odd and even on different event loops
                Observable<Integer> asyncGroup = group.observeOn(Schedulers.computation());

                // convert to Reactive Streams Publisher
                Publisher<Integer> groupPublisher = RxReactiveStreams.toPublisher(asyncGroup);

                // Convert to Reactor Stream
                final Stream<String> linesStream = Streams.create(groupPublisher).map(i -> i + " " + group.getKey()).take(2000);

                // convert back from Reactor Stream to Rx Observable
                return RxReactiveStreams.toObservable(linesStream);
            });

        strings.toBlocking().forEach(System.out::println);
        System.out.println("Number emitted from source (should be < 6000): " + numEmitted.get());
    }
}

Output ends with:

3987 false
3989 false
3991 false
3993 false
3995 false
3997 false
3999 false
Number emitted from source (should be < 6000): 4098
// sometimes with this (which is okay ... concurrency races) ...
Number emitted from source (should be < 6000): 6147

@smaldini Do you have a better example you'd like me to show?

@benjchristensen
Copy link
Member Author

rxjava-reactor-interop-example

@benjchristensen
Copy link
Member Author

@rkuhn Are you okay with me using the following slide? Is there anything you'd like me to change?

rxjava akka-streams

If you can provide me a more realistic example I'd happily change. For example an Akka Actor that acts as a source to RxJava for consumption.

@benjchristensen
Copy link
Member Author

@smaldini I can't find a logo for Reactor, is there something I should use?

@benjchristensen
Copy link
Member Author

@alkemist Is there an example you'd like me to show in my presentation at QCon? Are you close to publishing to Maven Central a ratpack-rx version that supports RxJava 1.0? I tried with 0.9.9 but that isn't working (only 17 more days until 1.0 Final and no more breaking changes!).

@ldaley
Copy link
Collaborator

ldaley commented Nov 2, 2014

@benjchristensen 0.9.10 was released today. It's in Jcenter but a successful sync to Central is proving elusive. Regarding what to demo, I just pushed examples for SSE and WebSocket streaming using publishers. Chunks, SSE and WebSocket are really the only useful ways we leverage RS streams ATM.

Up to you what you want to show.

@rkuhn
Copy link

rkuhn commented Nov 3, 2014

@benjchristensen The Akka part of the sample is fine, you can make it arbitrarily complex with all the operators out there.

The problem you are seeing very likely is that Akka streams buffer less elements, exposing the deadlock in your sample code: following a groupBy with a flatMap can only work under very specific circumstances, in general it is not a safe combination under bounded memory processing (since the even numbers will need to be buffered for as long as it takes to finish all the odd ones).

Concerning the exception itself: looking at the lines referenced in the stack trace it seems that data are just pushed too eagerly within the part leading up to observeOn, either by not respecting the signaled demand or by observeOn signaling more than it can take in. Do the RxJava or Reactor versions also work if you insert a Thread.sleep(1) within the map operation?

@benjchristensen
Copy link
Member Author

@alkemist Thanks, I'll try that.

@benjchristensen
Copy link
Member Author

@rkuhn

very likely is that Akka streams buffer less elements

How much it buffers shouldn't matter, as request(n) will determine how much it requests. One impl should be able to request(1) and another request(Long.MAX_VALUE) and it work.

exposing the deadlock in your sample code: following a groupBy with a flatMap can only work under very specific circumstances

What do you mean by this? The groupBy and flatMap uses cases work and correctly slow down (i.e. apply backpressure) to whatever the slowest group is. It would only "dead lock" as you call it if you completely blocked one group, which is exactly what should happen in a backpressure case. The user is free to decouple fast and slow streams if they wish by using a multitude of different operators that will make it behave differently (such as sample, throttle, onBackpressureBuffer, onBackpressureDrop, debounce, buffer, window, etc).

it seems that data are just pushed too eagerly within the part leading up to observeOn

That's correct, hence the problem. It works with Rx+Reactor and with Rx by itself, which implies that the request(n) is not correctly being composed through when it is Rx+Akka. It may be the RxJavaReactiveStream implementation, but Rx+Reactor are working with the exact same use case.

I have not yet spent the time to debug where too much is being requested or the request is being lost.

I'll paste examples below using Thread.sleep to demonstrate that it works with RxJava by itself or with RxJava+Reactor.

@benjchristensen
Copy link
Member Author

Example of RxJava + Reactor where the even group takes 10ms per item and the odd group takes 1ms per item

package reactive_streams_interop;

import java.util.concurrent.atomic.AtomicInteger;

import org.reactivestreams.Publisher;

import reactor.rx.Stream;
import reactor.rx.Streams;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.observables.GroupedObservable;
import rx.schedulers.Schedulers;

public class RxReactor {

    public static void main(String... args) {

        final AtomicInteger numEmitted = new AtomicInteger();

        // RxJava Observable
        Observable<GroupedObservable<Boolean, Integer>> oddAndEvenGroups = Observable.range(1, 1000000)
                .doOnNext(i -> numEmitted.incrementAndGet())
                .groupBy(i -> i % 2 == 0)
                .take(2);

        Observable<String> strings = oddAndEvenGroups.<String> flatMap(group -> {
            // schedule odd and even on different event loops
                Observable<Integer> asyncGroup = group.observeOn(Schedulers.computation()).map(i -> {
                    if (group.getKey()) {
                        // make 'even' group be slow
                        try {
                            Thread.sleep(10);
                        } catch (Exception e) {
                        }
                    } else {
                        // make 'odd' group be faster but still have some "computational cost"
                        try {
                            Thread.sleep(1);
                        } catch (Exception e) {
                        }
                    }
                    return i;
                });

                // convert to Reactive Streams Publisher
                Publisher<Integer> groupPublisher = RxReactiveStreams.toPublisher(asyncGroup);

                // Convert to Reactor Stream
                final Stream<String> linesStream = Streams.create(groupPublisher).map(i -> i + " " + group.getKey()).take(2000);

                // convert back from Reactor Stream to Rx Observable
                return RxReactiveStreams.toObservable(linesStream);
            });

        strings.toBlocking().forEach(System.out::println);
        System.out.println("Number emitted from source (should be < 6000): " + numEmitted.get());
    }
}

You end up with output such as this:

1 false
3 false
5 false
7 false
9 false
11 false
13 false
15 false
17 false
2 true
19 false
21 false
23 false
...
3990 true
3992 true
3994 true
3996 true
3998 true
4000 true
Number emitted from source (should be < 6000): 4097

@benjchristensen
Copy link
Member Author

RxJava by itself doing groupBy + flatMap:

package reactive_streams_interop;

import java.util.concurrent.atomic.AtomicInteger;

import rx.Observable;
import rx.observables.GroupedObservable;
import rx.schedulers.Schedulers;

public class RxJavaGroupByFlatMap {

    public static void main(String... args) {

        final AtomicInteger numEmitted = new AtomicInteger();

        // RxJava Observable
        Observable<GroupedObservable<Boolean, Integer>> oddAndEvenGroups = Observable.range(1, 1000000)
                .doOnNext(i -> numEmitted.incrementAndGet())
                .groupBy(i -> i % 2 == 0)
                .take(2);

        Observable<String> strings = oddAndEvenGroups.<String> flatMap(group -> {
            // schedule odd and even on different event loops
                return group.observeOn(Schedulers.computation()).map(i -> {
                    if (group.getKey()) {
                        // make 'even' group be slow
                        try {
                            Thread.sleep(10);
                        } catch (Exception e) {
                        }
                    } else {
                        // make 'odd' group be faster but still have some "computational cost"
                        try {
                            Thread.sleep(1);
                        } catch (Exception e) {
                        }
                    }
                    return i;
                }).map(i -> i + " " + group.getKey()).take(2000);

            });

        strings.toBlocking().forEach(System.out::println);
        System.out.println("Number emitted from source (should be < 6000): " + numEmitted.get());
    }
}

@benjchristensen
Copy link
Member Author

Here is one that multicasts a stream (use publish) creates 2 async/parallel streams off of it (odd and even using filtering) then zips them together with one of the streams slow.

package reactive_streams_interop;

import java.util.concurrent.atomic.AtomicInteger;

import rx.Observable;
import rx.schedulers.Schedulers;

public class RxJavaPublishZip {

    public static void main(String... args) {
        final AtomicInteger numEmitted = new AtomicInteger();

        Observable<Object> strings = Observable.range(1, 1000000).doOnNext(i -> numEmitted.incrementAndGet())
                .publish(oi -> {
                    // schedule it so we are async and need backpressure
                        Observable<String> odd = oi.observeOn(Schedulers.computation())
                                .filter(i -> i % 2 != 0).map(i -> i + "-odd").map(s -> {
                                    // make odd slow
                                        try {
                                            Thread.sleep(1);
                                        } catch (Exception e1) {
                                        }
                                        return s;
                                    });
                        Observable<String> even = oi.observeOn(Schedulers.computation())
                                .filter(i -> i % 2 == 0).map(i -> i + "-even");
                        return Observable.zip(odd, even, (o, e) -> o + " " + e + "   Thread: " + Thread.currentThread());
                    }).take(2000);

        strings.toBlocking().forEach(System.out::println);
        System.out.println("Number emitted from source (should be ~4000): " + numEmitted.get());
    }
}

Output would be like this:

1-odd 2-even   Thread: Thread[RxComputationThreadPool-3,5,main]
3-odd 4-even   Thread: Thread[RxComputationThreadPool-3,5,main]
5-odd 6-even   Thread: Thread[RxComputationThreadPool-3,5,main]
7-odd 8-even   Thread: Thread[RxComputationThreadPool-3,5,main]
9-odd 10-even   Thread: Thread[RxComputationThreadPool-3,5,main]
11-odd 12-even   Thread: Thread[RxComputationThreadPool-4,5,main]
13-odd 14-even   Thread: Thread[RxComputationThreadPool-3,5,main]
... 
3995-odd 3996-even   Thread: Thread[RxComputationThreadPool-3,5,main]
3997-odd 3998-even   Thread: Thread[RxComputationThreadPool-3,5,main]
3999-odd 4000-even   Thread: Thread[RxComputationThreadPool-3,5,main]
Number emitted from source (should be ~4000): 4864

You can see the backpressure happening because the num emitted is < 5000 in this case rather than spinning and emitting all 1000000. It is not exactly 4000 because we are async and allow buffers in observeOn to fill.

If I put take on the individual streams that it would be exact as it would propagate through, but via zip it can't do that as it splits streams, so each stream maintains its own backpressure and zip also maintains its own.

@ldaley
Copy link
Collaborator

ldaley commented Nov 8, 2014

@benjchristensen do you have in mind anything more than what we have now in terms of the general pattern? Where the proposed pattern is to have more projects in examples/ integrating different libraries/frameworks.

I'd like to encourage the respective library/framework owners to contribute examples. I don't intend to do it.

@ldaley
Copy link
Collaborator

ldaley commented Nov 8, 2014

Looking at the readme again, we probably do want at least one example of real interop there.

@benjchristensen
Copy link
Member Author

@alkemist The examples above are sufficient enough, but we should get them into /examples or something like that as you suggest, and probably have the simple ones presented in the README or wiki similar to how my slides give a quick intro: https://speakerdeck.com/benjchristensen/reactive-programming-with-rx-at-qconsf-2014?slide=158

@benjchristensen
Copy link
Member Author

@rkuhn Any thoughts on the examples above? I'm interested in your perspective since you stated that groupBy and flatMap are not safe.

@rkuhn
Copy link

rkuhn commented Nov 11, 2014

Sorry for the delay, I’m traveling too much this month; I’ll try to elaborate later this week, otherwise we can talk it through next week at React.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants