@@ -5367,17 +5367,6 @@ public final <R> Observable<R> parallel(final Func1<Observable<T>, Observable<R>
5367
5367
return lift (new OperatorParallel <T , R >(f , s ));
5368
5368
}
5369
5369
5370
- /**
5371
- * Protects against errors being thrown from Observer implementations and ensures
5372
- * onNext/onError/onCompleted contract compliance.
5373
- * <p>
5374
- * See https://github.com/Netflix/RxJava/issues/216 for a discussion on "Guideline 6.4: Protect calls to
5375
- * user code from within an Observer"
5376
- */
5377
- private Subscription protectivelyWrapAndSubscribe (Subscriber <? super T > o ) {
5378
- return subscribe (new SafeSubscriber <T >(o ));
5379
- }
5380
-
5381
5370
/**
5382
5371
* Returns a {@link ConnectableObservable}, which waits until its {@link ConnectableObservable#connect connect} method is called before it begins emitting items to those {@link Observer}s that
5383
5372
* have subscribed to it.
@@ -6704,7 +6693,7 @@ public final Observable<T> startWith(T[] values, Scheduler scheduler) {
6704
6693
* if the Observable tries to call {@code onError}
6705
6694
*/
6706
6695
public final Subscription subscribe () {
6707
- return protectivelyWrapAndSubscribe (new Subscriber <T >() {
6696
+ return subscribe (new Subscriber <T >() {
6708
6697
6709
6698
@ Override
6710
6699
public final void onCompleted () {
@@ -6743,13 +6732,7 @@ public final Subscription subscribe(final Action1<? super T> onNext) {
6743
6732
throw new IllegalArgumentException ("onNext can not be null" );
6744
6733
}
6745
6734
6746
- /**
6747
- * Wrapping since raw functions provided by the user are being invoked.
6748
- *
6749
- * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to
6750
- * user code from within an Observer"
6751
- */
6752
- return protectivelyWrapAndSubscribe (new Subscriber <T >() {
6735
+ return subscribe (new Subscriber <T >() {
6753
6736
6754
6737
@ Override
6755
6738
public final void onCompleted () {
@@ -6793,13 +6776,7 @@ public final Subscription subscribe(final Action1<? super T> onNext, final Actio
6793
6776
throw new IllegalArgumentException ("onError can not be null" );
6794
6777
}
6795
6778
6796
- /**
6797
- * Wrapping since raw functions provided by the user are being invoked.
6798
- *
6799
- * See https://github.com/Netflix/RxJava/issues/216 for discussion on
6800
- * "Guideline 6.4: Protect calls to user code from within an Observer"
6801
- */
6802
- return protectivelyWrapAndSubscribe (new Subscriber <T >() {
6779
+ return subscribe (new Subscriber <T >() {
6803
6780
6804
6781
@ Override
6805
6782
public final void onCompleted () {
@@ -6850,12 +6827,7 @@ public final Subscription subscribe(final Action1<? super T> onNext, final Actio
6850
6827
throw new IllegalArgumentException ("onComplete can not be null" );
6851
6828
}
6852
6829
6853
- /**
6854
- * Wrapping since raw functions provided by the user are being invoked.
6855
- *
6856
- * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an Observer"
6857
- */
6858
- return protectivelyWrapAndSubscribe (new Subscriber <T >() {
6830
+ return subscribe (new Subscriber <T >() {
6859
6831
6860
6832
@ Override
6861
6833
public final void onCompleted () {
@@ -7011,7 +6983,7 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
7011
6983
* For more information see the
7012
6984
* <a href="https://github.com/Netflix/RxJava/wiki/Observable">RxJava Wiki</a>
7013
6985
*
7014
- * @param observer
6986
+ * @param subscriber
7015
6987
* the {@link Subscriber}
7016
6988
* @return a {@link Subscription} reference with which Subscribers that are {@link Observer}s can
7017
6989
* unsubscribe from the Observable
@@ -7024,11 +6996,11 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
7024
6996
* @throws RuntimeException
7025
6997
* if the {@link Subscriber}'s {@code onError} method itself threw a {@code Throwable}
7026
6998
*/
7027
- public final Subscription subscribe (Subscriber <? super T > observer ) {
6999
+ public final Subscription subscribe (Subscriber <? super T > subscriber ) {
7028
7000
// allow the hook to intercept and/or decorate
7029
7001
OnSubscribe <T > onSubscribeFunction = hook .onSubscribeStart (this , onSubscribe );
7030
7002
// validate and proceed
7031
- if (observer == null ) {
7003
+ if (subscriber == null ) {
7032
7004
throw new IllegalArgumentException ("observer can not be null" );
7033
7005
}
7034
7006
if (onSubscribeFunction == null ) {
@@ -7044,12 +7016,12 @@ public final Subscription subscribe(Subscriber<? super T> observer) {
7044
7016
* to user code from within an Observer"
7045
7017
*/
7046
7018
// if not already wrapped
7047
- if (!(observer instanceof SafeSubscriber )) {
7019
+ if (!(subscriber instanceof SafeSubscriber )) {
7048
7020
// assign to `observer` so we return the protected version
7049
- observer = new SafeSubscriber <T >(observer );
7021
+ subscriber = new SafeSubscriber <T >(subscriber );
7050
7022
}
7051
- onSubscribeFunction .call (observer );
7052
- final Subscription returnSubscription = hook .onSubscribeReturn (observer );
7023
+ onSubscribeFunction .call (subscriber );
7024
+ final Subscription returnSubscription = hook .onSubscribeReturn (subscriber );
7053
7025
// we return it inside a Subscription so it can't be cast back to Subscriber
7054
7026
return Subscriptions .create (new Action0 () {
7055
7027
@@ -7064,7 +7036,7 @@ public void call() {
7064
7036
Exceptions .throwIfFatal (e );
7065
7037
// if an unhandled error occurs executing the onSubscribe we will propagate it
7066
7038
try {
7067
- observer .onError (hook .onSubscribeError (e ));
7039
+ subscriber .onError (hook .onSubscribeError (e ));
7068
7040
} catch (OnErrorNotImplementedException e2 ) {
7069
7041
// special handling when onError is not implemented ... we just rethrow
7070
7042
throw e2 ;
0 commit comments