Skip to content

Commit c2709b3

Browse files
Merge pull request ReactiveX#586 from akarnokd/ConcatFix
Fix Concat to allow multiple observers
2 parents d378ef2 + 21f7d52 commit c2709b3

File tree

2 files changed

+66
-8
lines changed

2 files changed

+66
-8
lines changed

rxjava-core/src/main/java/rx/operators/OperationConcat.java

+23-7
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,12 @@ public static <T> OnSubscribeFunc<T> concat(final Iterable<? extends Observable<
5151
}
5252

5353
public static <T> OnSubscribeFunc<T> concat(final Observable<? extends Observable<? extends T>> sequences) {
54-
return new Concat<T>(sequences);
54+
return new OnSubscribeFunc<T>() {
55+
@Override
56+
public Subscription onSubscribe(Observer<? super T> t1) {
57+
return new Concat<T>(sequences).onSubscribe(t1);
58+
}
59+
};
5560
}
5661

5762
private static class Concat<T> implements OnSubscribeFunc<T> {
@@ -121,8 +126,12 @@ public void onNext(Observable<? extends T> nextSequence) {
121126
@Override
122127
public void onError(Throwable e) {
123128
if (completedOrErred.compareAndSet(false, true)) {
124-
if (innerSubscription != null) {
125-
innerSubscription.unsubscribe();
129+
Subscription q;
130+
synchronized (nextSequences) {
131+
q = innerSubscription;
132+
}
133+
if (q != null) {
134+
q.unsubscribe();
126135
}
127136
observer.onError(e);
128137
}
@@ -131,7 +140,11 @@ public void onError(Throwable e) {
131140
@Override
132141
public void onCompleted() {
133142
allSequencesReceived.set(true);
134-
if (innerSubscription == null) {
143+
Subscription q;
144+
synchronized (nextSequences) {
145+
q = innerSubscription;
146+
}
147+
if (q == null) {
135148
// We are not subscribed to any sequence, and none are coming anymore
136149
if (completedOrErred.compareAndSet(false, true)) {
137150
observer.onCompleted();
@@ -143,11 +156,14 @@ public void onCompleted() {
143156
return new Subscription() {
144157
@Override
145158
public void unsubscribe() {
159+
Subscription q;
146160
synchronized (nextSequences) {
147-
if (innerSubscription != null)
148-
innerSubscription.unsubscribe();
149-
outerSubscription.unsubscribe();
161+
q = innerSubscription;
162+
}
163+
if (q != null) {
164+
q.unsubscribe();
150165
}
166+
outerSubscription.unsubscribe();
151167
}
152168
};
153169
}

rxjava-core/src/test/java/rx/operators/OperationConcatTest.java

+43-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package rx.operators;
1717

1818
import static org.junit.Assert.*;
19-
import static org.mockito.Matchers.*;
2019
import static org.mockito.Mockito.*;
2120
import static rx.operators.OperationConcat.*;
2221

@@ -33,6 +32,7 @@
3332
import rx.Observable;
3433
import rx.Observer;
3534
import rx.Subscription;
35+
import rx.concurrency.TestScheduler;
3636
import rx.subscriptions.BooleanSubscription;
3737

3838
public class OperationConcatTest {
@@ -556,4 +556,46 @@ public void run() {
556556
return s;
557557
}
558558
}
559+
@Test
560+
public void testMultipleObservers() {
561+
Observer<Object> o1 = mock(Observer.class);
562+
Observer<Object> o2 = mock(Observer.class);
563+
564+
TestScheduler s = new TestScheduler();
565+
566+
Observable<Long> timer = Observable.interval(500, TimeUnit.MILLISECONDS, s).take(2);
567+
Observable<Long> o = Observable.concat(timer, timer);
568+
569+
o.subscribe(o1);
570+
o.subscribe(o2);
571+
572+
InOrder inOrder1 = inOrder(o1);
573+
InOrder inOrder2 = inOrder(o2);
574+
575+
s.advanceTimeBy(500, TimeUnit.MILLISECONDS);
576+
577+
inOrder1.verify(o1, times(1)).onNext(0L);
578+
inOrder2.verify(o2, times(1)).onNext(0L);
579+
580+
s.advanceTimeBy(500, TimeUnit.MILLISECONDS);
581+
582+
inOrder1.verify(o1, times(1)).onNext(1L);
583+
inOrder2.verify(o2, times(1)).onNext(1L);
584+
585+
s.advanceTimeBy(500, TimeUnit.MILLISECONDS);
586+
587+
inOrder1.verify(o1, times(1)).onNext(0L);
588+
inOrder2.verify(o2, times(1)).onNext(0L);
589+
590+
s.advanceTimeBy(500, TimeUnit.MILLISECONDS);
591+
592+
inOrder1.verify(o1, times(1)).onNext(1L);
593+
inOrder2.verify(o2, times(1)).onNext(1L);
594+
595+
inOrder1.verify(o1, times(1)).onCompleted();
596+
inOrder2.verify(o2, times(1)).onCompleted();
597+
598+
verify(o1, never()).onError(any(Throwable.class));
599+
verify(o2, never()).onError(any(Throwable.class));
600+
}
559601
}

0 commit comments

Comments
 (0)