Skip to content

Commit 241e154

Browse files
committed
Merge pull request ReactiveX#3149 from akarnokd/SchedulerShutdownV3
Scheduler shutdown capability
2 parents ebe28de + 566ee93 commit 241e154

File tree

10 files changed

+457
-94
lines changed

10 files changed

+457
-94
lines changed

src/main/java/rx/internal/schedulers/EventLoopsScheduler.java

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@
1616
package rx.internal.schedulers;
1717

1818
import java.util.concurrent.*;
19+
import java.util.concurrent.atomic.AtomicReference;
1920

2021
import rx.*;
2122
import rx.functions.Action0;
2223
import rx.internal.util.*;
2324
import rx.subscriptions.*;
2425

25-
public class EventLoopsScheduler extends Scheduler {
26+
public class EventLoopsScheduler extends Scheduler implements SchedulerLifecycle {
2627
/** Manages a fixed number of workers. */
2728
private static final String THREAD_NAME_PREFIX = "RxComputationThreadPool-";
2829
private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);
@@ -44,40 +45,82 @@ public class EventLoopsScheduler extends Scheduler {
4445
}
4546
MAX_THREADS = max;
4647
}
48+
49+
static final PoolWorker SHUTDOWN_WORKER;
50+
static {
51+
SHUTDOWN_WORKER = new PoolWorker(new RxThreadFactory("RxComputationShutdown-"));
52+
SHUTDOWN_WORKER.unsubscribe();
53+
}
54+
4755
static final class FixedSchedulerPool {
4856
final int cores;
4957

5058
final PoolWorker[] eventLoops;
5159
long n;
5260

53-
FixedSchedulerPool() {
61+
FixedSchedulerPool(int maxThreads) {
5462
// initialize event loops
55-
this.cores = MAX_THREADS;
56-
this.eventLoops = new PoolWorker[cores];
57-
for (int i = 0; i < cores; i++) {
63+
this.cores = maxThreads;
64+
this.eventLoops = new PoolWorker[maxThreads];
65+
for (int i = 0; i < maxThreads; i++) {
5866
this.eventLoops[i] = new PoolWorker(THREAD_FACTORY);
5967
}
6068
}
6169

6270
public PoolWorker getEventLoop() {
71+
int c = cores;
72+
if (c == 0) {
73+
return SHUTDOWN_WORKER;
74+
}
6375
// simple round robin, improvements to come
64-
return eventLoops[(int)(n++ % cores)];
76+
return eventLoops[(int)(n++ % c)];
77+
}
78+
79+
public void shutdown() {
80+
for (PoolWorker w : eventLoops) {
81+
w.unsubscribe();
82+
}
6583
}
6684
}
85+
/** This will indicate no pool is active. */
86+
static final FixedSchedulerPool NONE = new FixedSchedulerPool(0);
6787

68-
final FixedSchedulerPool pool;
88+
final AtomicReference<FixedSchedulerPool> pool;
6989

7090
/**
7191
* Create a scheduler with pool size equal to the available processor
7292
* count and using least-recent worker selection policy.
7393
*/
7494
public EventLoopsScheduler() {
75-
pool = new FixedSchedulerPool();
95+
this.pool = new AtomicReference<FixedSchedulerPool>(NONE);
96+
start();
7697
}
7798

7899
@Override
79100
public Worker createWorker() {
80-
return new EventLoopWorker(pool.getEventLoop());
101+
return new EventLoopWorker(pool.get().getEventLoop());
102+
}
103+
104+
@Override
105+
public void start() {
106+
FixedSchedulerPool update = new FixedSchedulerPool(MAX_THREADS);
107+
if (!pool.compareAndSet(NONE, update)) {
108+
update.shutdown();
109+
}
110+
}
111+
112+
@Override
113+
public void shutdown() {
114+
for (;;) {
115+
FixedSchedulerPool curr = pool.get();
116+
if (curr == NONE) {
117+
return;
118+
}
119+
if (pool.compareAndSet(curr, NONE)) {
120+
curr.shutdown();
121+
return;
122+
}
123+
}
81124
}
82125

83126
/**
@@ -87,7 +130,7 @@ public Worker createWorker() {
87130
* @return the subscription
88131
*/
89132
public Subscription scheduleDirect(Action0 action) {
90-
PoolWorker pw = pool.getEventLoop();
133+
PoolWorker pw = pool.get().getEventLoop();
91134
return pw.scheduleActual(action, -1, TimeUnit.NANOSECONDS);
92135
}
93136

@@ -137,4 +180,4 @@ private static final class PoolWorker extends NewThreadWorker {
137180
super(threadFactory);
138181
}
139182
}
140-
}
183+
}

src/main/java/rx/schedulers/GenericScheduledExecutorService.java renamed to src/main/java/rx/internal/schedulers/GenericScheduledExecutorService.java

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,14 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package rx.schedulers;
16+
package rx.internal.schedulers;
17+
18+
import java.util.concurrent.*;
19+
import java.util.concurrent.atomic.AtomicReference;
1720

