Skip to content

Commit f753f4f

Browse files
committed
TestScheduler: Fix race condition in queue
1 parent c8e1b03 commit f753f4f

File tree

1 file changed

+19
-12
lines changed

1 file changed

+19
-12
lines changed

src/main/java/rx/schedulers/TestScheduler.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -109,15 +109,20 @@ public void triggerActions() {
109109
}
110110

111111
private void triggerActions(long targetTimeInNanos) {
112-
while (!queue.isEmpty()) {
113-
TimedAction current = queue.peek();
114-
if (current.time > targetTimeInNanos) {
115-
break;
112+
while (true) {
113+
TimedAction current = null;
114+
synchronized (queue) {
115+
if (queue.isEmpty()) {
116+
break;
117+
}
118+
current = queue.peek();
119+
if (current.time > targetTimeInNanos) {
120+
break;
121+
}
122+
// if scheduled time is 0 (immediate) use current virtual time
123+
time = current.time == 0 ? time : current.time;
124+
queue.remove();
116125
}
117-
// if scheduled time is 0 (immediate) use current virtual time
118-
time = current.time == 0 ? time : current.time;
119-
queue.remove();
120-
121126
// Only execute if not unsubscribed
122127
if (!current.scheduler.isUnsubscribed()) {
123128
current.action.call();
@@ -156,9 +161,10 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
156161

157162
@Override
158163
public void call() {
159-
queue.remove(timedAction);
164+
synchronized (queue) {
165+
queue.remove(timedAction);
166+
}
160167
}
161-
162168
});
163169
}
164170

@@ -170,9 +176,10 @@ public Subscription schedule(Action0 action) {
170176

171177
@Override
172178
public void call() {
173-
queue.remove(timedAction);
179+
synchronized (queue) {
180+
queue.remove(timedAction);
181+
}
174182
}
175-
176183
});
177184
}
178185

0 commit comments

Comments
 (0)