Skip to content

Commit b34e5f6

Browse files
authored
3.x: Fix switchMap not canceling properly during onNext-cancel races (ReactiveX#6917)
1 parent 042dee3 commit b34e5f6

File tree

3 files changed

+82
-3
lines changed

3 files changed

+82
-3
lines changed

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSwitchMap.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,6 @@ void drain() {
198198
for (;;) {
199199

200200
if (cancelled) {
201-
active.lazySet(null);
202201
return;
203202
}
204203

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSwitchTest.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import java.util.*;
2121
import java.util.concurrent.TimeUnit;
22-
import java.util.concurrent.atomic.AtomicBoolean;
22+
import java.util.concurrent.atomic.*;
2323

2424
import org.junit.*;
2525
import org.mockito.InOrder;
@@ -1337,4 +1337,44 @@ public void innerCompletedBackpressureBoundaryDelayError() {
13371337

13381338
ts.assertValuesOnly(1);
13391339
}
1340+
1341+
@Test
1342+
public void cancellationShouldTriggerInnerCancellationRace() throws Throwable {
1343+
AtomicInteger outer = new AtomicInteger();
1344+
AtomicInteger inner = new AtomicInteger();
1345+
1346+
int n = 10_000;
1347+
for (int i = 0; i < n; i++) {
1348+
Flowable.<Integer>create(it -> {
1349+
it.onNext(0);
1350+
}, BackpressureStrategy.MISSING)
1351+
.switchMap(v -> createFlowable(inner))
1352+
.observeOn(Schedulers.computation())
1353+
.doFinally(() -> {
1354+
outer.incrementAndGet();
1355+
})
1356+
.take(1)
1357+
.blockingSubscribe(v -> { }, Throwable::printStackTrace);
1358+
}
1359+
1360+
Thread.sleep(100);
1361+
assertEquals(inner.get(), outer.get());
1362+
assertEquals(n, inner.get());
1363+
}
1364+
1365+
Flowable<Integer> createFlowable(AtomicInteger inner) {
1366+
return Flowable.<Integer>unsafeCreate(s -> {
1367+
SerializedSubscriber<Integer> it = new SerializedSubscriber<>(s);
1368+
it.onSubscribe(new BooleanSubscription());
1369+
Schedulers.io().scheduleDirect(() -> {
1370+
it.onNext(1);
1371+
}, 0, TimeUnit.MILLISECONDS);
1372+
Schedulers.io().scheduleDirect(() -> {
1373+
it.onNext(2);
1374+
}, 0, TimeUnit.MILLISECONDS);
1375+
})
1376+
.doFinally(() -> {
1377+
inner.incrementAndGet();
1378+
});
1379+
}
13401380
}

src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSwitchTest.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import io.reactivex.rxjava3.internal.functions.Functions;
3535
import io.reactivex.rxjava3.internal.schedulers.ImmediateThinScheduler;
3636
import io.reactivex.rxjava3.internal.util.ExceptionHelper;
37-
import io.reactivex.rxjava3.observers.TestObserver;
37+
import io.reactivex.rxjava3.observers.*;
3838
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
3939
import io.reactivex.rxjava3.schedulers.*;
4040
import io.reactivex.rxjava3.subjects.PublishSubject;
@@ -1398,4 +1398,44 @@ public void innerIgnoresCancelAndErrors() throws Throwable {
13981398
TestHelper.assertUndeliverable(errors, 0, TestException.class);
13991399
});
14001400
}
1401+
1402+
@Test
1403+
public void cancellationShouldTriggerInnerCancellationRace() throws Throwable {
1404+
AtomicInteger outer = new AtomicInteger();
1405+
AtomicInteger inner = new AtomicInteger();
1406+
1407+
int n = 10_000;
1408+
for (int i = 0; i < n; i++) {
1409+
Observable.<Integer>create(it -> {
1410+
it.onNext(0);
1411+
})
1412+
.switchMap(v -> createObservable(inner))
1413+
.observeOn(Schedulers.computation())
1414+
.doFinally(() -> {
1415+
outer.incrementAndGet();
1416+
})
1417+
.take(1)
1418+
.blockingSubscribe(v -> { }, Throwable::printStackTrace);
1419+
}
1420+
1421+
Thread.sleep(100);
1422+
assertEquals(inner.get(), outer.get());
1423+
assertEquals(n, inner.get());
1424+
}
1425+
1426+
Observable<Integer> createObservable(AtomicInteger inner) {
1427+
return Observable.<Integer>unsafeCreate(s -> {
1428+
SerializedObserver<Integer> it = new SerializedObserver<>(s);
1429+
it.onSubscribe(Disposable.empty());
1430+
Schedulers.io().scheduleDirect(() -> {
1431+
it.onNext(1);
1432+
}, 0, TimeUnit.MILLISECONDS);
1433+
Schedulers.io().scheduleDirect(() -> {
1434+
it.onNext(2);
1435+
}, 0, TimeUnit.MILLISECONDS);
1436+
})
1437+
.doFinally(() -> {
1438+
inner.incrementAndGet();
1439+
});
1440+
}
14011441
}

0 commit comments

Comments
 (0)