Skip to content

Commit cc3c654

Browse files
Functional now ... it seems.
1 parent 8b3862e commit cc3c654

File tree

4 files changed

+76
-51
lines changed

4 files changed

+76
-51
lines changed

rxjava-core/src/main/java/rx/observers/SerializedObserver.java

Lines changed: 58 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package rx.observers;
22

3+
import java.util.Collections;
4+
import java.util.HashSet;
5+
import java.util.Set;
36
import java.util.concurrent.atomic.AtomicInteger;
47
import java.util.concurrent.atomic.AtomicReference;
58

@@ -14,6 +17,11 @@ public SerializedObserver(Observer<T> s) {
1417
this.s = s;
1518
}
1619

20+
final AtomicInteger received = new AtomicInteger();
21+
final AtomicInteger counter = new AtomicInteger();
22+
final AtomicInteger offered = new AtomicInteger();
23+
static AtomicInteger decremented = new AtomicInteger();
24+
1725
@Override
1826
public void onCompleted() {
1927
State current = null;
@@ -26,7 +34,6 @@ public void onCompleted() {
2634
}
2735
newState = current.complete();
2836
} while (!state.compareAndSet(current, newState));
29-
System.out.println("********** onCompleted");
3037
terminateIfNecessary(newState);
3138
}
3239

@@ -42,43 +49,54 @@ public void onError(Throwable e) {
4249
}
4350
newState = current.error(e);
4451
} while (!state.compareAndSet(current, newState));
45-
System.out.println("********** onError");
4652
terminateIfNecessary(newState);
4753
}
4854

55+
AtomicInteger conc = new AtomicInteger();
56+
AtomicInteger lost = new AtomicInteger();
57+
Set<Object> items = Collections.synchronizedSet(new HashSet<Object>());
58+
4959
@SuppressWarnings("unchecked")
5060
@Override
5161
public void onNext(T t) {
5262
State current = null;
5363
State newState = null;
64+
65+
int contention = 0;
66+
State orig = null;
5467
do {
5568
current = state.get();
69+
if (orig == null) {
70+
orig = current;
71+
}
5672
if (current.isTerminated()) {
5773
// already received terminal state
5874
return;
5975
}
6076
newState = current.offerItem(t);
77+
contention++;
6178
} while (!state.compareAndSet(current, newState));
6279

80+
do {
81+
current = state.get();
82+
newState = current.startProcessing();
83+
} while (!state.compareAndSet(current, newState));
6384
if (newState.shouldProcess()) {
64-
// this thread wins and will emit then drain queue if it concurrently gets added to
65-
s.onNext(t);
66-
67-
// drain queue if exists
68-
// we do "if" instead of "while" so we don't starve one thread
85+
// drain queue
6986
Object[] items = newState.queue;
7087
for (int i = 0; i < items.length; i++) {
7188
s.onNext((T) items[i]);
89+
counter.incrementAndGet();
7290
}
7391

7492
// finish processing to let this thread move on
7593
do {
7694
current = state.get();
77-
newState = current.finishProcessing(items.length + 1); // the + 1 is for the first onNext of itself
95+
newState = current.finishProcessing(items.length);
7896
} while (!state.compareAndSet(current, newState));
79-
System.out.println("********** finishProcessing");
80-
terminateIfNecessary(newState);
97+
8198
}
99+
terminateIfNecessary(newState);
82100
}
83101

