Skip to content

Commit 8b3862e

Browse files
not quite functional ... and slow
1 parent 4427d03 commit 8b3862e

File tree

5 files changed

+218
-101
lines changed

5 files changed

+218
-101
lines changed

rxjava-core/src/main/java/rx/observers/SerializedObserver.java

Lines changed: 83 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,8 @@ public void onCompleted() {
2626
}
2727
newState = current.complete();
2828
} while (!state.compareAndSet(current, newState));
29-
if (newState.count == 0) {
30-
s.onCompleted();
31-
}
29+
System.out.println("********** onCompleted");
30+
terminateIfNecessary(newState);
3231
}
3332

3433
@Override
@@ -43,9 +42,8 @@ public void onError(Throwable e) {
4342
}
4443
newState = current.error(e);
4544
} while (!state.compareAndSet(current, newState));
46-
if (newState.count == 0) {
47-
s.onError(e);
48-
}
45+
System.out.println("********** onError");
46+
terminateIfNecessary(newState);
4947
}
5048

5149
@SuppressWarnings("unchecked")
@@ -59,125 +57,147 @@ public void onNext(T t) {
5957
// already received terminal state
6058
return;
6159
}
62-
newState = current.increment(t);
60+
newState = current.offerItem(t);
6361
} while (!state.compareAndSet(current, newState));
6462

