Skip to content

Commit 2875fef

Browse files
authored
2.x: Expand the JavaDocs of the Scheduler API (ReactiveX#5843)
* 2.x: Expand the JavaDocs of the Scheduler API * Fix typo and grammar.
1 parent f7a2870 commit 2875fef

File tree

2 files changed

+142
-29
lines changed

2 files changed

+142
-29
lines changed

src/main/java/io/reactivex/Scheduler.java

Lines changed: 133 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,72 @@
2727

2828
/**
2929
* A {@code Scheduler} is an object that specifies an API for scheduling
30-
* units of work with or without delays or periodically.
31-
* You can get common instances of this class in {@link io.reactivex.schedulers.Schedulers}.
30+
* units of work provided in the form of {@link Runnable}s to be
31+
* executed without delay (effectively as soon as possible), after a specified time delay or periodically
32+
* and represents an abstraction over an asynchronous boundary that ensures
33+
* these units of work get executed by some underlying task-execution scheme
34+
* (such as custom Threads, event loop, {@link java.util.concurrent.Executor Executor} or Actor system)
35+
* with some uniform properties and guarantees regardless of the particular underlying
36+
* scheme.
37+
* <p>
38+
* You can get various standard, RxJava-specific instances of this class via
39+
* the static methods of the {@link io.reactivex.schedulers.Schedulers} utility class.
40+
* <p>
41+
* The so-called {@link Worker}s of a {@code Scheduler} can be created via the {@link #createWorker()} method which allow the scheduling
42+
* of multiple {@link Runnable} tasks in an isolated manner. {@code Runnable} tasks scheduled on a {@code Worker} are guaranteed to be
43+
* executed sequentially and in a non-overlapping fashion. Non-delayed {@code Runnable} tasks are guaranteed to execute in a
44+
* First-In-First-Out order but their execution may be interleaved with delayed tasks.
45+
* In addition, outstanding or running tasks can be cancelled together via
46+
* {@link Worker#dispose()} without affecting any other {@code Worker} instances of the same {@code Scheduler}.
47+
* <p>
48+
* Implementations of the {@link #scheduleDirect} and {@link Worker#schedule} methods are encouraged to call the {@link io.reactivex.plugins.RxJavaPlugins#onSchedule(Runnable)}
49+
* method to allow a scheduler hook to manipulate (wrap or replace) the original {@code Runnable} task before it is submitted to the
50+
* underlying task-execution scheme.
51+
* <p>
52+
* The default implementations of the {@code scheduleDirect} methods provided by this abstract class
53+
* delegate to the respective {@code schedule} methods in the {@link Worker} instance created via {@link #createWorker()}
54+
* for each individual {@link Runnable} task submitted. Implementors of this class are encouraged to provide
55+
* a more efficient direct scheduling implementation to avoid the time and memory overhead of creating such {@code Worker}s
56+
* for every task.
57+
* This delegation is done via special wrapper instances around the original {@code Runnable} before calling the respective
58+
* {@code Worker.schedule} method. Note that this can lead to multiple {@code RxJavaPlugins.onSchedule} calls and potentially
59+
* multiple hooks applied. Therefore, the default implementations of {@code scheduleDirect} (and the {@link Worker#schedulePeriodically(Runnable, long, long, TimeUnit)})
60+
* wrap the incoming {@code Runnable} into a class that implements the {@link io.reactivex.schedulers.SchedulerRunnableIntrospection}
61+
* interface which can grant access to the original or hooked {@code Runnable}, thus, a repeated {@code RxJavaPlugins.onSchedule}
62+
* can detect the earlier hook and not apply a new one over again.
63+
* <p>
64+
* The default implementation of {@link #now(TimeUnit)} and {@link Worker#now(TimeUnit)} methods to return current
65+
* {@link System#currentTimeMillis()} value in the desired time unit. Custom {@code Scheduler} implementations can override this
66+
* to provide specialized time accounting (such as virtual time to be advanced programmatically).
67+
* Note that operators requiring a {@code Scheduler} may rely on either of the {@code now()} calls provided by
68+
* {@code Scheduler} or {@code Worker} respectively, therefore, it is recommended they represent a logically
69+
* consistent source of the current time.
70+
* <p>
71+
* The default implementation of the {@link Worker#schedulePeriodically(Runnable, long, long, TimeUnit)} method uses
72+
* the {@link Worker#schedule(Runnable, long, TimeUnit)} for scheduling the {@code Runnable} task periodically.
73+
* The algorithm calculates the next absolute time when the task should run again and schedules this execution
74+
* based on the relative time between it and {@link Worker#now(TimeUnit)}. However, drifts or changes in the
75+
* system clock could affect this calculation either by scheduling subsequent runs too frequently or too far apart.
76+
* Therefore, the default implementation uses the {@link #clockDriftTolerance()} value (set via
77+
* {@code rx2.scheduler.drift-tolerance} in minutes) to detect a drift in {@link Worker#now(TimeUnit)} and
78+
* re-adjust the absolute/relative time calculation accordingly.
79+
* <p>
80+
* The default implementations of {@link #start()} and {@link #shutdown()} do nothing and should be overridden if the
81+
* underlying task-execution scheme supports stopping and restarting itself.
82+
* <p>
83+
* If the {@code Scheduler} is shut down or a {@code Worker} is disposed, the {@code schedule} methods
84+
* should return the {@link io.reactivex.disposables.Disposables#disposed()} singleton instance indicating the shut down/disposed
85+
* state to the caller. Since the shutdown or dispose can happen from any thread, the {@code schedule} implementations
86+
* should make best effort to cancel tasks immediately after those tasks have been submitted to the
87+
* underlying task-execution scheme if the shutdown/dispose was detected after this submission.
88+
* <p>
89+
* All methods on the {@code Scheduler} and {@code Worker} classes should be thread safe.
3290
*/
3391
public abstract class Scheduler {
3492
/**
3593
* The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase.
3694
* <p>
37-
* The associated system parameter, {@code rx.scheduler.drift-tolerance}, expects its value in minutes.
95+
* The associated system parameter, {@code rx2.scheduler.drift-tolerance}, expects its value in minutes.
3896
*/
3997
static final long CLOCK_DRIFT_TOLERANCE_NANOSECONDS;
4098
static {
@@ -44,7 +102,7 @@ public abstract class Scheduler {
44102

45103
/**
46104
* Returns the clock drift tolerance in nanoseconds.
47-
* <p>Related system property: {@code rx2.scheduler.drift-tolerance} in minutes
105+
* <p>Related system property: {@code rx2.scheduler.drift-tolerance} in minutes.
48106
* @return the tolerance in nanoseconds
49107
* @since 2.0
50108
*/
@@ -54,11 +112,13 @@ public static long clockDriftTolerance() {
54112

55113

56114
/**
57-
* Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions.
115+
* Retrieves or creates a new {@link Scheduler.Worker} that represents sequential execution of actions.
58116
* <p>
59-
* When work is completed it should be unsubscribed using {@link Scheduler.Worker#dispose()}.
117+
* When work is completed, the {@code Worker} instance should be released
118+
* by calling {@link Scheduler.Worker#dispose()} to avoid potential resource leaks in the
119+
* underlying task-execution scheme.
60120
* <p>
61-
* Work on a {@link Scheduler.Worker} is guaranteed to be sequential.
121+
* Work on a {@link Scheduler.Worker} is guaranteed to be sequential and non-overlapping.
62122
*
63123
* @return a Worker representing a serial queue of actions to be executed
64124
*/
@@ -78,29 +138,37 @@ public long now(@NonNull TimeUnit unit) {
78138
/**
79139
* Allows the Scheduler instance to start threads
80140
* and accept tasks on them.
81-
* <p>Implementations should make sure the call is idempotent and thread-safe.
141+
* <p>
142+
* Implementations should make sure the call is idempotent, thread-safe and
143+
* should not throw any {@code RuntimeException} if it doesn't support this
144+
* functionality.
145+
*
82146
* @since 2.0
83147
*/
84148
public void start() {
85149

86150
}
87151

88152
/**
89-
* Instructs the Scheduler instance to stop threads
90-
* and stop accepting tasks on any outstanding Workers.
91-
* <p>Implementations should make sure the call is idempotent and thread-safe.
153+
* Instructs the Scheduler instance to stop threads,
154+
* stop accepting tasks on any outstanding {@link Worker} instances
155+
* and clean up any associated resources with this Scheduler.
156+
* <p>
157+
* Implementations should make sure the call is idempotent, thread-safe and
158+
* should not throw any {@code RuntimeException} if it doesn't support this
159+
* functionality.
92160
* @since 2.0
93161
*/
94162
public void shutdown() {
95163

96164
}
97165

98166
/**
99-
* Schedules the given task on this scheduler non-delayed execution.
167+
* Schedules the given task on this Scheduler without any time delay.
100168
*
101169
* <p>
102170
* This method is safe to be called from multiple threads but there are no
103-
* ordering guarantees between tasks.
171+
* ordering or non-overlapping guarantees between tasks.
104172
*
105173
* @param run the task to execute
106174
*
@@ -113,7 +181,7 @@ public Disposable scheduleDirect(@NonNull Runnable run) {
113181
}
114182

115183
/**
116-
* Schedules the execution of the given task with the given delay amount.
184+
* Schedules the execution of the given task with the given time delay.
117185
*
118186
* <p>
119187
* This method is safe to be called from multiple threads but there are no
@@ -139,15 +207,16 @@ public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull Tim
139207
}
140208

141209
/**
142-
* Schedules a periodic execution of the given task with the given initial delay and period.
210+
* Schedules a periodic execution of the given task with the given initial time delay and repeat period.
143211
*
144212
* <p>
145213
* This method is safe to be called from multiple threads but there are no
146214
* ordering guarantees between tasks.
147215
*
148216
* <p>
149-
* The periodic execution is at a fixed rate, that is, the first execution will be after the initial
150-
* delay, the second after initialDelay + period, the third after initialDelay + 2 * period, and so on.
217+
* The periodic execution is at a fixed rate, that is, the first execution will be after the
218+
* {@code initialDelay}, the second after {@code initialDelay + period}, the third after
219+
* {@code initialDelay + 2 * period}, and so on.
151220
*
152221
* @param run the task to schedule
153222
* @param initialDelay the initial delay amount, non-positive values indicate non-delayed scheduling
@@ -254,13 +323,43 @@ public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flow
254323
}
255324

256325
/**
257-
* Sequential Scheduler for executing actions on a single thread or event loop.
326+
* Represents an isolated, sequential worker of a parent Scheduler for executing {@code Runnable} tasks on
327+
* an underlying task-execution scheme (such as custom Threads, event loop, {@link java.util.concurrent.Executor Executor} or Actor system).
328+
* <p>
329+
* Disposing the {@link Worker} should cancel all outstanding work and allows resource cleanup.
258330
* <p>
259-
* Disposing the {@link Worker} cancels all outstanding work and allows resource cleanup.
331+
* The default implementations of {@link #schedule(Runnable)} and {@link #schedulePeriodically(Runnable, long, long, TimeUnit)}
332+
* delegate to the abstract {@link #schedule(Runnable, long, TimeUnit)} method. Its implementation is encouraged to
333+
* track the individual {@code Runnable} tasks while they are waiting to be executed (with or without delay) so that
334+
* {@link #dispose()} can prevent their execution or potentially interrupt them if they are currently running.
335+
* <p>
336+
* The default implementation of the {@link #now(TimeUnit)} method returns current
337+
* {@link System#currentTimeMillis()} value in the desired time unit. Custom {@code Worker} implementations can override this
338+
* to provide specialized time accounting (such as virtual time to be advanced programmatically).
339+
* Note that operators requiring a scheduler may rely on either of the {@code now()} calls provided by
340+
* {@code Scheduler} or {@code Worker} respectively, therefore, it is recommended they represent a logically
341+
* consistent source of the current time.
342+
* <p>
343+
* The default implementation of the {@link #schedulePeriodically(Runnable, long, long, TimeUnit)} method uses
344+
* the {@link #schedule(Runnable, long, TimeUnit)} for scheduling the {@code Runnable} task periodically.
345+
* The algorithm calculates the next absolute time when the task should run again and schedules this execution
346+
* based on the relative time between it and {@link #now(TimeUnit)}. However, drifts or changes in the
347+
* system clock would affect this calculation either by scheduling subsequent runs too frequently or too far apart.
348+
* Therefore, the default implementation uses the {@link #clockDriftTolerance()} value (set via
349+
* {@code rx2.scheduler.drift-tolerance} in minutes) to detect a drift in {@link #now(TimeUnit)} and
350+
* re-adjust the absolute/relative time calculation accordingly.
351+
* <p>
352+
* If the {@code Worker} is disposed, the {@code schedule} methods
353+
* should return the {@link io.reactivex.disposables.Disposables#disposed()} singleton instance indicating the disposed
354+
* state to the caller. Since the {@link #dispose()} call can happen on any thread, the {@code schedule} implementations
355+
* should make best effort to cancel tasks immediately after those tasks have been submitted to the
356+
* underlying task-execution scheme if the dispose was detected after this submission.
357+
* <p>
358+
* All methods on the {@code Worker} class should be thread safe.
260359
*/
261360
public abstract static class Worker implements Disposable {
262361
/**
263-
* Schedules a Runnable for execution without delay.
362+
* Schedules a Runnable for execution without any time delay.
264363
*
265364
* <p>The default implementation delegates to {@link #schedule(Runnable, long, TimeUnit)}.
266365
*
@@ -274,15 +373,16 @@ public Disposable schedule(@NonNull Runnable run) {
274373
}
275374

276375
/**
277-
* Schedules an Runnable for execution at some point in the future.
376+
* Schedules an Runnable for execution at some point in the future specified by a time delay
377+
* relative to the current time.
278378
* <p>
279379
* Note to implementors: non-positive {@code delayTime} should be regarded as non-delayed schedule, i.e.,
280380
* as if the {@link #schedule(Runnable)} was called.
281381
*
282382
* @param run
283383
* the Runnable to schedule
284384
* @param delay
285-
* time to wait before executing the action; non-positive values indicate an non-delayed
385+
* time to "wait" before executing the action; non-positive values indicate an non-delayed
286386
* schedule
287387
* @param unit
288388
* the time unit of {@code delayTime}
@@ -292,12 +392,20 @@ public Disposable schedule(@NonNull Runnable run) {
292392
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
293393

294394
/**
295-
* Schedules a cancelable action to be executed periodically. This default implementation schedules
296-
* recursively and waits for actions to complete (instead of potentially executing long-running actions
297-
* concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
395+
* Schedules a periodic execution of the given task with the given initial time delay and repeat period.
396+
* <p>
397+
* The default implementation schedules and reschedules the {@code Runnable} task via the
398+
* {@link #schedule(Runnable, long, TimeUnit)}
399+
* method over and over and at a fixed rate, that is, the first execution will be after the
400+
* {@code initialDelay}, the second after {@code initialDelay + period}, the third after
401+
* {@code initialDelay + 2 * period}, and so on.
298402
* <p>
299403
* Note to implementors: non-positive {@code initialTime} and {@code period} should be regarded as
300404
* non-delayed scheduling of the first and any subsequent executions.
405+
* In addition, a more specific {@code Worker} implementation should override this method
406+
* if it can perform the periodic task execution with less overhead (such as by avoiding the
407+
* creation of the wrapper and tracker objects upon each periodic invocation of the
408+
* common {@link #schedule(Runnable, long, TimeUnit)} method).
301409
*
302410
* @param run
303411
* the Runnable to execute periodically

src/main/java/io/reactivex/schedulers/SchedulerRunnableIntrospection.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,15 @@
1515
import io.reactivex.plugins.RxJavaPlugins;
1616

1717
/**
18-
* Interface to wrap an action inside internal scheduler's task.
19-
*
20-
* You can check if runnable implements this interface and unwrap original runnable.
21-
* For example inside of the {@link RxJavaPlugins#setScheduleHandler(Function)}
18+
* Interface to indicate the implementor class wraps a {@code Runnable} that can
19+
* be accessed via {@link #getWrappedRunnable()}.
20+
* <p>
21+
* You can check if a {@link Runnable} task submitted to a {@link io.reactivex.Scheduler Scheduler} (or its
22+
* {@link io.reactivex.Scheduler.Worker Scheduler.Worker}) implements this interface and unwrap the
23+
* original {@code Runnable} instance. This could help to avoid hooking the same underlying {@code Runnable}
24+
* task in a custom {@link RxJavaPlugins#onSchedule(Runnable)} hook set via
25+
* the {@link RxJavaPlugins#setScheduleHandler(Function)} method multiple times due to internal delegation
26+
* of the default {@code Scheduler.scheduleDirect} or {@code Scheduler.Worker.schedule} methods.
2227
*
2328
* @since 2.1.7 - experimental
2429
*/

0 commit comments

Comments
 (0)