Skip to content

Commit 43e3d77

Browse files
CurrentThreadScheduler Fixes
- outer/inner scheduling so nested order is correct while not deadlocking on certain nested use cases as found in previous testing
1 parent 28dd5fc commit 43e3d77

File tree

1 file changed

+52
-24
lines changed

1 file changed

+52
-24
lines changed

rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java

Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121

2222
import rx.Scheduler;
2323
import rx.Subscription;
24-
import rx.subscriptions.CompositeSubscription;
2524
import rx.subscriptions.MultipleAssignmentSubscription;
25+
import rx.util.functions.Func1;
2626
import rx.util.functions.Func2;
2727

2828
/**
@@ -36,7 +36,17 @@ public static CurrentThreadScheduler getInstance() {
3636
return INSTANCE;
3737
}
3838

39-
private static final ThreadLocal<PriorityQueue<TimedAction>> QUEUE = new ThreadLocal<PriorityQueue<TimedAction>>();
39+
private static final ThreadLocal<PriorityQueue<TimedAction>> QUEUE = new ThreadLocal<PriorityQueue<TimedAction>>() {
40+
protected java.util.PriorityQueue<TimedAction> initialValue() {
41+
return new PriorityQueue<TimedAction>();
42+
};
43+
};
44+
45+
private static final ThreadLocal<Boolean> PROCESSING = new ThreadLocal<Boolean>() {
46+
protected Boolean initialValue() {
47+
return Boolean.FALSE;
48+
};
49+
};
4050

4151
/* package accessible for unit tests */CurrentThreadScheduler() {
4252
}
@@ -45,50 +55,57 @@ public static CurrentThreadScheduler getInstance() {
4555
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
4656
// immediately move to the InnerCurrentThreadScheduler
4757
InnerCurrentThreadScheduler innerScheduler = new InnerCurrentThreadScheduler();
48-
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
49-
enqueue(innerScheduler, discardableAction, now());
58+
innerScheduler.enqueue(new DiscardableAction<T>(state, action), now());
59+
enqueueFromOuter(innerScheduler, now());
5060
return innerScheduler;
5161
}
5262

5363
@Override
5464
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long dueTime, TimeUnit unit) {
5565
long execTime = now() + unit.toMillis(dueTime);
5666

57-
// immediately move to the InnerCurrentThreadScheduler
67+
// create an inner scheduler and queue it for execution
5868
InnerCurrentThreadScheduler innerScheduler = new InnerCurrentThreadScheduler();
59-
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, new SleepingAction<T>(action, this, execTime));
60-
enqueue(innerScheduler, discardableAction, execTime);
61-
return discardableAction;
69+
innerScheduler.enqueue(new DiscardableAction<T>(state, new SleepingAction<T>(action, this, execTime)), execTime);
70+
enqueueFromOuter(innerScheduler, execTime);
71+
return innerScheduler;
6272
}
6373

64-
private static void enqueue(Scheduler scheduler, DiscardableAction<?> action, long execTime) {
74+
/*
75+
* This will accept InnerCurrentThreadScheduler instances and execute them in order they are received
76+
* and on each of them will loop internally until each is complete.
77+
*/
78+
private void enqueueFromOuter(final InnerCurrentThreadScheduler innerScheduler, long execTime) {
79+
// Note that everything here is single-threaded so we won't have race conditions
6580
PriorityQueue<TimedAction> queue = QUEUE.get();
66-
boolean exec = queue == null;
81+
queue.add(new TimedAction(new Func1<Scheduler, Subscription>() {
6782

68-
if (exec) {
69-
queue = new PriorityQueue<TimedAction>();
70-
QUEUE.set(queue);
71-
}
72-
73-
queue.add(new TimedAction(action, execTime, counter.incrementAndGet()));
83+
@Override
84+
public Subscription call(Scheduler _) {
85+
// when the InnerCurrentThreadScheduler gets scheduled we want to process its tasks
86+
return innerScheduler.startProcessing();
87+
}
88+
}, execTime, counter.incrementAndGet()));
7489

75-
if (exec) {
90+
// first time through starts the loop
91+
if (!PROCESSING.get()) {
92+
PROCESSING.set(Boolean.TRUE);
7693
while (!queue.isEmpty()) {
77-
queue.poll().action.call(scheduler);
94+
queue.poll().action.call(innerScheduler);
7895
}
79-
80-
QUEUE.set(null);
96+
PROCESSING.set(Boolean.FALSE);
8197
}
8298
}
8399

84100
private static class InnerCurrentThreadScheduler extends Scheduler implements Subscription {
85101
private final MultipleAssignmentSubscription childSubscription = new MultipleAssignmentSubscription();
102+
private final PriorityQueue<TimedAction> innerQueue = new PriorityQueue<TimedAction>();
86103

87104
@Override
88105
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
89106
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
90107
childSubscription.set(discardableAction);
91-
enqueue(this, discardableAction, now());
108+
enqueue(discardableAction, now());
92109
return childSubscription;
93110
}
94111

@@ -98,10 +115,21 @@ public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ?
98115

99116
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
100117
childSubscription.set(discardableAction);
101-
enqueue(this, discardableAction, execTime);
118+
enqueue(discardableAction, execTime);
102119
return childSubscription;
103120
}
104121

122+
private void enqueue(Func1<Scheduler, Subscription> action, long execTime) {
123+
innerQueue.add(new TimedAction(action, execTime, counter.incrementAndGet()));
124+
}
125+
126+
private Subscription startProcessing() {
127+
while (!innerQueue.isEmpty()) {
128+
innerQueue.poll().action.call(this);
129+
}
130+
return this;
131+
}
132+
105133
@Override
106134
public void unsubscribe() {
107135
childSubscription.unsubscribe();
@@ -113,11 +141,11 @@ public void unsubscribe() {
113141
* Use time to sort items so delayed actions are sorted to their appropriate position in the queue.
114142
*/
115143
private static class TimedAction implements Comparable<TimedAction> {
116-
final DiscardableAction<?> action;
144+
final Func1<Scheduler, Subscription> action;
117145
final Long execTime;
118146
final Long count; // In case if time between enqueueing took less than 1ms
119147

120-
private TimedAction(DiscardableAction<?> action, Long execTime, Long count) {
148+
private TimedAction(Func1<Scheduler, Subscription> action, Long execTime, Long count) {
121149
this.action = action;
122150
this.execTime = execTime;
123151
this.count = count;

0 commit comments

Comments
 (0)