Skip to content

Commit b17bc49

Browse files
authored
2.x: more detailed no-multi-subscribe with std consumers error message (ReactiveX#5301)
* 2.x: more detailed no-multi-subscribe with std consumers error message * Improve coverage of the new content
1 parent bab3071 commit b17bc49

26 files changed

+830
-34
lines changed
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.util;
15+
16+
import java.util.concurrent.atomic.*;
17+
18+
import org.reactivestreams.Subscription;
19+
20+
import io.reactivex.disposables.Disposable;
21+
import io.reactivex.exceptions.ProtocolViolationException;
22+
import io.reactivex.internal.disposables.DisposableHelper;
23+
import io.reactivex.internal.functions.ObjectHelper;
24+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
25+
import io.reactivex.plugins.RxJavaPlugins;
26+
27+
/**
28+
* Utility class to help report multiple subscriptions with the same
29+
* consumer type instead of the internal "Disposable already set!" message
30+
* that is practically reserved for internal operators and indicate bugs in them.
31+
*/
32+
public final class EndConsumerHelper {
33+
34+
/**
35+
* Utility class.
36+
*/
37+
private EndConsumerHelper() {
38+
throw new IllegalStateException("No instances!");
39+
}
40+
41+
/**
42+
* Ensures that the upstream Disposable is null and returns true, otherwise
43+
* disposes the next Disposable and if the upstream is not the shared
44+
* disposed instance, reports a ProtocolViolationException due to
45+
* multiple subscribe attempts.
46+
* @param upstream the upstream current value
47+
* @param next the Disposable to check for nullness and dispose if necessary
48+
* @param observer the class of the consumer to have a personalized
49+
* error message if the upstream already contains a non-cancelled Disposable.
50+
* @return true if successful, false if the upstream was non null
51+
*/
52+
public static boolean validate(Disposable upstream, Disposable next, Class<?> observer) {
53+
ObjectHelper.requireNonNull(next, "next is null");
54+
if (upstream != null) {
55+
next.dispose();
56+
if (upstream != DisposableHelper.DISPOSED) {
57+
reportDoubleSubscription(observer);
58+
}
59+
return false;
60+
}
61+
return true;
62+
}
63+
64+
/**
65+
* Atomically updates the target upstream AtomicReference from null to the non-null
66+
* next Disposable, otherwise disposes next and reports a ProtocolViolationException
67+
* if the AtomicReference doesn't contain the shared disposed indicator.
68+
* @param upstream the target AtomicReference to update
69+
* @param next the Disposable to set on it atomically
70+
* @param observer the class of the consumer to have a personalized
71+
* error message if the upstream already contains a non-cancelled Disposable.
72+
* @return true if successful, false if the content of the AtomicReference was non null
73+
*/
74+
public static boolean setOnce(AtomicReference<Disposable> upstream, Disposable next, Class<?> observer) {
75+
ObjectHelper.requireNonNull(next, "next is null");
76+
if (!upstream.compareAndSet(null, next)) {
77+
next.dispose();
78+
if (upstream.get() != DisposableHelper.DISPOSED) {
79+
reportDoubleSubscription(observer);
80+
}
81+
return false;
82+
}
83+
return true;
84+
}
85+
86+
/**
87+
* Ensures that the upstream Subscription is null and returns true, otherwise
88+
* cancels the next Subscription and if the upstream is not the shared
89+
* cancelled instance, reports a ProtocolViolationException due to
90+
* multiple subscribe attempts.
91+
* @param upstream the upstream current value
92+
* @param next the Subscription to check for nullness and cancel if necessary
93+
* @param subscriber the class of the consumer to have a personalized
94+
* error message if the upstream already contains a non-cancelled Subscription.
95+
* @return true if successful, false if the upstream was non null
96+
*/
97+
public static boolean validate(Subscription upstream, Subscription next, Class<?> subscriber) {
98+
ObjectHelper.requireNonNull(next, "next is null");
99+
if (upstream != null) {
100+
next.cancel();
101+
if (upstream != SubscriptionHelper.CANCELLED) {
102+
reportDoubleSubscription(subscriber);
103+
}
104+
return false;
105+
}
106+
return true;
107+
}
108+
109+
/**
110+
* Atomically updates the target upstream AtomicReference from null to the non-null
111+
* next Subscription, otherwise cancels next and reports a ProtocolViolationException
112+
* if the AtomicReference doesn't contain the shared cancelled indicator.
113+
* @param upstream the target AtomicReference to update
114+
* @param next the Subscription to set on it atomically
115+
* @param subscriber the class of the consumer to have a personalized
116+
* error message if the upstream already contains a non-cancelled Subscription.
117+
* @return true if successful, false if the content of the AtomicReference was non null
118+
*/
119+
public static boolean setOnce(AtomicReference<Subscription> upstream, Subscription next, Class<?> subscriber) {
120+
ObjectHelper.requireNonNull(next, "next is null");
121+
if (!upstream.compareAndSet(null, next)) {
122+
next.cancel();
123+
if (upstream.get() != SubscriptionHelper.CANCELLED) {
124+
reportDoubleSubscription(subscriber);
125+
}
126+
return false;
127+
}
128+
return true;
129+
}
130+
131+
/**
132+
* Builds the error message with the consumer class.
133+
* @param consumer the class of the consumer
134+
* @return the error message string
135+
*/
136+
public static String composeMessage(String consumer) {
137+
return "It is not allowed to subscribe with a(n) " + consumer + " multiple times. "
138+
+ "Please create a fresh instance of " + consumer + " and subscribe that to the target source instead.";
139+
}
140+
141+
/**
142+
* Report a ProtocolViolationException with a personalized message referencing
143+
* the simple type name of the consumer class and report it via
144+
* RxJavaPlugins.onError.
145+
* @param consumer the class of the consumer
146+
*/
147+
public static void reportDoubleSubscription(Class<?> consumer) {
148+
RxJavaPlugins.onError(new ProtocolViolationException(composeMessage(consumer.getName())));
149+
}
150+
}

src/main/java/io/reactivex/observers/DefaultObserver.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.reactivex.annotations.NonNull;
1818
import io.reactivex.disposables.Disposable;
1919
import io.reactivex.internal.disposables.DisposableHelper;
20+
import io.reactivex.internal.util.EndConsumerHelper;
2021

2122
/**
2223
* Abstract base implementation of an {@link io.reactivex.Observer Observer} with support for cancelling a
@@ -30,7 +31,7 @@
3031
*
3132
* <p>Like all other consumers, {@code DefaultObserver} can be subscribed only once.
3233
* Any subsequent attempt to subscribe it to a new source will yield an
33-
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
34+
* {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) <class name> multiple times."}.
3435
*
3536
* <p>Implementation of {@link #onStart()}, {@link #onNext(Object)}, {@link #onError(Throwable)}
3637
* and {@link #onComplete()} are not allowed to throw any unchecked exceptions.
@@ -67,7 +68,7 @@ public abstract class DefaultObserver<T> implements Observer<T> {
6768
private Disposable s;
6869
@Override
6970
public final void onSubscribe(@NonNull Disposable s) {
70-
if (DisposableHelper.validate(this.s, s)) {
71+
if (EndConsumerHelper.validate(this.s, s, getClass())) {
7172
this.s = s;
7273
onStart();
7374
}

src/main/java/io/reactivex/observers/DisposableCompletableObserver.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.reactivex.annotations.NonNull;
2020
import io.reactivex.disposables.Disposable;
2121
import io.reactivex.internal.disposables.DisposableHelper;
22+
import io.reactivex.internal.util.EndConsumerHelper;
2223

2324
/**
2425
* An abstract {@link CompletableObserver} that allows asynchronous cancellation by implementing Disposable.
@@ -27,7 +28,7 @@
2728
*
2829
* <p>Like all other consumers, {@code DisposableCompletableObserver} can be subscribed only once.
2930
* Any subsequent attempt to subscribe it to a new source will yield an
30-
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
31+
* {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) <class name> multiple times."}.
3132
*
3233
* <p>Implementation of {@link #onStart()}, {@link #onError(Throwable)} and
3334
* {@link #onComplete()} are not allowed to throw any unchecked exceptions.
@@ -55,7 +56,7 @@ public abstract class DisposableCompletableObserver implements CompletableObserv
5556

5657
@Override
5758
public final void onSubscribe(@NonNull Disposable s) {
58-
if (DisposableHelper.setOnce(this.s, s)) {
59+
if (EndConsumerHelper.setOnce(this.s, s, getClass())) {
5960
onStart();
6061
}
6162
}

src/main/java/io/reactivex/observers/DisposableMaybeObserver.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.reactivex.annotations.NonNull;
2020
import io.reactivex.disposables.Disposable;
2121
import io.reactivex.internal.disposables.DisposableHelper;
22+
import io.reactivex.internal.util.EndConsumerHelper;
2223

2324
/**
2425
* An abstract {@link MaybeObserver} that allows asynchronous cancellation by implementing Disposable.
@@ -31,7 +32,7 @@
3132
*
3233
* <p>Like all other consumers, {@code DisposableMaybeObserver} can be subscribed only once.
3334
* Any subsequent attempt to subscribe it to a new source will yield an
34-
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
35+
* {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) <class name> multiple times."}.
3536
*
3637
* <p>Implementation of {@link #onStart()}, {@link #onSuccess(Object)}, {@link #onError(Throwable)} and
3738
* {@link #onComplete()} are not allowed to throw any unchecked exceptions.
@@ -66,7 +67,7 @@ public abstract class DisposableMaybeObserver<T> implements MaybeObserver<T>, Di
6667

6768
@Override
6869
public final void onSubscribe(@NonNull Disposable s) {
69-
if (DisposableHelper.setOnce(this.s, s)) {
70+
if (EndConsumerHelper.setOnce(this.s, s, getClass())) {
7071
onStart();
7172
}
7273
}

src/main/java/io/reactivex/observers/DisposableObserver.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.reactivex.annotations.NonNull;
2020
import io.reactivex.disposables.Disposable;
2121
import io.reactivex.internal.disposables.DisposableHelper;
22+
import io.reactivex.internal.util.EndConsumerHelper;
2223

2324
/**
2425
* An abstract {@link Observer} that allows asynchronous cancellation by implementing Disposable.
@@ -30,7 +31,7 @@
3031
*
3132
* <p>Like all other consumers, {@code DisposableObserver} can be subscribed only once.
3233
* Any subsequent attempt to subscribe it to a new source will yield an
33-
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
34+
* {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) <class name> multiple times."}.
3435
*
3536
* <p>Implementation of {@link #onStart()}, {@link #onNext(Object)}, {@link #onError(Throwable)}
3637
* and {@link #onComplete()} are not allowed to throw any unchecked exceptions.
@@ -69,7 +70,7 @@ public abstract class DisposableObserver<T> implements Observer<T>, Disposable {
6970

7071
@Override
7172
public final void onSubscribe(@NonNull Disposable s) {
72-
if (DisposableHelper.setOnce(this.s, s)) {
73+
if (EndConsumerHelper.setOnce(this.s, s, getClass())) {
7374
onStart();
7475
}
7576
}

src/main/java/io/reactivex/observers/DisposableSingleObserver.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.reactivex.annotations.NonNull;
2020
import io.reactivex.disposables.Disposable;
2121
import io.reactivex.internal.disposables.DisposableHelper;
22+
import io.reactivex.internal.util.EndConsumerHelper;
2223

2324
/**
2425
* An abstract {@link SingleObserver} that allows asynchronous cancellation by implementing Disposable.
@@ -27,7 +28,7 @@
2728
*
2829
* <p>Like all other consumers, {@code DisposableSingleObserver} can be subscribed only once.
2930
* Any subsequent attempt to subscribe it to a new source will yield an
30-
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
31+
* {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) <class name> multiple times."}.
3132
*
3233
* <p>Implementation of {@link #onStart()}, {@link #onSuccess(Object)} and {@link #onError(Throwable)}
3334
* are not allowed to throw any unchecked exceptions.
@@ -58,7 +59,7 @@ public abstract class DisposableSingleObserver<T> implements SingleObserver<T>,
5859

5960
@Override
6061
public final void onSubscribe(@NonNull Disposable s) {
61-
if (DisposableHelper.setOnce(this.s, s)) {
62+
if (EndConsumerHelper.setOnce(this.s, s, getClass())) {
6263
onStart();
6364
}
6465
}

src/main/java/io/reactivex/observers/ResourceCompletableObserver.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.reactivex.disposables.Disposable;
2121
import io.reactivex.internal.disposables.*;
2222
import io.reactivex.internal.functions.ObjectHelper;
23+
import io.reactivex.internal.util.EndConsumerHelper;
2324

2425
/**
2526
* An abstract {@link CompletableObserver} that allows asynchronous cancellation of its subscription and associated resources.
@@ -44,7 +45,7 @@
4445
*
4546
* <p>Like all other consumers, {@code ResourceCompletableObserver} can be subscribed only once.
4647
* Any subsequent attempt to subscribe it to a new source will yield an
47-
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
48+
* {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) <class name> multiple times."}.
4849
*
4950
* <p>Implementation of {@link #onStart()}, {@link #onError(Throwable)}
5051
* and {@link #onComplete()} are not allowed to throw any unchecked exceptions.
@@ -92,7 +93,7 @@ public final void add(@NonNull Disposable resource) {
9293

9394
@Override
9495
public final void onSubscribe(@NonNull Disposable s) {
95-
if (DisposableHelper.setOnce(this.s, s)) {
96+
if (EndConsumerHelper.setOnce(this.s, s, getClass())) {
9697
onStart();
9798
}
9899
}

src/main/java/io/reactivex/observers/ResourceMaybeObserver.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.reactivex.disposables.Disposable;
2121
import io.reactivex.internal.disposables.*;
2222
import io.reactivex.internal.functions.ObjectHelper;
23+
import io.reactivex.internal.util.EndConsumerHelper;
2324

2425
/**
2526
* An abstract {@link MaybeObserver} that allows asynchronous cancellation of its subscription and associated resources.
@@ -48,7 +49,7 @@
4849
*
4950
* <p>Like all other consumers, {@code ResourceMaybeObserver} can be subscribed only once.
5051
* Any subsequent attempt to subscribe it to a new source will yield an
51-
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
52+
* {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) <class name> multiple times."}.
5253
*
5354
* <p>Implementation of {@link #onStart()}, {@link #onSuccess(Object)}, {@link #onError(Throwable)}
5455
* and {@link #onComplete()} are not allowed to throw any unchecked exceptions.
@@ -102,7 +103,7 @@ public final void add(@NonNull Disposable resource) {
102103

103104
@Override
104105
public final void onSubscribe(@NonNull Disposable s) {
105-
if (DisposableHelper.setOnce(this.s, s)) {
106+
if (EndConsumerHelper.setOnce(this.s, s, getClass())) {
106107
onStart();
107108
}
108109
}

src/main/java/io/reactivex/observers/ResourceObserver.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.reactivex.disposables.Disposable;
2121
import io.reactivex.internal.disposables.*;
2222
import io.reactivex.internal.functions.ObjectHelper;
23+
import io.reactivex.internal.util.EndConsumerHelper;
2324

2425
/**
2526
* An abstract {@link Observer} that allows asynchronous cancellation of its subscription and associated resources.
@@ -41,7 +42,7 @@
4142
*
4243
* <p>Like all other consumers, {@code ResourceObserver} can be subscribed only once.
4344
* Any subsequent attempt to subscribe it to a new source will yield an
44-
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
45+
* {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) <class name> multiple times."}.
4546
*
4647
* <p>Implementation of {@link #onStart()}, {@link #onNext(Object)}, {@link #onError(Throwable)}
4748
* and {@link #onComplete()} are not allowed to throw any unchecked exceptions.
@@ -100,7 +101,7 @@ public final void add(@NonNull Disposable resource) {
100101

101102
@Override
102103
public final void onSubscribe(Disposable s) {
103-
if (DisposableHelper.setOnce(this.s, s)) {
104+
if (EndConsumerHelper.setOnce(this.s, s, getClass())) {
104105
onStart();
105106
}
106107
}

src/main/java/io/reactivex/observers/ResourceSingleObserver.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.reactivex.disposables.Disposable;
2121
import io.reactivex.internal.disposables.*;
2222
import io.reactivex.internal.functions.ObjectHelper;
23+
import io.reactivex.internal.util.EndConsumerHelper;
2324

2425
/**
2526
* An abstract {@link SingleObserver} that allows asynchronous cancellation of its subscription
@@ -45,7 +46,7 @@
4546
*
4647
* <p>Like all other consumers, {@code ResourceSingleObserver} can be subscribed only once.
4748
* Any subsequent attempt to subscribe it to a new source will yield an
48-
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
49+
* {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) <class name> multiple times."}.
4950
*
5051
* <p>Implementation of {@link #onStart()}, {@link #onSuccess(Object)} and {@link #onError(Throwable)}
5152
* are not allowed to throw any unchecked exceptions.
@@ -95,7 +96,7 @@ public final void add(@NonNull Disposable resource) {
9596

9697
@Override
9798
public final void onSubscribe(@NonNull Disposable s) {
98-
if (DisposableHelper.setOnce(this.s, s)) {
99+
if (EndConsumerHelper.setOnce(this.s, s, getClass())) {
99100
onStart();
100101
}
101102
}

0 commit comments

Comments
 (0)