Skip to content

Commit a85ddd1

Browse files
authored
2.x: Add interruptible mode to Schedulers.from (ReactiveX#6370)
1 parent e1b3838 commit a85ddd1

File tree

3 files changed

+861
-15
lines changed

3 files changed

+861
-15
lines changed

src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java

Lines changed: 129 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import io.reactivex.internal.disposables.*;
2323
import io.reactivex.internal.functions.Functions;
2424
import io.reactivex.internal.queue.MpscLinkedQueue;
25-
import io.reactivex.internal.schedulers.ExecutorScheduler.ExecutorWorker.BooleanRunnable;
25+
import io.reactivex.internal.schedulers.ExecutorScheduler.ExecutorWorker.*;
2626
import io.reactivex.plugins.RxJavaPlugins;
2727
import io.reactivex.schedulers.*;
2828

@@ -31,19 +31,22 @@
3131
*/
3232
public final class ExecutorScheduler extends Scheduler {
3333

34+
final boolean interruptibleWorker;
35+
3436
@NonNull
3537
final Executor executor;
3638

3739
static final Scheduler HELPER = Schedulers.single();
3840

39-
public ExecutorScheduler(@NonNull Executor executor) {
41+
public ExecutorScheduler(@NonNull Executor executor, boolean interruptibleWorker) {
4042
this.executor = executor;
43+
this.interruptibleWorker = interruptibleWorker;
4144
}
4245

4346
@NonNull
4447
@Override
4548
public Worker createWorker() {
46-
return new ExecutorWorker(executor);
49+
return new ExecutorWorker(executor, interruptibleWorker);
4750
}
4851

4952
@NonNull
@@ -58,9 +61,15 @@ public Disposable scheduleDirect(@NonNull Runnable run) {
5861
return task;
5962
}
6063

61-
BooleanRunnable br = new BooleanRunnable(decoratedRun);
62-
executor.execute(br);
63-
return br;
64+
if (interruptibleWorker) {
65+
InterruptibleRunnable interruptibleTask = new InterruptibleRunnable(decoratedRun, null);
66+
executor.execute(interruptibleTask);
67+
return interruptibleTask;
68+
} else {
69+
BooleanRunnable br = new BooleanRunnable(decoratedRun);
70+
executor.execute(br);
71+
return br;
72+
}
6473
} catch (RejectedExecutionException ex) {
6574
RxJavaPlugins.onError(ex);
6675
return EmptyDisposable.INSTANCE;
@@ -111,6 +120,9 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
111120
}
112121
/* public: test support. */
113122
public static final class ExecutorWorker extends Scheduler.Worker implements Runnable {
123+
124+
final boolean interruptibleWorker;
125+
114126
final Executor executor;
115127

116128
final MpscLinkedQueue<Runnable> queue;
@@ -121,9 +133,10 @@ public static final class ExecutorWorker extends Scheduler.Worker implements Run
121133

122134
final CompositeDisposable tasks = new CompositeDisposable();
123135

124-
public ExecutorWorker(Executor executor) {
136+
public ExecutorWorker(Executor executor, boolean interruptibleWorker) {
125137
this.executor = executor;
126138
this.queue = new MpscLinkedQueue<Runnable>();
139+
this.interruptibleWorker = interruptibleWorker;
127140
}
128141

129142
@NonNull
@@ -134,9 +147,24 @@ public Disposable schedule(@NonNull Runnable run) {
134147
}
135148

136149
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
137-
BooleanRunnable br = new BooleanRunnable(decoratedRun);
138150

139-
queue.offer(br);
151+
Runnable task;
152+
Disposable disposable;
153+
154+
if (interruptibleWorker) {
155+
InterruptibleRunnable interruptibleTask = new InterruptibleRunnable(decoratedRun, tasks);
156+
tasks.add(interruptibleTask);
157+
158+
task = interruptibleTask;
159+
disposable = interruptibleTask;
160+
} else {
161+
BooleanRunnable runnableTask = new BooleanRunnable(decoratedRun);
162+
163+
task = runnableTask;
164+
disposable = runnableTask;
165+
}
166+
167+
queue.offer(task);
140168

141169
if (wip.getAndIncrement() == 0) {
142170
try {
@@ -149,7 +177,7 @@ public Disposable schedule(@NonNull Runnable run) {
149177
}
150178
}
151179

152-
return br;
180+
return disposable;
153181
}
154182

155183
@NonNull
@@ -288,6 +316,97 @@ public void run() {
288316
mar.replace(schedule(decoratedRun));
289317
}
290318
}
319+
320+
/**
321+
* Wrapper for a {@link Runnable} with additional logic for handling interruption on
322+
* a shared thread, similar to how Java Executors do it.
323+
*/
324+
static final class InterruptibleRunnable extends AtomicInteger implements Runnable, Disposable {
325+
326+
private static final long serialVersionUID = -3603436687413320876L;
327+
328+
final Runnable run;
329+
330+
final DisposableContainer tasks;
331+
332+
volatile Thread thread;
333+
334+
static final int READY = 0;
335+
336+
static final int RUNNING = 1;
337+
338+
static final int FINISHED = 2;
339+
340+
static final int INTERRUPTING = 3;
341+
342+
static final int INTERRUPTED = 4;
343+
344+
InterruptibleRunnable(Runnable run, DisposableContainer tasks) {
345+
this.run = run;
346+
this.tasks = tasks;
347+
}
348+
349+
@Override
350+
public void run() {
351+
if (get() == READY) {
352+
thread = Thread.currentThread();
353+
if (compareAndSet(READY, RUNNING)) {
354+
try {
355+
run.run();
356+
} finally {
357+
thread = null;
358+
if (compareAndSet(RUNNING, FINISHED)) {
359+
cleanup();
360+
} else {
361+
while (get() == INTERRUPTING) {
362+
Thread.yield();
363+
}
364+
Thread.interrupted();
365+
}
366+
}
367+
} else {
368+
thread = null;
369+
}
370+
}
371+
}
372+
373+
@Override
374+
public void dispose() {
375+
for (;;) {
376+
int state = get();
377+
if (state >= FINISHED) {
378+
break;
379+
} else if (state == READY) {
380+
if (compareAndSet(READY, INTERRUPTED)) {
381+
cleanup();
382+
break;
383+
}
384+
} else {
385+
if (compareAndSet(RUNNING, INTERRUPTING)) {
386+
Thread t = thread;
387+
if (t != null) {
388+
t.interrupt();
389+
thread = null;
390+
}
391+
set(INTERRUPTED);
392+
cleanup();
393+
break;
394+
}
395+
}
396+
}
397+
}
398+
399+
void cleanup() {
400+
if (tasks != null) {
401+
tasks.delete(this);
402+
}
403+
}
404+
405+
@Override
406+
public boolean isDisposed() {
407+
return get() >= FINISHED;
408+
}
409+
}
291410
}
292411

293412
static final class DelayedRunnable extends AtomicReference<Runnable>

src/main/java/io/reactivex/schedulers/Schedulers.java

Lines changed: 68 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@
1313

1414
package io.reactivex.schedulers;
1515

16+
import java.util.concurrent.*;
17+
1618
import io.reactivex.Scheduler;
17-
import io.reactivex.annotations.NonNull;
19+
import io.reactivex.annotations.*;
1820
import io.reactivex.internal.schedulers.*;
1921
import io.reactivex.plugins.RxJavaPlugins;
2022

21-
import java.util.concurrent.*;
22-
2323
/**
2424
* Static factory methods for returning standard Scheduler instances.
2525
* <p>
@@ -299,6 +299,9 @@ public static Scheduler single() {
299299
* a time delay or periodically will use the {@link #single()} scheduler for the timed waiting
300300
* before posting the actual task to the given executor.
301301
* <p>
302+
* Tasks submitted to the {@link Scheduler.Worker} of this {@code Scheduler} are also not interruptible. Use the
303+
* {@link #from(Executor, boolean)} overload to enable task interruption via this wrapper.
304+
* <p>
302305
* If the provided executor supports the standard Java {@link ExecutorService} API,
303306
* cancelling tasks scheduled by this scheduler can be cancelled/interrupted by calling
304307
* {@link io.reactivex.disposables.Disposable#dispose()}. In addition, tasks scheduled with
@@ -329,7 +332,7 @@ public static Scheduler single() {
329332
* }
330333
* </code></pre>
331334
* <p>
332-
* This type of scheduler is less sensitive to leaking {@link io.reactivex.Scheduler.Worker} instances, although
335+
* This type of scheduler is less sensitive to leaking {@link Scheduler.Worker} instances, although
333336
* not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or
334337
* execute those tasks "unexpectedly".
335338
* <p>
@@ -340,7 +343,67 @@ public static Scheduler single() {
340343
*/
341344
@NonNull
342345
public static Scheduler from(@NonNull Executor executor) {
343-
return new ExecutorScheduler(executor);
346+
return new ExecutorScheduler(executor, false);
347+
}
348+
349+
/**
350+
* Wraps an {@link Executor} into a new Scheduler instance and delegates {@code schedule()}
351+
* calls to it.
352+
* <p>
353+
* The tasks scheduled by the returned {@link Scheduler} and its {@link Scheduler.Worker}
354+
* can be optionally interrupted.
355+
* <p>
356+
* If the provided executor doesn't support any of the more specific standard Java executor
357+
* APIs, tasks scheduled with a time delay or periodically will use the
358+
* {@link #single()} scheduler for the timed waiting
359+
* before posting the actual task to the given executor.
360+
* <p>
361+
* If the provided executor supports the standard Java {@link ExecutorService} API,
362+
* canceling tasks scheduled by this scheduler can be cancelled/interrupted by calling
363+
* {@link io.reactivex.disposables.Disposable#dispose()}. In addition, tasks scheduled with
364+
* a time delay or periodically will use the {@link #single()} scheduler for the timed waiting
365+
* before posting the actual task to the given executor.
366+
* <p>
367+
* If the provided executor supports the standard Java {@link ScheduledExecutorService} API,
368+
* canceling tasks scheduled by this scheduler can be cancelled/interrupted by calling
369+
* {@link io.reactivex.disposables.Disposable#dispose()}. In addition, tasks scheduled with
370+
* a time delay or periodically will use the provided executor. Note, however, if the provided
371+
* {@code ScheduledExecutorService} instance is not single threaded, tasks scheduled
372+
* with a time delay close to each other may end up executing in different order than
373+
* the original schedule() call was issued. This limitation may be lifted in a future patch.
374+
* <p>
375+
* Starting, stopping and restarting this scheduler is not supported (no-op) and the provided
376+
* executor's lifecycle must be managed externally:
377+
* <pre><code>
378+
* ExecutorService exec = Executors.newSingleThreadedExecutor();
379+
* try {
380+
* Scheduler scheduler = Schedulers.from(exec, true);
381+
* Flowable.just(1)
382+
* .subscribeOn(scheduler)
383+
* .map(v -&gt; v + 1)
384+
* .observeOn(scheduler)
385+
* .blockingSubscribe(System.out::println);
386+
* } finally {
387+
* exec.shutdown();
388+
* }
389+
* </code></pre>
390+
* <p>
391+
* This type of scheduler is less sensitive to leaking {@link Scheduler.Worker} instances, although
392+
* not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or
393+
* execute those tasks "unexpectedly".
394+
* <p>
395+
* Note that this method returns a new {@link Scheduler} instance, even for the same {@link Executor} instance.
396+
* @param executor
397+
* the executor to wrap
398+
* @param interruptibleWorker if {@code true} the tasks submitted to the {@link Scheduler.Worker} will
399+
* be interrupted when the task is disposed.
400+
* @return the new Scheduler wrapping the Executor
401+
* @since 2.2.6 - experimental
402+
*/
403+
@NonNull
404+
@Experimental
405+
public static Scheduler from(@NonNull Executor executor, boolean interruptibleWorker) {
406+
return new ExecutorScheduler(executor, interruptibleWorker);
344407
}
345408

346409
/**

0 commit comments

Comments
 (0)