Skip to content

Commit 090f99e

Browse files
authored
2.x: Fix the error/race in Obs.repeatWhen due to flooding repeat signal (ReactiveX#6359)
1 parent 2e0c8b5 commit 090f99e

File tree

5 files changed

+229
-1
lines changed

5 files changed

+229
-1
lines changed

src/main/java/io/reactivex/internal/operators/observable/ObservableRepeatWhen.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,8 @@ public void onError(Throwable e) {
108108

109109
@Override
110110
public void onComplete() {
111-
active = false;
112111
DisposableHelper.replace(upstream, null);
112+
active = false;
113113
signaller.onNext(0);
114114
}
115115

src/test/java/io/reactivex/internal/operators/flowable/FlowableRepeatTest.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.reactivex.exceptions.TestException;
3030
import io.reactivex.functions.*;
3131
import io.reactivex.internal.subscriptions.BooleanSubscription;
32+
import io.reactivex.plugins.RxJavaPlugins;
3233
import io.reactivex.processors.PublishProcessor;
3334
import io.reactivex.schedulers.Schedulers;
3435
import io.reactivex.subscribers.TestSubscriber;
@@ -441,4 +442,56 @@ public boolean test(Object v) throws Exception {
441442

442443
assertEquals(0, counter.get());
443444
}
445+
446+
@Test
447+
public void repeatFloodNoSubscriptionError() {
448+
List<Throwable> errors = TestHelper.trackPluginErrors();
449+
450+
try {
451+
final PublishProcessor<Integer> source = PublishProcessor.create();
452+
final PublishProcessor<Integer> signaller = PublishProcessor.create();
453+
454+
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
455+
456+
TestSubscriber<Integer> ts = source.take(1)
457+
.repeatWhen(new Function<Flowable<Object>, Flowable<Integer>>() {
458+
@Override
459+
public Flowable<Integer> apply(Flowable<Object> v)
460+
throws Exception {
461+
return signaller;
462+
}
463+
}).test();
464+
465+
Runnable r1 = new Runnable() {
466+
@Override
467+
public void run() {
468+
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
469+
source.onNext(1);
470+
}
471+
}
472+
};
473+
Runnable r2 = new Runnable() {
474+
@Override
475+
public void run() {
476+
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
477+
signaller.offer(1);
478+
}
479+
}
480+
};
481+
482+
TestHelper.race(r1, r2);
483+
484+
ts.dispose();
485+
}
486+
487+
if (!errors.isEmpty()) {
488+
for (Throwable e : errors) {
489+
e.printStackTrace();
490+
}
491+
fail(errors + "");
492+
}
493+
} finally {
494+
RxJavaPlugins.reset();
495+
}
496+
}
444497
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableRetryTest.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.reactivex.functions.*;
3333
import io.reactivex.internal.functions.Functions;
3434
import io.reactivex.internal.subscriptions.BooleanSubscription;
35+
import io.reactivex.plugins.RxJavaPlugins;
3536
import io.reactivex.processors.PublishProcessor;
3637
import io.reactivex.schedulers.Schedulers;
3738
import io.reactivex.subscribers.*;
@@ -1221,4 +1222,64 @@ public boolean test(Object v) throws Exception {
12211222

12221223
assertEquals(0, counter.get());
12231224
}
1225+
1226+
@Test
1227+
public void repeatFloodNoSubscriptionError() {
1228+
List<Throwable> errors = TestHelper.trackPluginErrors();
1229+
1230+
final TestException error = new TestException();
1231+
1232+
try {
1233+
final PublishProcessor<Integer> source = PublishProcessor.create();
1234+
final PublishProcessor<Integer> signaller = PublishProcessor.create();
1235+
1236+
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
1237+
1238+
TestSubscriber<Integer> ts = source.take(1)
1239+
.map(new Function<Integer, Integer>() {
1240+
@Override
1241+
public Integer apply(Integer v) throws Exception {
1242+
throw error;
1243+
}
1244+
})
1245+
.retryWhen(new Function<Flowable<Throwable>, Flowable<Integer>>() {
1246+
@Override
1247+
public Flowable<Integer> apply(Flowable<Throwable> v)
1248+
throws Exception {
1249+
return signaller;
1250+
}
1251+
}).test();
1252+
1253+
Runnable r1 = new Runnable() {
1254+
@Override
1255+
public void run() {
1256+
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
1257+
source.onNext(1);
1258+
}
1259+
}
1260+
};
1261+
Runnable r2 = new Runnable() {
1262+
@Override
1263+
public void run() {
1264+
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
1265+
signaller.offer(1);
1266+
}
1267+
}
1268+
};
1269+
1270+
TestHelper.race(r1, r2);
1271+
1272+
ts.dispose();
1273+
}
1274+
1275+
if (!errors.isEmpty()) {
1276+
for (Throwable e : errors) {
1277+
e.printStackTrace();
1278+
}
1279+
fail(errors + "");
1280+
}
1281+
} finally {
1282+
RxJavaPlugins.reset();
1283+
}
1284+
}
12241285
}

