Skip to content

Commit 5ac4d2a

Browse files
davidmotenakarnokd
authored andcommitted
fromAsync - handle post-terminal events (ReactiveX#4427)
1 parent 743f164 commit 5ac4d2a

File tree

2 files changed

+110
-1
lines changed

2 files changed

+110
-1
lines changed

src/main/java/rx/internal/operators/OnSubscribeFromAsync.java

+34-1
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ public NoOverflowBaseAsyncEmitter(Subscriber<? super T> actual) {
223223
}
224224

225225
@Override
226-
public final void onNext(T t) {
226+
public void onNext(T t) {
227227
if (actual.isUnsubscribed()) {
228228
return;
229229
}
@@ -259,10 +259,43 @@ static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
259259

260260
/** */
261261
private static final long serialVersionUID = 338953216916120960L;
262+
263+
private boolean done;
262264

263265
public ErrorAsyncEmitter(Subscriber<? super T> actual) {
264266
super(actual);
265267
}
268+
269+
270+
@Override
271+
public void onNext(T t) {
272+
if (done) {
273+
return;
274+
}
275+
super.onNext(t);
276+
}
277+
278+
279+
@Override
280+
public void onCompleted() {
281+
if (done) {
282+
return;
283+
}
284+
done = true;
285+
super.onCompleted();
286+
}
287+
288+
289+
@Override
290+
public void onError(Throwable e) {
291+
if (done) {
292+
RxJavaHooks.onError(e);
293+
return;
294+
}
295+
done = true;
296+
super.onError(e);
297+
}
298+
266299

267300
@Override
268301
void onOverflow() {

src/test/java/rx/internal/operators/OnSubscribeFromAsyncTest.java

+76
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,19 @@
1616

1717
package rx.internal.operators;
1818

19+
import static org.junit.Assert.assertEquals;
20+
21+
import java.util.Arrays;
22+
import java.util.List;
23+
import java.util.concurrent.CopyOnWriteArrayList;
24+
1925
import org.junit.*;
2026

2127
import rx.*;
2228
import rx.exceptions.*;
2329
import rx.functions.Action1;
2430
import rx.observers.TestSubscriber;
31+
import rx.plugins.RxJavaHooks;
2532
import rx.subjects.PublishSubject;
2633

2734
public class OnSubscribeFromAsyncTest {
@@ -134,6 +141,75 @@ public void normalError() {
134141

135142
Assert.assertEquals("fromAsync: could not emit value due to lack of requests", ts.getOnErrorEvents().get(0).getMessage());
136143
}
144+
145+
@Test
146+
public void overflowErrorIsNotFollowedByAnotherErrorDueToOnNextFromUpstream() {
147+
Action1<AsyncEmitter<Integer>> source = new Action1<AsyncEmitter<Integer>>(){
148+
149+
@Override
150+
public void call(AsyncEmitter<Integer> emitter) {
151+
emitter.onNext(1);
152+
//don't check for unsubscription
153+
emitter.onNext(2);
154+
}};
155+
Observable.fromAsync(source, AsyncEmitter.BackpressureMode.ERROR).unsafeSubscribe(ts);
156+
157+
ts.assertNoValues();
158+
ts.assertError(MissingBackpressureException.class);
159+
ts.assertNotCompleted();
160+
161+
Assert.assertEquals("fromAsync: could not emit value due to lack of requests", ts.getOnErrorEvents().get(0).getMessage());
162+
}
163+
164+
@Test
165+
public void overflowErrorIsNotFollowedByAnotherCompletedDueToCompletedFromUpstream() {
166+
Action1<AsyncEmitter<Integer>> source = new Action1<AsyncEmitter<Integer>>(){
167+
168+
@Override
169+
public void call(AsyncEmitter<Integer> emitter) {
170+
emitter.onNext(1);
171+
//don't check for unsubscription
172+
emitter.onCompleted();
173+
}};
174+
Observable.fromAsync(source, AsyncEmitter.BackpressureMode.ERROR).unsafeSubscribe(ts);
175+
176+
ts.assertNoValues();
177+
ts.assertError(MissingBackpressureException.class);
178+
ts.assertNotCompleted();
179+
180+
Assert.assertEquals("fromAsync: could not emit value due to lack of requests", ts.getOnErrorEvents().get(0).getMessage());
181+
}
182+
183+
@Test
184+
public void overflowErrorIsNotFollowedByAnotherErrorDueToOnErrorFromUpstreamAndSecondErrorIsReportedToHook() {
185+
try {
186+
final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
187+
RxJavaHooks.setOnError(new Action1<Throwable>() {
188+
@Override
189+
public void call(Throwable t) {
190+
list.add(t);
191+
}});
192+
final RuntimeException e = new RuntimeException();
193+
Action1<AsyncEmitter<Integer>> source = new Action1<AsyncEmitter<Integer>>(){
194+
195+
@Override
196+
public void call(AsyncEmitter<Integer> emitter) {
197+
emitter.onNext(1);
198+
//don't check for unsubscription
199+
emitter.onError(e);
200+
}};
201+
Observable.fromAsync(source, AsyncEmitter.BackpressureMode.ERROR).unsafeSubscribe(ts);
202+
203+
ts.assertNoValues();
204+
ts.assertError(MissingBackpressureException.class);
205+
ts.assertNotCompleted();
206+
207+
Assert.assertEquals("fromAsync: could not emit value due to lack of requests", ts.getOnErrorEvents().get(0).getMessage());
208+
assertEquals(Arrays.asList(e), list);
209+
} finally {
210+
RxJavaHooks.reset();
211+
}
212+
}
137213

138214
@Test
139215
public void errorBuffered() {

0 commit comments

Comments
 (0)