Skip to content

Commit ccd9cf8

Browse files
Throw if no onError handler specified
ReactiveX#198 As per Rx Design Guidelines 5.2: "when calling the Subscribe method that only has an onNext argument, the OnError behavior will be to rethrow the exception on the thread that the message comes out from the observable sequence. The OnCompleted behavior in this case is to do nothing." A new OnErrorNotImplementedException was created so it is explicit as to where the exception is coming from and why.
1 parent 22cb682 commit ccd9cf8

File tree

3 files changed

+133
-9
lines changed

3 files changed

+133
-9
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import rx.subjects.Subject;
8383
import rx.subscriptions.BooleanSubscription;
8484
import rx.subscriptions.Subscriptions;
85+
import rx.util.OnErrorNotImplementedException;
8586
import rx.util.Range;
8687
import rx.util.Timestamped;
8788
import rx.util.functions.Action0;
@@ -189,10 +190,16 @@ public Subscription subscribe(Observer<T> observer) {
189190
subscription.wrap(onSubscribeFunction.call(new AtomicObserver<T>(subscription, observer)));
190191
return hook.onSubscribeReturn(this, subscription);
191192
}
193+
} catch (OnErrorNotImplementedException e) {
194+
// special handling when onError is not implemented ... we just rethrow
195+
throw e;
192196
} catch (Exception e) {
193197
// if an unhandled error occurs executing the onSubscribe we will propagate it
194198
try {
195199
observer.onError(hook.onSubscribeError(this, e));
200+
} catch (OnErrorNotImplementedException e2) {
201+
// special handling when onError is not implemented ... we just rethrow
202+
throw e2;
196203
} catch (Exception e2) {
197204
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
198205
// so we are unable to propagate the error correctly and will just throw
@@ -279,6 +286,8 @@ public void onError(Exception e) {
279286
Object onError = callbacks.get("onError");
280287
if (onError != null) {
281288
Functions.from(onError).call(e);
289+
} else {
290+
throw new OnErrorNotImplementedException(e);
282291
}
283292
}
284293

@@ -323,7 +332,7 @@ public void onCompleted() {
323332
@Override
324333
public void onError(Exception e) {
325334
handleError(e);
326-
// no callback defined
335+
throw new OnErrorNotImplementedException(e);
327336
}
328337

329338
@Override
@@ -358,7 +367,7 @@ public void onCompleted() {
358367
@Override
359368
public void onError(Exception e) {
360369
handleError(e);
361-
// no callback defined
370+
throw new OnErrorNotImplementedException(e);
362371
}
363372

364373
@Override
@@ -3739,6 +3748,79 @@ public void call(String v) {
37393748
assertEquals(1, counter.get());
37403749
}
37413750

3751+
/**
3752+
* https://github.com/Netflix/RxJava/issues/198
3753+
*
3754+
* Rx Design Guidelines 5.2
3755+
*
3756+
* "when calling the Subscribe method that only has an onNext argument, the OnError behavior will be
3757+
* to rethrow the exception on the thread that the message comes out from the observable sequence.
3758+
* The OnCompleted behavior in this case is to do nothing."
3759+
*/
3760+
@Test
3761+
public void testErrorThrownWithoutErrorHandlerSynchronous() {
3762+
try {
3763+
error(new RuntimeException("failure")).subscribe(new Action1<Object>() {
3764+
3765+
@Override
3766+
public void call(Object t1) {
3767+
// won't get anything
3768+
}
3769+
3770+
});
3771+
fail("expected exception");
3772+
} catch (Exception e) {
3773+
assertEquals("failure", e.getMessage());
3774+
}
3775+
}
3776+
3777+
/**
3778+
* https://github.com/Netflix/RxJava/issues/198
3779+
*
3780+
* Rx Design Guidelines 5.2
3781+
*
3782+
* "when calling the Subscribe method that only has an onNext argument, the OnError behavior will be
3783+
* to rethrow the exception on the thread that the message comes out from the observable sequence.
3784+
* The OnCompleted behavior in this case is to do nothing."
3785+
*
3786+
* @throws InterruptedException
3787+
*/
3788+
@Test
3789+
public void testErrorThrownWithoutErrorHandlerAsynchronous() throws InterruptedException {
3790+
final CountDownLatch latch = new CountDownLatch(1);
3791+
final AtomicReference<Exception> exception = new AtomicReference<Exception>();
3792+
Observable.create(new Func1<Observer<String>, Subscription>() {
3793+
3794+
@Override
3795+
public Subscription call(final Observer<String> observer) {
3796+
new Thread(new Runnable() {
3797+
3798+
@Override
3799+
public void run() {
3800+
try {
3801+
observer.onError(new RuntimeException("failure"));
3802+
} catch (Exception e) {
3803+
// without an onError handler it has to just throw on whatever thread invokes it
3804+
exception.set(e);
3805+
}
3806+
latch.countDown();
3807+
}
3808+
}).start();
3809+
return Subscriptions.empty();
3810+
}
3811+
}).subscribe(new Action1<Object>() {
3812+
3813+
@Override
3814+
public void call(Object t1) {
3815+
3816+
}
3817+
3818+
});
3819+
// wait for exception
3820+
latch.await(3000, TimeUnit.MILLISECONDS);
3821+
assertNotNull(exception.get());
3822+
assertEquals("failure", exception.get().getMessage());
3823+
}
37423824
}
37433825

37443826
}

rxjava-core/src/main/java/rx/operators/AtomicObserver.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import rx.Observer;
77
import rx.plugins.RxJavaPlugins;
88
import rx.util.CompositeException;
9+
import rx.util.OnErrorNotImplementedException;
910

1011
/**
1112
* Wrapper around Observer to ensure compliance with Rx contract.
@@ -70,13 +71,28 @@ public void onError(Exception e) {
7071
try {
7172
actual.onError(e);
7273
} catch (Exception e2) {
73-
// if the onError itself fails then pass to the plugin
74-
// see https://github.com/Netflix/RxJava/issues/216 for further discussion
75-
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
76-
RxJavaPlugins.getInstance().getErrorHandler().handleError(e2);
77-
// and throw exception despite that not being proper for Rx
78-
// https://github.com/Netflix/RxJava/issues/198
79-
throw new RuntimeException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));
74+
if (e2 instanceof OnErrorNotImplementedException) {
75+
/**
76+
* onError isn't implemented so throw
77+
*
78+
* https://github.com/Netflix/RxJava/issues/198
79+
*
80+
* Rx Design Guidelines 5.2
81+
*
82+
* "when calling the Subscribe method that only has an onNext argument, the OnError behavior will be
83+
* to rethrow the exception on the thread that the message comes out from the observable sequence.
84+
* The OnCompleted behavior in this case is to do nothing."
85+
*/
86+
throw (OnErrorNotImplementedException) e2;
87+
} else {
88+
// if the onError itself fails then pass to the plugin
89+
// see https://github.com/Netflix/RxJava/issues/216 for further discussion
90+
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
91+
RxJavaPlugins.getInstance().getErrorHandler().handleError(e2);
92+
// and throw exception despite that not being proper for Rx
93+
// https://github.com/Netflix/RxJava/issues/198
94+
throw new RuntimeException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));
95+
}
8096
}
8197
// auto-unsubscribe
8298
subscription.unsubscribe();
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package rx.util;
2+
3+
import rx.Observer;
4+
5+
/**
6+
* Used for re-throwing {@link Observer#onError(Exception)} when an implementation doesn't exist.
7+
*
8+
* https://github.com/Netflix/RxJava/issues/198
9+
*
10+
* Rx Design Guidelines 5.2
11+
*
12+
* "when calling the Subscribe method that only has an onNext argument, the OnError behavior will be
13+
* to rethrow the exception on the thread that the message comes out from the observable sequence.
14+
* The OnCompleted behavior in this case is to do nothing."
15+
*/
16+
public class OnErrorNotImplementedException extends RuntimeException {
17+
private static final long serialVersionUID = -6298857009889503852L;
18+
19+
public OnErrorNotImplementedException(String message, Throwable e) {
20+
super(message, e);
21+
}
22+
23+
public OnErrorNotImplementedException(Throwable e) {
24+
super(e.getMessage(), e);
25+
}
26+
}

0 commit comments

Comments
 (0)