src/test/java/io/reactivex/internal/operators/observable/ObservableRepeatTest.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.reactivex.exceptions.TestException;
3131
import io.reactivex.functions.*;
3232
import io.reactivex.observers.TestObserver;
33+
import io.reactivex.plugins.RxJavaPlugins;
3334
import io.reactivex.schedulers.Schedulers;
3435
import io.reactivex.subjects.PublishSubject;
3536

@@ -397,4 +398,56 @@ public boolean test(Object v) throws Exception {
397398

398399
assertEquals(0, counter.get());
399400
}
401+
402+
@Test
403+
public void repeatFloodNoSubscriptionError() {
404+
List<Throwable> errors = TestHelper.trackPluginErrors();
405+
406+
try {
407+
final PublishSubject<Integer> source = PublishSubject.create();
408+
final PublishSubject<Integer> signaller = PublishSubject.create();
409+
410+
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
411+
412+
TestObserver<Integer> to = source.take(1)
413+
.repeatWhen(new Function<Observable<Object>, ObservableSource<Integer>>() {
414+
@Override
415+
public ObservableSource<Integer> apply(Observable<Object> v)
416+
throws Exception {
417+
return signaller;
418+
}
419+
}).test();
420+
421+
Runnable r1 = new Runnable() {
422+
@Override
423+
public void run() {
424+
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
425+
source.onNext(1);
426+
}
427+
}
428+
};
429+
Runnable r2 = new Runnable() {
430+
@Override
431+
public void run() {
432+
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
433+
signaller.onNext(1);
434+
}
435+
}
436+
};
437+
438+
TestHelper.race(r1, r2);
439+
440+
to.dispose();
441+
}
442+
443+
if (!errors.isEmpty()) {
444+
for (Throwable e : errors) {
445+
e.printStackTrace();
446+
}
447+
fail(errors + "");
448+
}
449+
} finally {
450+
RxJavaPlugins.reset();
451+
}
452+
}
400453
}

src/test/java/io/reactivex/internal/operators/observable/ObservableRetryTest.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.reactivex.internal.subscriptions.BooleanSubscription;
3535
import io.reactivex.observables.GroupedObservable;
3636
import io.reactivex.observers.*;
37+
import io.reactivex.plugins.RxJavaPlugins;
3738
import io.reactivex.schedulers.Schedulers;
3839
import io.reactivex.subjects.PublishSubject;
3940

@@ -1131,4 +1132,64 @@ public boolean test(Object v) throws Exception {
11311132

11321133
assertEquals(0, counter.get());
11331134
}
1135+
1136+
@Test
1137+
public void repeatFloodNoSubscriptionError() {
1138+
List<Throwable> errors = TestHelper.trackPluginErrors();
1139+
1140+
final TestException error = new TestException();
1141+
1142+
try {
1143+
final PublishSubject<Integer> source = PublishSubject.create();
1144+
final PublishSubject<Integer> signaller = PublishSubject.create();
1145+
1146+
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
1147+
1148+
TestObserver<Integer> to = source.take(1)
1149+
.map(new Function<Integer, Integer>() {
1150+
@Override
1151+
public Integer apply(Integer v) throws Exception {
1152+
throw error;
1153+
}
1154+
})
1155+
.retryWhen(new Function<Observable<Throwable>, ObservableSource<Integer>>() {
1156+
@Override
1157+
public ObservableSource<Integer> apply(Observable<Throwable> v)
1158+
throws Exception {
1159+
return signaller;
1160+
}
1161+
}).test();
1162+
1163+
Runnable r1 = new Runnable() {
1164+
@Override
1165+
public void run() {
1166+
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
1167+
source.onNext(1);
1168+
}
1169+
}
1170+
};
1171+
Runnable r2 = new Runnable() {
1172+
@Override
1173+
public void run() {
1174+
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
1175+
signaller.onNext(1);
1176+
}
1177+
}
1178+
};
1179+
1180+
TestHelper.race(r1, r2);
1181+
1182+
to.dispose();
1183+
}
1184+
1185+
if (!errors.isEmpty()) {
1186+
for (Throwable e : errors) {
1187+
e.printStackTrace();
1188+
}
1189+
fail(errors + "");
1190+
}
1191+
} finally {
1192+
RxJavaPlugins.reset();
1193+
}
1194+
}
11341195
}

0 commit comments

Comments
 (0)