Skip to content

Commit 28dd5fc

Browse files
Small ObserveOn Improvements
- use Long instead of Int so we don’t overflow - migrate from deprecated method
1 parent 9a94fd2 commit 28dd5fc

File tree

1 file changed

+3
-2
lines changed

1 file changed

+3
-2
lines changed

rxjava-core/src/main/java/rx/operators/OperationObserveOn.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.util.concurrent.ConcurrentLinkedQueue;
1919
import java.util.concurrent.atomic.AtomicInteger;
20+
import java.util.concurrent.atomic.AtomicLong;
2021

2122
import rx.Notification;
2223
import rx.Observable;
@@ -71,7 +72,7 @@ private class Observation {
7172
final CompositeSubscription compositeSubscription = new CompositeSubscription();
7273
final MultipleAssignmentSubscription recursiveSubscription = new MultipleAssignmentSubscription();
7374
final ConcurrentLinkedQueue<Notification<? extends T>> queue = new ConcurrentLinkedQueue<Notification<? extends T>>();
74-
final AtomicInteger counter = new AtomicInteger(0);
75+
final AtomicLong counter = new AtomicLong(0);
7576
private volatile Scheduler recursiveScheduler;
7677

7778
public Observation(Observer<? super T> observer) {
@@ -108,7 +109,7 @@ public Subscription call(Scheduler innerScheduler, T state) {
108109
}
109110

110111
void processQueue() {
111-
recursiveSubscription.setSubscription(recursiveScheduler.schedule(new Action1<Action0>() {
112+
recursiveSubscription.set(recursiveScheduler.schedule(new Action1<Action0>() {
112113
@Override
113114
public void call(Action0 self) {
114115
Notification<? extends T> not = queue.poll();

0 commit comments

Comments
 (0)