Skip to content

Commit 8819cc9

Browse files
authored
2.x: wrap undeliverable errors (#5080)
* 2.x: wrap undeliverable errors * Add CompositeException to isBug, add test for isBug
1 parent 66fbf5a commit 8819cc9

File tree

154 files changed

+447
-255
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

154 files changed

+447
-255
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.exceptions;
15+
16+
import io.reactivex.annotations.Experimental;
17+
18+
/**
19+
* Explicitly named exception to indicate a Reactive-Streams
20+
* protocol violation.
21+
* @since 2.0.6 - experimental
22+
*/
23+
@Experimental
24+
public final class ProtocolViolationException extends IllegalStateException {
25+
26+
private static final long serialVersionUID = 1644750035281290266L;
27+
28+
/**
29+
* Creates an instance with the given message.
30+
* @param message the message
31+
*/
32+
public ProtocolViolationException(String message) {
33+
super(message);
34+
}
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.exceptions;
15+
16+
import io.reactivex.annotations.Experimental;
17+
18+
/**
19+
* Wrapper for Throwable errors that are sent to `RxJavaPlugins.onError`.
20+
* @since 2.0.6 - experimental
21+
*/
22+
@Experimental
23+
public final class UndeliverableException extends IllegalStateException {
24+
25+
private static final long serialVersionUID = 1644750035281290266L;
26+
27+
/**
28+
* Construct an instance by wrapping the given, non-null
29+
* cause Throwable.
30+
* @param cause the cause, not null
31+
*/
32+
public UndeliverableException(Throwable cause) {
33+
super(cause);
34+
}
35+
}

src/main/java/io/reactivex/internal/disposables/DisposableHelper.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.util.concurrent.atomic.AtomicReference;
1717

1818
import io.reactivex.disposables.Disposable;
19+
import io.reactivex.exceptions.ProtocolViolationException;
1920
import io.reactivex.internal.functions.ObjectHelper;
2021
import io.reactivex.plugins.RxJavaPlugins;
2122

@@ -152,7 +153,7 @@ public static boolean validate(Disposable current, Disposable next) {
152153
* Reports that the disposable is already set to the RxJavaPlugins error handler.
153154
*/
154155
public static void reportDisposableSet() {
155-
RxJavaPlugins.onError(new IllegalStateException("Disposable already set!"));
156+
RxJavaPlugins.onError(new ProtocolViolationException("Disposable already set!"));
156157
}
157158

158159
/**

src/main/java/io/reactivex/internal/subscriptions/SubscriptionHelper.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import org.reactivestreams.Subscription;
1919

20+
import io.reactivex.exceptions.ProtocolViolationException;
2021
import io.reactivex.internal.functions.ObjectHelper;
2122
import io.reactivex.internal.util.BackpressureHelper;
2223
import io.reactivex.plugins.RxJavaPlugins;
@@ -67,7 +68,7 @@ public static boolean validate(Subscription current, Subscription next) {
6768
* which is an indication of a onSubscribe management bug.
6869
*/
6970
public static void reportSubscriptionSet() {
70-
RxJavaPlugins.onError(new IllegalStateException("Subscription already set!"));
71+
RxJavaPlugins.onError(new ProtocolViolationException("Subscription already set!"));
7172
}
7273

7374
/**
@@ -89,7 +90,7 @@ public static boolean validate(long n) {
8990
* @param n the overproduction amount
9091
*/
9192
public static void reportMoreProduced(long n) {
92-
RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + n));
93+
RxJavaPlugins.onError(new ProtocolViolationException("More produced than requested: " + n));
9394
}
9495
/**
9596
* Check if the given subscription is the common cancelled subscription.

src/main/java/io/reactivex/plugins/RxJavaPlugins.java

+45
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.reactivex.annotations.Experimental;
2626
import io.reactivex.annotations.NonNull;
2727
import io.reactivex.annotations.Nullable;
28+
import io.reactivex.exceptions.*;
2829
import io.reactivex.flowables.ConnectableFlowable;
2930
import io.reactivex.functions.BiFunction;
3031
import io.reactivex.functions.BooleanSupplier;
@@ -360,6 +361,10 @@ public static void onError(@NonNull Throwable error) {
360361

361362
if (error == null) {
362363
error = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
364+
} else {
365+
if (!isBug(error)) {
366+
error = new UndeliverableException(error);
367+
}
363368
}
364369

365370
if (f != null) {
@@ -377,6 +382,46 @@ public static void onError(@NonNull Throwable error) {
377382
uncaught(error);
378383
}
379384

385+
/**
386+
* Checks if the given error is one of the already named
387+
* bug cases that should pass through {@link #onError(Throwable)}
388+
* as is.
389+
* @param error the error to check
390+
* @return true if the error should pass throug, false if
391+
* it may be wrapped into an UndeliverableException
392+
*/
393+
static boolean isBug(Throwable error) {
394+
// user forgot to add the onError handler in subscribe
395+
if (error instanceof OnErrorNotImplementedException) {
396+
return true;
397+
}
398+
// the sender didn't honor the request amount
399+
// it's either due to an operator bug or concurrent onNext
400+
if (error instanceof MissingBackpressureException) {
401+
return true;
402+
}
403+
// general protocol violations
404+
// it's either due to an operator bug or concurrent onNext
405+
if (error instanceof IllegalStateException) {
406+
return true;
407+
}
408+
// nulls are generally not allowed
409+
// likely an operator bug or missing null-check
410+
if (error instanceof NullPointerException) {
411+
return true;
412+
}
413+
// bad arguments, likely invalid user input
414+
if (error instanceof IllegalArgumentException) {
415+
return true;
416+
}
417+
// Crash while handling an exception
418+
if (error instanceof CompositeException) {
419+
return true;
420+
}
421+
// everything else is probably due to lifecycle limits
422+
return false;
423+
}
424+
380425
static void uncaught(@NonNull Throwable error) {
381426
Thread currentThread = Thread.currentThread();
382427
UncaughtExceptionHandler handler = currentThread.getUncaughtExceptionHandler();

src/test/java/io/reactivex/TestHelper.java

+40-2
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,21 @@ public static void assertError(List<Throwable> list, int index, Class<? extends
154154
}
155155
}
156156

157+
public static void assertUndeliverable(List<Throwable> list, int index, Class<? extends Throwable> clazz) {
158+
Throwable ex = list.get(index);
159+
if (!(ex instanceof UndeliverableException)) {
160+
AssertionError err = new AssertionError("Outer exception UndeliverableException expected but got " + list.get(index));
161+
err.initCause(list.get(index));
162+
throw err;
163+
}
164+
ex = ex.getCause();
165+
if (!clazz.isInstance(ex)) {
166+
AssertionError err = new AssertionError("Inner exception " + clazz + " expected but got " + list.get(index));
167+
err.initCause(list.get(index));
168+
throw err;
169+
}
170+
}
171+
157172
public static void assertError(List<Throwable> list, int index, Class<? extends Throwable> clazz, String message) {
158173
Throwable ex = list.get(index);
159174
if (!clazz.isInstance(ex)) {
@@ -168,6 +183,26 @@ public static void assertError(List<Throwable> list, int index, Class<? extends
168183
}
169184
}
170185

186+
public static void assertUndeliverable(List<Throwable> list, int index, Class<? extends Throwable> clazz, String message) {
187+
Throwable ex = list.get(index);
188+
if (!(ex instanceof UndeliverableException)) {
189+
AssertionError err = new AssertionError("Outer exception UndeliverableException expected but got " + list.get(index));
190+
err.initCause(list.get(index));
191+
throw err;
192+
}
193+
ex = ex.getCause();
194+
if (!clazz.isInstance(ex)) {
195+
AssertionError err = new AssertionError("Inner exception " + clazz + " expected but got " + list.get(index));
196+
err.initCause(list.get(index));
197+
throw err;
198+
}
199+
if (!ObjectHelper.equals(message, ex.getMessage())) {
200+
AssertionError err = new AssertionError("Message " + message + " expected but got " + ex.getMessage());
201+
err.initCause(ex);
202+
throw err;
203+
}
204+
}
205+
171206
public static void assertError(TestObserver<?> ts, int index, Class<? extends Throwable> clazz) {
172207
Throwable ex = ts.errors().get(0);
173208
try {
@@ -386,6 +421,9 @@ public void run() {
386421
* @return the list of Throwables
387422
*/
388423
public static List<Throwable> compositeList(Throwable ex) {
424+
if (ex instanceof UndeliverableException) {
425+
ex = ex.getCause();
426+
}
389427
return ((CompositeException)ex).getExceptions();
390428
}
391429

@@ -2428,7 +2466,7 @@ protected void subscribeActual(Observer<? super T> observer) {
24282466
}
24292467
}
24302468

2431-
assertError(errors, 0, TestException.class, "second");
2469+
assertUndeliverable(errors, 0, TestException.class, "second");
24322470
} catch (AssertionError ex) {
24332471
throw ex;
24342472
} catch (Throwable ex) {
@@ -2587,7 +2625,7 @@ protected void subscribeActual(Subscriber<? super T> observer) {
25872625
}
25882626
}
25892627

2590-
assertError(errors, 0, TestException.class, "second");
2628+
assertUndeliverable(errors, 0, TestException.class, "second");
25912629
} catch (AssertionError ex) {
25922630
throw ex;
25932631
} catch (Throwable ex) {

src/test/java/io/reactivex/flowable/FlowableCollectTest.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,9 @@ public void testCollectorFailureDoesNotResultInTwoErrorEmissionsFlowable() {
120120
.test() //
121121
.assertError(e1) //
122122
.assertNotComplete();
123-
assertEquals(Arrays.asList(e2), list);
123+
124+
assertEquals(1, list.size());
125+
assertEquals(e2, list.get(0).getCause());
124126
} finally {
125127
RxJavaPlugins.reset();
126128
}
@@ -272,7 +274,9 @@ public void testCollectorFailureDoesNotResultInTwoErrorEmissions() {
272274
.test() //
273275
.assertError(e1) //
274276
.assertNotComplete();
275-
assertEquals(Arrays.asList(e2), list);
277+
278+
assertEquals(1, list.size());
279+
assertEquals(e2, list.get(0).getCause());
276280
} finally {
277281
RxJavaPlugins.reset();
278282
}

src/test/java/io/reactivex/flowable/FlowableScanTests.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,16 @@
1313

1414
package io.reactivex.flowable;
1515

16-
import static org.junit.Assert.assertEquals;
16+
import static org.junit.Assert.*;
1717

1818
import java.util.*;
1919
import java.util.concurrent.*;
2020
import java.util.concurrent.atomic.AtomicInteger;
2121

2222
import org.junit.Test;
2323

24-
import io.reactivex.Flowable;
24+
import io.reactivex.*;
25+
import io.reactivex.exceptions.UndeliverableException;
2526
import io.reactivex.flowable.FlowableEventStream.Event;
2627
import io.reactivex.functions.*;
2728
import io.reactivex.plugins.RxJavaPlugins;
@@ -65,7 +66,10 @@ public void accept(Throwable t) throws Exception {
6566
.test()
6667
.assertNoValues()
6768
.assertError(e);
68-
assertEquals(Arrays.asList(e2), list);
69+
70+
assertEquals("" + list, 1, list.size());
71+
assertTrue("" + list, list.get(0) instanceof UndeliverableException);
72+
assertEquals(e2, list.get(0).getCause());
6973
} finally {
7074
RxJavaPlugins.reset();
7175
}
@@ -142,7 +146,10 @@ public void accept(Throwable t) throws Exception {
142146
.test()
143147
.assertValue(1)
144148
.assertError(e);
145-
assertEquals(Arrays.asList(e2), list);
149+
150+
assertEquals("" + list, 1, list.size());
151+
assertTrue("" + list, list.get(0) instanceof UndeliverableException);
152+
assertEquals(e2, list.get(0).getCause());
146153
} finally {
147154
RxJavaPlugins.reset();
148155
}

src/test/java/io/reactivex/flowable/FlowableSubscriberTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -722,7 +722,7 @@ public void run() throws Exception {
722722

723723
s.onComplete();
724724

725-
TestHelper.assertError(list, 0, TestException.class, "Inner");
725+
TestHelper.assertUndeliverable(list, 0, TestException.class, "Inner");
726726
} finally {
727727
RxJavaPlugins.reset();
728728
}

src/test/java/io/reactivex/internal/disposables/CancellableDisposableTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public void cancel() throws Exception {
7272
cd.dispose();
7373
cd.dispose();
7474

75-
TestHelper.assertError(list, 0, TestException.class);
75+
TestHelper.assertUndeliverable(list, 0, TestException.class);
7676
} finally {
7777
RxJavaPlugins.reset();
7878
}

src/test/java/io/reactivex/internal/disposables/ObserverFullArbiterTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public void errorAfterCancel() {
7373
try {
7474
fa.onError(new TestException(), bs);
7575

76-
TestHelper.assertError(errors, 0, TestException.class);
76+
TestHelper.assertUndeliverable(errors, 0, TestException.class);
7777
} finally {
7878
RxJavaPlugins.reset();
7979
}
@@ -90,7 +90,7 @@ public void cancelAfterError() {
9090
fa.dispose();
9191

9292
fa.drain();
93-
TestHelper.assertError(errors, 0, TestException.class);
93+
TestHelper.assertUndeliverable(errors, 0, TestException.class);
9494
} finally {
9595
RxJavaPlugins.reset();
9696
}

src/test/java/io/reactivex/internal/observers/LambdaObserverTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ public void accept(Disposable s) throws Exception {
185185

186186
assertTrue(o.isDisposed());
187187

188-
TestHelper.assertError(errors, 0, TestException.class);
188+
TestHelper.assertUndeliverable(errors, 0, TestException.class);
189189
} finally {
190190
RxJavaPlugins.reset();
191191
}

src/test/java/io/reactivex/internal/operators/completable/CompletableAmbTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public void run() {
100100
to.assertFailure(TestException.class);
101101

102102
if (!errors.isEmpty()) {
103-
TestHelper.assertError(errors, 0, TestException.class);
103+
TestHelper.assertUndeliverable(errors, 0, TestException.class);
104104
}
105105
} finally {
106106
RxJavaPlugins.reset();

src/test/java/io/reactivex/internal/operators/completable/CompletableConcatTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public void run() {
104104
to.assertFailure(TestException.class);
105105

106106
if (!errors.isEmpty()) {
107-
TestHelper.assertError(errors, 0, TestException.class);
107+
TestHelper.assertUndeliverable(errors, 0, TestException.class);
108108
}
109109
} finally {
110110
RxJavaPlugins.reset();

src/test/java/io/reactivex/internal/operators/completable/CompletableDisposeOnTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public void errorAfterCancel() {
8888

8989
to.assertEmpty();
9090

91-
TestHelper.assertError(errors, 0, TestException.class);
91+
TestHelper.assertUndeliverable(errors, 0, TestException.class);
9292
} finally {
9393
RxJavaPlugins.reset();
9494
}

0 commit comments

Comments
 (0)