Skip to content

Commit 4427d03

Browse files
OperatorSerialize
1 parent 9a0f54f commit 4427d03

File tree

8 files changed

+1871
-2
lines changed

8 files changed

+1871
-2
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
import rx.operators.OnSubscribeFromIterable;
5353
import rx.operators.OnSubscribeRange;
5454
import rx.operators.OperationAll;
55-
import rx.operators.OperatorAmb;
5655
import rx.operators.OperationAny;
5756
import rx.operators.OperationAsObservable;
5857
import rx.operators.OperationAverage;
@@ -91,7 +90,6 @@
9190
import rx.operators.OperationSkip;
9291
import rx.operators.OperationSkipLast;
9392
import rx.operators.OperationSkipUntil;
94-
import rx.operators.OperatorSkipWhile;
9593
import rx.operators.OperationSum;
9694
import rx.operators.OperationSwitch;
9795
import rx.operators.OperationTakeLast;
@@ -106,6 +104,7 @@
106104
import rx.operators.OperationToObservableFuture;
107105
import rx.operators.OperationUsing;
108106
import rx.operators.OperationWindow;
107+
import rx.operators.OperatorAmb;
109108
import rx.operators.OperatorCast;
110109
import rx.operators.OperatorDoOnEach;
111110
import rx.operators.OperatorFilter;
@@ -119,7 +118,9 @@
119118
import rx.operators.OperatorRepeat;
120119
import rx.operators.OperatorRetry;
121120
import rx.operators.OperatorScan;
121+
import rx.operators.OperatorSerialize;
122122
import rx.operators.OperatorSkip;
123+
import rx.operators.OperatorSkipWhile;
123124
import rx.operators.OperatorSubscribeOn;
124125
import rx.operators.OperatorSynchronize;
125126
import rx.operators.OperatorTake;
@@ -6197,6 +6198,10 @@ public final <R> Observable<R> scan(R initialValue, Func2<R, ? super T, R> accum
61976198
return lift(new OperatorScan<R, T>(initialValue, accumulator));
61986199
}
61996200

