Skip to content

Commit 76bbefc

Browse files
SerializedObserver and SerializedSubscriber
Using the "queue and lock" implementation which won our performance and production testing.
1 parent c103796 commit 76bbefc

9 files changed

+139
-30
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7264,6 +7264,7 @@ public final <R> Observable<R> switchMap(Func1<? super T, ? extends Observable<?
72647264
* @return an Observable that is a chronologically well-behaved version of the source Observable, and that
72657265
* synchronously notifies its {@link Observer}s
72667266
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-synchronize">RxJava Wiki: synchronize()</a>
7267+
* @deprecated Use {@link #serialize()} instead as it doesn't block threads while emitting notification.
72677268
*/
72687269
public final Observable<T> synchronize() {
72697270
return lift(new OperatorSynchronize<T>());
@@ -7288,6 +7289,7 @@ public final Observable<T> synchronize() {
72887289
* @return an Observable that is a chronologically well-behaved version of the source Observable, and that
72897290
* synchronously notifies its {@link Observer}s
72907291
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-synchronize">RxJava Wiki: synchronize()</a>
7292+
* @deprecated Use {@link #serialize()} instead as it doesn't block threads while emitting notification.
72917293
*/
72927294
public final Observable<T> synchronize(Object lock) {
72937295
return lift(new OperatorSynchronize<T>(lock));
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package rx.observers;
2+
3+
import rx.Observer;
4+
5+
/**
6+
* Enforce single-threaded, serialized, ordered execution of onNext, onCompleted, onError.
7+
* <p>
8+
* When multiple threads are notifying they will be serialized by:
9+
* <p>
10+
* <li>Allowing only one thread at a time to emit</li>
11+
* <li>Adding notifications to a queue if another thread is already emitting</li>
12+
* <li>Not holding any locks or blocking any threads while emitting</li>
13+
* <p>
14+
*
15+
* @param <T>
16+
*/
17+
public class SerializedObserver<T> implements Observer<T> {
18+
/*
19+
* Facade to actual implementation until final decision is made
20+
* on the implementation.
21+
*/
22+
private final SerializedObserverViaQueueAndLock<T> actual;
23+
24+
public SerializedObserver(Observer<? super T> observer) {
25+
this.actual = new SerializedObserverViaQueueAndLock<T>(observer);
26+
}
27+
28+
@Override
29+
public void onCompleted() {
30+
actual.onCompleted();
31+
}
32+
33+
@Override
34+
public void onError(Throwable e) {
35+
actual.onError(e);
36+
}
37+
38+
@Override
39+
public void onNext(T t) {
40+
actual.onNext(t);
41+
}
42+
}

rxjava-core/src/main/java/rx/observers/SerializedObserverViaQueueAndCounter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
import rx.Observer;
77

8-
public class SerializedObserverViaQueueAndCounter<T> implements Observer<T> {
8+
/* package */class SerializedObserverViaQueueAndCounter<T> implements Observer<T> {
99
private final Observer<? super T> actual;
1010
private final AtomicInteger count = new AtomicInteger();
1111
private final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>();

rxjava-core/src/main/java/rx/observers/SerializedObserverViaQueueAndLock.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import rx.Observer;
66

7-
public class SerializedObserverViaQueueAndLock<T> implements Observer<T> {
7+
/* package */ class SerializedObserverViaQueueAndLock<T> implements Observer<T> {
88
private final Observer<? super T> actual;
99

1010
private boolean emitting = false;

rxjava-core/src/main/java/rx/observers/SerializedObserverViaStateMachine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import rx.Observer;
66

7-
public class SerializedObserverViaStateMachine<T> implements Observer<T> {
7+
/* package */class SerializedObserverViaStateMachine<T> implements Observer<T> {
88

99
private final AtomicReference<State> state = new AtomicReference<State>(State.createNew());
1010
private final Observer<? super T> s;

rxjava-core/src/main/java/rx/observers/SerializedSubscriber.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,24 @@
33
import rx.Observer;
44
import rx.Subscriber;
55

6+
/**
7+
* Enforce single-threaded, serialized, ordered execution of onNext, onCompleted, onError.
8+
* <p>
9+
* When multiple threads are notifying they will be serialized by:
10+
* <p>
11+
* <li>Allowing only one thread at a time to emit</li>
12+
* <li>Adding notifications to a queue if another thread is already emitting</li>
13+
* <li>Not holding any locks or blocking any threads while emitting</li>
14+
* <p>
15+
*
16+
* @param <T>
17+
*/
618
public class SerializedSubscriber<T> extends Subscriber<T> {
719

820
private final Observer<T> s;
921

1022
public SerializedSubscriber(Subscriber<? super T> s) {
11-
this.s = new SerializedObserverViaQueueAndLock<T>(s);
23+
this.s = new SerializedObserver<T>(s);
1224
}
1325

1426
@Override

rxjava-core/src/main/java/rx/observers/SynchronizedObserver.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
* This ONLY does synchronization. It does not involve itself in safety or subscriptions. See SafeSubscriber for that.
2424
*
2525
* @param <T>
26+
* @deprecated Use SerializedObserver instead as it doesn't block threads during event notification.
2627
*/
28+
@Deprecated
2729
public final class SynchronizedObserver<T> implements Observer<T> {
2830

2931
/**

rxjava-core/src/main/java/rx/observers/SynchronizedSubscriber.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
* </ul>
3030
*
3131
* @param <T>
32+
* @deprecated Use SerializedSubscriber instead as it doesn't block threads during event notification.
3233
*/
34+
@Deprecated
3335
public final class SynchronizedSubscriber<T> extends Subscriber<T> {
3436

3537
private final Observer<? super T> observer;

rxjava-core/src/perf/java/rx/operators/OperatorSerializePerformance.java

Lines changed: 75 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,9 @@
1414
import rx.schedulers.Schedulers;
1515

1616
public class OperatorSerializePerformance extends AbstractPerformanceTester {
17-
// static int reps = Integer.MAX_VALUE / 16384; // timeTwoStreams
18-
19-
static int reps = Integer.MAX_VALUE / 1024; // timeSingleStream
17+
static int reps = Integer.MAX_VALUE / 16384; // timeTwoStreams
2018

19+
// static int reps = Integer.MAX_VALUE / 1024; // timeSingleStream
2120
// static int reps = 1000; // interval streams
2221

2322
OperatorSerializePerformance() {
@@ -32,8 +31,9 @@ public static void main(String args[]) {
3231

3332
@Override
3433
public void call() {
35-
// spt.timeTwoStreams();
36-
spt.timeSingleStream();
34+
spt.timeTwoStreams();
35+
// spt.timeSingleStream();
36+
// spt.timeTwoStreamsIntervals();
3737
}
3838
});
3939
} catch (Exception e) {
@@ -43,24 +43,33 @@ public void call() {
4343
}
4444

4545
/**
46+
* 1 streams emitting in a tight loop. Testing for single-threaded overhead.
47+
*
48+
* -> blocking synchronization (SynchronizedObserver)
4649
*
47-
* -> state machine technique
50+
* Run: 10 - 58,186,310 ops/sec
51+
* Run: 11 - 60,592,037 ops/sec
52+
* Run: 12 - 58,099,263 ops/sec
53+
* Run: 13 - 59,034,765 ops/sec
54+
* Run: 14 - 58,231,548 ops/sec
55+
*
56+
* -> state machine technique (SerializedObserverViaStateMachine)
4857
*
4958
* Run: 10 - 34,668,810 ops/sec
5059
* Run: 11 - 32,874,312 ops/sec
5160
* Run: 12 - 33,389,339 ops/sec
5261
* Run: 13 - 35,269,946 ops/sec
5362
* Run: 14 - 34,165,013 ops/sec
5463
*
55-
* -> using queue and counter technique
64+
* -> using queue and counter technique (SerializedObserverViaQueueAndCounter)
5665
*
5766
* Run: 10 - 19,548,387 ops/sec
5867
* Run: 11 - 19,471,069 ops/sec
5968
* Run: 12 - 19,480,112 ops/sec
6069
* Run: 13 - 18,720,550 ops/sec
6170
* Run: 14 - 19,070,383 ops/sec
6271
*
63-
* -> using queue and lock technique
72+
* -> using queue and lock technique (SerializedObserverViaQueueAndLock)
6473
*
6574
* Run: 10 - 51,295,152 ops/sec
6675
* Run: 11 - 50,317,937 ops/sec
@@ -112,29 +121,41 @@ public void call(Integer t1) {
112121
}
113122

114123
/**
115-
* -> state machine technique
124+
* 2 streams emitting in tight loops so very high contention.
125+
*
126+
* -> blocking synchronization (SynchronizedObserver)
127+
*
128+
* Run: 10 - 8,361,252 ops/sec
129+
* Run: 11 - 7,184,728 ops/sec
130+
* Run: 12 - 8,249,685 ops/sec
131+
* Run: 13 - 6,831,595 ops/sec
132+
* Run: 14 - 8,003,358 ops/sec
133+
*
134+
* (faster because it allows each thread to be "single threaded" while blocking the other)
135+
*
136+
* -> state machine technique (SerializedObserverViaStateMachine)
116137
*
117-
* Run: 10 - 3,432,256 ops/sec
118-
* Run: 11 - 3,570,444 ops/sec
119-
* Run: 12 - 3,791,137 ops/sec
120-
* Run: 13 - 3,664,579 ops/sec
121-
* Run: 14 - 5,211,156 ops/sec
138+
* Run: 10 - 4,060,062 ops/sec
139+
* Run: 11 - 3,561,131 ops/sec
140+
* Run: 12 - 3,721,387 ops/sec
141+
* Run: 13 - 3,693,909 ops/sec
142+
* Run: 14 - 3,516,324 ops/sec
122143
*
123-
* -> using "observeOn" technique
144+
* -> using queue and counter technique (SerializedObserverViaQueueAndCounter)
124145
*
125-
* Run: 10 - 3,995,336 ops/sec
126-
* Run: 11 - 4,033,077 ops/sec
127-
* Run: 12 - 4,510,978 ops/sec
128-
* Run: 13 - 3,218,915 ops/sec
129-
* Run: 14 - 3,938,549 ops/sec
146+
* Run: 10 - 4,300,229 ops/sec
147+
* Run: 11 - 4,395,995 ops/sec
148+
* Run: 12 - 4,551,550 ops/sec
149+
* Run: 13 - 4,443,235 ops/sec
150+
* Run: 14 - 4,158,475 ops/sec
130151
*
131-
* -> using queue and lock technique
152+
* -> using queue and lock technique (SerializedObserverViaQueueAndLock)
132153
*
133-
* Run: 10 - 5,348,090 ops/sec
134-
* Run: 11 - 6,458,608 ops/sec
135-
* Run: 12 - 5,430,743 ops/sec
136-
* Run: 13 - 5,159,666 ops/sec
137-
* Run: 14 - 6,129,682 ops/sec
154+
* Run: 10 - 6,369,781 ops/sec
155+
* Run: 11 - 6,933,872 ops/sec
156+
* Run: 12 - 5,652,535 ops/sec
157+
* Run: 13 - 5,503,716 ops/sec
158+
* Run: 14 - 6,219,264 ops/sec
138159
*/
139160
public long timeTwoStreams() {
140161

@@ -198,11 +219,39 @@ public void call(Integer t1) {
198219
}
199220

200221
/**
222+
* 2 streams emitting once a millisecond. Slow emission so little to no contention.
223+
*
224+
* -> blocking synchronization (SynchronizedObserver)
225+
*
226+
* Run: 10 - 1,996 ops/sec
227+
* Run: 11 - 1,996 ops/sec
228+
* Run: 12 - 1,995 ops/sec
229+
* Run: 13 - 1,997 ops/sec
230+
* Run: 14 - 1,996 ops/sec
231+
*
232+
* -> state machine technique (SerializedObserverViaStateMachine)
233+
*
201234
* Run: 10 - 1,996 ops/sec
202235
* Run: 11 - 1,996 ops/sec
203236
* Run: 12 - 1,996 ops/sec
204237
* Run: 13 - 1,996 ops/sec
205238
* Run: 14 - 1,996 ops/sec
239+
*
240+
* -> using queue and counter technique (SerializedObserverViaQueueAndCounter)
241+
*
242+
* Run: 10 - 1,996 ops/sec
243+
* Run: 11 - 1,996 ops/sec
244+
* Run: 12 - 1,996 ops/sec
245+
* Run: 13 - 1,996 ops/sec
246+
* Run: 14 - 1,995 ops/sec
247+
*
248+
* -> using queue and lock technique (SerializedObserverViaQueueAndLock)
249+
*
250+
* Run: 10 - 1,996 ops/sec
251+
* Run: 11 - 1,996 ops/sec
252+
* Run: 12 - 1,997 ops/sec
253+
* Run: 13 - 1,996 ops/sec
254+
* Run: 14 - 1,995 ops/sec
206255
*/
207256
public long timeTwoStreamsIntervals() {
208257

0 commit comments

Comments
 (0)