Skip to content

Commit 4081e3f

Browse files
committed
Merge pull request ReactiveX#2779 from davidmoten/connect-returns-null
OperatorMulticast.connect(connection) should not return null
2 parents 8614754 + 379f07d commit 4081e3f

File tree

2 files changed

+45
-35
lines changed

2 files changed

+45
-35
lines changed

src/main/java/rx/internal/operators/OperatorMulticast.java

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,13 @@ public final class OperatorMulticast<T, R> extends ConnectableObservable<R> {
4141
final Observable<? extends T> source;
4242
final Object guard;
4343
final Func0<? extends Subject<? super T, ? extends R>> subjectFactory;
44-
private final AtomicReference<Subject<? super T, ? extends R>> connectedSubject;
45-
private final List<Subscriber<? super R>> waitingForConnect;
44+
final AtomicReference<Subject<? super T, ? extends R>> connectedSubject;
45+
final List<Subscriber<? super R>> waitingForConnect;
4646

4747
/** Guarded by guard. */
48-
Subscriber<T> subscription;
48+
private Subscriber<T> subscription;
49+
// wraps subscription above for unsubscription using guard
50+
private Subscription guardedSubscription;
4951

5052
public OperatorMulticast(Observable<? extends T> source, final Func0<? extends Subject<? super T, ? extends R>> subjectFactory) {
5153
this(new Object(), new AtomicReference<Subject<? super T, ? extends R>>(), new ArrayList<Subscriber<? super R>>(), source, subjectFactory);
@@ -77,15 +79,13 @@ public void call(Subscriber<? super R> subscriber) {
7779
public void connect(Action1<? super Subscription> connection) {
7880
// each time we connect we create a new Subject and Subscription
7981

80-
boolean shouldSubscribe = false;
81-
8282
// subscription is the state of whether we are connected or not
8383
synchronized (guard) {
8484
if (subscription != null) {
85-
// already connected, return as there is nothing to do
85+
// already connected
86+
connection.call(guardedSubscription);
8687
return;
8788
} else {
88-
shouldSubscribe = true;
8989
// we aren't connected, so let's create a new Subject and connect
9090
final Subject<? super T, ? extends R> subject = subjectFactory.call();
9191
// create new Subscriber that will pass-thru to the subject we just created
@@ -106,6 +106,26 @@ public void onNext(T args) {
106106
subject.onNext(args);
107107
}
108108
};
109+
final AtomicReference<Subscription> gs = new AtomicReference<Subscription>();
110+
gs.set(Subscriptions.create(new Action0() {
111+
@Override
112+
public void call() {
113+
Subscription s;
114+
synchronized (guard) {
115+
if ( guardedSubscription == gs.get()) {
116+
s = subscription;
117+
subscription = null;
118+
guardedSubscription = null;
119+
connectedSubject.set(null);
120+
} else
121+
return;
122+
}
123+
if (s != null) {
124+
s.unsubscribe();
125+
}
126+
}
127+
}));
128+
guardedSubscription = gs.get();
109129

110130
// register any subscribers that are waiting with this new subject
111131
for(Subscriber<? super R> s : waitingForConnect) {
@@ -116,34 +136,20 @@ public void onNext(T args) {
116136
// record the Subject so OnSubscribe can see it
117137
connectedSubject.set(subject);
118138
}
139+
119140
}
120141

121142
// in the lock above we determined we should subscribe, do it now outside the lock
122-
if (shouldSubscribe) {
123-
// register a subscription that will shut this down
124-
connection.call(Subscriptions.create(new Action0() {
125-
@Override
126-
public void call() {
127-
Subscription s;
128-
synchronized (guard) {
129-
s = subscription;
130-
subscription = null;
131-
connectedSubject.set(null);
132-
}
133-
if (s != null) {
134-
s.unsubscribe();
135-
}
136-
}
137-
}));
143+
// register a subscription that will shut this down
144+
connection.call(guardedSubscription);
138145

139-
// now that everything is hooked up let's subscribe
140-
// as long as the subscription is not null (which can happen if already unsubscribed)
141-
boolean subscriptionIsNull;
142-
synchronized(guard) {
143-
subscriptionIsNull = subscription == null;
144-
}
145-
if (!subscriptionIsNull)
146-
source.subscribe(subscription);
146+
// now that everything is hooked up let's subscribe
147+
// as long as the subscription is not null (which can happen if already unsubscribed)
148+
Subscriber<T> sub;
149+
synchronized (guard) {
150+
sub = subscription;
147151
}
152+
if (sub != null)
153+
source.subscribe(sub);
148154
}
149155
}

src/test/java/rx/internal/operators/OnSubscribeMulticastTest.java renamed to src/test/java/rx/internal/operators/OperatorMulticastTest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertEquals;
1819
import static org.mockito.Mockito.mock;
1920
import static org.mockito.Mockito.never;
2021
import static org.mockito.Mockito.times;
2122
import static org.mockito.Mockito.verify;
2223

24+
import org.junit.Assert;
2325
import org.junit.Test;
2426

2527
import rx.Observer;
@@ -29,7 +31,7 @@
2931
import rx.subjects.PublishSubject;
3032
import rx.subjects.Subject;
3133

32-
public class OnSubscribeMulticastTest {
34+
public class OperatorMulticastTest {
3335

3436
@Test
3537
public void testMulticast() {
@@ -70,15 +72,17 @@ public void testMulticastConnectTwice() {
7072

7173
source.onNext("one");
7274

73-
multicasted.connect();
74-
multicasted.connect();
75-
75+
Subscription sub = multicasted.connect();
76+
Subscription sub2 = multicasted.connect();
77+
7678
source.onNext("two");
7779
source.onCompleted();
7880

7981
verify(observer, never()).onNext("one");
8082
verify(observer, times(1)).onNext("two");
8183
verify(observer, times(1)).onCompleted();
84+
85+
assertEquals(sub, sub2);
8286

8387
}
8488

0 commit comments

Comments
 (0)