Skip to content

Commit 68b0ae6

Browse files
committed
ExecutorScheduler to wrap an Executor
1 parent e8afd05 commit 68b0ae6

File tree

3 files changed

+222
-0
lines changed

3 files changed

+222
-0
lines changed
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.schedulers;
17+
18+
import java.util.concurrent.ConcurrentLinkedQueue;
19+
import java.util.concurrent.Executor;
20+
import java.util.concurrent.Future;
21+
import java.util.concurrent.RejectedExecutionException;
22+
import java.util.concurrent.ScheduledExecutorService;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
26+
import rx.Scheduler;
27+
import rx.Subscription;
28+
import rx.functions.Action0;
29+
import rx.plugins.RxJavaPlugins;
30+
import rx.subscriptions.CompositeSubscription;
31+
import rx.subscriptions.MultipleAssignmentSubscription;
32+
import rx.subscriptions.Subscriptions;
33+
34+
/**
35+
* Scheduler that wraps an Executor instance and establishes the Scheduler contract upon it.
36+
* <p>
37+
* Note that thread-hopping is unavoidable with this kind of Scheduler as we don't know
38+
* about the underlying threading behavior of the executor.
39+
*/
40+
/* public */final class ExecutorScheduler extends Scheduler {
41+
final Executor executor;
42+
public ExecutorScheduler(Executor executor) {
43+
this.executor = executor;
44+
}
45+
@Override
46+
public Worker createWorker() {
47+
return new ExecutorSchedulerWorker(executor);
48+
}
49+
/** Worker that schedules tasks on the executor indirectly through a trampoline mechanism. */
50+
static final class ExecutorSchedulerWorker extends Scheduler.Worker implements Runnable {
51+
final Executor executor;
52+
// TODO: use a better performing structure for task tracking
53+
final CompositeSubscription tasks;
54+
// TODO: use MpscLinkedQueue once available
55+
final ConcurrentLinkedQueue<ExecutorAction> queue;
56+
final AtomicInteger wip;
57+
58+
public ExecutorSchedulerWorker(Executor executor) {
59+
this.executor = executor;
60+
this.queue = new ConcurrentLinkedQueue<ExecutorAction>();
61+
this.wip = new AtomicInteger();
62+
this.tasks = new CompositeSubscription();
63+
}
64+
65+
@Override
66+
public Subscription schedule(Action0 action) {
67+
if (isUnsubscribed()) {
68+
return Subscriptions.empty();
69+
}
70+
ExecutorAction ea = new ExecutorAction(action, tasks);
71+
tasks.add(ea);
72+
queue.offer(ea);
73+
if (wip.getAndIncrement() == 0) {
74+
try {
75+
executor.execute(this);
76+
} catch (RejectedExecutionException t) {
77+
// cleanup if rejected
78+
tasks.remove(ea);
79+
wip.decrementAndGet();
80+
// report the error to the plugin
81+
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
82+
// throw it to the caller
83+
throw t;
84+
}
85+
}
86+
87+
return ea;
88+
}
89+
90+
@Override
91+
public void run() {
92+
do {
93+
queue.poll().run();
94+
} while (wip.decrementAndGet() > 0);
95+
}
96+
97+
@Override
98+
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
99+
if (delayTime <= 0) {
100+
return schedule(action);
101+
}
102+
if (isUnsubscribed()) {
103+
return Subscriptions.empty();
104+
}
105+
ScheduledExecutorService service;
106+
if (executor instanceof ScheduledExecutorService) {
107+
service = (ScheduledExecutorService)executor;
108+
} else {
109+
service = GenericScheduledExecutorService.getInstance();
110+
}
111+
112+
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
113+
// tasks.add(mas); // Needs a removal without unsubscription
114+
115+
try {
116+
Future<?> f = service.schedule(new Runnable() {
117+
@Override
118+
public void run() {
119+
if (mas.isUnsubscribed()) {
120+
return;
121+
}
122+
mas.set(schedule(action));
123+
// tasks.delete(mas); // Needs a removal without unsubscription
124+
}
125+
}, delayTime, unit);
126+
mas.set(Subscriptions.from(f));
127+
} catch (RejectedExecutionException t) {
128+
// report the rejection to plugins
129+
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
130+
throw t;
131+
}
132+
133+
return mas;
134+
}
135+
136+
@Override
137+
public boolean isUnsubscribed() {
138+
return tasks.isUnsubscribed();
139+
}
140+
141+
@Override
142+
public void unsubscribe() {
143+
tasks.unsubscribe();
144+
}
145+
146+
}
147+
/** Runs the actual action and maintains an unsubscription state. */
148+
static final class ExecutorAction implements Runnable, Subscription {
149+
final Action0 actual;
150+
final CompositeSubscription parent;
151+
volatile int unsubscribed;
152+
static final AtomicIntegerFieldUpdater<ExecutorAction> UNSUBSCRIBED_UPDATER
153+
= AtomicIntegerFieldUpdater.newUpdater(ExecutorAction.class, "unsubscribed");
154+
155+
public ExecutorAction(Action0 actual, CompositeSubscription parent) {
156+
this.actual = actual;
157+
this.parent = parent;
158+
}
159+
160+
@Override
161+
public void run() {
162+
if (isUnsubscribed()) {
163+
return;
164+
}
165+
try {
166+
actual.call();
167+
} catch (Throwable t) {
168+
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
169+
} finally {
170+
unsubscribe();
171+
}
172+
}
173+
@Override
174+
public boolean isUnsubscribed() {
175+
return unsubscribed != 0;
176+
}
177+
178+
@Override
179+
public void unsubscribe() {
180+
if (UNSUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) {
181+
parent.remove(this);
182+
}
183+
}
184+
185+
}
186+
}

rxjava-core/src/main/java/rx/schedulers/Schedulers.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,4 +112,12 @@ public static Scheduler io() {
112112
public static TestScheduler test() {
113113
return new TestScheduler();
114114
}
115+
/**
116+
* Wraps the given Executor into a new Scheduler instance.
117+
* @param executor the executor to wrap
118+
* @return the new scheduler wrapping the executor
119+
*/
120+
public static Scheduler newExecutor(Executor executor) {
121+
return new ExecutorScheduler(executor);
122+
}
115123
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.schedulers;
17+
18+
import java.util.concurrent.Executors;
19+
import rx.Scheduler;
20+
21+
public class ExecutorSchedulerTest extends AbstractSchedulerConcurrencyTests {
22+
23+
@Override
24+
protected Scheduler getScheduler() {
25+
return Schedulers.newExecutor(Executors.newFixedThreadPool(2, new NewThreadScheduler.RxThreadFactory("TestCustomPool-")));
26+
}
27+
28+
}

0 commit comments

Comments
 (0)