Skip to content

Commit a606a26

Browse files
authored
1.x: enable TestScheduler with nanosecond periodic scheduling (ReactiveX#4884)
1 parent 8fff67d commit a606a26

File tree

5 files changed

+147
-70
lines changed

5 files changed

+147
-70
lines changed

src/main/java/rx/Scheduler.java

Lines changed: 3 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@
1919

2020
import rx.annotations.Experimental;
2121
import rx.functions.*;
22-
import rx.internal.schedulers.SchedulerWhen;
23-
import rx.internal.subscriptions.SequentialSubscription;
22+
import rx.internal.schedulers.*;
2423
import rx.schedulers.Schedulers;
2524

2625
/**
@@ -44,18 +43,6 @@ public abstract class Scheduler {
4443
* : Without virtual extension methods even additive changes are breaking and thus severely impede library
4544
* maintenance.
4645
*/
47-
48-
/**
49-
* The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase.
50-
* <p>
51-
* The associated system parameter, {@code rx.scheduler.drift-tolerance}, expects its value in minutes.
52-
*/
53-
static final long CLOCK_DRIFT_TOLERANCE_NANOS;
54-
static {
55-
CLOCK_DRIFT_TOLERANCE_NANOS = TimeUnit.MINUTES.toNanos(
56-
Long.getLong("rx.scheduler.drift-tolerance", 15));
57-
}
58-
5946
/**
6047
* Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions.
6148
* <p>
@@ -121,47 +108,8 @@ public abstract static class Worker implements Subscription {
121108
* @return a subscription to be able to prevent or cancel the execution of the action
122109
*/
123110
public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) {
124-
final long periodInNanos = unit.toNanos(period);
125-
final long firstNowNanos = TimeUnit.MILLISECONDS.toNanos(now());
126-
final long firstStartInNanos = firstNowNanos + unit.toNanos(initialDelay);
127-
128-
final SequentialSubscription first = new SequentialSubscription();
129-
final SequentialSubscription mas = new SequentialSubscription(first);
130-
131-
final Action0 recursiveAction = new Action0() {
132-
long count;
133-
long lastNowNanos = firstNowNanos;
134-
long startInNanos = firstStartInNanos;
135-
@Override
136-
public void call() {
137-
action.call();
138-
139-
if (!mas.isUnsubscribed()) {
140-
141-
long nextTick;
142-
143-
long nowNanos = TimeUnit.MILLISECONDS.toNanos(now());
144-
// If the clock moved in a direction quite a bit, rebase the repetition period
145-
if (nowNanos + CLOCK_DRIFT_TOLERANCE_NANOS < lastNowNanos
146-
|| nowNanos >= lastNowNanos + periodInNanos + CLOCK_DRIFT_TOLERANCE_NANOS) {
147-
nextTick = nowNanos + periodInNanos;
148-
/*
149-
* Shift the start point back by the drift as if the whole thing
150-
* started count periods ago.
151-
*/
152-
startInNanos = nextTick - (periodInNanos * (++count));
153-
} else {
154-
nextTick = startInNanos + (++count * periodInNanos);
155-
}
156-
lastNowNanos = nowNanos;
157-
158-
long delay = nextTick - nowNanos;
159-
mas.replace(schedule(this, delay, TimeUnit.NANOSECONDS));
160-
}
161-
}
162-
};
163-
first.replace(schedule(recursiveAction, initialDelay, unit));
164-
return mas;
111+
return SchedulePeriodicHelper.schedulePeriodically(this, action,
112+
initialDelay, period, unit, null);
165113
}
166114

