Skip to content

Commit c6c17cc

Browse files
Merge pull request ReactiveX#1275 from benjchristensen/fix-formatting
Fix Encoding/Formatting
2 parents 5f2cf72 + 105d20f commit c6c17cc

File tree

2 files changed

+214
-214
lines changed

2 files changed

+214
-214
lines changed
Lines changed: 186 additions & 186 deletions
Original file line numberDiff line numberDiff line change
@@ -1,186 +1,186 @@
1-
/**
2-
* Copyright 2014 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
package rx.schedulers;
17-
18-
import java.util.concurrent.ConcurrentLinkedQueue;
19-
import java.util.concurrent.Executor;
20-
import java.util.concurrent.Future;
21-
import java.util.concurrent.RejectedExecutionException;
22-
import java.util.concurrent.ScheduledExecutorService;
23-
import java.util.concurrent.TimeUnit;
24-
import java.util.concurrent.atomic.AtomicInteger;
25-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
26-
import rx.Scheduler;
27-
import rx.Subscription;
28-
import rx.functions.Action0;
29-
import rx.plugins.RxJavaPlugins;
30-
import rx.subscriptions.CompositeSubscription;
31-
import rx.subscriptions.MultipleAssignmentSubscription;
32-
import rx.subscriptions.Subscriptions;
33-
34-
/**
35-
* Scheduler that wraps an Executor instance and establishes the Scheduler contract upon it.
36-
* <p>
37-
* Note that thread-hopping is unavoidable with this kind of Scheduler as we don't know
38-
* about the underlying threading behavior of the executor.
39-
*/
40-
/* public */final class ExecutorScheduler extends Scheduler {
41-
final Executor executor;
42-
public ExecutorScheduler(Executor executor) {
43-
this.executor = executor;
44-
}
45-
@Override
46-
public Worker createWorker() {
47-
return new ExecutorSchedulerWorker(executor);
48-
}
49-
/** Worker that schedules tasks on the executor indirectly through a trampoline mechanism. */
50-
static final class ExecutorSchedulerWorker extends Scheduler.Worker implements Runnable {
51-
final Executor executor;
52-
// TODO: use a better performing structure for task tracking
53-
final CompositeSubscription tasks;
54-
// TODO: use MpscLinkedQueue once available
55-
final ConcurrentLinkedQueue<ExecutorAction> queue;
56-
final AtomicInteger wip;
57-
58-
public ExecutorSchedulerWorker(Executor executor) {
59-
this.executor = executor;
60-
this.queue = new ConcurrentLinkedQueue<ExecutorAction>();
61-
this.wip = new AtomicInteger();
62-
this.tasks = new CompositeSubscription();
63-
}
64-
65-
@Override
66-
public Subscription schedule(Action0 action) {
67-
if (isUnsubscribed()) {
68-
return Subscriptions.empty();
69-
}
70-
ExecutorAction ea = new ExecutorAction(action, tasks);
71-
tasks.add(ea);
72-
queue.offer(ea);
73-
if (wip.getAndIncrement() == 0) {
74-
try {
75-
executor.execute(this);
76-
} catch (RejectedExecutionException t) {
77-
// cleanup if rejected
78-
tasks.remove(ea);
79-
wip.decrementAndGet();
80-
// report the error to the plugin
81-
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
82-
// throw it to the caller
83-
throw t;
84-
}
85-
}
86-
87-
return ea;
88-
}
89-
90-
@Override
91-
public void run() {
92-
do {
93-
queue.poll().run();
94-
} while (wip.decrementAndGet() > 0);
95-
}
96-
97-
@Override
98-
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
99-
if (delayTime <= 0) {
100-
return schedule(action);
101-
}
102-
if (isUnsubscribed()) {
103-
return Subscriptions.empty();
104-
}
105-
ScheduledExecutorService service;
106-
if (executor instanceof ScheduledExecutorService) {
107-
service = (ScheduledExecutorService)executor;
108-
} else {
109-
service = GenericScheduledExecutorService.getInstance();
110-
}
111-
112-
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
113-
// tasks.add(mas); // Needs a removal without unsubscription
114-
115-
try {
116-
Future<?> f = service.schedule(new Runnable() {
117-
@Override
118-
public void run() {
119-
if (mas.isUnsubscribed()) {
120-
return;
121-
}
122-
mas.set(schedule(action));
123-
// tasks.delete(mas); // Needs a removal without unsubscription
124-
}
125-
}, delayTime, unit);
126-
mas.set(Subscriptions.from(f));
127-
} catch (RejectedExecutionException t) {
128-
// report the rejection to plugins
129-
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
130-
throw t;
131-
}
132-
133-
return mas;
134-
}
135-
136-
@Override
137-
public boolean isUnsubscribed() {
138-
return tasks.isUnsubscribed();
139-
}
140-
141-
@Override
142-
public void unsubscribe() {
143-
tasks.unsubscribe();
144-
}
145-
146-
}
147-
/** Runs the actual action and maintains an unsubscription state. */
148-
static final class ExecutorAction implements Runnable, Subscription {
149-
final Action0 actual;
150-
final CompositeSubscription parent;
151-
volatile int unsubscribed;
152-
static final AtomicIntegerFieldUpdater<ExecutorAction> UNSUBSCRIBED_UPDATER
153-
= AtomicIntegerFieldUpdater.newUpdater(ExecutorAction.class, "unsubscribed");
154-
155-
public ExecutorAction(Action0 actual, CompositeSubscription parent) {
156-
this.actual = actual;
157-
this.parent = parent;
158-
}
159-
160-
@Override
161-
public void run() {
162-
if (isUnsubscribed()) {
163-
return;
164-
}
165-
try {
166-
actual.call();
167-
} catch (Throwable t) {
168-
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
169-
} finally {
170-
unsubscribe();
171-
}
172-
}
173-
@Override
174-
public boolean isUnsubscribed() {
175-
return unsubscribed != 0;
176-
}
177-
178-
@Override
179-
public void unsubscribe() {
180-
if (UNSUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) {
181-
parent.remove(this);
182-
}
183-
}
184-
185-
}
186-
}
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.schedulers;
17+
18+
import java.util.concurrent.ConcurrentLinkedQueue;
19+
import java.util.concurrent.Executor;
20+
import java.util.concurrent.Future;
21+
import java.util.concurrent.RejectedExecutionException;
22+
import java.util.concurrent.ScheduledExecutorService;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
26+
import rx.Scheduler;
27+
import rx.Subscription;
28+
import rx.functions.Action0;
29+
import rx.plugins.RxJavaPlugins;
30+
import rx.subscriptions.CompositeSubscription;
31+
import rx.subscriptions.MultipleAssignmentSubscription;
32+
import rx.subscriptions.Subscriptions;
33+
34+
/**
35+
* Scheduler that wraps an Executor instance and establishes the Scheduler contract upon it.
36+
* <p>
37+
* Note that thread-hopping is unavoidable with this kind of Scheduler as we don't know
38+
* about the underlying threading behavior of the executor.
39+
*/
40+
/* public */final class ExecutorScheduler extends Scheduler {
41+
final Executor executor;
42+
public ExecutorScheduler(Executor executor) {
43+
this.executor = executor;
44+
}
45+
@Override
46+
public Worker createWorker() {
47+
return new ExecutorSchedulerWorker(executor);
48+
}
49+
/** Worker that schedules tasks on the executor indirectly through a trampoline mechanism. */
50+
static final class ExecutorSchedulerWorker extends Scheduler.Worker implements Runnable {
51+
final Executor executor;
52+
// TODO: use a better performing structure for task tracking
53+
final CompositeSubscription tasks;
54+
// TODO: use MpscLinkedQueue once available
55+
final ConcurrentLinkedQueue<ExecutorAction> queue;
56+
final AtomicInteger wip;
57+
58+
public ExecutorSchedulerWorker(Executor executor) {
59+
this.executor = executor;
60+
this.queue = new ConcurrentLinkedQueue<ExecutorAction>();
61+
this.wip = new AtomicInteger();
62+
this.tasks = new CompositeSubscription();
63+
}
64+
65+
@Override
66+
public Subscription schedule(Action0 action) {
67+
if (isUnsubscribed()) {
68+
return Subscriptions.empty();
69+
}
70+
ExecutorAction ea = new ExecutorAction(action, tasks);
71+
tasks.add(ea);
72+
queue.offer(ea);
73+
if (wip.getAndIncrement() == 0) {
74+
try {
75+
executor.execute(this);
76+
} catch (RejectedExecutionException t) {
77+
// cleanup if rejected
78+
tasks.remove(ea);
79+
wip.decrementAndGet();
80+
// report the error to the plugin
81+
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
82+
// throw it to the caller
83+
throw t;
84+
}
85+
}
86+
87+
return ea;
88+
}
89+
90+
@Override
91+
public void run() {
92+
do {
93+
queue.poll().run();
94+
} while (wip.decrementAndGet() > 0);
95+
}
96+
97+
@Override
98+
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
99+
if (delayTime <= 0) {
100+
return schedule(action);
101+
}
102+
if (isUnsubscribed()) {
103+
return Subscriptions.empty();
104+
}
105+
ScheduledExecutorService service;
106+
if (executor instanceof ScheduledExecutorService) {
107+
service = (ScheduledExecutorService)executor;
108+
} else {
109+
service = GenericScheduledExecutorService.getInstance();
110+
}
111+
112+
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
113+
// tasks.add(mas); // Needs a removal without unsubscription
114+
115+
try {
116+
Future<?> f = service.schedule(new Runnable() {
117+
@Override
118+
public void run() {
119+
if (mas.isUnsubscribed()) {
120+
return;
121+
}
122+
mas.set(schedule(action));
123+
// tasks.delete(mas); // Needs a removal without unsubscription
124+
}
125+
}, delayTime, unit);
126+
mas.set(Subscriptions.from(f));
127+
} catch (RejectedExecutionException t) {
128+
// report the rejection to plugins
129+
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
130+
throw t;
131+
}
132+
133+
return mas;
134+
}
135+
136+
@Override
137+
public boolean isUnsubscribed() {
138+
return tasks.isUnsubscribed();
139+
}
140+
141+
@Override
142+
public void unsubscribe() {
143+
tasks.unsubscribe();
144+
}
145+
146+
}
147+
/** Runs the actual action and maintains an unsubscription state. */
148+
static final class ExecutorAction implements Runnable, Subscription {
149+
final Action0 actual;
150+
final CompositeSubscription parent;
151+
volatile int unsubscribed;
152+
static final AtomicIntegerFieldUpdater<ExecutorAction> UNSUBSCRIBED_UPDATER
153+
= AtomicIntegerFieldUpdater.newUpdater(ExecutorAction.class, "unsubscribed");
154+
155+
public ExecutorAction(Action0 actual, CompositeSubscription parent) {
156+
this.actual = actual;
157+
this.parent = parent;
158+
}
159+
160+
@Override
161+
public void run() {
162+
if (isUnsubscribed()) {
163+
return;
164+
}
165+
try {
166+
actual.call();
167+
} catch (Throwable t) {
168+
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
169+
} finally {
170+
unsubscribe();
171+
}
172+
}
173+
@Override
174+
public boolean isUnsubscribed() {
175+
return unsubscribed != 0;
176+
}
177+
178+
@Override
179+
public void unsubscribe() {
180+
if (UNSUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) {
181+
parent.remove(this);
182+
}
183+
}
184+
185+
}
186+
}

0 commit comments

Comments
 (0)