1821
import rx.Scheduler;
19-
import rx.internal.schedulers.NewThreadWorker;
2022
import rx.internal.util.RxThreadFactory;
21-
22-
import java.util.concurrent.*;
23+
import rx.schedulers.*;
2324

2425
/**
2526
* A default {@link ScheduledExecutorService} that can be used for scheduling actions when a {@link Scheduler} implementation doesn't have that ability.
@@ -30,15 +31,29 @@
3031
* the work asynchronously on the appropriate {@link Scheduler} implementation. This means for example that you would not use this approach
3132
* along with {@link TrampolineScheduler} or {@link ImmediateScheduler}.
3233
*/
33-
/* package */final class GenericScheduledExecutorService {
34+
public final class GenericScheduledExecutorService implements SchedulerLifecycle{
3435

3536
private static final String THREAD_NAME_PREFIX = "RxScheduledExecutorPool-";
3637
private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);
3738

38-
private final static GenericScheduledExecutorService INSTANCE = new GenericScheduledExecutorService();
39-
private final ScheduledExecutorService executor;
40-
39+
/* Schedulers needs acces to this in order to work with the lifecycle. */
40+
public final static GenericScheduledExecutorService INSTANCE = new GenericScheduledExecutorService();
41+
42+
private final AtomicReference<ScheduledExecutorService> executor;
43+
44+
static final ScheduledExecutorService NONE;
45+
static {
46+
NONE = Executors.newScheduledThreadPool(0);
47+
NONE.shutdownNow();
48+
}
49+
4150
private GenericScheduledExecutorService() {
51+
executor = new AtomicReference<ScheduledExecutorService>(NONE);
52+
start();
53+
}
54+
55+
@Override
56+
public void start() {
4257
int count = Runtime.getRuntime().availableProcessors();
4358
if (count > 4) {
4459
count = count / 2;
@@ -47,21 +62,41 @@ private GenericScheduledExecutorService() {
4762
if (count > 8) {
4863
count = 8;
4964
}
65+
5066
ScheduledExecutorService exec = Executors.newScheduledThreadPool(count, THREAD_FACTORY);
51-
if (!NewThreadWorker.tryEnableCancelPolicy(exec)) {
52-
if (exec instanceof ScheduledThreadPoolExecutor) {
53-
NewThreadWorker.registerExecutor((ScheduledThreadPoolExecutor)exec);
67+
if (executor.compareAndSet(NONE, exec)) {
68+
if (!NewThreadWorker.tryEnableCancelPolicy(exec)) {
69+
if (exec instanceof ScheduledThreadPoolExecutor) {
70+
NewThreadWorker.registerExecutor((ScheduledThreadPoolExecutor)exec);
71+
}
5472
}
73+
return;
74+
} else {
75+
exec.shutdownNow();
5576
}
56-
executor = exec;
5777
}
58-
78+
79+
@Override
80+
public void shutdown() {
81+
for (;;) {
82+
ScheduledExecutorService exec = executor.get();
83+
if (exec == NONE) {
84+
return;
85+
}
86+
if (executor.compareAndSet(exec, NONE)) {
87+
NewThreadWorker.deregisterExecutor(exec);
88+
exec.shutdownNow();
89+
return;
90+
}
91+
}
92+
}
93+
5994
/**
6095
* See class Javadoc for information on what this is for and how to use.
6196
*
6297
* @return {@link ScheduledExecutorService} for generic use.
6398
*/
6499
public static ScheduledExecutorService getInstance() {
65-
return INSTANCE.executor;
100+
return INSTANCE.executor.get();
66101
}
67-
}
102+
}

src/main/java/rx/internal/schedulers/NewThreadWorker.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ public void run() {
8282
}, PURGE_FREQUENCY, PURGE_FREQUENCY, TimeUnit.MILLISECONDS);
8383

8484
break;
85+
} else {
86+
exec.shutdownNow();
8587
}
8688
} while (true);
8789

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package rx.internal.schedulers;
2+
3+
/**
4+
* Represents the capability of a Scheduler to be start or shut down its maintained
5+
* threads.
6+
*/
7+
public interface SchedulerLifecycle {
8+
/**
9+
* Allows the Scheduler instance to start threads
10+
* and accept tasks on them.
11+
* <p>Implementations should make sure the call is idempotent and threadsafe.
12+
*/
13+
void start();
14+
/**
15+
* Instructs the Scheduler instance to stop threads
16+
* and stop accepting tasks on any outstanding Workers.
17+
* <p>Implementations should make sure the call is idempotent and threadsafe.
18+
*/
19+
void shutdown();
20+
}

src/main/java/rx/internal/util/ObjectPool.java

Lines changed: 47 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,22 @@
1818
package rx.internal.util;
1919