65-
if (newState.count == 1) {
63+
if (newState.shouldProcess()) {
6664
// this thread wins and will emit then drain queue if it concurrently gets added to
67-
try {
68-
s.onNext(t);
69-
} finally {
70-
// decrement after finishing
71-
do {
72-
current = state.get();
73-
newState = current.decrement();
74-
} while (!state.compareAndSet(current, newState));
75-
}
65+
s.onNext(t);
7666

7767
// drain queue if exists
7868
// we do "if" instead of "while" so we don't starve one thread
79-
if (newState.queue.length > 0) {
80-
Object[] items = newState.queue;
81-
for (int i = 0; i < items.length; i++) {
82-
s.onNext((T) items[i]);
83-
}
84-
// clear state of queue
85-
do {
86-
current = state.get();
87-
newState = current.drain(items.length);
88-
} while (!state.compareAndSet(current, newState));
89-
terminateIfNecessary(newState);
90-
} else {
91-
terminateIfNecessary(newState);
69+
Object[] items = newState.queue;
70+
for (int i = 0; i < items.length; i++) {
71+
s.onNext((T) items[i]);
9272
}
9373

74+
// finish processing to let this thread move on
75+
do {
76+
current = state.get();
77+
newState = current.finishProcessing(items.length + 1); // the + 1 is for the first onNext of itself
78+
} while (!state.compareAndSet(current, newState));
79+
System.out.println("********** finishProcessing");
80+
terminateIfNecessary(newState);
9481
}
9582
}
9683

97-
private void terminateIfNecessary(State state) {
98-
if (state.isTerminated()) {
99-
if (state.onComplete) {
100-
s.onCompleted();
101-
} else {
102-
s.onError(state.onError);
84+
@SuppressWarnings("unchecked")
85+
private void terminateIfNecessary(State current) {
86+
if (current.isTerminated()) {
87+
State newState = null;
88+
do {
89+
current = state.get();
90+
newState = current.startTermination();
91+
} while (!state.compareAndSet(current, newState));
92+
if (newState.shouldProcess()) {
93+
// drain any items left
94+
for (int i = 0; i < newState.queue.length; i++) {
95+
s.onNext((T) newState.queue[i]);
96+
}
97+
98+
// now terminate
99+
if (newState.onComplete) {
100+
s.onCompleted();
101+
} else {
102+
s.onError(newState.onError);
103+
}
103104
}
104105
}
105106
}
106107

107108
public static class State {
108-
final int count;
109+
final boolean shouldProcess;
110+
final boolean isSomeoneProcessing;
111+
final int queueSize;
109112
final Object[] queue;
110113
final boolean onComplete;
111114
final Throwable onError;
112115

113116
private final static Object[] EMPTY = new Object[0];
114117

115-
private final static State NON_TERMINATED_EMPTY = new State(0, false, null, EMPTY);
116-
private final static State NON_TERMINATED_SINGLE = new State(1, false, null, EMPTY);
118+
private final static State NON_TERMINATED_EMPTY = new State(false, false, 0, false, null, EMPTY);
119+
private final static State NON_TERMINATED_PROCESS_SELF = new State(true, true, 1, false, null, EMPTY);
117120

118-
public State(int count, boolean onComplete, Throwable onError, Object[] queue) {
119-
this.count = count;
121+
public State(boolean shouldProcess, boolean isSomeoneProcessing, int queueSize, boolean onComplete, Throwable onError, Object[] queue) {
122+
this.shouldProcess = shouldProcess;
123+
this.isSomeoneProcessing = isSomeoneProcessing;
124+
this.queueSize = queueSize;
120125
this.queue = queue;
121126
this.onComplete = onComplete;
122127
this.onError = onError;
123128
}
124129

125130
public static State createNew() {
126-
return new State(0, false, null, EMPTY);
131+
return new State(false, false, 0, false, null, EMPTY);
132+
}
133+
134+
public boolean shouldProcess() {
135+
return shouldProcess;
127136
}
128137

129138
public boolean isTerminated() {
130139
return onComplete || onError != null;
131140
}
132141

133142
public State complete() {
134-
return new State(count, true, onError, queue);
143+
return new State(false, isSomeoneProcessing, queueSize, true, onError, queue);
135144
}
136145

137146
public State error(Throwable e) {
138-
return new State(count, onComplete, e, queue);
147+
// immediately empty the queue and emit error as soon as possible
148+
return new State(false, isSomeoneProcessing, queueSize, onComplete, e, EMPTY);
149+
}
150+
151+
public State startTermination() {
152+
if (isSomeoneProcessing) {
153+
System.out.println("start terminate and DO NOT process => queue size: " + (queueSize + 1));
154+
return new State(false, isSomeoneProcessing, queueSize, onComplete, onError, queue);
155+
} else {
156+
System.out.println("start terminate and process => queue size: " + (queueSize + 1));
157+
return new State(true, isSomeoneProcessing, queueSize, onComplete, onError, queue);
158+
}
139159
}
140160

141161
AtomicInteger max = new AtomicInteger();
142162

143-
public State increment(Object item) {
144-
if (count == 0) {
163+
public State offerItem(Object item) {
164+
if (queueSize == 0) {
145165
// no concurrent requests so don't queue, we'll process immediately
146166
if (isTerminated()) {
147167
// return count of 0 meaning don't emit as we are terminated
148-
return new State(0, onComplete, onError, EMPTY);
168+
return new State(false, false, 0, onComplete, onError, EMPTY);
149169
} else {
150-
return NON_TERMINATED_SINGLE;
170+
return NON_TERMINATED_PROCESS_SELF;
151171
}
152172
} else {
153-
// concurrent requests so need to queue
173+
// there are items queued so we need to queue
154174
int idx = queue.length;
155175
Object[] newQueue = new Object[idx + 1];
156176
System.arraycopy(queue, 0, newQueue, 0, idx);
157177
newQueue[idx] = item;
158178

159-
if (max.get() < newQueue.length) {
160-
max.set(newQueue.length);
161-
System.out.println("max queue: " + newQueue.length);
179+
if (isSomeoneProcessing) {
180+
// we just add to queue
181+
return new State(false, isSomeoneProcessing, queueSize + 1, onComplete, onError, newQueue);
182+
} else {
183+
// we add to queue and claim work
184+
return new State(false, true, queueSize + 1, onComplete, onError, newQueue);
162185
}
163-
164-
return new State(count + 1, onComplete, onError, newQueue);
165186
}
166187
}
167188

168-
public State decrement() {
169-
if (count > 1 || isTerminated()) {
170-
return new State(count - 1, onComplete, onError, queue);
189+
public State finishProcessing(int numOnNextSent) {
190+
int numOnNextFromQueue = numOnNextSent - 1; // we remove the "self" onNext as it doesn't affect the queue
191+
int size = queueSize - numOnNextFromQueue;
192+
System.out.println("finishProcessing => queue size: " + size + " after processing: " + numOnNextSent);
193+
if (size > 1 || isTerminated()) {
194+
Object[] newQueue = new Object[queue.length - numOnNextFromQueue];
195+
System.arraycopy(queue, numOnNextFromQueue, newQueue, 0, newQueue.length);
196+
return new State(false, false, size, onComplete, onError, newQueue);
171197
} else {
172198
return NON_TERMINATED_EMPTY;
173199
}
174200
}
175201

176-
public State drain(int c) {
177-
Object[] newQueue = new Object[queue.length - c];
178-
System.arraycopy(queue, c, newQueue, 0, newQueue.length);
179-
return new State(count - c, onComplete, onError, newQueue);
180-
}
181-
182202
}
183203
}

rxjava-core/src/perf/java/rx/operators/OperatorSerializePerformance.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414
import rx.schedulers.Schedulers;
1515

1616
public class OperatorSerializePerformance extends AbstractPerformanceTester {
17-
// static int reps = Integer.MAX_VALUE / 16384; // timeTwoStreams
17+
static int reps = Integer.MAX_VALUE / 16384; // timeTwoStreams
1818

1919
// static int reps = Integer.MAX_VALUE / 1024; // timeSingleStream
2020

21-
static int reps = 1000; // interval streams
21+
// static int reps = 1000; // interval streams
2222

2323
OperatorSerializePerformance() {
2424
super(reps);

rxjava-core/src/perf/java/rx/perf/IntegerSumObserver.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ public void onCompleted() {
1313

1414
@Override
1515
public void onError(Throwable e) {
16+
e.printStackTrace();
1617
throw new RuntimeException(e);
1718
}
1819

0 commit comments

Comments
 (0)