Skip to content

Commit 36742c9

Browse files
committed
Merge pull request ReactiveX#3408 from akarnokd/DoOnErrorReportBoth
DoOnEach: report both original exception and callback exception.
2 parents dc00d14 + 89390e3 commit 36742c9

File tree

3 files changed

+45
-17
lines changed

3 files changed

+45
-17
lines changed

src/main/java/rx/Observable.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4362,6 +4362,10 @@ public final void onNext(T v) {
43624362
/**
43634363
* Modifies the source Observable so that it notifies an Observer for each item it emits.
43644364
* <p>
4365+
* In case the onError of the supplied observer throws, the downstream will receive a composite exception containing
4366+
* the original exception and the exception thrown by onError. If the onNext or the onCompleted methods
4367+
* of the supplied observer throws, the downstream will be terminated and wil receive this thrown exception.
4368+
* <p>
43654369
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnEach.png" alt="">
43664370
* <dl>
43674371
* <dt><b>Scheduler:</b></dt>
@@ -4380,6 +4384,9 @@ public final Observable<T> doOnEach(Observer<? super T> observer) {
43804384
/**
43814385
* Modifies the source Observable so that it invokes an action if it calls {@code onError}.
43824386
* <p>
4387+
* In case the onError action throws, the downstream will receive a composite exception containing
4388+
* the original exception and the exception thrown by onError.
4389+
* <p>
43834390
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnError.png" alt="">
43844391
* <dl>
43854392
* <dt><b>Scheduler:</b></dt>

src/main/java/rx/internal/operators/OperatorDoOnEach.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import java.util.Arrays;
19+
1820
import rx.*;
1921
import rx.Observable.Operator;
20-
import rx.exceptions.Exceptions;
22+
import rx.exceptions.*;
2123

2224
/**
2325
* Converts the elements of an observable sequence to the specified type.
@@ -62,7 +64,8 @@ public void onError(Throwable e) {
6264
try {
6365
doOnEachObserver.onError(e);
6466
} catch (Throwable e2) {
65-
Exceptions.throwOrReport(e2, observer);
67+
Exceptions.throwIfFatal(e2);
68+
observer.onError(new CompositeException(Arrays.asList(e, e2)));
6669
return;
6770
}
6871
observer.onError(e);

src/test/java/rx/internal/operators/OperatorDoOnEachTest.java

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,19 @@
1717

1818
import static org.junit.Assert.*;
1919
import static org.mockito.Matchers.any;
20-
import static org.mockito.Mockito.never;
21-
import static org.mockito.Mockito.times;
22-
import static org.mockito.Mockito.verify;
23-
24-
import org.junit.Before;
25-
import org.junit.Test;
26-
import org.mockito.Mock;
27-
import org.mockito.MockitoAnnotations;
28-
29-
import rx.Observable;
30-
import rx.Observer;
31-
import rx.Subscriber;
32-
import rx.exceptions.OnErrorNotImplementedException;
33-
import rx.functions.Action1;
34-
import rx.functions.Func1;
20+
import static org.mockito.Mockito.*;
3521

3622
import java.util.List;
3723
import java.util.concurrent.atomic.AtomicInteger;
3824

25+
import org.junit.*;
26+
import org.mockito.*;
27+
28+
import rx.*;
29+
import rx.exceptions.*;
30+
import rx.functions.*;
31+
import rx.observers.TestSubscriber;
32+
3933
public class OperatorDoOnEachTest {
4034

4135
@Mock
@@ -201,4 +195,28 @@ public void call(Object o) {
201195
System.out.println("Received exception: " + e);
202196
}
203197
}
198+
199+
@Test
200+
public void testOnErrorThrows() {
201+
TestSubscriber<Object> ts = TestSubscriber.create();
202+
203+
Observable.error(new TestException())
204+
.doOnError(new Action1<Throwable>() {
205+
@Override
206+
public void call(Throwable e) {
207+
throw new TestException();
208+
}
209+
}).subscribe(ts);
210+
211+
ts.assertNoValues();
212+
ts.assertNotCompleted();
213+
ts.assertError(CompositeException.class);
214+
215+
CompositeException ex = (CompositeException)ts.getOnErrorEvents().get(0);
216+
217+
List<Throwable> exceptions = ex.getExceptions();
218+
assertEquals(2, exceptions.size());
219+
assertTrue(exceptions.get(0) instanceof TestException);
220+
assertTrue(exceptions.get(1) instanceof TestException);
221+
}
204222
}

0 commit comments

Comments
 (0)