Skip to content

Commit 3f6e570

Browse files
authored
2.x: improve the documentation of Schedulers utility class. (ReactiveX#5223)
1 parent 59dc7a3 commit 3f6e570

File tree

2 files changed

+196
-18
lines changed

2 files changed

+196
-18
lines changed

src/main/java/io/reactivex/annotations/SchedulerSupport.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@
5959
* or takes timing information from it.
6060
*/
6161
String TRAMPOLINE = "io.reactivex:trampoline";
62+
/**
63+
* The operator/class runs on RxJava's {@linkplain Schedulers#single() single scheduler}
64+
* or takes timing information from it.
65+
* @since 2.0.8 - experimental
66+
*/
67+
@Experimental
68+
String SINGLE = "io.reactivex:single";
6269

6370
/**
6471
* The kind of scheduler the class or method uses.

src/main/java/io/reactivex/schedulers/Schedulers.java

Lines changed: 189 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
/**
2424
* Static factory methods for returning standard Scheduler instances.
2525
* <p>
26+
* The initial and runtime values of the various scheduler types can be overridden via the
27+
* {@code RxJavaPlugins.setInit(scheduler name)SchedulerHandler()} and
28+
* {@code RxJavaPlugins.set(scheduler name)SchedulerHandler()} respectively.
29+
* <p>
2630
* <strong>Supported system properties ({@code System.getProperty()}):</strong>
2731
* <ul>
2832
* <li>{@code rx2.io-priority} (int): sets the thread priority of the {@link #io()} Scheduler, default is {@link Thread#NORM_PRIORITY}</li>
@@ -84,14 +88,46 @@ private Schedulers() {
8488
}
8589

8690
/**
87-
* Creates and returns a {@link Scheduler} intended for computational work.
91+
* Returns a default, shared {@link Scheduler} instance intended for computational work.
8892
* <p>
8993
* This can be used for event-loops, processing callbacks and other computational work.
9094
* <p>
91-
* Do not perform IO-bound work on this scheduler. Use {@link #io()} instead.
95+
* It is not recommended to perform blocking, IO-bound work on this scheduler. Use {@link #io()} instead.
96+
* <p>
97+
* The default instance has a backing pool of single-threaded {@link ScheduledExecutorService} instances equal to
98+
* the number of available processors ({@link java.lang.Runtime#availableProcessors()}) to the Java VM.
9299
* <p>
93100
* Unhandled errors will be delivered to the scheduler Thread's {@link java.lang.Thread.UncaughtExceptionHandler}.
94-
*
101+
* <p>
102+
* This type of scheduler is less sensitive to leaking {@link io.reactivex.Scheduler.Worker} instances, although
103+
* not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or
104+
* execute those tasks "unexpectedly".
105+
* <p>
106+
* If the {@link RxJavaPlugins#setFailOnNonBlockingScheduler(boolean)} is set to true, attempting to execute
107+
* operators that block while running on this scheduler will throw an {@link IllegalStateException}.
108+
* <p>
109+
* You can control certain properties of this standard scheduler via system properties that have to be set
110+
* before the {@link Schedulers} class is referenced in your code.
111+
* <br><strong>Supported system properties ({@code System.getProperty()}):</strong>
112+
* <ul>
113+
* <li>{@code rx2.computation-threads} (int): sets the number of threads in the {@link #computation()} Scheduler, default is the number of available CPUs</li>
114+
* <li>{@code rx2.computation-priority} (int): sets the thread priority of the {@link #computation()} Scheduler, default is {@link Thread#NORM_PRIORITY}</li>
115+
* </ul>
116+
* <p>
117+
* The default value of this scheduler can be overridden at initialization time via the
118+
* {@link RxJavaPlugins#setInitComputationSchedulerHandler(io.reactivex.functions.Function)} plugin method.
119+
* Note that due to possible initialization cycles, using any of the other scheduler-returning methods will
120+
* result in a {@code NullPointerException}.
121+
* Once the {@link Schedulers} class has been initialized, you can override the returned {@link Scheduler} instance
122+
* via the {@link RxJavaPlugins#setComputationSchedulerHandler(io.reactivex.functions.Function)} method.
123+
* <p>
124+
* It is possible to create a fresh instance of this scheduler with a custom ThreadFactory, via the
125+
* {@link RxJavaPlugins#createComputationScheduler(ThreadFactory)} method. Note that such custom
126+
* instances require a manual call to {@link Scheduler#shutdown()} to allow the JVM to exit or the
127+
* (J2EE) container to unload properly.
128+
* <p>Operators on the base reactive classes that use this scheduler are marked with the
129+
* &#64;{@link io.reactivex.annotations.SchedulerSupport SchedulerSupport}({@link io.reactivex.annotations.SchedulerSupport#COMPUTATION COMPUTATION})
130+
* annotation.
95131
* @return a {@link Scheduler} meant for computation-bound work
96132
*/
97133
@NonNull
@@ -100,16 +136,42 @@ public static Scheduler computation() {
100136
}
101137

102138
/**
103-
* Creates and returns a {@link Scheduler} intended for IO-bound work.
104-
* <p>
105-
* The implementation is backed by an {@link Executor} thread-pool that will grow as needed.
139+
* Returns a default, shared {@link Scheduler} instance intended for IO-bound work.
106140
* <p>
107141
* This can be used for asynchronously performing blocking IO.
108142
* <p>
109-
* Do not perform computational work on this scheduler. Use {@link #computation()} instead.
143+
* The implementation is backed by a pool of single-threaded {link ScheduledExecutorService} instances
144+
* that will try to reuse previoulsy started instances used by the worker
145+
* returned by {@link io.reactivex.Scheduler#createWorker()} but otherwise will start a new backing
146+
* {link ScheduledExecutorService} instance. Note that this scheduler may create an unbounded number
147+
* of worker threads that can result in system slowdowns or {@code OutOfMemoryError}. Therefore, for casual uses
148+
* or when implementing an operator, the Worker instances must be disposed via {@link io.reactivex.Scheduler.Worker#dispose()}.
149+
* <p>
150+
* It is not recommended to perform computational work on this scheduler. Use {@link #computation()} instead.
110151
* <p>
111152
* Unhandled errors will be delivered to the scheduler Thread's {@link java.lang.Thread.UncaughtExceptionHandler}.
112-
*
153+
* <p>
154+
* You can control certain properties of this standard scheduler via system properties that have to be set
155+
* before the {@link Schedulers} class is referenced in your code.
156+
* <br><strong>Supported system properties ({@code System.getProperty()}):</strong>
157+
* <ul>
158+
* <li>{@code rx2.io-priority} (int): sets the thread priority of the {@link #io()} Scheduler, default is {@link Thread#NORM_PRIORITY}</li>
159+
* </ul>
160+
* <p>
161+
* The default value of this scheduler can be overridden at initialization time via the
162+
* {@link RxJavaPlugins#setInitIoSchedulerHandler(io.reactivex.functions.Function)} plugin method.
163+
* Note that due to possible initialization cycles, using any of the other scheduler-returning methods will
164+
* result in a {@code NullPointerException}.
165+
* Once the {@link Schedulers} class has been initialized, you can override the returned {@link Scheduler} instance
166+
* via the {@link RxJavaPlugins#setIoSchedulerHandler(io.reactivex.functions.Function)} method.
167+
* <p>
168+
* It is possible to create a fresh instance of this scheduler with a custom ThreadFactory, via the
169+
* {@link RxJavaPlugins#createIoScheduler(ThreadFactory)} method. Note that such custom
170+
* instances require a manual call to {@link Scheduler#shutdown()} to allow the JVM to exit or the
171+
* (J2EE) container to unload properly.
172+
* <p>Operators on the base reactive classes that use this scheduler are marked with the
173+
* &#64;{@link io.reactivex.annotations.SchedulerSupport SchedulerSupport}({@link io.reactivex.annotations.SchedulerSupport#IO IO})
174+
* annotation.
113175
* @return a {@link Scheduler} meant for IO-bound work
114176
*/
115177
@NonNull
@@ -118,9 +180,17 @@ public static Scheduler io() {
118180
}
119181

120182
/**
121-
* Creates and returns a {@link Scheduler} that queues work on the current thread to be executed after the
122-
* current work completes.
123-
*
183+
* Returns a default, shared {@link Scheduler} instance whose {@link io.reactivex.Scheduler.Worker}
184+
* instances queue work and execute them in a FIFO manner on one of the participating threads.
185+
* <p>
186+
* The default implementation's {@link Scheduler#scheduleDirect(Runnable)} methods execute the tasks on the current thread
187+
* without any queueing and the timed overloads use blocking sleep as well.
188+
* <p>
189+
* Note that this scheduler can't be reliably used to return the execution of
190+
* tasks to the "main" thread. Such behavior requires a blocking-queueing scheduler currently not provided
191+
* by RxJava itself but may be found in external libraries.
192+
* <p>
193+
* This scheduler can't be overridden via an {@link RxJavaPlugins} method.
124194
* @return a {@link Scheduler} that queues work on the current thread
125195
*/
126196
@NonNull
@@ -129,10 +199,37 @@ public static Scheduler trampoline() {
129199
}
130200

131201
/**
132-
* Creates and returns a {@link Scheduler} that creates a new {@link Thread} for each unit of work.
202+
* Returns a default, shared {@link Scheduler} instance that creates a new {@link Thread} for each unit of work.
203+
* <p>
204+
* The default implementation of this scheduler creates a new, single-threaded {@link ScheduledExecutorService} for
205+
* each invocation of the {@link Scheduler#scheduleDirect(Runnable)} (plus its overloads) and {@link Scheduler#createWorker()}
206+
* methods, thus an unbounded number of worker threads may be created that can
207+
* result in system slowdowns or {@code OutOfMemoryError}. Therefore, for casual uses or when implementing an operator,
208+
* the Worker instances must be disposed via {@link io.reactivex.Scheduler.Worker#dispose()}.
133209
* <p>
134210
* Unhandled errors will be delivered to the scheduler Thread's {@link java.lang.Thread.UncaughtExceptionHandler}.
135-
*
211+
* <p>
212+
* You can control certain properties of this standard scheduler via system properties that have to be set
213+
* before the {@link Schedulers} class is referenced in your code.
214+
* <br><strong>Supported system properties ({@code System.getProperty()}):</strong>
215+
* <ul>
216+
* <li>{@code rx2.newthread-priority} (int): sets the thread priority of the {@link #newThread()} Scheduler, default is {@link Thread#NORM_PRIORITY}</li>
217+
* </ul>
218+
* <p>
219+
* The default value of this scheduler can be overridden at initialization time via the
220+
* {@link RxJavaPlugins#setInitNewThreadSchedulerHandler(io.reactivex.functions.Function)} plugin method.
221+
* Note that due to possible initialization cycles, using any of the other scheduler-returning methods will
222+
* result in a {@code NullPointerException}.
223+
* Once the {@link Schedulers} class has been initialized, you can override the returned {@link Scheduler} instance
224+
* via the {@link RxJavaPlugins#setNewThreadSchedulerHandler(io.reactivex.functions.Function)} method.
225+
* <p>
226+
* It is possible to create a fresh instance of this scheduler with a custom ThreadFactory, via the
227+
* {@link RxJavaPlugins#createNewThreadScheduler(ThreadFactory)} method. Note that such custom
228+
* instances require a manual call to {@link Scheduler#shutdown()} to allow the JVM to exit or the
229+
* (J2EE) container to unload properly.
230+
* <p>Operators on the base reactive classes that use this scheduler are marked with the
231+
* &#64;{@link io.reactivex.annotations.SchedulerSupport SchedulerSupport}({@link io.reactivex.annotations.SchedulerSupport#NEW_THREAD NEW_TRHEAD})
232+
* annotation.
136233
* @return a {@link Scheduler} that creates new threads
137234
*/
138235
@NonNull
@@ -141,7 +238,8 @@ public static Scheduler newThread() {
141238
}
142239

143240
/**
144-
* Returns the common, single-thread backed Scheduler instance.
241+
* Returns a default, shared, single-thread-backed {@link Scheduler} instance for work
242+
* requiring strongly-sequential execution on the same background thread.
145243
* <p>
146244
* Uses:
147245
* <ul>
@@ -150,6 +248,37 @@ public static Scheduler newThread() {
150248
* <li>support benchmarks that pipeline data from the main thread to some other thread and
151249
* avoid core-bashing of computation's round-robin nature</li>
152250
* </ul>
251+
* <p>
252+
* Unhandled errors will be delivered to the scheduler Thread's {@link java.lang.Thread.UncaughtExceptionHandler}.
253+
* <p>
254+
* This type of scheduler is less sensitive to leaking {@link io.reactivex.Scheduler.Worker} instances, although
255+
* not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or
256+
* execute those tasks "unexpectedly".
257+
* <p>
258+
* If the {@link RxJavaPlugins#setFailOnNonBlockingScheduler(boolean)} is set to true, attempting to execute
259+
* operators that block while running on this scheduler will throw an {@link IllegalStateException}.
260+
* <p>
261+
* You can control certain properties of this standard scheduler via system properties that have to be set
262+
* before the {@link Schedulers} class is referenced in your code.
263+
* <br><strong>Supported system properties ({@code System.getProperty()}):</strong>
264+
* <ul>
265+
* <li>{@code rx2.single-priority} (int): sets the thread priority of the {@link #single()} Scheduler, default is {@link Thread#NORM_PRIORITY}</li>
266+
* </ul>
267+
* <p>
268+
* The default value of this scheduler can be overridden at initialization time via the
269+
* {@link RxJavaPlugins#setInitSingleSchedulerHandler(io.reactivex.functions.Function)} plugin method.
270+
* Note that due to possible initialization cycles, using any of the other scheduler-returning methods will
271+
* result in a {@code NullPointerException}.
272+
* Once the {@link Schedulers} class has been initialized, you can override the returned {@link Scheduler} instance
273+
* via the {@link RxJavaPlugins#setSingleSchedulerHandler(io.reactivex.functions.Function)} method.
274+
* <p>
275+
* It is possible to create a fresh instance of this scheduler with a custom ThreadFactory, via the
276+
* {@link RxJavaPlugins#createSingleScheduler(ThreadFactory)} method. Note that such custom
277+
* instances require a manual call to {@link Scheduler#shutdown()} to allow the JVM to exit or the
278+
* (J2EE) container to unload properly.
279+
* <p>Operators on the base reactive classes that use this scheduler are marked with the
280+
* &#64;{@link io.reactivex.annotations.SchedulerSupport SchedulerSupport}({@link io.reactivex.annotations.SchedulerSupport#SINGLE SINGLE})
281+
* annotation.
153282
* @return a {@link Scheduler} that shares a single backing thread.
154283
* @since 2.0
155284
*/
@@ -159,8 +288,50 @@ public static Scheduler single() {
159288
}
160289

161290
/**
162-
* Converts an {@link Executor} into a new Scheduler instance.
163-
*
291+
* Wraps an {@link Executor} into a new Scheduler instance and delegates {@code schedule()}
292+
* calls to it.
293+
* <p>
294+
* If the provided executor doesn't support any of the more specific standard Java executor
295+
* APIs, cancelling tasks scheduled by this scheduler can't be interrupted when they are
296+
* executing but only prevented from running prior to that. In addition, tasks scheduled with
297+
* a time delay or periodically will use the {@link #single()} scheduler for the timed waiting
298+
* before posting the actual task to the given executor.
299+
* <p>
300+
* If the provided executor supports the standard Java {@link ExecutorService} API,
301+
* cancelling tasks scheduled by this scheduler can be cancelled/interrupted by calling
302+
* {@link io.reactivex.disposables.Disposable#dispose()}. In addition, tasks scheduled with
303+
* a time delay or periodically will use the {@link #single()} scheduler for the timed waiting
304+
* before posting the actual task to the given executor.
305+
* <p>
306+
* If the provided executor supports the standard Java {@link ScheduledExecutorService} API,
307+
* cancelling tasks scheduled by this scheduler can be cancelled/interrupted by calling
308+
* {@link io.reactivex.disposables.Disposable#dispose()}. In addition, tasks scheduled with
309+
* a time delay or periodically will use the provided executor. Note, however, if the provided
310+
* {@code ScheduledExecutorService} instance is not single threaded, tasks scheduled
311+
* with a time delay close to each other may end up executing in different order than
312+
* the original schedule() call was issued. This limitation may be lifted in a future patch.
313+
* <p>
314+
* Starting, stopping and restarting this scheduler is not supported (no-op) and the provided
315+
* executor's lifecycle must be managed externally:
316+
* <code><pre>
317+
* ExecutorService exec = Executors.newSingleThreadedExecutor();
318+
* try {
319+
* Scheduler scheduler = Schedulers.from(exec);
320+
* Flowable.just(1)
321+
* .subscribeOn(scheduler)
322+
* .map(v -&gt; v + 1)
323+
* .observeOn(scheduler)
324+
* .blockingSubscribe(System.out::println);
325+
* } finally {
326+
* exec.shutdown();
327+
* }
328+
* </pre></code>
329+
* <p>
330+
* This type of scheduler is less sensitive to leaking {@link io.reactivex.Scheduler.Worker} instances, although
331+
* not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or
332+
* execute those tasks "unexpectedly".
333+
* <p>
334+
* Note that this method returns a new {@link Scheduler} instance, even for the same {@link Executor} instance.
164335
* @param executor
165336
* the executor to wrap
166337
* @return the new Scheduler wrapping the Executor
@@ -171,7 +342,7 @@ public static Scheduler from(@NonNull Executor executor) {
171342
}
172343

173344
/**
174-
* Shuts down those standard Schedulers which support the SchedulerLifecycle interface.
345+
* Shuts down the standard Schedulers.
175346
* <p>The operation is idempotent and thread-safe.
176347
*/
177348
public static void shutdown() {
@@ -184,7 +355,7 @@ public static void shutdown() {
184355
}
185356

186357
/**
187-
* Starts those standard Schedulers which support the SchedulerLifecycle interface.
358+
* Starts the standard Schedulers.
188359
* <p>The operation is idempotent and thread-safe.
189360
*/
190361
public static void start() {

0 commit comments

Comments
 (0)