Skip to content

Commit 071b894

Browse files
Fix concurrency bug in ScheduledObserver
This is a followup to ReactiveX@1fa6ae3 that fixed one issue (concurrency) and created another (broke Rx contract by allowing concurrent execution of onNext). I have reverted back to the previous implementatio and then attempted to fix the concurrency issue again. I think it ended up being a simple fix … just re-ordering the `enqueue` method to remove the race-condition between the logic protected by the AtomicInteger and adding to the queue. It's not an atomic operation (adding then processing) so we need to just add to the queue and treat it as an async data structure and keep the AtomicInteger portion to only protecting the "process or not process" logic. ```java // this must happen before 'counter' is used to provide synchronization between threads queue.offer(notification); ``` This may still have issues but it's now working in all of my concurrency tests (the ones that broken with the original and then my modified version). The tests are not easy to build unit tests out of as they require running for many seconds and non-deterministically causing a race condition so I have not yet spend the time to try and figure out a deterministic unit test hence them not being committed.
1 parent f69ce09 commit 071b894

File tree

1 file changed

+43
-26
lines changed

1 file changed

+43
-26
lines changed

rxjava-core/src/main/java/rx/operators/ScheduledObserver.java

Lines changed: 43 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,21 @@
1515
*/
1616
package rx.operators;
1717

18+
import java.util.concurrent.ConcurrentLinkedQueue;
19+
import java.util.concurrent.atomic.AtomicInteger;
20+
1821
import rx.Notification;
1922
import rx.Observer;
2023
import rx.Scheduler;
21-
import rx.concurrency.Schedulers;
2224
import rx.util.functions.Action0;
2325

2426
/* package */class ScheduledObserver<T> implements Observer<T> {
2527
private final Observer<T> underlying;
2628
private final Scheduler scheduler;
2729

30+
private final ConcurrentLinkedQueue<Notification<T>> queue = new ConcurrentLinkedQueue<Notification<T>>();
31+
private final AtomicInteger counter = new AtomicInteger(0);
32+
2833
public ScheduledObserver(Observer<T> underlying, Scheduler scheduler) {
2934
this.underlying = underlying;
3035
this.scheduler = scheduler;
@@ -41,38 +46,50 @@ public void onError(final Exception e) {
4146
}
4247

4348
@Override
44-
public void onNext(final T v) {
45-
enqueue(new Notification<T>(v));
49+
public void onNext(final T args) {
50+
enqueue(new Notification<T>(args));
4651
}
4752

48-
private void enqueue(final Notification<T> notification) {
53+
private void enqueue(Notification<T> notification) {
54+
// this must happen before 'counter' is used to provide synchronization between threads
55+
queue.offer(notification);
4956

50-
Schedulers.currentThread().schedule(new Action0() {
57+
// we now use counter to atomically determine if we need to start processing or not
58+
// it will be 0 if it's the first notification or the scheduler has finished processing work
59+
// and we need to start doing it again
60+
if (counter.getAndIncrement() == 0) {
61+
processQueue();
62+
}
63+
}
64+
65+
private void processQueue() {
66+
scheduler.schedule(new Action0() {
5167
@Override
5268
public void call() {
69+
Notification<T> not = queue.poll();
5370

54-
scheduler.schedule(new Action0() {
55-
@Override
56-
public void call() {
57-
switch (notification.getKind()) {
58-
case OnNext:
59-
underlying.onNext(notification.getValue());
60-
break;
61-
case OnError:
62-
underlying.onError(notification.getException());
63-
break;
64-
case OnCompleted:
65-
underlying.onCompleted();
66-
break;
67-
default:
68-
throw new IllegalStateException("Unknown kind of notification " + notification);
71+
switch (not.getKind()) {
72+
case OnNext:
73+
underlying.onNext(not.getValue());
74+
break;
75+
case OnError:
76+
underlying.onError(not.getException());
77+
break;
78+
case OnCompleted:
79+
underlying.onCompleted();
80+
break;
81+
default:
82+
throw new IllegalStateException("Unknown kind of notification " + not);
6983

70-
}
71-
}
72-
});
73-
}
84+
}
7485

75-
});
76-
};
86+
// decrement count and if we still have work to do
87+
// recursively schedule ourselves to process again
88+
if (counter.decrementAndGet() > 0) {
89+
scheduler.schedule(this);
90+
}
7791

92+
}
93+
});
94+
}
7895
}

0 commit comments

Comments
 (0)