From f753f4fcad5a07f052684bbe6f1850dd12c62e0b Mon Sep 17 00:00:00 2001 From: Sam Lancia Date: Tue, 5 Apr 2016 15:29:37 +0100 Subject: [PATCH] TestScheduler: Fix race condition in queue --- .../java/rx/schedulers/TestScheduler.java | 31 ++++++++++++------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/src/main/java/rx/schedulers/TestScheduler.java b/src/main/java/rx/schedulers/TestScheduler.java index fec8bbcd75..e0ef4ec253 100644 --- a/src/main/java/rx/schedulers/TestScheduler.java +++ b/src/main/java/rx/schedulers/TestScheduler.java @@ -109,15 +109,20 @@ public void triggerActions() { } private void triggerActions(long targetTimeInNanos) { - while (!queue.isEmpty()) { - TimedAction current = queue.peek(); - if (current.time > targetTimeInNanos) { - break; + while (true) { + TimedAction current = null; + synchronized (queue) { + if (queue.isEmpty()) { + break; + } + current = queue.peek(); + if (current.time > targetTimeInNanos) { + break; + } + // if scheduled time is 0 (immediate) use current virtual time + time = current.time == 0 ? time : current.time; + queue.remove(); } - // if scheduled time is 0 (immediate) use current virtual time - time = current.time == 0 ? time : current.time; - queue.remove(); - // Only execute if not unsubscribed if (!current.scheduler.isUnsubscribed()) { current.action.call(); @@ -156,9 +161,10 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { @Override public void call() { - queue.remove(timedAction); + synchronized (queue) { + queue.remove(timedAction); + } } - }); } @@ -170,9 +176,10 @@ public Subscription schedule(Action0 action) { @Override public void call() { - queue.remove(timedAction); + synchronized (queue) { + queue.remove(timedAction); + } } - }); }