167115
/**
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.schedulers;
18+
19+
import java.util.concurrent.TimeUnit;
20+
21+
import rx.Scheduler.Worker;
22+
import rx.Subscription;
23+
import rx.functions.Action0;
24+
import rx.internal.subscriptions.SequentialSubscription;
25+
26+
/**
27+
* Utility method for scheduling tasks periodically (at a fixed rate) by using Worker.schedule(Action0, long, TimeUnit).
28+
*/
29+
public final class SchedulePeriodicHelper {
30+
31+
/** Utility class. */
32+
private SchedulePeriodicHelper() {
33+
throw new IllegalStateException("No instances!");
34+
}
35+
36+
/**
37+
* The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase.
38+
* <p>
39+
* The associated system parameter, {@code rx.scheduler.drift-tolerance}, expects its value in minutes.
40+
*/
41+
public static final long CLOCK_DRIFT_TOLERANCE_NANOS;
42+
static {
43+
CLOCK_DRIFT_TOLERANCE_NANOS = TimeUnit.MINUTES.toNanos(
44+
Long.getLong("rx.scheduler.drift-tolerance", 15));
45+
}
46+
47+
/**
48+
* Return the current time in nanoseconds.
49+
*/
50+
public interface NowNanoSupplier {
51+
long nowNanos();
52+
}
53+
54+
public static Subscription schedulePeriodically(
55+
final Worker worker,
56+
final Action0 action,
57+
long initialDelay, long period, TimeUnit unit,
58+
final NowNanoSupplier nowNanoSupplier) {
59+
final long periodInNanos = unit.toNanos(period);
60+
final long firstNowNanos = nowNanoSupplier != null ? nowNanoSupplier.nowNanos() : TimeUnit.MILLISECONDS.toNanos(worker.now());
61+
final long firstStartInNanos = firstNowNanos + unit.toNanos(initialDelay);
62+
63+
final SequentialSubscription first = new SequentialSubscription();
64+
final SequentialSubscription mas = new SequentialSubscription(first);
65+
66+
final Action0 recursiveAction = new Action0() {
67+
long count;
68+
long lastNowNanos = firstNowNanos;
69+
long startInNanos = firstStartInNanos;
70+
@Override
71+
public void call() {
72+
action.call();
73+
74+
if (!mas.isUnsubscribed()) {
75+
76+
long nextTick;
77+
78+
long nowNanos = nowNanoSupplier != null ? nowNanoSupplier.nowNanos() : TimeUnit.MILLISECONDS.toNanos(worker.now());
79+
// If the clock moved in a direction quite a bit, rebase the repetition period
80+
if (nowNanos + CLOCK_DRIFT_TOLERANCE_NANOS < lastNowNanos
81+
|| nowNanos >= lastNowNanos + periodInNanos + CLOCK_DRIFT_TOLERANCE_NANOS) {
82+
nextTick = nowNanos + periodInNanos;
83+
/*
84+
* Shift the start point back by the drift as if the whole thing
85+
* started count periods ago.
86+
*/
87+
startInNanos = nextTick - (periodInNanos * (++count));
88+
} else {
89+
nextTick = startInNanos + (++count * periodInNanos);
90+
}
91+
lastNowNanos = nowNanos;
92+
93+
long delay = nextTick - nowNanos;
94+
mas.replace(worker.schedule(this, delay, TimeUnit.NANOSECONDS));
95+
}
96+
}
97+
};
98+
first.replace(worker.schedule(recursiveAction, initialDelay, unit));
99+
return mas;
100+
}
101+
102+
}

src/main/java/rx/schedulers/TestScheduler.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import rx.Scheduler;
2424
import rx.Subscription;
2525
import rx.functions.Action0;
26+
import rx.internal.schedulers.SchedulePeriodicHelper;
27+
import rx.internal.schedulers.SchedulePeriodicHelper.NowNanoSupplier;
2628
import rx.subscriptions.BooleanSubscription;
2729
import rx.subscriptions.Subscriptions;
2830

@@ -130,7 +132,7 @@ public Worker createWorker() {
130132
return new InnerTestScheduler();
131133
}
132134

