Skip to content

Commit df73d03

Browse files
authored
3.x: Fix ReplaySubject termination-subscription race emitting wrongly (#7879)
1 parent be0bd27 commit df73d03

File tree

6 files changed

+171
-52
lines changed

6 files changed

+171
-52
lines changed

src/main/java/io/reactivex/rxjava3/subjects/ReplaySubject.java

Lines changed: 33 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -652,8 +652,6 @@ static final class UnboundedReplayBuffer<T>
652652

653653
final List<Object> buffer;
654654

655-
volatile boolean done;
656-
657655
volatile int size;
658656

659657
UnboundedReplayBuffer(int capacityHint) {
@@ -671,7 +669,6 @@ public void addFinal(Object notificationLite) {
671669
buffer.add(notificationLite);
672670
trimHead();
673671
size++;
674-
done = true;
675672
}
676673

677674
@Override
@@ -772,20 +769,17 @@ public void replay(ReplayDisposable<T> rs) {
772769

773770
Object o = b.get(index);
774771

775-
if (done) {
776-
if (index + 1 == s) {
777-
s = size;
778-
if (index + 1 == s) {
779-
if (NotificationLite.isComplete(o)) {
780-
a.onComplete();
781-
} else {
782-
a.onError(NotificationLite.getError(o));
783-
}
784-
rs.index = null;
785-
rs.cancelled = true;
786-
return;
787-
}
788-
}
772+
if (NotificationLite.isComplete(o)) {
773+
a.onComplete();
774+
rs.index = null;
775+
rs.cancelled = true;
776+
return;
777+
} else
778+
if (NotificationLite.isError(o)) {
779+
a.onError(NotificationLite.getError(o));
780+
rs.index = null;
781+
rs.cancelled = true;
782+
return;
789783
}
790784

791785
a.onNext((T)o);
@@ -856,8 +850,6 @@ static final class SizeBoundReplayBuffer<T>
856850

857851
Node<Object> tail;
858852

859-
volatile boolean done;
860-
861853
SizeBoundReplayBuffer(int maxSize) {
862854
this.maxSize = maxSize;
863855
Node<Object> h = new Node<>(null);
@@ -895,7 +887,6 @@ public void addFinal(Object notificationLite) {
895887
t.lazySet(n); // releases both the tail and size
896888

897889
trimHead();
898-
done = true;
899890
}
900891

901892
/**
@@ -1000,18 +991,17 @@ public void replay(ReplayDisposable<T> rs) {
1000991

1001992
Object o = n.value;
1002993

1003-
if (done) {
1004-
if (n.get() == null) {
1005-
1006-
if (NotificationLite.isComplete(o)) {
1007-
a.onComplete();
1008-
} else {
1009-
a.onError(NotificationLite.getError(o));
1010-
}
1011-
rs.index = null;
1012-
rs.cancelled = true;
1013-
return;
1014-
}
994+
if (NotificationLite.isComplete(o)) {
995+
a.onComplete();
996+
rs.index = null;
997+
rs.cancelled = true;
998+
return;
999+
} else
1000+
if (NotificationLite.isError(o)) {
1001+
a.onError(NotificationLite.getError(o));
1002+
rs.index = null;
1003+
rs.cancelled = true;
1004+
return;
10151005
}
10161006

10171007
a.onNext((T)o);
@@ -1069,8 +1059,6 @@ static final class SizeAndTimeBoundReplayBuffer<T>
10691059

10701060
TimedNode<Object> tail;
10711061

1072-
volatile boolean done;
1073-
10741062
SizeAndTimeBoundReplayBuffer(int maxSize, long maxAge, TimeUnit unit, Scheduler scheduler) {
10751063
this.maxSize = maxSize;
10761064
this.maxAge = maxAge;
@@ -1163,8 +1151,6 @@ public void addFinal(Object notificationLite) {
11631151
size++;
11641152
t.lazySet(n); // releases both the tail and size
11651153
trimFinal();
1166-
1167-
done = true;
11681154
}
11691155

11701156
/**
@@ -1290,18 +1276,17 @@ public void replay(ReplayDisposable<T> rs) {
12901276

12911277
Object o = n.value;
12921278

1293-
if (done) {
1294-
if (n.get() == null) {
1295-
1296-
if (NotificationLite.isComplete(o)) {
1297-
a.onComplete();
1298-
} else {
1299-
a.onError(NotificationLite.getError(o));
1300-
}
1301-
rs.index = null;
1302-
rs.cancelled = true;
1303-
return;
1304-
}
1279+
if (NotificationLite.isComplete(o)) {
1280+
a.onComplete();
1281+
rs.index = null;
1282+
rs.cancelled = true;
1283+
return;
1284+
} else
1285+
if (NotificationLite.isError(o)) {
1286+
a.onError(NotificationLite.getError(o));
1287+
rs.index = null;
1288+
rs.cancelled = true;
1289+
return;
13051290
}
13061291

13071292
a.onNext((T)o);

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableAmbTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,8 @@ public void disposed() {
402402

403403
@Test
404404
public void manySources() {
405-
Flowable<?>[] a = new Flowable[32];
405+
@SuppressWarnings("unchecked")
406+
Flowable<Object>[] a = new Flowable[32];
406407
Arrays.fill(a, Flowable.never());
407408
a[31] = Flowable.just(1);
408409

src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmbTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,8 @@ public void ambArraySingleElement() {
235235

236236
@Test
237237
public void manySources() {
238-
Observable<?>[] a = new Observable[32];
238+
@SuppressWarnings("unchecked")
239+
Observable<Object>[] a = new Observable[32];
239240
Arrays.fill(a, Observable.never());
240241
a[31] = Observable.just(1);
241242

src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleAmbTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,8 @@ public void run() {
249249

250250
@Test
251251
public void manySources() {
252-
Single<?>[] sources = new Single[32];
252+
@SuppressWarnings("unchecked")
253+
Single<Object>[] sources = new Single[32];
253254
Arrays.fill(sources, Single.never());
254255
sources[31] = Single.just(31);
255256

src/test/java/io/reactivex/rxjava3/processors/ReplayProcessorTest.java

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1832,4 +1832,69 @@ public void timeAndSizeRemoveCorrectNumberOfOld() {
18321832

18331833
rp.test().assertValuesOnly(4, 5);
18341834
}
1835-
}
1835+
1836+
@Test
1837+
public void terminationSubscriptionRaceUnbounded() throws Throwable {
1838+
for (int i = 1; i <= 10000; i++) {
1839+
ReplayProcessor<String> source = ReplayProcessor.create();
1840+
PublishProcessor<String> sink = PublishProcessor.create();
1841+
TestSubscriber<String> subscriber = sink.test();
1842+
Schedulers.computation().scheduleDirect(() -> {
1843+
// issue signals to the source in adherence to the reactive streams specification
1844+
source.onSubscribe(new BooleanSubscription());
1845+
source.onNext("hello");
1846+
source.onNext("world");
1847+
source.onComplete();
1848+
});
1849+
Schedulers.computation().scheduleDirect(() -> {
1850+
// connect the source to the sink in parallel with the signals issued to the source
1851+
// note the cast() operator, which is here to detect non-String escapees
1852+
source.cast(String.class).subscribe(sink);
1853+
});
1854+
subscriber.await().assertValues("hello", "world").assertComplete();
1855+
}
1856+
}
1857+
1858+
@Test
1859+
public void terminationSubscriptionRaceSizeBound() throws Throwable {
1860+
for (int i = 1; i <= 10000; i++) {
1861+
ReplayProcessor<String> source = ReplayProcessor.createWithSize(20);
1862+
PublishProcessor<String> sink = PublishProcessor.create();
1863+
TestSubscriber<String> subscriber = sink.test();
1864+
Schedulers.computation().scheduleDirect(() -> {
1865+
// issue signals to the source in adherence to the reactive streams specification
1866+
source.onSubscribe(new BooleanSubscription());
1867+
source.onNext("hello");
1868+
source.onNext("world");
1869+
source.onComplete();
1870+
});
1871+
Schedulers.computation().scheduleDirect(() -> {
1872+
// connect the source to the sink in parallel with the signals issued to the source
1873+
// note the cast() operator, which is here to detect non-String escapees
1874+
source.cast(String.class).subscribe(sink);
1875+
});
1876+
subscriber.await().assertValues("hello", "world").assertComplete();
1877+
}
1878+
}
1879+
1880+
@Test
1881+
public void terminationSubscriptionRaceTimeBound() throws Throwable {
1882+
for (int i = 1; i <= 10000; i++) {
1883+
ReplayProcessor<String> source = ReplayProcessor.createWithTime(20, TimeUnit.MINUTES, Schedulers.computation());
1884+
PublishProcessor<String> sink = PublishProcessor.create();
1885+
TestSubscriber<String> subscriber = sink.test();
1886+
Schedulers.computation().scheduleDirect(() -> {
1887+
// issue signals to the source in adherence to the reactive streams specification
1888+
source.onSubscribe(new BooleanSubscription());
1889+
source.onNext("hello");
1890+
source.onNext("world");
1891+
source.onComplete();
1892+
});
1893+
Schedulers.computation().scheduleDirect(() -> {
1894+
// connect the source to the sink in parallel with the signals issued to the source
1895+
// note the cast() operator, which is here to detect non-String escapees
1896+
source.cast(String.class).subscribe(sink);
1897+
});
1898+
subscriber.await().assertValues("hello", "world").assertComplete();
1899+
}
1900+
}}

src/test/java/io/reactivex/rxjava3/subjects/ReplaySubjectTest.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1378,4 +1378,70 @@ public void timeAndSizeRemoveCorrectNumberOfOld() {
13781378

13791379
rs.test().assertValuesOnly(4, 5);
13801380
}
1381+
1382+
@Test
1383+
public void terminationSubscriptionRaceUnbounded() throws Throwable {
1384+
for (int i = 1; i <= 10000; i++) {
1385+
Subject<String> source = ReplaySubject.create();
1386+
Subject<String> sink = PublishSubject.create();
1387+
TestObserver<String> observer = sink.test();
1388+
Schedulers.computation().scheduleDirect(() -> {
1389+
// issue signals to the source in adherence to the reactive streams specification
1390+
source.onSubscribe(Disposable.empty());
1391+
source.onNext("hello");
1392+
source.onNext("world");
1393+
source.onComplete();
1394+
});
1395+
Schedulers.computation().scheduleDirect(() -> {
1396+
// connect the source to the sink in parallel with the signals issued to the source
1397+
// note the cast() operator, which is here to detect non-String escapees
1398+
source.cast(String.class).subscribe(sink);
1399+
});
1400+
observer.await().assertValues("hello", "world").assertComplete();
1401+
}
1402+
}
1403+
1404+
@Test
1405+
public void terminationSubscriptionRaceSizeBound() throws Throwable {
1406+
for (int i = 1; i <= 10000; i++) {
1407+
Subject<String> source = ReplaySubject.createWithSize(20);
1408+
Subject<String> sink = PublishSubject.create();
1409+
TestObserver<String> observer = sink.test();
1410+
Schedulers.computation().scheduleDirect(() -> {
1411+
// issue signals to the source in adherence to the reactive streams specification
1412+
source.onSubscribe(Disposable.empty());
1413+
source.onNext("hello");
1414+
source.onNext("world");
1415+
source.onComplete();
1416+
});
1417+
Schedulers.computation().scheduleDirect(() -> {
1418+
// connect the source to the sink in parallel with the signals issued to the source
1419+
// note the cast() operator, which is here to detect non-String escapees
1420+
source.cast(String.class).subscribe(sink);
1421+
});
1422+
observer.await().assertValues("hello", "world").assertComplete();
1423+
}
1424+
}
1425+
1426+
@Test
1427+
public void terminationSubscriptionRaceTimeBound() throws Throwable {
1428+
for (int i = 1; i <= 10000; i++) {
1429+
Subject<String> source = ReplaySubject.createWithTime(20, TimeUnit.MINUTES, Schedulers.computation());
1430+
Subject<String> sink = PublishSubject.create();
1431+
TestObserver<String> observer = sink.test();
1432+
Schedulers.computation().scheduleDirect(() -> {
1433+
// issue signals to the source in adherence to the reactive streams specification
1434+
source.onSubscribe(Disposable.empty());
1435+
source.onNext("hello");
1436+
source.onNext("world");
1437+
source.onComplete();
1438+
});
1439+
Schedulers.computation().scheduleDirect(() -> {
1440+
// connect the source to the sink in parallel with the signals issued to the source
1441+
// note the cast() operator, which is here to detect non-String escapees
1442+
source.cast(String.class).subscribe(sink);
1443+
});
1444+
observer.await().assertValues("hello", "world").assertComplete();
1445+
}
1446+
}
13811447
}

0 commit comments

Comments
 (0)