Skip to content

Commit f059ded

Browse files
authored
2.x: fix LambdaObserver not cancelling the upstream (ReactiveX#5170)
1 parent b5501c5 commit f059ded

File tree

6 files changed

+139
-13
lines changed

6 files changed

+139
-13
lines changed

src/main/java/io/reactivex/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10894,7 +10894,7 @@ public final <R> Observable<R> switchMap(Function<? super T, ? extends Observabl
1089410894
public final <R> Observable<R> switchMapSingle(@NonNull Function<? super T, ? extends SingleSource<? extends R>> mapper) {
1089510895
return ObservableInternalHelper.switchMapSingle(this, mapper);
1089610896
}
10897-
10897+
1089810898
/**
1089910899
* Returns a new ObservableSource by applying a function that you supply to each item emitted by the source
1090010900
* ObservableSource that returns a SingleSource, and then emitting the item emitted by the most recently emitted
@@ -10925,7 +10925,7 @@ public final <R> Observable<R> switchMapSingle(@NonNull Function<? super T, ? ex
1092510925
public final <R> Observable<R> switchMapSingleDelayError(@NonNull Function<? super T, ? extends SingleSource<? extends R>> mapper) {
1092610926
return ObservableInternalHelper.switchMapSingleDelayError(this, mapper);
1092710927
}
10928-
10928+
1092910929
/**
1093010930
* Returns a new ObservableSource by applying a function that you supply to each item emitted by the source
1093110931
* ObservableSource that returns an ObservableSource, and then emitting the items emitted by the most recently emitted

src/main/java/io/reactivex/internal/observers/LambdaObserver.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public void onSubscribe(Disposable s) {
4747
onSubscribe.accept(this);
4848
} catch (Throwable ex) {
4949
Exceptions.throwIfFatal(ex);
50+
s.dispose();
5051
onError(ex);
5152
}
5253
}
@@ -59,6 +60,7 @@ public void onNext(T t) {
5960
onNext.accept(t);
6061
} catch (Throwable e) {
6162
Exceptions.throwIfFatal(e);
63+
get().dispose();
6264
onError(e);
6365
}
6466
}

src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -326,27 +326,27 @@ public static <T,R> Observable<R> switchMapSingleDelayError(Observable<T> source
326326
Function<? super T, ? extends SingleSource<? extends R>> mapper) {
327327
return source.switchMapDelayError(convertSingleMapperToObservableMapper(mapper), 1);
328328
}
329-
329+
330330
private static <T, R> Function<T, Observable<R>> convertSingleMapperToObservableMapper(
331331
final Function<? super T, ? extends SingleSource<? extends R>> mapper) {
332332
ObjectHelper.requireNonNull(mapper, "mapper is null");
333333
return new ObservableMapper<T,R>(mapper);
334334
}
335-
335+
336336
static final class ObservableMapper<T,R> implements Function<T,Observable<R>> {
337-
337+
338338
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
339339

340340
ObservableMapper(Function<? super T, ? extends SingleSource<? extends R>> mapper) {
341341
this.mapper = mapper;
342342
}
343-
343+
344344
@Override
345345
public Observable<R> apply(T t) throws Exception {
346346
return RxJavaPlugins.onAssembly(new SingleToObservable<R>(
347347
ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null value")));
348348
}
349-
349+
350350
}
351351

352352
}

src/test/java/io/reactivex/internal/observers/LambdaObserverTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.reactivex.exceptions.*;
2727
import io.reactivex.functions.*;
2828
import io.reactivex.plugins.RxJavaPlugins;
29+
import io.reactivex.subjects.PublishSubject;
2930

3031
public class LambdaObserverTest {
3132

@@ -280,4 +281,65 @@ public void accept(Disposable s) throws Exception {
280281

281282
assertEquals(Arrays.asList(1, 100), received);
282283
}
284+
285+
@Test
286+
public void onNextThrowsCancelsUpstream() {
287+
PublishSubject<Integer> ps = PublishSubject.create();
288+
289+
final List<Throwable> errors = new ArrayList<Throwable>();
290+
291+
ps.subscribe(new Consumer<Integer>() {
292+
@Override
293+
public void accept(Integer v) throws Exception {
294+
throw new TestException();
295+
}
296+
}, new Consumer<Throwable>() {
297+
@Override
298+
public void accept(Throwable e) throws Exception {
299+
errors.add(e);
300+
}
301+
});
302+
303+
assertTrue("No observers?!", ps.hasObservers());
304+
assertTrue("Has errors already?!", errors.isEmpty());
305+
306+
ps.onNext(1);
307+
308+
assertFalse("Has observers?!", ps.hasObservers());
309+
assertFalse("No errors?!", errors.isEmpty());
310+
311+
assertTrue(errors.toString(), errors.get(0) instanceof TestException);
312+
}
313+
314+
@Test
315+
public void onSubscribeThrowsCancelsUpstream() {
316+
PublishSubject<Integer> ps = PublishSubject.create();
317+
318+
final List<Throwable> errors = new ArrayList<Throwable>();
319+
320+
ps.subscribe(new Consumer<Integer>() {
321+
@Override
322+
public void accept(Integer v) throws Exception {
323+
}
324+
}, new Consumer<Throwable>() {
325+
@Override
326+
public void accept(Throwable e) throws Exception {
327+
errors.add(e);
328+
}
329+
}, new Action() {
330+
@Override
331+
public void run() throws Exception {
332+
}
333+
}, new Consumer<Disposable>() {
334+
@Override
335+
public void accept(Disposable s) throws Exception {
336+
throw new TestException();
337+
}
338+
});
339+
340+
assertFalse("Has observers?!", ps.hasObservers());
341+
assertFalse("No errors?!", errors.isEmpty());
342+
343+
assertTrue(errors.toString(), errors.get(0) instanceof TestException);
344+
}
283345
}

src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,7 @@ public void switchMapInnerCancelled() {
622622

623623
assertFalse(pp.hasObservers());
624624
}
625-
625+
626626
@Test
627627
public void switchMapSingleJustSource() {
628628
Observable.just(0)
@@ -635,7 +635,7 @@ public SingleSource<Integer> apply(Object v) throws Exception {
635635
.test()
636636
.assertResult(1);
637637
}
638-
638+
639639
@Test
640640
public void switchMapSingleMapperReturnsNull() {
641641
Observable.just(0)
@@ -648,13 +648,13 @@ public SingleSource<Integer> apply(Object v) throws Exception {
648648
.test()
649649
.assertError(NullPointerException.class);
650650
}
651-
652-
@Test(expected=NullPointerException.class)
651+
652+
@Test(expected = NullPointerException.class)
653653
public void switchMapSingleMapperIsNull() {
654654
Observable.just(0)
655655
.switchMapSingle(null);
656656
}
657-
657+
658658
@Test
659659
public void switchMapSingleFunctionDoesntReturnSingle() {
660660
Observable.just(0)
@@ -698,7 +698,7 @@ public void accept(Integer n) throws Exception {
698698
.assertError(RuntimeException.class);
699699
assertTrue(completed.get());
700700
}
701-
701+
702702
@Test
703703
public void scalarMap() {
704704
Observable.switchOnNext(Observable.just(Observable.just(1)))

src/test/java/io/reactivex/internal/subscribers/LambdaSubscriberTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.reactivex.functions.*;
2626
import io.reactivex.internal.subscriptions.BooleanSubscription;
2727
import io.reactivex.plugins.RxJavaPlugins;
28+
import io.reactivex.processors.PublishProcessor;
2829

2930
public class LambdaSubscriberTest {
3031

@@ -285,4 +286,65 @@ public void accept(Subscription s) throws Exception {
285286

286287
assertEquals(Arrays.asList(1, 100), received);
287288
}
289+
290+
@Test
291+
public void onNextThrowsCancelsUpstream() {
292+
PublishProcessor<Integer> ps = PublishProcessor.create();
293+
294+
final List<Throwable> errors = new ArrayList<Throwable>();
295+
296+
ps.subscribe(new Consumer<Integer>() {
297+
@Override
298+
public void accept(Integer v) throws Exception {
299+
throw new TestException();
300+
}
301+
}, new Consumer<Throwable>() {
302+
@Override
303+
public void accept(Throwable e) throws Exception {
304+
errors.add(e);
305+
}
306+
});
307+
308+
assertTrue("No observers?!", ps.hasSubscribers());
309+
assertTrue("Has errors already?!", errors.isEmpty());
310+
311+
ps.onNext(1);
312+
313+
assertFalse("Has observers?!", ps.hasSubscribers());
314+
assertFalse("No errors?!", errors.isEmpty());
315+
316+
assertTrue(errors.toString(), errors.get(0) instanceof TestException);
317+
}
318+
319+
@Test
320+
public void onSubscribeThrowsCancelsUpstream() {
321+
PublishProcessor<Integer> ps = PublishProcessor.create();
322+
323+
final List<Throwable> errors = new ArrayList<Throwable>();
324+
325+
ps.subscribe(new Consumer<Integer>() {
326+
@Override
327+
public void accept(Integer v) throws Exception {
328+
}
329+
}, new Consumer<Throwable>() {
330+
@Override
331+
public void accept(Throwable e) throws Exception {
332+
errors.add(e);
333+
}
334+
}, new Action() {
335+
@Override
336+
public void run() throws Exception {
337+
}
338+
}, new Consumer<Subscription>() {
339+
@Override
340+
public void accept(Subscription s) throws Exception {
341+
throw new TestException();
342+
}
343+
});
344+
345+
assertFalse("Has observers?!", ps.hasSubscribers());
346+
assertFalse("No errors?!", errors.isEmpty());
347+
348+
assertTrue(errors.toString(), errors.get(0) instanceof TestException);
349+
}
288350
}

0 commit comments

Comments
 (0)