Skip to content

Commit 8e6bef3

Browse files
Improve Error Handling and Stacktraces
The stacktraces were a mess when onError failed or was not implemented and unsubscribe also failed. That is a far edge case and means code is broken and breaking the Rx contracts … but that’s just when we need clear stacktraces. The CompositeException and SafeObserver class now do a dance and wire together a causal chain to provide a stacktrace that can identity all the points of error. Also standardized and simplified the RxJavaPlugin.onErrorHandler while working in the vicinity. This came about after I was asked to help debug a problem and couldn’t do it by looking at the thrown exception, I had to use a debugger and step through.
1 parent 72f043e commit 8e6bef3

File tree

7 files changed

+433
-57
lines changed

7 files changed

+433
-57
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,6 @@ public void onCompleted() {
334334

335335
@Override
336336
public void onError(Throwable e) {
337-
handleError(e);
338337
throw new OnErrorNotImplementedException(e);
339338
}
340339

@@ -373,7 +372,6 @@ public void onCompleted() {
373372

374373
@Override
375374
public void onError(Throwable e) {
376-
handleError(e);
377375
throw new OnErrorNotImplementedException(e);
378376
}
379377

@@ -430,7 +428,6 @@ public void onCompleted() {
430428

431429
@Override
432430
public void onError(Throwable e) {
433-
handleError(e);
434431
onError.call(e);
435432
}
436433

@@ -491,7 +488,6 @@ public void onCompleted() {
491488

492489
@Override
493490
public void onError(Throwable e) {
494-
handleError(e);
495491
onError.call(e);
496492
}
497493

@@ -561,16 +557,6 @@ public <TIntermediate, TResult> Observable<TResult> multicast(
561557
final Func1<? super Observable<TIntermediate>, ? extends Observable<TResult>> selector) {
562558
return OperationMulticast.multicast(this, subjectFactory, selector);
563559
}
564-
/**
565-
* Allow the {@link RxJavaErrorHandler} to receive the exception from
566-
* onError.
567-
*
568-
* @param e
569-
*/
570-
private void handleError(Throwable e) {
571-
// onError should be rare so we'll only fetch when needed
572-
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
573-
}
574560

575561
/**
576562
* An Observable that never sends any information to an {@link Observer}.

rxjava-core/src/main/java/rx/operators/SafeObserver.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,11 @@ public void onNext(T args) {
114114
*/
115115
protected void _onError(Throwable e) {
116116
try {
117+
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
117118
actual.onError(e);
118119
} catch (Throwable e2) {
119120
if (e2 instanceof OnErrorNotImplementedException) {
120-
/**
121+
/*
121122
* onError isn't implemented so throw
122123
*
123124
* https://github.com/Netflix/RxJava/issues/198
@@ -128,19 +129,36 @@ protected void _onError(Throwable e) {
128129
* to rethrow the exception on the thread that the message comes out from the observable sequence.
129130
* The OnCompleted behavior in this case is to do nothing."
130131
*/
132+
try {
133+
subscription.unsubscribe();
134+
} catch (Throwable unsubscribeException) {
135+
RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
136+
throw new RuntimeException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException)));
137+
}
131138
throw (OnErrorNotImplementedException) e2;
132139
} else {
133-
// if the onError itself fails then pass to the plugin
134-
// see https://github.com/Netflix/RxJava/issues/216 for further discussion
135-
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
140+
/*
141+
* throw since the Rx contract is broken if onError failed
142+
*
143+
* https://github.com/Netflix/RxJava/issues/198
144+
*/
136145
RxJavaPlugins.getInstance().getErrorHandler().handleError(e2);
137-
// and throw exception despite that not being proper for Rx
138-
// https://github.com/Netflix/RxJava/issues/198
146+
try {
147+
subscription.unsubscribe();
148+
} catch (Throwable unsubscribeException) {
149+
RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
150+
throw new RuntimeException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException)));
151+
}
152+
139153
throw new RuntimeException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));
140154
}
141-
} finally {
142-
// auto-unsubscribe
155+
}
156+
// if we did not throw about we will unsubscribe here, if onError failed then unsubscribe happens in the catch
157+
try {
143158
subscription.unsubscribe();
159+
} catch (RuntimeException unsubscribeException) {
160+
RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
161+
throw unsubscribeException;
144162
}
145163
}
146164