84102
@SuppressWarnings("unchecked")
@@ -89,6 +107,7 @@ private void terminateIfNecessary(State current) {
89107
current = state.get();
90108
newState = current.startTermination();
91109
} while (!state.compareAndSet(current, newState));
110+
92111
if (newState.shouldProcess()) {
93112
// drain any items left
94113
for (int i = 0; i < newState.queue.length; i++) {
@@ -114,9 +133,9 @@ public static class State {
114133
final Throwable onError;
115134

116135
private final static Object[] EMPTY = new Object[0];
136+
private final static Object[] PROCESS_SELF = new Object[1];
117137

118138
private final static State NON_TERMINATED_EMPTY = new State(false, false, 0, false, null, EMPTY);
119-
private final static State NON_TERMINATED_PROCESS_SELF = new State(true, true, 1, false, null, EMPTY);
120139

121140
public State(boolean shouldProcess, boolean isSomeoneProcessing, int queueSize, boolean onComplete, Throwable onError, Object[] queue) {
122141
this.shouldProcess = shouldProcess;
@@ -145,59 +164,59 @@ public State complete() {
145164

146165
public State error(Throwable e) {
147166
// immediately empty the queue and emit error as soon as possible
148-
return new State(false, isSomeoneProcessing, queueSize, onComplete, e, EMPTY);
167+
return new State(false, isSomeoneProcessing, 0, onComplete, e, EMPTY);
149168
}
150169

151170
public State startTermination() {
152171
if (isSomeoneProcessing) {
153-
System.out.println("start terminate and DO NOT process => queue size: " + (queueSize + 1));
154172
return new State(false, isSomeoneProcessing, queueSize, onComplete, onError, queue);
155173
} else {
156-
System.out.println("start terminate and process => queue size: " + (queueSize + 1));
157-
return new State(true, isSomeoneProcessing, queueSize, onComplete, onError, queue);
174+
return new State(true, true, queueSize, onComplete, onError, queue);
158175
}
159176
}
160177

161-
AtomicInteger max = new AtomicInteger();
162-
163178
public State offerItem(Object item) {
164-
if (queueSize == 0) {
165-
// no concurrent requests so don't queue, we'll process immediately
166-
if (isTerminated()) {
167-
// return count of 0 meaning don't emit as we are terminated
168-
return new State(false, false, 0, onComplete, onError, EMPTY);
169-
} else {
170-
return NON_TERMINATED_PROCESS_SELF;
171-
}
179+
if (isTerminated()) {
180+
// return count of 0 meaning don't emit as we are terminated
181+
return new State(false, isSomeoneProcessing, 0, onComplete, onError, EMPTY);
172182
} else {
173-
// there are items queued so we need to queue
174183
int idx = queue.length;
175184
Object[] newQueue = new Object[idx + 1];
176185
System.arraycopy(queue, 0, newQueue, 0, idx);
177186
newQueue[idx] = item;
178187

179-
if (isSomeoneProcessing) {
180-
// we just add to queue
181-
return new State(false, isSomeoneProcessing, queueSize + 1, onComplete, onError, newQueue);
182-
} else {
183-
// we add to queue and claim work
184-
return new State(false, true, queueSize + 1, onComplete, onError, newQueue);
185-
}
188+
// we just add to queue
189+
return new State(false, isSomeoneProcessing, queueSize + 1, onComplete, onError, newQueue);
190+
}
191+
}
192+
193+
public State startProcessing() {
194+
if (isSomeoneProcessing) {
195+
return new State(false, true, queueSize, onComplete, onError, queue);
196+
} else {
197+
return new State(true, true, queueSize, onComplete, onError, queue);
186198
}
187199
}
188200

189201
public State finishProcessing(int numOnNextSent) {
190-
int numOnNextFromQueue = numOnNextSent - 1; // we remove the "self" onNext as it doesn't affect the queue
191-
int size = queueSize - numOnNextFromQueue;
192-
System.out.println("finishProcessing => queue size: " + size + " after processing: " + numOnNextSent);
193-
if (size > 1 || isTerminated()) {
194-
Object[] newQueue = new Object[queue.length - numOnNextFromQueue];
195-
System.arraycopy(queue, numOnNextFromQueue, newQueue, 0, newQueue.length);
202+
int size = queueSize - numOnNextSent;
203+
if (size > 0 || isTerminated()) {
204+
// if size == 0 but we are terminated then it's an empty queue
205+
Object[] newQueue = EMPTY;
206+
if (size > 0) {
207+
newQueue = new Object[queue.length - numOnNextSent];
208+
System.arraycopy(queue, numOnNextSent, newQueue, 0, newQueue.length);
209+
}
196210
return new State(false, false, size, onComplete, onError, newQueue);
197211
} else {
198212
return NON_TERMINATED_EMPTY;
199213
}
200214
}
201215

216+
@Override
217+
public String toString() {
218+
return "State => shouldProcess: " + shouldProcess + " processing: " + isSomeoneProcessing + " queueSize: " + queueSize + " queue: " + queue.length + " terminated: " + isTerminated();
219+
}
220+
202221
}
203222
}

rxjava-core/src/perf/java/rx/operators/OperatorSerializePerformance.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ public void call(Integer t1) {
147147

148148
IntegerSumObserver o = new IntegerSumObserver();
149149
s.subscribe(o);
150-
System.out.println("sum : " + o.sum);
150+
// System.out.println("sum : " + o.sum);
151151

152152
return o.sum;
153153
}

rxjava-core/src/perf/java/rx/operators/OperatorSynchronizePerformance.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414
import rx.schedulers.Schedulers;
1515

1616
public class OperatorSynchronizePerformance extends AbstractPerformanceTester {
17-
// static int reps = Integer.MAX_VALUE / 1024;
17+
static int reps = Integer.MAX_VALUE / 1024;
1818

19-
static int reps = 1000; // timeTwoStreamsIntervals
19+
// static int reps = 1000; // timeTwoStreamsIntervals
2020

2121
OperatorSynchronizePerformance() {
2222
super(reps);
@@ -30,7 +30,7 @@ public static void main(String args[]) {
3030

3131
@Override
3232
public void call() {
33-
spt.timeTwoStreamsIntervals();
33+
spt.timeTwoStreams();
3434
}
3535
});
3636
} catch (Exception e) {

rxjava-core/src/test/java/rx/observers/SerializedObserverTest.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.mockito.Matchers.*;
2020
import static org.mockito.Mockito.*;
2121

22+
import java.util.concurrent.CountDownLatch;
2223
import java.util.concurrent.ExecutorService;
2324
import java.util.concurrent.Executors;
2425
import java.util.concurrent.Future;
@@ -92,8 +93,8 @@ public void testMultiThreadedBasic() {
9293
assertEquals(1, busyObserver.maxConcurrentThreads.get());
9394
}
9495

95-
@Test
96-
public void testMultiThreadedWithNPE() {
96+
@Test(timeout=1000)
97+
public void testMultiThreadedWithNPE() throws InterruptedException {
9798
Subscription s = mock(Subscription.class);
9899
TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null);
99100
Observable<String> w = Observable.create(onSubscribe);
@@ -103,8 +104,9 @@ public void testMultiThreadedWithNPE() {
103104

104105
w.subscribe(aw);
105106
onSubscribe.waitToFinish();
107+
busyObserver.terminalEvent.await();
106108

107-
System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get());
109+
System.out.println("OnSubscribe maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get() + " Observer maxConcurrentThreads: " + busyObserver.maxConcurrentThreads.get());
108110

109111
// we can't know how many onNext calls will occur since they each run on a separate thread
110112
// that depends on thread scheduling so 0, 1, 2 and 3 are all valid options
@@ -286,7 +288,7 @@ public static class OnNextThread implements Runnable {
286288
@Override
287289
public void run() {
288290
for (int i = 0; i < numStringsToSend; i++) {
289-
Observer.onNext("aString");
291+
Observer.onNext(Thread.currentThread().getId() + "-" + i);
290292
}
291293
}
292294
}
@@ -296,12 +298,12 @@ public void run() {
296298
*/
297299
public static class CompletionThread implements Runnable {
298300

299-
private final Observer<String> Observer;
301+
private final Observer<String> observer;
300302
private final TestConcurrencyObserverEvent event;
301303
private final Future<?>[] waitOnThese;
302304

303305
CompletionThread(Observer<String> Observer, TestConcurrencyObserverEvent event, Future<?>... waitOnThese) {
304-
this.Observer = Observer;
306+
this.observer = Observer;
305307
this.event = event;
306308
this.waitOnThese = waitOnThese;
307309
}
@@ -321,9 +323,9 @@ public void run() {
321323

322324
/* send the event */
323325
if (event == TestConcurrencyObserverEvent.onError) {
324-
Observer.onError(new RuntimeException("mocked exception"));
326+
observer.onError(new RuntimeException("mocked exception"));
325327
} else if (event == TestConcurrencyObserverEvent.onCompleted) {
326-
Observer.onCompleted();
328+
observer.onCompleted();
327329

328330
} else {
329331
throw new IllegalArgumentException("Expecting either onError or onCompleted");
@@ -566,6 +568,7 @@ private static class BusyObserver extends Subscriber<String> {
566568
AtomicInteger onNextCount = new AtomicInteger();
567569
AtomicInteger threadsRunning = new AtomicInteger();
568570
AtomicInteger maxConcurrentThreads = new AtomicInteger();
571+
final CountDownLatch terminalEvent = new CountDownLatch(1);
569572

570573
@Override
571574
public void onCompleted() {
@@ -575,17 +578,20 @@ public void onCompleted() {
575578
} finally {
576579
captureMaxThreads();
577580
threadsRunning.decrementAndGet();
581+
terminalEvent.countDown();
578582
}
579583
}
580584

581585
@Override
582586
public void onError(Throwable e) {
587+
System.out.println(">>>>>>>>>>>>>>>>>>>> onError received: " + e);
583588
threadsRunning.incrementAndGet();
584589
try {
585590
onError = true;
586591
} finally {
587592
captureMaxThreads();
588593
threadsRunning.decrementAndGet();
594+
terminalEvent.countDown();
589595
}
590596
}
591597

0 commit comments

Comments
 (0)