6201+
public final Observable<T> serialize() {
6202+
return lift(new OperatorSerialize<T>());
6203+
}
6204+
62006205
/**
62016206
* If the source Observable completes after emitting a single item, return an Observable that emits that
62026207
* item. If the source Observable emits more than one item or no items, throw an
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
package rx.observers;
2+
3+
import java.util.concurrent.atomic.AtomicInteger;
4+
import java.util.concurrent.atomic.AtomicReference;
5+
6+
import rx.Observer;
7+
8+
public class SerializedObserver<T> implements Observer<T> {
9+
10+
private final AtomicReference<State> state = new AtomicReference<State>(State.createNew());
11+
private final Observer<T> s;
12+
13+
public SerializedObserver(Observer<T> s) {
14+
this.s = s;
15+
}
16+
17+
@Override
18+
public void onCompleted() {
19+
State current = null;
20+
State newState = null;
21+
do {
22+
current = state.get();
23+
if (current.isTerminated()) {
24+
// already received terminal state
25+
return;
26+
}
27+
newState = current.complete();
28+
} while (!state.compareAndSet(current, newState));
29+
if (newState.count == 0) {
30+
s.onCompleted();
31+
}
32+
}
33+
34+
@Override
35+
public void onError(Throwable e) {
36+
State current = null;
37+
State newState = null;
38+
do {
39+
current = state.get();
40+
if (current.isTerminated()) {
41+
// already received terminal state
42+
return;
43+
}
44+
newState = current.error(e);
45+
} while (!state.compareAndSet(current, newState));
46+
if (newState.count == 0) {
47+
s.onError(e);
48+
}
49+
}
50+
51+
@SuppressWarnings("unchecked")
52+
@Override
53+
public void onNext(T t) {
54+
State current = null;
55+
State newState = null;
56+
do {
57+
current = state.get();
58+
if (current.isTerminated()) {
59+
// already received terminal state
60+
return;
61+
}
62+
newState = current.increment(t);
63+
} while (!state.compareAndSet(current, newState));
64+
65+
if (newState.count == 1) {
66+
// this thread wins and will emit then drain queue if it concurrently gets added to
67+
try {
68+
s.onNext(t);
69+
} finally {
70+
// decrement after finishing
71+
do {
72+
current = state.get();
73+
newState = current.decrement();
74+
} while (!state.compareAndSet(current, newState));
75+
}
76+
77+
// drain queue if exists
78+
// we do "if" instead of "while" so we don't starve one thread
79+
if (newState.queue.length > 0) {
80+
Object[] items = newState.queue;
81+
for (int i = 0; i < items.length; i++) {
82+
s.onNext((T) items[i]);
83+
}
84+
// clear state of queue
85+
do {
86+
current = state.get();
87+
newState = current.drain(items.length);
88+
} while (!state.compareAndSet(current, newState));
89+
terminateIfNecessary(newState);
90+
} else {
91+
terminateIfNecessary(newState);
92+
}
93+
94+
}
95+
}
96+
97+
private void terminateIfNecessary(State state) {
98+
if (state.isTerminated()) {
99+
if (state.onComplete) {
100+
s.onCompleted();
101+
} else {
102+
s.onError(state.onError);
103+
}
104+
}
105+
}
106+
107+
public static class State {
108+
final int count;
109+
final Object[] queue;
110+
final boolean onComplete;
111+
final Throwable onError;
112+
113+
private final static Object[] EMPTY = new Object[0];
114+
115+
private final static State NON_TERMINATED_EMPTY = new State(0, false, null, EMPTY);
116+
private final static State NON_TERMINATED_SINGLE = new State(1, false, null, EMPTY);
117+
118+
public State(int count, boolean onComplete, Throwable onError, Object[] queue) {
119+
this.count = count;
120+
this.queue = queue;
121+
this.onComplete = onComplete;
122+
this.onError = onError;
123+
}
124+
125+
public static State createNew() {
126+
return new State(0, false, null, EMPTY);
127+
}
128+
129+
public boolean isTerminated() {
130+
return onComplete || onError != null;
131+
}
132+
133+
public State complete() {
134+
return new State(count, true, onError, queue);
135+
}
136+
137+
public State error(Throwable e) {
138+
return new State(count, onComplete, e, queue);
139+
}
140+
141+
AtomicInteger max = new AtomicInteger();
142+
143+
public State increment(Object item) {
144+
if (count == 0) {
145+
// no concurrent requests so don't queue, we'll process immediately
146+
if (isTerminated()) {
147+
// return count of 0 meaning don't emit as we are terminated
148+
return new State(0, onComplete, onError, EMPTY);
149+
} else {
150+
return NON_TERMINATED_SINGLE;
151+
}
152+
} else {
153+
// concurrent requests so need to queue
154+
int idx = queue.length;
155+
Object[] newQueue = new Object[idx + 1];
156+
System.arraycopy(queue, 0, newQueue, 0, idx);
157+
newQueue[idx] = item;
158+
159+
if (max.get() < newQueue.length) {
160+
max.set(newQueue.length);
161+
System.out.println("max queue: " + newQueue.length);
162+
}
163+
164+
return new State(count + 1, onComplete, onError, newQueue);
165+
}
166+
}
167+
168+
public State decrement() {
169+
if (count > 1 || isTerminated()) {
170+
return new State(count - 1, onComplete, onError, queue);
171+
} else {
172+
return NON_TERMINATED_EMPTY;
173+
}
174+
}
175+
176+
public State drain(int c) {
177+
Object[] newQueue = new Object[queue.length - c];
178+
System.arraycopy(queue, c, newQueue, 0, newQueue.length);
179+
return new State(count - c, onComplete, onError, newQueue);
180+
}
181+
182+
}
183+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package rx.observers;
2+
3+
import rx.Observer;
4+
import rx.Subscriber;
5+
6+
public class SerializedSubscriber<T> extends Subscriber<T> {
7+
8+
private final Observer<T> s;
9+
10+
public SerializedSubscriber(Subscriber<T> s) {
11+
this.s = new SerializedObserver<T>(s);
12+
}
13+
14+
@Override
15+
public void onCompleted() {
16+
s.onCompleted();
17+
}
18+
19+
@Override
20+
public void onError(Throwable e) {
21+
s.onError(e);
22+
}
23+
24+
@Override
25+
public void onNext(T t) {
26+
s.onNext(t);
27+
}
28+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable.Operator;
19+
import rx.Subscriber;
20+
import rx.observers.SerializedSubscriber;
21+
22+
public final class OperatorSerialize<T> implements Operator<T, T> {
23+
24+
@Override
25+
public Subscriber<? super T> call(final Subscriber<? super T> s) {
26+
return new SerializedSubscriber<T>(new Subscriber<T>(s) {
27+
28+
@Override
29+
public void onCompleted() {
30+
s.onCompleted();
31+
}
32+
33+
@Override
34+
public void onError(Throwable e) {
35+
s.onError(e);
36+
}
37+
38+
@Override
39+
public void onNext(T t) {
40+
s.onNext(t);
41+
}
42+
43+
});
44+
}
45+
46+
}

0 commit comments

Comments
 (0)