Skip to content

Commit 176f8e7

Browse files
Merge pull request ReactiveX#712 from benjchristensen/scheduler-memory-leak
Fix Scheduler Memory Leaks
2 parents 77b2986 + 1d0d90c commit 176f8e7

15 files changed

+1591
-905
lines changed

rxjava-core/src/main/java/rx/Scheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ public Subscription call(final Scheduler scheduler, final Func2 parentAction) {
165165
@Override
166166
public void call() {
167167
if (!parentSubscription.isUnsubscribed()) {
168-
childSubscription.setSubscription(scheduler.schedule(parentAction, parentAction));
168+
childSubscription.set(scheduler.schedule(parentAction, parentAction));
169169
}
170170
}
171171

rxjava-core/src/main/java/rx/operators/OperationObserveOn.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.util.concurrent.ConcurrentLinkedQueue;
1919
import java.util.concurrent.atomic.AtomicInteger;
20+
import java.util.concurrent.atomic.AtomicLong;
2021

2122
import rx.Notification;
2223
import rx.Observable;
@@ -71,7 +72,7 @@ private class Observation {
7172
final CompositeSubscription compositeSubscription = new CompositeSubscription();
7273
final MultipleAssignmentSubscription recursiveSubscription = new MultipleAssignmentSubscription();
7374
final ConcurrentLinkedQueue<Notification<? extends T>> queue = new ConcurrentLinkedQueue<Notification<? extends T>>();
74-
final AtomicInteger counter = new AtomicInteger(0);
75+
final AtomicLong counter = new AtomicLong(0);
7576
private volatile Scheduler recursiveScheduler;
7677

7778
public Observation(Observer<? super T> observer) {
@@ -108,7 +109,7 @@ public Subscription call(Scheduler innerScheduler, T state) {
108109
}
109110

110111
void processQueue() {
111-
recursiveSubscription.setSubscription(recursiveScheduler.schedule(new Action1<Action0>() {
112+
recursiveSubscription.set(recursiveScheduler.schedule(new Action1<Action0>() {
112113
@Override
113114
public void call(Action0 self) {
114115
Notification<? extends T> not = queue.poll();

rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java

Lines changed: 90 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,71 +17,135 @@
1717

1818
import java.util.PriorityQueue;
1919
import java.util.concurrent.TimeUnit;
20-
import java.util.concurrent.atomic.AtomicInteger;
20+
import java.util.concurrent.atomic.AtomicLong;
2121

2222
import rx.Scheduler;
2323
import rx.Subscription;
24+
import rx.subscriptions.MultipleAssignmentSubscription;
25+
import rx.util.functions.Func1;
2426
import rx.util.functions.Func2;
2527

2628
/**
2729
* Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed after the current unit of work is completed.
2830
*/
2931
public class CurrentThreadScheduler extends Scheduler {
3032
private static final CurrentThreadScheduler INSTANCE = new CurrentThreadScheduler();
33+
private static final AtomicLong counter = new AtomicLong(0);
3134

3235
public static CurrentThreadScheduler getInstance() {
3336
return INSTANCE;
3437
}
3538

36-
private static final ThreadLocal<PriorityQueue<TimedAction>> QUEUE = new ThreadLocal<PriorityQueue<TimedAction>>();
39+
private static final ThreadLocal<PriorityQueue<TimedAction>> QUEUE = new ThreadLocal<PriorityQueue<TimedAction>>() {
40+
protected java.util.PriorityQueue<TimedAction> initialValue() {
41+
return new PriorityQueue<TimedAction>();
42+
};
43+
};
44+
45+
private static final ThreadLocal<Boolean> PROCESSING = new ThreadLocal<Boolean>() {
46+
protected Boolean initialValue() {
47+
return Boolean.FALSE;
48+
};
49+
};
3750

3851
/* package accessible for unit tests */CurrentThreadScheduler() {
3952
}
4053

41-
private final AtomicInteger counter = new AtomicInteger(0);
42-
4354
@Override
4455
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
45-
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
46-
enqueue(discardableAction, now());
47-
return discardableAction;
56+
// immediately move to the InnerCurrentThreadScheduler
57+
InnerCurrentThreadScheduler innerScheduler = new InnerCurrentThreadScheduler();
58+
innerScheduler.schedule(state, action);
59+
enqueueFromOuter(innerScheduler, now());
60+
return innerScheduler;
4861
}
4962

5063
@Override
51-
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long dueTime, TimeUnit unit) {
52-
long execTime = now() + unit.toMillis(dueTime);
53-
54-
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, new SleepingAction<T>(action, this, execTime));
55-
enqueue(discardableAction, execTime);
56-
return discardableAction;
64+
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
65+
long execTime = now() + unit.toMillis(delayTime);
66+
67+
// create an inner scheduler and queue it for execution
68+
InnerCurrentThreadScheduler innerScheduler = new InnerCurrentThreadScheduler();
69+
innerScheduler.schedule(state, action, delayTime, unit);
70+
enqueueFromOuter(innerScheduler, execTime);
71+
return innerScheduler;
5772
}
5873

59-
private void enqueue(DiscardableAction<?> action, long execTime) {
74+
/*
75+
* This will accept InnerCurrentThreadScheduler instances and execute them in order they are received
76+
* and on each of them will loop internally until each is complete.
77+
*/
78+
private void enqueueFromOuter(final InnerCurrentThreadScheduler innerScheduler, long execTime) {
79+
// Note that everything here is single-threaded so we won't have race conditions
6080
PriorityQueue<TimedAction> queue = QUEUE.get();
61-
boolean exec = queue == null;
81+
queue.add(new TimedAction(new Func1<Scheduler, Subscription>() {
6282

63-
if (exec) {
64-
queue = new PriorityQueue<TimedAction>();
65-
QUEUE.set(queue);
83+
@Override
84+
public Subscription call(Scheduler _) {
85+
// when the InnerCurrentThreadScheduler gets scheduled we want to process its tasks
86+
return innerScheduler.startProcessing();
87+
}
88+
}, execTime, counter.incrementAndGet()));
89+
90+
// first time through starts the loop
91+
if (!PROCESSING.get()) {
92+
PROCESSING.set(Boolean.TRUE);
93+
while (!queue.isEmpty()) {
94+
queue.poll().action.call(innerScheduler);
95+
}
96+
PROCESSING.set(Boolean.FALSE);
6697
}
98+
}
6799

68-
queue.add(new TimedAction(action, execTime, counter.incrementAndGet()));
100+
private static class InnerCurrentThreadScheduler extends Scheduler implements Subscription {
101+
private final MultipleAssignmentSubscription childSubscription = new MultipleAssignmentSubscription();
102+
private final PriorityQueue<TimedAction> innerQueue = new PriorityQueue<TimedAction>();
69103

70-
if (exec) {
71-
while (!queue.isEmpty()) {
72-
queue.poll().action.call(this);
104+
@Override
105+
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
106+
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
107+
childSubscription.set(discardableAction);
108+
enqueue(discardableAction, now());
109+
return childSubscription;
110+
}
111+
112+
@Override
113+
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
114+
long execTime = now() + unit.toMillis(delayTime);
115+
116+
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, new SleepingAction<T>(action, this, execTime));
117+
childSubscription.set(discardableAction);
118+
enqueue(discardableAction, execTime);
119+
return childSubscription;
120+
}
121+
122+
private void enqueue(Func1<Scheduler, Subscription> action, long execTime) {
123+
innerQueue.add(new TimedAction(action, execTime, counter.incrementAndGet()));
124+
}
125+
126+
private Subscription startProcessing() {
127+
while (!innerQueue.isEmpty()) {
128+
innerQueue.poll().action.call(this);
73129
}
130+
return this;
131+
}
74132

75-
QUEUE.set(null);
133+
@Override
134+
public void unsubscribe() {
135+
childSubscription.unsubscribe();
76136
}
137+
77138
}
78139

140+
/**
141+
* Use time to sort items so delayed actions are sorted to their appropriate position in the queue.
142+
*/
79143
private static class TimedAction implements Comparable<TimedAction> {
80-
final DiscardableAction<?> action;
144+
final Func1<Scheduler, Subscription> action;
81145
final Long execTime;
82-
final Integer count; // In case if time between enqueueing took less than 1ms
146+
final Long count; // In case if time between enqueueing took less than 1ms
83147

84-
private TimedAction(DiscardableAction<?> action, Long execTime, Integer count) {
148+
private TimedAction(Func1<Scheduler, Subscription> action, Long execTime, Long count) {
85149
this.action = action;
86150
this.execTime = execTime;
87151
this.count = count;

0 commit comments

Comments
 (0)