Skip to content

Commit 382d6df

Browse files
committed
OperatorMulticast.connect(connection) should return first subscription on multiple calls, address possible race condition provoking IAE
1 parent 8614754 commit 382d6df

File tree

2 files changed

+38
-28
lines changed

2 files changed

+38
-28
lines changed

src/main/java/rx/internal/operators/OperatorMulticast.java

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,16 @@
3838
* the result value type
3939
*/
4040
public final class OperatorMulticast<T, R> extends ConnectableObservable<R> {
41-
final Observable<? extends T> source;
42-
final Object guard;
43-
final Func0<? extends Subject<? super T, ? extends R>> subjectFactory;
41+
private final Observable<? extends T> source;
42+
private final Object guard;
43+
private final Func0<? extends Subject<? super T, ? extends R>> subjectFactory;
4444
private final AtomicReference<Subject<? super T, ? extends R>> connectedSubject;
4545
private final List<Subscriber<? super R>> waitingForConnect;
4646

4747
/** Guarded by guard. */
48-
Subscriber<T> subscription;
48+
private Subscriber<T> subscription;
49+
// wraps subscription above with for unsubscription using guard
50+
private Subscription guardedSubscription;
4951

5052
public OperatorMulticast(Observable<? extends T> source, final Func0<? extends Subject<? super T, ? extends R>> subjectFactory) {
5153
this(new Object(), new AtomicReference<Subject<? super T, ? extends R>>(), new ArrayList<Subscriber<? super R>>(), source, subjectFactory);
@@ -82,7 +84,8 @@ public void connect(Action1<? super Subscription> connection) {
8284
// subscription is the state of whether we are connected or not
8385
synchronized (guard) {
8486
if (subscription != null) {
85-
// already connected, return as there is nothing to do
87+
// already connected
88+
connection.call(guardedSubscription);
8689
return;
8790
} else {
8891
shouldSubscribe = true;
@@ -106,6 +109,21 @@ public void onNext(T args) {
106109
subject.onNext(args);
107110
}
108111
};
112+
guardedSubscription = Subscriptions.create(new Action0() {
113+
@Override
114+
public void call() {
115+
Subscription s;
116+
synchronized (guard) {
117+
s = subscription;
118+
subscription = null;
119+
guardedSubscription = null;
120+
connectedSubject.set(null);
121+
}
122+
if (s != null) {
123+
s.unsubscribe();
124+
}
125+
}
126+
});
109127

110128
// register any subscribers that are waiting with this new subject
111129
for(Subscriber<? super R> s : waitingForConnect) {
@@ -116,34 +134,22 @@ public void onNext(T args) {
116134
// record the Subject so OnSubscribe can see it
117135
connectedSubject.set(subject);
118136
}
137+
119138
}
120139

121140
// in the lock above we determined we should subscribe, do it now outside the lock
122141
if (shouldSubscribe) {
123142
// register a subscription that will shut this down
124-
connection.call(Subscriptions.create(new Action0() {
125-
@Override
126-
public void call() {
127-
Subscription s;
128-
synchronized (guard) {
129-
s = subscription;
130-
subscription = null;
131-
connectedSubject.set(null);
132-
}
133-
if (s != null) {
134-
s.unsubscribe();
135-
}
136-
}
137-
}));
143+
connection.call(guardedSubscription);
138144

139145
// now that everything is hooked up let's subscribe
140146
// as long as the subscription is not null (which can happen if already unsubscribed)
141-
boolean subscriptionIsNull;
142-
synchronized(guard) {
143-
subscriptionIsNull = subscription == null;
147+
Subscriber<T> sub;
148+
synchronized (guard) {
149+
sub = subscription;
144150
}
145-
if (!subscriptionIsNull)
146-
source.subscribe(subscription);
151+
if (sub != null)
152+
source.subscribe(sub);
147153
}
148154
}
149155
}

src/test/java/rx/internal/operators/OnSubscribeMulticastTest.java renamed to src/test/java/rx/internal/operators/OperatorMulticastTest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertEquals;
1819
import static org.mockito.Mockito.mock;
1920
import static org.mockito.Mockito.never;
2021
import static org.mockito.Mockito.times;
2122
import static org.mockito.Mockito.verify;
2223

24+
import org.junit.Assert;
2325
import org.junit.Test;
2426

2527
import rx.Observer;
@@ -29,7 +31,7 @@
2931
import rx.subjects.PublishSubject;
3032
import rx.subjects.Subject;
3133

32-
public class OnSubscribeMulticastTest {
34+
public class OperatorMulticastTest {
3335

3436
@Test
3537
public void testMulticast() {
@@ -70,15 +72,17 @@ public void testMulticastConnectTwice() {
7072

7173
source.onNext("one");
7274

73-
multicasted.connect();
74-
multicasted.connect();
75-
75+
Subscription sub = multicasted.connect();
76+
Subscription sub2 = multicasted.connect();
77+
7678
source.onNext("two");
7779
source.onCompleted();
7880

7981
verify(observer, never()).onNext("one");
8082
verify(observer, times(1)).onNext("two");
8183
verify(observer, times(1)).onCompleted();
84+
85+
assertEquals(sub, sub2);
8286

8387
}
8488

0 commit comments

Comments
 (0)