Skip to content

Commit 34a2561

Browse files
Experimenting with different implementations and performance
1 parent 423b470 commit 34a2561

File tree

5 files changed

+300
-203
lines changed

5 files changed

+300
-203
lines changed
Lines changed: 42 additions & 194 deletions
Original file line numberDiff line numberDiff line change
@@ -1,222 +1,70 @@
11
package rx.observers;
22

3-
import java.util.Collections;
4-
import java.util.HashSet;
5-
import java.util.Set;
3+
import java.util.concurrent.ConcurrentLinkedQueue;
64
import java.util.concurrent.atomic.AtomicInteger;
7-
import java.util.concurrent.atomic.AtomicReference;
85

96
import rx.Observer;
107

118
public class SerializedObserver<T> implements Observer<T> {
9+
private final Observer<? super T> actual;
10+
private final AtomicInteger count = new AtomicInteger();
11+
private final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>();
1212

13-
private final AtomicReference<State> state = new AtomicReference<State>(State.createNew());
14-
private final Observer<? super T> s;
13+
private static Sentinel NULL_SENTINEL = new Sentinel();
14+
private static Sentinel COMPLETE_SENTINEL = new Sentinel();
1515

16-
public SerializedObserver(Observer<? super T> s) {
17-
this.s = s;
16+
private static class Sentinel {
17+
18+
}
19+
20+
private static class ErrorSentinel extends Sentinel {
21+
final Throwable e;
22+
23+
ErrorSentinel(Throwable e) {
24+
this.e = e;
25+
}
1826
}
1927

20-
final AtomicInteger received = new AtomicInteger();
21-
final AtomicInteger counter = new AtomicInteger();
22-
final AtomicInteger offered = new AtomicInteger();
23-
static AtomicInteger decremented = new AtomicInteger();
28+
public SerializedObserver(Observer<? super T> s) {
29+
this.actual = s;
30+
}
2431

2532
@Override
2633
public void onCompleted() {
27-
State current = null;
28-
State newState = null;
29-
do {
30-
current = state.get();
31-
if (current.isTerminated()) {
32-
// already received terminal state
33-
return;
34-
}
35-
newState = current.complete();
36-
} while (!state.compareAndSet(current, newState));
37-
terminateIfNecessary(newState);
34+
queue.add(COMPLETE_SENTINEL);
35+
doIt();
3836
}
3937

4038
@Override
41-
public void onError(Throwable e) {
42-
State current = null;
43-
State newState = null;
44-
do {
45-
current = state.get();
46-
if (current.isTerminated()) {
47-
// already received terminal state
48-
return;
49-
}
50-
newState = current.error(e);
51-
} while (!state.compareAndSet(current, newState));
52-
terminateIfNecessary(newState);
39+
public void onError(final Throwable e) {
40+
queue.add(new ErrorSentinel(e));
41+
doIt();
5342
}
5443

55-
AtomicInteger conc = new AtomicInteger();
56-
AtomicInteger lost = new AtomicInteger();
57-
Set<Object> items = Collections.synchronizedSet(new HashSet<Object>());
58-
59-
@SuppressWarnings("unchecked")
6044
@Override
6145
public void onNext(T t) {
62-
State current = null;
63-
State newState = null;
64-
65-
int contention = 0;
66-
State orig = null;
67-
do {
68-
current = state.get();
69-
if (orig == null) {
70-
orig = current;
71-
}
72-
if (current.isTerminated()) {
73-
// already received terminal state
74-
return;
75-
}
76-
newState = current.offerItem(t);
77-
contention++;
78-
} while (!state.compareAndSet(current, newState));
79-
80-
do {
81-
current = state.get();
82-
newState = current.startProcessing();
83-
} while (!state.compareAndSet(current, newState));
84-
if (newState.shouldProcess()) {
85-
// drain queue
86-
Object[] items = newState.queue;
87-
for (int i = 0; i < items.length; i++) {
88-
s.onNext((T) items[i]);
89-
counter.incrementAndGet();
90-
}
91-
92-
// finish processing to let this thread move on
93-
do {
94-
current = state.get();
95-
newState = current.finishProcessing(items.length);
96-
} while (!state.compareAndSet(current, newState));
97-
98-
}
99-
terminateIfNecessary(newState);
46+
queue.add(t);
47+
doIt();
10048
}
10149

102-
@SuppressWarnings("unchecked")
103-
private void terminateIfNecessary(State current) {
104-
if (current.isTerminated()) {
105-
State newState = null;
50+
public void doIt() {
51+
if (count.getAndIncrement() == 0) {
10652
do {
107-
current = state.get();
108-
newState = current.startTermination();
109-
} while (!state.compareAndSet(current, newState));
110-
111-
if (newState.shouldProcess()) {
112-
// drain any items left
113-
for (int i = 0; i < newState.queue.length; i++) {
114-
s.onNext((T) newState.queue[i]);
115-
}
116-
117-
// now terminate
118-
if (newState.onComplete) {
119-
s.onCompleted();
120-
} else {
121-
s.onError(newState.onError);
53+
Object v = queue.poll();
54+
if (v != null) {
55+
if (v instanceof Sentinel) {
56+
if (v == NULL_SENTINEL) {
57+
actual.onNext(null);
58+
} else if (v == COMPLETE_SENTINEL) {
59+
actual.onCompleted();
60+
} else if (v instanceof ErrorSentinel) {
61+
actual.onError(((ErrorSentinel) v).e);
62+
}
63+
} else {
64+
actual.onNext((T) v);
65+
}
12266
}
123-
}
124-
}
125-
}
126-
127-
public static class State {
128-
final boolean shouldProcess;
129-
final boolean isSomeoneProcessing;
130-
final int queueSize;
131-
final Object[] queue;
132-
final boolean onComplete;
133-
final Throwable onError;
134-
135-
private final static Object[] EMPTY = new Object[0];
136-
private final static Object[] PROCESS_SELF = new Object[1];
137-
138-
private final static State NON_TERMINATED_EMPTY = new State(false, false, 0, false, null, EMPTY);
139-
140-
public State(boolean shouldProcess, boolean isSomeoneProcessing, int queueSize, boolean onComplete, Throwable onError, Object[] queue) {
141-
this.shouldProcess = shouldProcess;
142-
this.isSomeoneProcessing = isSomeoneProcessing;
143-
this.queueSize = queueSize;
144-
this.queue = queue;
145-
this.onComplete = onComplete;
146-
this.onError = onError;
67+
} while (count.decrementAndGet() > 0);
14768
}
148-
149-
public static State createNew() {
150-
return new State(false, false, 0, false, null, EMPTY);
151-
}
152-
153-
public boolean shouldProcess() {
154-
return shouldProcess;
155-
}
156-
157-
public boolean isTerminated() {
158-
return onComplete || onError != null;
159-
}
160-
161-
public State complete() {
162-
return new State(false, isSomeoneProcessing, queueSize, true, onError, queue);
163-
}
164-
165-
public State error(Throwable e) {
166-
// immediately empty the queue and emit error as soon as possible
167-
return new State(false, isSomeoneProcessing, 0, onComplete, e, EMPTY);
168-
}
169-
170-
public State startTermination() {
171-
if (isSomeoneProcessing) {
172-
return new State(false, isSomeoneProcessing, queueSize, onComplete, onError, queue);
173-
} else {
174-
return new State(true, true, queueSize, onComplete, onError, queue);
175-
}
176-
}
177-
178-
public State offerItem(Object item) {
179-
if (isTerminated()) {
180-
// return count of 0 meaning don't emit as we are terminated
181-
return new State(false, isSomeoneProcessing, 0, onComplete, onError, EMPTY);
182-
} else {
183-
int idx = queue.length;
184-
Object[] newQueue = new Object[idx + 1];
185-
System.arraycopy(queue, 0, newQueue, 0, idx);
186-
newQueue[idx] = item;
187-
188-
// we just add to queue
189-
return new State(false, isSomeoneProcessing, queueSize + 1, onComplete, onError, newQueue);
190-
}
191-
}
192-
193-
public State startProcessing() {
194-
if (isSomeoneProcessing) {
195-
return new State(false, true, queueSize, onComplete, onError, queue);
196-
} else {
197-
return new State(true, true, queueSize, onComplete, onError, queue);
198-
}
199-
}
200-
201-
public State finishProcessing(int numOnNextSent) {
202-
int size = queueSize - numOnNextSent;
203-
if (size > 0 || isTerminated()) {
204-
// if size == 0 but we are terminated then it's an empty queue
205-
Object[] newQueue = EMPTY;
206-
if (size > 0) {
207-
newQueue = new Object[queue.length - numOnNextSent];
208-
System.arraycopy(queue, numOnNextSent, newQueue, 0, newQueue.length);
209-
}
210-
return new State(false, false, size, onComplete, onError, newQueue);
211-
} else {
212-
return NON_TERMINATED_EMPTY;
213-
}
214-
}
215-
216-
@Override
217-
public String toString() {
218-
return "State => shouldProcess: " + shouldProcess + " processing: " + isSomeoneProcessing + " queueSize: " + queueSize + " queue: " + queue.length + " terminated: " + isTerminated();
219-
}
220-
22169
}
22270
}

0 commit comments

Comments
 (0)