2020
import java.util.Queue;
21-
import java.util.concurrent.ConcurrentLinkedQueue;
22-
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.*;
22+
import java.util.concurrent.atomic.AtomicReference;
2323

24-
import rx.Scheduler;
24+
import rx.Scheduler.Worker;
2525
import rx.functions.Action0;
26-
import rx.internal.util.unsafe.MpmcArrayQueue;
27-
import rx.internal.util.unsafe.UnsafeAccess;
26+
import rx.internal.schedulers.SchedulerLifecycle;
27+
import rx.internal.util.unsafe.*;
2828
import rx.schedulers.Schedulers;
2929

30-
public abstract class ObjectPool<T> {
30+
public abstract class ObjectPool<T> implements SchedulerLifecycle {
3131
private Queue<T> pool;
32+
private final int minSize;
3233
private final int maxSize;
34+
private final long validationInterval;
3335

34-
private Scheduler.Worker schedulerWorker;
36+
private final AtomicReference<Worker> schedulerWorker;
3537

3638
public ObjectPool() {
3739
this(0, 0, 67);
@@ -50,31 +52,14 @@ public ObjectPool() {
5052
* When the number of objects is greater than maxIdle, too many instances will be removed.
5153
*/
5254
private ObjectPool(final int min, final int max, final long validationInterval) {
55+
this.minSize = min;
5356
this.maxSize = max;
57+
this.validationInterval = validationInterval;
58+
this.schedulerWorker = new AtomicReference<Worker>();
5459
// initialize pool
5560
initialize(min);
5661

57-
schedulerWorker = Schedulers.computation().createWorker();
58-
schedulerWorker.schedulePeriodically(new Action0() {
59-
60-
@Override
61-
public void call() {
62-
int size = pool.size();
63-
if (size < min) {
64-
int sizeToBeAdded = max - size;
65-
for (int i = 0; i < sizeToBeAdded; i++) {
66-
pool.add(createObject());
67-
}
68-
} else if (size > max) {
69-
int sizeToBeRemoved = size - max;
70-
for (int i = 0; i < sizeToBeRemoved; i++) {
71-
// pool.pollLast();
72-
pool.poll();
73-
}
74-
}
75-
}
76-
77-
}, validationInterval, validationInterval, TimeUnit.SECONDS);
62+
start();
7863
}
7964

8065
/**
@@ -109,10 +94,43 @@ public void returnObject(T object) {
10994
/**
11095
* Shutdown this pool.
11196
*/
97+
@Override
11298
public void shutdown() {
113-
schedulerWorker.unsubscribe();
99+
Worker w = schedulerWorker.getAndSet(null);
100+
if (w != null) {
101+
w.unsubscribe();
102+
}
114103
}
115104

105+
@Override
106+
public void start() {
107+
Worker w = Schedulers.computation().createWorker();
108+
if (schedulerWorker.compareAndSet(null, w)) {
109+
w.schedulePeriodically(new Action0() {
110+
111+
@Override
112+
public void call() {
113+
int size = pool.size();
114+
if (size < minSize) {
115+
int sizeToBeAdded = maxSize - size;
116+
for (int i = 0; i < sizeToBeAdded; i++) {
117+
pool.add(createObject());
118+
}
119+
} else if (size > maxSize) {
120+
int sizeToBeRemoved = size - maxSize;
121+
for (int i = 0; i < sizeToBeRemoved; i++) {
122+
// pool.pollLast();
123+
pool.poll();
124+
}
125+
}
126+
}
127+
128+
}, validationInterval, validationInterval, TimeUnit.SECONDS);
129+
} else {
130+
w.unsubscribe();
131+
}
132+
}
133+
116134
/**
117135
* Creates a new object.
118136
*

src/main/java/rx/internal/util/RxRingBuffer.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,8 @@ public static RxRingBuffer getSpmcInstance() {
276276
}
277277
public static final int SIZE = _size;
278278

279-
private static ObjectPool<Queue<Object>> SPSC_POOL = new ObjectPool<Queue<Object>>() {
279+
/* Public so Schedulers can manage the lifecycle of the inner worker. */
280+
public static ObjectPool<Queue<Object>> SPSC_POOL = new ObjectPool<Queue<Object>>() {
280281

281282
@Override
282283
protected SpscArrayQueue<Object> createObject() {
@@ -285,7 +286,8 @@ protected SpscArrayQueue<Object> createObject() {
285286

286287
};
287288

288-
private static ObjectPool<Queue<Object>> SPMC_POOL = new ObjectPool<Queue<Object>>() {
289+
/* Public so Schedulers can manage the lifecycle of the inner worker. */
290+
public static ObjectPool<Queue<Object>> SPMC_POOL = new ObjectPool<Queue<Object>>() {
289291

290292
@Override
291293
protected SpmcArrayQueue<Object> createObject() {
@@ -452,4 +454,4 @@ public boolean isUnsubscribed() {
452454
return queue == null;
453455
}
454456

455-
}
457+
}

0 commit comments

Comments
 (0)