Skip to content

Commit 61e1c22

Browse files
committed
Merge pull request ReactiveX#3419 from artem-zinnatullin/single-do-on-error
Add Single.doOnError()
2 parents 9204154 + 2d832a4 commit 61e1c22

File tree

2 files changed

+106
-0
lines changed

2 files changed

+106
-0
lines changed

src/main/java/rx/Single.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import rx.functions.Func8;
3232
import rx.functions.Func9;
3333
import rx.internal.operators.OnSubscribeToObservableFuture;
34+
import rx.internal.operators.OperatorDoOnEach;
3435
import rx.internal.operators.OperatorMap;
3536
import rx.internal.operators.OperatorObserveOn;
3637
import rx.internal.operators.OperatorOnErrorReturn;
@@ -1789,4 +1790,40 @@ public final <T2, R> Single<R> zipWith(Single<? extends T2> other, Func2<? super
17891790
return zip(this, other, zipFunction);
17901791
}
17911792

1793+
/**
1794+
* Modifies the source {@link Single} so that it invokes an action if it calls {@code onError}.
1795+
* <p>
1796+
* In case the onError action throws, the downstream will receive a composite exception containing
1797+
* the original exception and the exception thrown by onError.
1798+
* <p>
1799+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnError.png" alt="">
1800+
* <dl>
1801+
* <dt><b>Scheduler:</b></dt>
1802+
* <dd>{@code doOnError} does not operate by default on a particular {@link Scheduler}.</dd>
1803+
* </dl>
1804+
*
1805+
* @param onError
1806+
* the action to invoke if the source {@link Single} calls {@code onError}
1807+
* @return the source {@link Single} with the side-effecting behavior applied
1808+
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
1809+
*/
1810+
@Experimental
1811+
public final Single<T> doOnError(final Action1<Throwable> onError) {
1812+
Observer<T> observer = new Observer<T>() {
1813+
@Override
1814+
public void onCompleted() {
1815+
}
1816+
1817+
@Override
1818+
public void onError(Throwable e) {
1819+
onError.call(e);
1820+
}
1821+
1822+
@Override
1823+
public void onNext(T t) {
1824+
}
1825+
};
1826+
1827+
return lift(new OperatorDoOnEach<T>(observer));
1828+
}
17921829
}

src/test/java/rx/SingleTest.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,13 @@
1313
package rx;
1414

1515
import static org.junit.Assert.assertEquals;
16+
import static org.junit.Assert.assertSame;
1617
import static org.junit.Assert.assertTrue;
1718
import static org.junit.Assert.fail;
19+
import static org.mockito.Mockito.doThrow;
20+
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.verify;
22+
import static org.mockito.Mockito.verifyZeroInteractions;
1823

1924
import java.util.Arrays;
2025
import java.util.concurrent.CountDownLatch;
@@ -26,7 +31,9 @@
2631
import org.junit.Test;
2732

2833
import rx.Single.OnSubscribe;
34+
import rx.exceptions.CompositeException;
2935
import rx.functions.Action0;
36+
import rx.functions.Action1;
3037
import rx.functions.Func1;
3138
import rx.functions.Func2;
3239
import rx.observers.TestSubscriber;
@@ -461,4 +468,66 @@ public void testToObservable() {
461468
ts.assertValue("a");
462469
ts.assertCompleted();
463470
}
471+
472+
@Test
473+
public void doOnErrorShouldNotCallActionIfNoErrorHasOccurred() {
474+
Action1<Throwable> action = mock(Action1.class);
475+
476+
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
477+
478+
Single
479+
.just("value")
480+
.doOnError(action)
481+
.subscribe(testSubscriber);
482+
483+
testSubscriber.assertValue("value");
484+
testSubscriber.assertNoErrors();
485+
486+
verifyZeroInteractions(action);
487+
}
488+
489+
@Test
490+
public void doOnErrorShouldCallActionIfErrorHasOccurred() {
491+
Action1<Throwable> action = mock(Action1.class);
492+
493+
TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>();
494+
495+
Throwable error = new IllegalStateException();
496+
497+
Single
498+
.error(error)
499+
.doOnError(action)
500+
.subscribe(testSubscriber);
501+
502+
testSubscriber.assertNoValues();
503+
testSubscriber.assertError(error);
504+
505+
verify(action).call(error);
506+
}
507+
508+
@Test
509+
public void doOnErrorShouldThrowCompositeExceptionIfOnErrorActionThrows() {
510+
Action1<Throwable> action = mock(Action1.class);
511+
512+
513+
Throwable error = new RuntimeException();
514+
Throwable exceptionFromOnErrorAction = new IllegalStateException();
515+
doThrow(exceptionFromOnErrorAction).when(action).call(error);
516+
517+
TestSubscriber<Object> testSubscriber = new TestSubscriber<Object>();
518+
519+
Single
520+
.error(error)
521+
.doOnError(action)
522+
.subscribe(testSubscriber);
523+
524+
testSubscriber.assertNoValues();
525+
CompositeException compositeException = (CompositeException) testSubscriber.getOnErrorEvents().get(0);
526+
527+
assertEquals(2, compositeException.getExceptions().size());
528+
assertSame(error, compositeException.getExceptions().get(0));
529+
assertSame(exceptionFromOnErrorAction, compositeException.getExceptions().get(1));
530+
531+
verify(action).call(error);
532+
}
464533
}

0 commit comments

Comments
 (0)