Skip to content

Commit cafabff

Browse files
Merge pull request ReactiveX#1515 from benjchristensen/multicast-factory
Support Subject Factory with Multicast
2 parents 254cac1 + ed79e95 commit cafabff

File tree

4 files changed

+235
-54
lines changed

4 files changed

+235
-54
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 115 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5414,8 +5414,43 @@ public final <TIntermediate, TResult> Observable<TResult> multicast(
54145414
* @see <a href="https://github.com/Netflix/RxJava/wiki/Connectable-Observable-Operators#observablepublish-and-observablemulticast">RxJava wiki: Observable.publish and Observable.multicast</a>
54155415
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229708.aspx">MSDN: Observable.Multicast</a>
54165416
*/
5417-
public final <R> ConnectableObservable<R> multicast(Subject<? super T, ? extends R> subject) {
5418-
return new OperatorMulticast<T, R>(this, subject);
5417+
public final <R> ConnectableObservable<R> multicast(final Subject<? super T, ? extends R> subject) {
5418+
return new OperatorMulticast<T, R>(this, new Func0<Subject<? super T, ? extends R>>() {
5419+
5420+
@Override
5421+
public Subject<? super T, ? extends R> call() {
5422+
// same one every time, no factory behavior
5423+
return subject;
5424+
}
5425+
5426+
});
5427+
}
5428+
5429+
/**
5430+
* Returns a {@link ConnectableObservable} that upon connection causes the source Observable to push results
5431+
* into the specified subject. A Connectable Observable resembles an ordinary Observable, except that it
5432+
* does not begin emitting items when it is subscribed to, but only when its {@code connect} method
5433+
* is called.
5434+
* <dl>
5435+
* <dt><b>Backpressure Support:</b></dt>
5436+
* <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
5437+
* multiple subscribers. Each child will need to manage backpressure independently using operators such
5438+
* as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
5439+
* <dt><b>Scheduler:</b></dt>
5440+
* <dd>{@code multicast} does not operate by default on a particular {@link Scheduler}.</dd>
5441+
* </dl>
5442+
*
5443+
* @param subjectFactory
5444+
* Func that creates a new {@link Subject} for the {@link ConnectableObservable} to push source items into
5445+
* @param <R>
5446+
* the type of items emitted by the resulting {@code ConnectableObservable}
5447+
* @return a {@link ConnectableObservable} that upon connection causes the source Observable to push results
5448+
* into the specified {@link Subject}
5449+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Connectable-Observable-Operators#observablepublish-and-observablemulticast">RxJava wiki: Observable.publish and Observable.multicast</a>
5450+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229708.aspx">MSDN: Observable.Multicast</a>
5451+
*/
5452+
public final <R> ConnectableObservable<R> multicast(Func0<? extends Subject<? super T, ? extends R>> subjectFactory) {
5453+
return new OperatorMulticast<T, R>(this, subjectFactory);
54195454
}
54205455

54215456
/**
@@ -5724,7 +5759,14 @@ public final <R> Observable<R> parallel(final Func1<Observable<T>, Observable<R>
57245759
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publish.aspx">MSDN: Observable.Publish</a>
57255760
*/
57265761
public final ConnectableObservable<T> publish() {
5727-
return new OperatorMulticast<T, T>(this, PublishSubject.<T> create());
5762+
return new OperatorMulticast<T, T>(this, new Func0<Subject<? super T, ? extends T>>() {
5763+
5764+
@Override
5765+
public Subject<? super T, ? extends T> call() {
5766+
return PublishSubject.<T> create();
5767+
}
5768+
5769+
});
57285770
}
57295771

57305772
/**
@@ -5819,8 +5861,15 @@ public final Subject<T, T> call() {
58195861
* @see <a href="https://github.com/Netflix/RxJava/wiki/Connectable-Observable-Operators#observablepublish-and-observablemulticast">RxJava wiki: publish</a>
58205862
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publish.aspx">MSDN: Observable.Publish</a>
58215863
*/
5822-
public final ConnectableObservable<T> publish(T initialValue) {
5823-
return new OperatorMulticast<T, T>(this, BehaviorSubject.<T> create(initialValue));
5864+
public final ConnectableObservable<T> publish(final T initialValue) {
5865+
return new OperatorMulticast<T, T>(this, new Func0<Subject<? super T, ? extends T>>() {
5866+
5867+
@Override
5868+
public Subject<? super T, ? extends T> call() {
5869+
return BehaviorSubject.<T> create(initialValue);
5870+
}
5871+
5872+
});
58245873
}
58255874

58265875
/**
@@ -5842,7 +5891,14 @@ public final ConnectableObservable<T> publish(T initialValue) {
58425891
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publishlast.aspx">MSDN: Observable.PublishLast</a>
58435892
*/
58445893
public final ConnectableObservable<T> publishLast() {
5845-
return new OperatorMulticast<T, T>(this, AsyncSubject.<T> create());
5894+
return new OperatorMulticast<T, T>(this, new Func0<Subject<? super T, ? extends T>>() {
5895+
5896+
@Override
5897+
public Subject<? super T, ? extends T> call() {
5898+
return AsyncSubject.<T> create();
5899+
}
5900+
5901+
});
58465902
}
58475903

58485904
/**
@@ -6112,7 +6168,14 @@ public final Observable<T> repeatWhen(Func1<? super Observable<? extends Notific
61126168
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.replay.aspx">MSDN: Observable.Replay</a>
61136169
*/
61146170
public final ConnectableObservable<T> replay() {
6115-
return new OperatorMulticast<T, T>(this, ReplaySubject.<T> create());
6171+
return new OperatorMulticast<T, T>(this, new Func0<Subject<? super T, ? extends T>>() {
6172+
6173+
@Override
6174+
public Subject<? super T, ? extends T> call() {
6175+
return ReplaySubject.<T> create();
6176+
}
6177+
6178+
});
61166179
}
61176180

61186181
/**
@@ -6444,8 +6507,15 @@ public final Subject<T, T> call() {
64446507
* @see <a href="https://github.com/Netflix/RxJava/wiki/Connectable-Observable-Operators#observablereplay">RxJava wiki: replay</a>
64456508
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211976.aspx">MSDN: Observable.Replay</a>
64466509
*/
6447-
public final ConnectableObservable<T> replay(int bufferSize) {
6448-
return new OperatorMulticast<T, T>(this, ReplaySubject.<T>createWithSize(bufferSize));
6510+
public final ConnectableObservable<T> replay(final int bufferSize) {
6511+
return new OperatorMulticast<T, T>(this, new Func0<Subject<? super T, ? extends T>>() {
6512+
6513+
@Override
6514+
public Subject<? super T, ? extends T> call() {
6515+
return ReplaySubject.<T>createWithSize(bufferSize);
6516+
}
6517+
6518+
});
64496519
}
64506520

64516521
/**
@@ -6512,11 +6582,18 @@ public final ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit
65126582
* @see <a href="https://github.com/Netflix/RxJava/wiki/Connectable-Observable-Operators#observablereplay">RxJava wiki: replay</a>
65136583
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211759.aspx">MSDN: Observable.Replay</a>
65146584
*/
6515-
public final ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler) {
6585+
public final ConnectableObservable<T> replay(final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) {
65166586
if (bufferSize < 0) {
65176587
throw new IllegalArgumentException("bufferSize < 0");
65186588
}
6519-
return new OperatorMulticast<T, T>(this, ReplaySubject.<T>createWithTimeAndSize(time, unit, bufferSize, scheduler));
6589+
return new OperatorMulticast<T, T>(this, new Func0<Subject<? super T, ? extends T>>() {
6590+
6591+
@Override
6592+
public Subject<? super T, ? extends T> call() {
6593+
return ReplaySubject.<T>createWithTimeAndSize(time, unit, bufferSize, scheduler);
6594+
}
6595+
6596+
});
65206597
}
65216598

65226599
/**
@@ -6544,10 +6621,15 @@ public final ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit
65446621
* @see <a href="https://github.com/Netflix/RxJava/wiki/Connectable-Observable-Operators#observablereplay">RxJava wiki: replay</a>
65456622
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229814.aspx">MSDN: Observable.Replay</a>
65466623
*/
6547-
public final ConnectableObservable<T> replay(int bufferSize, Scheduler scheduler) {
6548-
return new OperatorMulticast<T, T>(this,
6549-
OperatorReplay.createScheduledSubject(
6550-
ReplaySubject.<T>createWithSize(bufferSize), scheduler));
6624+
public final ConnectableObservable<T> replay(final int bufferSize, final Scheduler scheduler) {
6625+
return new OperatorMulticast<T, T>(this, new Func0<Subject<? super T, ? extends T>>() {
6626+
6627+
@Override
6628+
public Subject<? super T, ? extends T> call() {
6629+
return OperatorReplay.createScheduledSubject(ReplaySubject.<T>createWithSize(bufferSize), scheduler);
6630+
}
6631+
6632+
});
65516633
}
65526634

65536635
/**
@@ -6606,8 +6688,15 @@ public final ConnectableObservable<T> replay(long time, TimeUnit unit) {
66066688
* @see <a href="https://github.com/Netflix/RxJava/wiki/Connectable-Observable-Operators#observablereplay">RxJava wiki: replay</a>
66076689
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211811.aspx">MSDN: Observable.Replay</a>
66086690
*/
6609-
public final ConnectableObservable<T> replay(long time, TimeUnit unit, Scheduler scheduler) {
6610-
return new OperatorMulticast<T, T>(this, ReplaySubject.<T>createWithTime(time, unit, scheduler));
6691+
public final ConnectableObservable<T> replay(final long time, final TimeUnit unit, final Scheduler scheduler) {
6692+
return new OperatorMulticast<T, T>(this, new Func0<Subject<? super T, ? extends T>>() {
6693+
6694+
@Override
6695+
public Subject<? super T, ? extends T> call() {
6696+
return ReplaySubject.<T>createWithTime(time, unit, scheduler);
6697+
}
6698+
6699+
});
66116700
}
66126701

66136702
/**
@@ -6634,8 +6723,15 @@ public final ConnectableObservable<T> replay(long time, TimeUnit unit, Scheduler
66346723
* @see <a href="https://github.com/Netflix/RxJava/wiki/Connectable-Observable-Operators#observablereplay">RxJava wiki: replay</a>
66356724
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211699.aspx">MSDN: Observable.Replay</a>
66366725
*/
6637-
public final ConnectableObservable<T> replay(Scheduler scheduler) {
6638-
return new OperatorMulticast<T, T>(this, OperatorReplay.createScheduledSubject(ReplaySubject.<T> create(), scheduler));
6726+
public final ConnectableObservable<T> replay(final Scheduler scheduler) {
6727+
return new OperatorMulticast<T, T>(this, new Func0<Subject<? super T, ? extends T>>() {
6728+
6729+
@Override
6730+
public Subject<? super T, ? extends T> call() {
6731+
return OperatorReplay.createScheduledSubject(ReplaySubject.<T> create(), scheduler);
6732+
}
6733+
6734+
});
66396735
}
66406736

66416737
/**

rxjava-core/src/main/java/rx/internal/operators/OnSubscribeMulticastSelector.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import rx.functions.Func1;
2525
import rx.observables.ConnectableObservable;
2626
import rx.observers.SafeSubscriber;
27+
import rx.subjects.ReplaySubject;
2728
import rx.subjects.Subject;
2829

2930
/**
@@ -54,9 +55,7 @@ public void call(Subscriber<? super TResult> child) {
5455
Observable<TResult> observable;
5556
ConnectableObservable<TIntermediate> connectable;
5657
try {
57-
Subject<? super TInput, ? extends TIntermediate> subject = subjectFactory.call();
58-
59-
connectable = new OperatorMulticast<TInput, TIntermediate>(source, subject);
58+
connectable = new OperatorMulticast<TInput, TIntermediate>(source, subjectFactory);
6059

6160
observable = resultSelector.call(connectable);
6261
} catch (Throwable t) {

rxjava-core/src/main/java/rx/internal/operators/OperatorMulticast.java

Lines changed: 81 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -15,58 +15,82 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import java.util.ArrayList;
19+
import java.util.List;
20+
import java.util.concurrent.atomic.AtomicReference;
21+
1822
import rx.Observable;
1923
import rx.Subscriber;
2024
import rx.Subscription;
2125
import rx.functions.Action0;
2226
import rx.functions.Action1;
27+
import rx.functions.Func0;
2328
import rx.observables.ConnectableObservable;
2429
import rx.subjects.Subject;
2530
import rx.subscriptions.Subscriptions;
2631

2732
/**
2833
* Shares a single subscription to a source through a Subject.
2934
*
30-
* @param <T> the source value type
31-
* @param <R> the result value type
35+
* @param <T>
36+
* the source value type
37+
* @param <R>
38+
* the result value type
3239
*/
3340
public final class OperatorMulticast<T, R> extends ConnectableObservable<R> {
3441
final Observable<? extends T> source;
35-
final Subject<? super T, ? extends R> subject;
36-
final Object guard = new Object();
42+
final Object guard;
43+
final Func0<? extends Subject<? super T, ? extends R>> subjectFactory;
44+
private final AtomicReference<Subject<? super T, ? extends R>> connectedSubject;
45+
private final List<Subscriber<? super R>> waitingForConnect;
46+
3747
/** Guarded by guard. */
38-
Subscription subscription;
48+
Subscriber<T> subscription;
49+
50+
public OperatorMulticast(Observable<? extends T> source, final Func0<? extends Subject<? super T, ? extends R>> subjectFactory) {
51+
this(new Object(), new AtomicReference<Subject<? super T, ? extends R>>(), new ArrayList<Subscriber<? super R>>(), source, subjectFactory);
52+
}
3953

40-
public OperatorMulticast(Observable<? extends T> source, final Subject<? super T, ? extends R> subject) {
54+
private OperatorMulticast(final Object guard, final AtomicReference<Subject<? super T, ? extends R>> connectedSubject, final List<Subscriber<? super R>> waitingForConnect, Observable<? extends T> source, final Func0<? extends Subject<? super T, ? extends R>> subjectFactory) {
4155
super(new OnSubscribe<R>() {
42-
@Override
43-
public void call(Subscriber<? super R> subscriber) {
44-
subject.unsafeSubscribe(subscriber);
56+
@Override
57+
public void call(Subscriber<? super R> subscriber) {
58+
synchronized (guard) {
59+
if (connectedSubject.get() == null) {
60+
// not connected yet, so register
61+
waitingForConnect.add(subscriber);
62+
} else {
63+
// we are already connected so subscribe directly
64+
connectedSubject.get().unsafeSubscribe(subscriber);
65+
}
4566
}
46-
});
67+
}
68+
});
69+
this.guard = guard;
70+
this.connectedSubject = connectedSubject;
71+
this.waitingForConnect = waitingForConnect;
4772
this.source = source;
48-
this.subject = subject;
73+
this.subjectFactory = subjectFactory;
4974
}
5075

5176
@Override
5277
public void connect(Action1<? super Subscription> connection) {
53-
connection.call(Subscriptions.create(new Action0() {
54-
@Override
55-
public void call() {
56-
Subscription s;
57-
synchronized (guard) {
58-
s = subscription;
59-
subscription = null;
60-
}
61-
if (s != null) {
62-
s.unsubscribe();
63-
}
64-
}
65-
}));
66-
Subscriber<T> s = null;
78+
// each time we connect we create a new Subject and Subscription
79+
80+
boolean shouldSubscribe = false;
81+
82+
// subscription is the state of whether we are connected or not
6783
synchronized (guard) {
68-
if (subscription == null) {
69-
s = new Subscriber<T>() {
84+
if (subscription != null) {
85+
// already connected, return as there is nothing to do
86+
return;
87+
} else {
88+
shouldSubscribe = true;
89+
// we aren't connected, so let's create a new Subject and connect
90+
final Subject<? super T, ? extends R> subject = subjectFactory.call();
91+
// create new Subscriber that will pass-thru to the subject we just created
92+
// we do this since it is also a Subscription whereas the Subject is not
93+
subscription = new Subscriber<T>() {
7094
@Override
7195
public void onCompleted() {
7296
subject.onCompleted();
@@ -82,11 +106,38 @@ public void onNext(T args) {
82106
subject.onNext(args);
83107
}
84108
};
85-
subscription = s;
109+
110+
// register any subscribers that are waiting with this new subject
111+
for(Subscriber<? super R> s : waitingForConnect) {
112+
subject.unsafeSubscribe(s);
113+
}
114+
// clear the waiting list as any new ones that come in after leaving this synchronized block will go direct to the Subject
115+
waitingForConnect.clear();
116+
// record the Subject so OnSubscribe can see it
117+
connectedSubject.set(subject);
86118
}
87119
}
88-
if (s != null) {
89-
source.unsafeSubscribe(s);
120+
121+
// in the lock above we determined we should subscribe, do it now outside the lock
122+
if (shouldSubscribe) {
123+
// register a subscription that will shut this down
124+
connection.call(Subscriptions.create(new Action0() {
125+
@Override
126+
public void call() {
127+
Subscription s;
128+
synchronized (guard) {
129+
s = subscription;
130+
subscription = null;
131+
connectedSubject.set(null);
132+
}
133+
if (s != null) {
134+
s.unsubscribe();
135+
}
136+
}
137+
}));
138+
139+
// now that everything is hooked up let's subscribe
140+
source.unsafeSubscribe(subscription);
90141
}
91142
}
92143
}

0 commit comments

Comments
 (0)