|
38 | 38 | * the result value type
|
39 | 39 | */
|
40 | 40 | public final class OperatorMulticast<T, R> extends ConnectableObservable<R> {
|
41 |
| - private final Observable<? extends T> source; |
42 |
| - private final Object guard; |
43 |
| - private final Func0<? extends Subject<? super T, ? extends R>> subjectFactory; |
44 |
| - private final AtomicReference<Subject<? super T, ? extends R>> connectedSubject; |
45 |
| - private final List<Subscriber<? super R>> waitingForConnect; |
| 41 | + final Observable<? extends T> source; |
| 42 | + final Object guard; |
| 43 | + final Func0<? extends Subject<? super T, ? extends R>> subjectFactory; |
| 44 | + final AtomicReference<Subject<? super T, ? extends R>> connectedSubject; |
| 45 | + final List<Subscriber<? super R>> waitingForConnect; |
46 | 46 |
|
47 | 47 | /** Guarded by guard. */
|
48 | 48 | private Subscriber<T> subscription;
|
@@ -109,21 +109,26 @@ public void onNext(T args) {
|
109 | 109 | subject.onNext(args);
|
110 | 110 | }
|
111 | 111 | };
|
112 |
| - guardedSubscription = Subscriptions.create(new Action0() { |
| 112 | + final AtomicReference<Subscription> gs = new AtomicReference<Subscription>(); |
| 113 | + gs.set(Subscriptions.create(new Action0() { |
113 | 114 | @Override
|
114 | 115 | public void call() {
|
115 | 116 | Subscription s;
|
116 | 117 | synchronized (guard) {
|
117 |
| - s = subscription; |
118 |
| - subscription = null; |
119 |
| - guardedSubscription = null; |
120 |
| - connectedSubject.set(null); |
| 118 | + if ( guardedSubscription == gs.get()) { |
| 119 | + s = subscription; |
| 120 | + subscription = null; |
| 121 | + guardedSubscription = null; |
| 122 | + connectedSubject.set(null); |
| 123 | + } else |
| 124 | + return; |
121 | 125 | }
|
122 | 126 | if (s != null) {
|
123 | 127 | s.unsubscribe();
|
124 | 128 | }
|
125 | 129 | }
|
126 |
| - }); |
| 130 | + })); |
| 131 | + guardedSubscription = gs.get(); |
127 | 132 |
|
128 | 133 | // register any subscribers that are waiting with this new subject
|
129 | 134 | for(Subscriber<? super R> s : waitingForConnect) {
|
|
0 commit comments