Skip to content

Commit 3687530

Browse files
committed
Wraps DoOnEach in a SafeObserver
This commit leverages the SafeObserver facility to get the desired behavior in the face of exceptions. Specifically, if any of the operations performed within the doOnEach handler raises an exception, that exception will propagate through the observable chain.
1 parent 205a16b commit 3687530

File tree

2 files changed

+29
-8
lines changed

2 files changed

+29
-8
lines changed

rxjava-core/src/main/java/rx/operators/OperationDoOnEach.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,23 +24,24 @@
2424
* Converts the elements of an observable sequence to the specified type.
2525
*/
2626
public class OperationDoOnEach {
27-
public static <T> OnSubscribeFunc<T> doOnEach(Observable<? extends T> source, Observer<? super T> observer) {
28-
return new DoOnEachObservable<T>(source, observer);
27+
public static <T> OnSubscribeFunc<T> doOnEach(Observable<? extends T> sequence, Observer<? super T> observer) {
28+
return new DoOnEachObservable<T>(sequence, observer);
2929
}
3030

3131
private static class DoOnEachObservable<T> implements OnSubscribeFunc<T> {
3232

33-
private final Observable<? extends T> source;
33+
private final Observable<? extends T> sequence;
3434
private final Observer<? super T> doOnEachObserver;
3535

36-
public DoOnEachObservable(Observable<? extends T> source, Observer<? super T> doOnEachObserver) {
37-
this.source = source;
36+
public DoOnEachObservable(Observable<? extends T> sequence, Observer<? super T> doOnEachObserver) {
37+
this.sequence = sequence;
3838
this.doOnEachObserver = doOnEachObserver;
3939
}
4040

4141
@Override
4242
public Subscription onSubscribe(final Observer<? super T> observer) {
43-
return source.subscribe(new Observer<T>() {
43+
final SafeObservableSubscription subscription = new SafeObservableSubscription();
44+
return subscription.wrap(sequence.subscribe(new SafeObserver<T>(subscription, new Observer<T>() {
4445
@Override
4546
public void onCompleted() {
4647
doOnEachObserver.onCompleted();
@@ -58,8 +59,7 @@ public void onNext(T value) {
5859
doOnEachObserver.onNext(value);
5960
observer.onNext(value);
6061
}
61-
62-
});
62+
})));
6363
}
6464

6565
}

rxjava-core/src/test/java/rx/operators/OperationDoOnEachTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import rx.concurrency.Schedulers;
3737
import rx.util.functions.Func1;
3838
import rx.util.functions.Func2;
39+
import rx.util.functions.Action1;
3940

4041
public class OperationDoOnEachTest {
4142

@@ -104,5 +105,25 @@ public String call(String s) {
104105
verify(sideEffectObserver, times(1)).onError(any(Throwable.class));
105106
}
106107

108+
@Test
109+
public void testDoOnEachWithErrorInCallback() {
110+
Observable<String> base = Observable.from("one", "two", "fail", "three");
111+
Observable<String> doOnEach = base.doOnEach(new Action1<String>() {
112+
@Override
113+
public void call(String s) {
114+
if ("fail".equals(s)) {
115+
throw new RuntimeException("Forced Failure");
116+
}
117+
}
118+
});
119+
120+
doOnEach.subscribe(subscribedObserver);
121+
verify(subscribedObserver, times(1)).onNext("one");
122+
verify(subscribedObserver, times(1)).onNext("two");
123+
verify(subscribedObserver, never()).onNext("three");
124+
verify(subscribedObserver, never()).onCompleted();
125+
verify(subscribedObserver, times(1)).onError(any(Throwable.class));
126+
127+
}
107128

108129
}

0 commit comments

Comments
 (0)