Skip to content

Commit c103796

Browse files
queue and lock implementation
1 parent b609607 commit c103796

File tree

4 files changed

+207
-6
lines changed

4 files changed

+207
-6
lines changed
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package rx.observers;
2+
3+
import java.util.ArrayList;
4+
5+
import rx.Observer;
6+
7+
public class SerializedObserverViaQueueAndLock<T> implements Observer<T> {
8+
private final Observer<? super T> actual;
9+
10+
private boolean emitting = false;
11+
private boolean terminated = false;
12+
private ArrayList<Object> queue = new ArrayList<Object>();
13+
14+
private static Sentinel NULL_SENTINEL = new Sentinel();
15+
private static Sentinel COMPLETE_SENTINEL = new Sentinel();
16+
17+
private static class Sentinel {
18+
19+
}
20+
21+
private static class ErrorSentinel extends Sentinel {
22+
final Throwable e;
23+
24+
ErrorSentinel(Throwable e) {
25+
this.e = e;
26+
}
27+
}
28+
29+
public SerializedObserverViaQueueAndLock(Observer<? super T> s) {
30+
this.actual = s;
31+
}
32+
33+
@Override
34+
public void onCompleted() {
35+
boolean canEmit = false;
36+
ArrayList<Object> list = null;
37+
synchronized (this) {
38+
if (terminated) {
39+
return;
40+
}
41+
terminated = true;
42+
if (!emitting) {
43+
// emit immediately
44+
emitting = true;
45+
canEmit = true;
46+
if (queue.size() > 0) {
47+
list = queue; // copy reference
48+
queue = new ArrayList<Object>(); // new version;
49+
}
50+
} else {
51+
// someone else is already emitting so just queue it
52+
queue.add(COMPLETE_SENTINEL);
53+
}
54+
}
55+
if (canEmit) {
56+
// we won the right to emit
57+
try {
58+
drainQueue(list);
59+
actual.onCompleted();
60+
} finally {
61+
synchronized (this) {
62+
emitting = false;
63+
}
64+
}
65+
}
66+
}
67+
68+
@Override
69+
public void onError(final Throwable e) {
70+
boolean canEmit = false;
71+
ArrayList<Object> list = null;
72+
synchronized (this) {
73+
if (terminated) {
74+
return;
75+
}
76+
terminated = true;
77+
if (!emitting) {
78+
// emit immediately
79+
emitting = true;
80+
canEmit = true;
81+
if (queue.size() > 0) {
82+
list = queue; // copy reference
83+
queue = new ArrayList<Object>(); // new version;
84+
}
85+
} else {
86+
// someone else is already emitting so just queue it ... after eliminating the queue to shortcut
87+
queue.clear();
88+
queue.add(new ErrorSentinel(e));
89+
}
90+
}
91+
if (canEmit) {
92+
// we won the right to emit
93+
try {
94+
drainQueue(list);
95+
actual.onError(e);
96+
} finally {
97+
synchronized (this) {
98+
emitting = false;
99+
}
100+
}
101+
}
102+
}
103+
104+
@Override
105+
public void onNext(T t) {
106+
boolean canEmit = false;
107+
ArrayList<Object> list = null;
108+
synchronized (this) {
109+
if (terminated) {
110+
return;
111+
}
112+
if (!emitting) {
113+
// emit immediately
114+
emitting = true;
115+
canEmit = true;
116+
if (queue.size() > 0) {
117+
list = queue; // copy reference
118+
queue = new ArrayList<Object>(); // new version;
119+
}
120+
} else {
121+
// someone else is already emitting so just queue it
122+
if (t == null) {
123+
queue.add(NULL_SENTINEL);
124+
} else {
125+
queue.add(t);
126+
}
127+
}
128+
}
129+
if (canEmit) {
130+
// we won the right to emit
131+
try {
132+
drainQueue(list);
133+
actual.onNext(t);
134+
} finally {
135+
synchronized (this) {
136+
if (terminated) {
137+
list = queue; // copy reference
138+
queue = new ArrayList<Object>(); // new version;
139+
} else {
140+
// release this thread
141+
emitting = false;
142+
canEmit = false;
143+
}
144+
}
145+
}
146+
}
147+
148+
// if terminated this will still be true so let's drain the rest of the queue
149+
if (canEmit) {
150+
drainQueue(list);
151+
}
152+
}
153+
154+
public void drainQueue(ArrayList<Object> list) {
155+
if (list == null || list.size() == 0) {
156+
return;
157+
}
158+
for (Object v : list) {
159+
if (v != null) {
160+
if (v instanceof Sentinel) {
161+
if (v == NULL_SENTINEL) {
162+
actual.onNext(null);
163+
} else if (v == COMPLETE_SENTINEL) {
164+
actual.onCompleted();
165+
} else if (v instanceof ErrorSentinel) {
166+
actual.onError(((ErrorSentinel) v).e);
167+
}
168+
} else {
169+
actual.onNext((T) v);
170+
}
171+
}
172+
}
173+
}
174+
}

