Skip to content

Commit 1fa6ae3

Browse files
fix concurrency bug in ScheduledObserver
- found a concurrency bug while working on Netflix/Hystrix#123 - the following code would lock up occasionally due to onCompleted not being delivered: ```java public class RunTest { public static void main(String[] args) { System.out.println("Starting test..."); final ArrayList<String> strings = new ArrayList<String>(200000); int num = 10000; while (true) { long start = System.currentTimeMillis(); final AtomicInteger count = new AtomicInteger(); for (int i = 0; i < num; i++) { new TestService1(2, 5).toObservable().forEach(new Action1<Integer>() { @OverRide public void call(Integer v) { count.addAndGet(v); } }); new TestService2("hello").toObservable().forEach(new Action1<String>() { @OverRide public void call(String v) { strings.add(v); } }); } long time = (System.currentTimeMillis() - start); long executions = num * 2; System.out.println("Time: " + time + "ms for " + executions + " executions (" + (time * 1000) / executions + " microseconds)"); System.out.println(" Count: " + count); System.out.println(" Strings: " + strings.size()); strings.clear(); } } } ``` - Also made OperationObserveOn not use ScheduledObserver if the `ImmediateScheduler` is chosen to allow an optimization. I believe this optimization is safe because ScheduledObserver does not require knowledge of a Scheduler (such as for now()) and all we do is emit data to the Observer on a scheduler and if we know it's Immediate we can go direct and skip the enqueuing step. This allows shaving off a noticable number of microseconds per execution in the loop above.
1 parent 18b1362 commit 1fa6ae3

File tree

2 files changed

+37
-45
lines changed

2 files changed

+37
-45
lines changed

rxjava-core/src/main/java/rx/operators/OperationObserveOn.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import rx.Observer;
3131
import rx.Scheduler;
3232
import rx.Subscription;
33+
import rx.concurrency.ImmediateScheduler;
3334
import rx.concurrency.Schedulers;
3435
import rx.util.functions.Func1;
3536

@@ -50,7 +51,12 @@ public ObserveOn(Observable<T> source, Scheduler scheduler) {
5051

5152
@Override
5253
public Subscription call(final Observer<T> observer) {
53-
return source.subscribe(new ScheduledObserver<T>(observer, scheduler));
54+
if (scheduler instanceof ImmediateScheduler) {
55+
// do nothing if we request ImmediateScheduler so we don't invoke overhead
56+
return source.subscribe(observer);
57+
} else {
58+
return source.subscribe(new ScheduledObserver<T>(observer, scheduler));
59+
}
5460
}
5561
}
5662

Lines changed: 30 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
7+
*
88
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,18 +18,13 @@
1818
import rx.Notification;
1919
import rx.Observer;
2020
import rx.Scheduler;
21+
import rx.concurrency.Schedulers;
2122
import rx.util.functions.Action0;
2223

23-
import java.util.concurrent.ConcurrentLinkedQueue;
24-
import java.util.concurrent.atomic.AtomicInteger;
25-
2624
/* package */class ScheduledObserver<T> implements Observer<T> {
2725
private final Observer<T> underlying;
2826
private final Scheduler scheduler;
2927

30-
private final ConcurrentLinkedQueue<Notification<T>> queue = new ConcurrentLinkedQueue<Notification<T>>();
31-
private final AtomicInteger counter = new AtomicInteger(0);
32-
3328
public ScheduledObserver(Observer<T> underlying, Scheduler scheduler) {
3429
this.underlying = underlying;
3530
this.scheduler = scheduler;
@@ -46,47 +41,38 @@ public void onError(final Exception e) {
4641
}
4742

4843
@Override
49-
public void onNext(final T args) {
50-
enqueue(new Notification<T>(args));
44+
public void onNext(final T v) {
45+
enqueue(new Notification<T>(v));
5146
}
5247

53-
private void enqueue(Notification<T> notification) {
54-
int count = counter.getAndIncrement();
55-
56-
queue.offer(notification);
48+
private void enqueue(final Notification<T> notification) {
5749

58-
if (count == 0) {
59-
processQueue();
60-
}
61-
}
62-
63-
private void processQueue() {
64-
scheduler.schedule(new Action0() {
50+
Schedulers.currentThread().schedule(new Action0() {
6551
@Override
6652
public void call() {
67-
Notification<T> not = queue.poll();
68-
69-
switch (not.getKind()) {
70-
case OnNext:
71-
underlying.onNext(not.getValue());
72-
break;
73-
case OnError:
74-
underlying.onError(not.getException());
75-
break;
76-
case OnCompleted:
77-
underlying.onCompleted();
78-
break;
79-
default:
80-
throw new IllegalStateException("Unknown kind of notification " + not);
81-
82-
}
83-
84-
int count = counter.decrementAndGet();
85-
if (count > 0) {
86-
scheduler.schedule(this);
87-
}
8853

54+
scheduler.schedule(new Action0() {
55+
@Override
56+
public void call() {
57+
switch (notification.getKind()) {
58+
case OnNext:
59+
underlying.onNext(notification.getValue());
60+
break;
61+
case OnError:
62+
underlying.onError(notification.getException());
63+
break;
64+
case OnCompleted:
65+
underlying.onCompleted();
66+
break;
67+
default:
68+
throw new IllegalStateException("Unknown kind of notification " + notification);
69+
70+
}
71+
}
72+
});
8973
}
74+
9075
});
91-
}
76+
};
77+
9278
}

0 commit comments

Comments
 (0)