rxjava-core/src/main/java/rx/plugins/RxJavaErrorHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ public abstract class RxJavaErrorHandler {
3030

3131
/**
3232
* Receives all Exceptions from an {@link Observable} passed to {@link Observer#onError(Throwable)}.
33+
* <p>
34+
* This should NEVER throw an Exception. Make sure to try/catch(Throwable) all code inside this method implementation.
3335
*
3436
* @param e
3537
* Exception

rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,17 @@ public class RxJavaPlugins {
3232
private final AtomicReference<RxJavaErrorHandler> errorHandler = new AtomicReference<RxJavaErrorHandler>();
3333
private final AtomicReference<RxJavaObservableExecutionHook> observableExecutionHook = new AtomicReference<RxJavaObservableExecutionHook>();
3434

35+
public static RxJavaPlugins getInstance() {
36+
return INSTANCE;
37+
}
38+
3539
/* package accessible for unit tests */RxJavaPlugins() {
3640

3741
}
38-
39-
public static RxJavaPlugins getInstance() {
40-
return INSTANCE;
42+
43+
/* package accessible for ujnit tests */ void reset() {
44+
INSTANCE.errorHandler.set(null);
45+
INSTANCE.observableExecutionHook.set(null);
4146
}
4247

4348
/**
@@ -74,7 +79,7 @@ public RxJavaErrorHandler getErrorHandler() {
7479
*/
7580
public void registerErrorHandler(RxJavaErrorHandler impl) {
7681
if (!errorHandler.compareAndSet(null, impl)) {
77-
throw new IllegalStateException("Another strategy was already registered.");
82+
throw new IllegalStateException("Another strategy was already registered: " + errorHandler.get());
7883
}
7984
}
8085

@@ -112,7 +117,7 @@ public RxJavaObservableExecutionHook getObservableExecutionHook() {
112117
*/
113118
public void registerObservableExecutionHook(RxJavaObservableExecutionHook impl) {
114119
if (!observableExecutionHook.compareAndSet(null, impl)) {
115-
throw new IllegalStateException("Another strategy was already registered.");
120+
throw new IllegalStateException("Another strategy was already registered: " + observableExecutionHook.get());
116121
}
117122
}
118123

rxjava-core/src/main/java/rx/util/CompositeException.java

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,29 +25,26 @@
2525
* <p>
2626
* The <code>getMessage()</code> will return a concatenation of the composite exceptions.
2727
*/
28-
public class CompositeException extends RuntimeException {
28+
public final class CompositeException extends RuntimeException {
2929

3030
private static final long serialVersionUID = 3026362227162912146L;
3131

3232
private final List<Throwable> exceptions;
3333
private final String message;
34+
private final Throwable cause;
3435

3536
public CompositeException(String messagePrefix, Collection<Throwable> errors) {
36-
StringBuilder _message = new StringBuilder();
37-
if (messagePrefix != null) {
38-
_message.append(messagePrefix).append(" => ");
39-
}
40-
4137
List<Throwable> _exceptions = new ArrayList<Throwable>();
38+
CompositeExceptionCausalChain _cause = new CompositeExceptionCausalChain();
39+
int count = 0;
4240
for (Throwable e : errors) {
41+
count++;
42+
attachCallingThreadStack(_cause, e);
4343
_exceptions.add(e);
44-
if (_message.length() > 0) {
45-
_message.append(", ");
46-
}
47-
_message.append(e.getClass().getSimpleName()).append(":").append(e.getMessage());
4844
}
4945
this.exceptions = Collections.unmodifiableList(_exceptions);
50-
this.message = _message.toString();
46+
this.message = count + " exceptions occurred. See them in causal chain below.";
47+
this.cause = _cause;
5148
}
5249

5350
public CompositeException(Collection<Throwable> errors) {
@@ -62,4 +59,51 @@ public List<Throwable> getExceptions() {
6259
public String getMessage() {
6360
return message;
6461
}
62+
63+
@Override
64+
public synchronized Throwable getCause() {
65+
return cause;
66+
}
67+
68+
@SuppressWarnings("unused") // useful when debugging but don't want to make part of publicly supported API
69+
private static String getStackTraceAsString(StackTraceElement[] stack) {
70+
StringBuilder s = new StringBuilder();
71+
boolean firstLine = true;
72+
for (StackTraceElement e : stack) {
73+
if (e.toString().startsWith("java.lang.Thread.getStackTrace")) {
74+
// we'll ignore this one
75+
continue;
76+
}
77+
if (!firstLine) {
78+
s.append("\n\t");
79+
}
80+
s.append(e.toString());
81+
firstLine = false;
82+
}
83+
return s.toString();
84+
}
85+
86+
private static void attachCallingThreadStack(Throwable e, Throwable cause) {
87+
while (e.getCause() != null) {
88+
e = e.getCause();
89+
}
90+
// we now have 'e' as the last in the chain
91+
try {
92+
e.initCause(cause);
93+
} catch (Throwable t) {
94+
// ignore
95+
// the javadocs say that some Throwables (depending on how they're made) will never
96+
// let me call initCause without blowing up even if it returns null
97+
}
98+
}
99+
100+
private final static class CompositeExceptionCausalChain extends RuntimeException {
101+
private static final long serialVersionUID = 3875212506787802066L;
102+
103+
@Override
104+
public String getMessage() {
105+
return "Chain of Causes for CompositeException In Order Received =>";
106+
}
107+
}
108+
65109
}

0 commit comments

Comments
 (0)