133-
final class InnerTestScheduler extends Worker {
135+
final class InnerTestScheduler extends Worker implements NowNanoSupplier {
134136

135137
private final BooleanSubscription s = new BooleanSubscription();
136138

@@ -172,10 +174,21 @@ public void call() {
172174
});
173175
}
174176

177+
@Override
178+
public Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit) {
179+
return SchedulePeriodicHelper.schedulePeriodically(this,
180+
action, initialDelay, period, unit, this);
181+
}
182+
175183
@Override
176184
public long now() {
177185
return TestScheduler.this.now();
178186
}
187+
188+
@Override
189+
public long nowNanos() {
190+
return TestScheduler.this.time;
191+
}
179192

180193
}
181194

src/test/java/rx/SchedulerWorkerTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.junit.Test;
2424

2525
import rx.functions.Action0;
26+
import rx.internal.schedulers.SchedulePeriodicHelper;
2627
import rx.schedulers.Schedulers;
2728

2829
public class SchedulerWorkerTest {
@@ -85,7 +86,7 @@ public void call() {
8586

8687
Thread.sleep(150);
8788

88-
s.drift = -1000 - TimeUnit.NANOSECONDS.toMillis(Scheduler.CLOCK_DRIFT_TOLERANCE_NANOS);
89+
s.drift = -1000 - TimeUnit.NANOSECONDS.toMillis(SchedulePeriodicHelper.CLOCK_DRIFT_TOLERANCE_NANOS);
8990

9091
Thread.sleep(400);
9192

@@ -127,7 +128,7 @@ public void call() {
127128

128129
Thread.sleep(150);
129130

130-
s.drift = 1000 + TimeUnit.NANOSECONDS.toMillis(Scheduler.CLOCK_DRIFT_TOLERANCE_NANOS);
131+
s.drift = 1000 + TimeUnit.NANOSECONDS.toMillis(SchedulePeriodicHelper.CLOCK_DRIFT_TOLERANCE_NANOS);
131132

132133
Thread.sleep(400);
133134

src/test/java/rx/schedulers/TestSchedulerTest.java

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,18 @@
1717

1818
import static org.junit.Assert.assertEquals;
1919
import static org.mockito.Matchers.anyLong;
20-
import static org.mockito.Mockito.mock;
21-
import static org.mockito.Mockito.never;
22-
import static org.mockito.Mockito.times;
23-
import static org.mockito.Mockito.verify;
20+
import static org.mockito.Mockito.*;
2421

2522
import java.util.concurrent.TimeUnit;
2623
import java.util.concurrent.atomic.AtomicInteger;
2724

2825
import org.junit.Test;
29-
import org.mockito.InOrder;
30-
import org.mockito.Mockito;
26+
import org.mockito.*;
3127

32-
import rx.Observable;
28+
import rx.*;
3329
import rx.Observable.OnSubscribe;
34-
import rx.Scheduler;
35-
import rx.Subscriber;
36-
import rx.Subscription;
37-
import rx.functions.Action0;
38-
import rx.functions.Func1;
30+
import rx.functions.*;
31+
import rx.observers.TestSubscriber;
3932

4033
public class TestSchedulerTest {
4134

@@ -222,4 +215,24 @@ public void call() {
222215
inner.unsubscribe();
223216
}
224217
}
218+
219+
@Test
220+
public void resolution() {
221+
for (final TimeUnit unit : TimeUnit.values()) {
222+
TestScheduler scheduler = new TestScheduler();
223+
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
224+
225+
Observable.interval(30, unit, scheduler)
226+
.map(new Func1<Long, String>() {
227+
@Override
228+
public String call(Long v) {
229+
return v + "-" + unit;
230+
}
231+
})
232+
.subscribe(testSubscriber);
233+
scheduler.advanceTimeTo(60, unit);
234+
235+
testSubscriber.assertValues("0-" + unit, "1-" + unit);
236+
}
237+
}
225238
}

0 commit comments

Comments
 (0)