rxjava-core/src/main/java/rx/observers/SerializedSubscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ public class SerializedSubscriber<T> extends Subscriber<T> {
88
private final Observer<T> s;
99

1010
public SerializedSubscriber(Subscriber<? super T> s) {
11-
this.s = new SerializedObserverViaStateMachine<T>(s);
11+
this.s = new SerializedObserverViaQueueAndLock<T>(s);
1212
}
1313

1414
@Override

rxjava-core/src/perf/java/rx/operators/OperatorSerializePerformance.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414
import rx.schedulers.Schedulers;
1515

1616
public class OperatorSerializePerformance extends AbstractPerformanceTester {
17-
static int reps = Integer.MAX_VALUE / 16384; // timeTwoStreams
17+
// static int reps = Integer.MAX_VALUE / 16384; // timeTwoStreams
1818

19-
// static int reps = Integer.MAX_VALUE / 1024; // timeSingleStream
19+
static int reps = Integer.MAX_VALUE / 1024; // timeSingleStream
2020

2121
// static int reps = 1000; // interval streams
2222

@@ -32,8 +32,8 @@ public static void main(String args[]) {
3232

3333
@Override
3434
public void call() {
35-
spt.timeTwoStreams();
36-
// spt.timeSingleStream();
35+
// spt.timeTwoStreams();
36+
spt.timeSingleStream();
3737
}
3838
});
3939
} catch (Exception e) {
@@ -52,13 +52,21 @@ public void call() {
5252
* Run: 13 - 35,269,946 ops/sec
5353
* Run: 14 - 34,165,013 ops/sec
5454
*
55-
* -> using "observeOn" technique
55+
* -> using queue and counter technique
5656
*
5757
* Run: 10 - 19,548,387 ops/sec
5858
* Run: 11 - 19,471,069 ops/sec
5959
* Run: 12 - 19,480,112 ops/sec
6060
* Run: 13 - 18,720,550 ops/sec
6161
* Run: 14 - 19,070,383 ops/sec
62+
*
63+
* -> using queue and lock technique
64+
*
65+
* Run: 10 - 51,295,152 ops/sec
66+
* Run: 11 - 50,317,937 ops/sec
67+
* Run: 12 - 51,126,331 ops/sec
68+
* Run: 13 - 52,418,291 ops/sec
69+
* Run: 14 - 51,694,710 ops/sec
6270
*/
6371
public long timeSingleStream() {
6472

@@ -119,6 +127,14 @@ public void call(Integer t1) {
119127
* Run: 12 - 4,510,978 ops/sec
120128
* Run: 13 - 3,218,915 ops/sec
121129
* Run: 14 - 3,938,549 ops/sec
130+
*
131+
* -> using queue and lock technique
132+
*
133+
* Run: 10 - 5,348,090 ops/sec
134+
* Run: 11 - 6,458,608 ops/sec
135+
* Run: 12 - 5,430,743 ops/sec
136+
* Run: 13 - 5,159,666 ops/sec
137+
* Run: 14 - 6,129,682 ops/sec
122138
*/
123139
public long timeTwoStreams() {
124140

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package rx.observers;
2+
3+
import rx.Observer;
4+
5+
public class SerializedObserverViaQueueAndLockTest extends SerializedObserverTest {
6+
@Override
7+
protected Observer<String> serializedObserver(Observer<String> o) {
8+
return new SerializedObserverViaQueueAndLock<String>(o);
9+
}
10+
11+
}

0 commit comments

Comments
 (0)