Skip to content

Commit 78c250b

Browse files
committed
Merge branch 'master' of https://github.com/Netflix/RxJava into docs
+ add a couple of javadoc improvements to merged changes
2 parents 6eeb249 + c6c17cc commit 78c250b

File tree

5 files changed

+264
-2
lines changed

5 files changed

+264
-2
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2301,15 +2301,19 @@ public static final <K1, K2, T> Observable<GroupedObservable<K2, GroupedObservab
23012301
* the number of sequential Integers to generate
23022302
* @return an Observable that emits a range of sequential Integers
23032303
* @throws IllegalArgumentException
2304-
* if {@code count} is less than zero
2304+
* if {@code count} is less than zero, or if {@code start} + {@code count} &minus; 1 exceeds
2305+
* {@code Integer.MAX_VALUE}
23052306
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-range">RxJava Wiki: range()</a>
23062307
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229460.aspx">MSDN: Observable.Range</a>
23072308
*/
23082309
public final static Observable<Integer> range(int start, int count) {
23092310
if (count < 0) {
23102311
throw new IllegalArgumentException("Count can not be negative");
23112312
}
2312-
if ((start + count) > Integer.MAX_VALUE) {
2313+
if (count == 0) {
2314+
return Observable.empty();
2315+
}
2316+
if (start > Integer.MAX_VALUE - count + 1) {
23132317
throw new IllegalArgumentException("start + count can not exceed Integer.MAX_VALUE");
23142318
}
23152319
return Observable.create(new OnSubscribeRange(start, start + (count - 1)));
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.schedulers;
17+
18+
import java.util.concurrent.ConcurrentLinkedQueue;
19+
import java.util.concurrent.Executor;
20+
import java.util.concurrent.Future;
21+
import java.util.concurrent.RejectedExecutionException;
22+
import java.util.concurrent.ScheduledExecutorService;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
26+
import rx.Scheduler;
27+
import rx.Subscription;
28+
import rx.functions.Action0;
29+
import rx.plugins.RxJavaPlugins;
30+
import rx.subscriptions.CompositeSubscription;
31+
import rx.subscriptions.MultipleAssignmentSubscription;
32+
import rx.subscriptions.Subscriptions;
33+
34+
/**
35+
* Scheduler that wraps an Executor instance and establishes the Scheduler contract upon it.
36+
* <p>
37+
* Note that thread-hopping is unavoidable with this kind of Scheduler as we don't know about the underlying
38+
* threading behavior of the executor.
39+
*/
40+
/* public */final class ExecutorScheduler extends Scheduler {
41+
final Executor executor;
42+
public ExecutorScheduler(Executor executor) {
43+
this.executor = executor;
44+
}
45+
46+
/**
47+
* @warn javadoc missing
48+
* @return
49+
*/
50+
@Override
51+
public Worker createWorker() {
52+
return new ExecutorSchedulerWorker(executor);
53+
}
54+
55+
/** Worker that schedules tasks on the executor indirectly through a trampoline mechanism. */
56+
static final class ExecutorSchedulerWorker extends Scheduler.Worker implements Runnable {
57+
final Executor executor;
58+
// TODO: use a better performing structure for task tracking
59+
final CompositeSubscription tasks;
60+
// TODO: use MpscLinkedQueue once available
61+
final ConcurrentLinkedQueue<ExecutorAction> queue;
62+
final AtomicInteger wip;
63+
64+
public ExecutorSchedulerWorker(Executor executor) {
65+
this.executor = executor;
66+
this.queue = new ConcurrentLinkedQueue<ExecutorAction>();
67+
this.wip = new AtomicInteger();
68+
this.tasks = new CompositeSubscription();
69+
}
70+
71+
@Override
72+
public Subscription schedule(Action0 action) {
73+
if (isUnsubscribed()) {
74+
return Subscriptions.empty();
75+
}
76+
ExecutorAction ea = new ExecutorAction(action, tasks);
77+
tasks.add(ea);
78+
queue.offer(ea);
79+
if (wip.getAndIncrement() == 0) {
80+
try {
81+
executor.execute(this);
82+
} catch (RejectedExecutionException t) {
83+
// cleanup if rejected
84+
tasks.remove(ea);
85+
wip.decrementAndGet();
86+
// report the error to the plugin
87+
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
88+
// throw it to the caller
89+
throw t;
90+
}
91+
}
92+
93+
return ea;
94+
}
95+
96+
@Override
97+
public void run() {
98+
do {
99+
queue.poll().run();
100+
} while (wip.decrementAndGet() > 0);
101+
}
102+
103+
@Override
104+
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
105+
if (delayTime <= 0) {
106+
return schedule(action);
107+
}
108+
if (isUnsubscribed()) {
109+
return Subscriptions.empty();
110+
}
111+
ScheduledExecutorService service;
112+
if (executor instanceof ScheduledExecutorService) {
113+
service = (ScheduledExecutorService)executor;
114+
} else {
115+
service = GenericScheduledExecutorService.getInstance();
116+
}
117+
118+
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
119+
// tasks.add(mas); // Needs a removal without unsubscription
120+
121+
try {
122+
Future<?> f = service.schedule(new Runnable() {
123+
@Override
124+
public void run() {
125+
if (mas.isUnsubscribed()) {
126+
return;
127+
}
128+
mas.set(schedule(action));
129+
// tasks.delete(mas); // Needs a removal without unsubscription
130+
}
131+
}, delayTime, unit);
132+
mas.set(Subscriptions.from(f));
133+
} catch (RejectedExecutionException t) {
134+
// report the rejection to plugins
135+
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
136+
throw t;
137+
}
138+
139+
return mas;
140+
}
141+
142+
@Override
143+
public boolean isUnsubscribed() {
144+
return tasks.isUnsubscribed();
145+
}
146+
147+
@Override
148+
public void unsubscribe() {
149+
tasks.unsubscribe();
150+
}
151+
152+
}
153+
154+
/** Runs the actual action and maintains an unsubscription state. */
155+
static final class ExecutorAction implements Runnable, Subscription {
156+
final Action0 actual;
157+
final CompositeSubscription parent;
158+
volatile int unsubscribed;
159+
static final AtomicIntegerFieldUpdater<ExecutorAction> UNSUBSCRIBED_UPDATER
160+
= AtomicIntegerFieldUpdater.newUpdater(ExecutorAction.class, "unsubscribed");
161+
162+
public ExecutorAction(Action0 actual, CompositeSubscription parent) {
163+
this.actual = actual;
164+
this.parent = parent;
165+
}
166+
167+
@Override
168+
public void run() {
169+
if (isUnsubscribed()) {
170+
return;
171+
}
172+
try {
173+
actual.call();
174+
} catch (Throwable t) {
175+
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
176+
} finally {
177+
unsubscribe();
178+
}
179+
}
180+
@Override
181+
public boolean isUnsubscribed() {
182+
return unsubscribed != 0;
183+
}
184+
185+
@Override
186+
public void unsubscribe() {
187+
if (UNSUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) {
188+
parent.remove(this);
189+
}
190+
}
191+
192+
}
193+
}

rxjava-core/src/main/java/rx/schedulers/Schedulers.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,15 @@ public static Scheduler io() {
117117
public static TestScheduler test() {
118118
return new TestScheduler();
119119
}
120+
121+
/**
122+
* Converts an {@link Executor} into a new Scheduler instance.
123+
*
124+
* @param executor
125+
* the executor to wrap
126+
* @return the new Scheduler wrapping the Executor
127+
*/
128+
public static Scheduler newExecutor(Executor executor) {
129+
return new ExecutorScheduler(executor);
130+
}
120131
}

rxjava-core/src/test/java/rx/operators/OnSubscribeRangeTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.operators;
1717

1818
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertFalse;
1920
import static org.mockito.Mockito.mock;
2021
import static org.mockito.Mockito.never;
2122
import static org.mockito.Mockito.times;
@@ -67,4 +68,29 @@ public void call(Integer t1) {
6768
verify(observer, times(1)).onCompleted();
6869
assertEquals(3, count.get());
6970
}
71+
72+
@Test
73+
public void testRangeWithOverflow() {
74+
Observable.range(1, 0);
75+
}
76+
77+
@Test
78+
public void testRangeWithOverflow2() {
79+
Observable.range(Integer.MAX_VALUE, 0);
80+
}
81+
82+
@Test
83+
public void testRangeWithOverflow3() {
84+
Observable.range(1, Integer.MAX_VALUE);
85+
}
86+
87+
@Test(expected = IllegalArgumentException.class)
88+
public void testRangeWithOverflow4() {
89+
Observable.range(2, Integer.MAX_VALUE);
90+
}
91+
92+
@Test
93+
public void testRangeWithOverflow5() {
94+
assertFalse(Observable.range(Integer.MIN_VALUE, 0).toBlocking().getIterator().hasNext());
95+
}
7096
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.schedulers;
17+
18+
import java.util.concurrent.Executors;
19+
import rx.Scheduler;
20+
21+
public class ExecutorSchedulerTest extends AbstractSchedulerConcurrencyTests {
22+
23+
@Override
24+
protected Scheduler getScheduler() {
25+
return Schedulers.newExecutor(Executors.newFixedThreadPool(2, new NewThreadScheduler.RxThreadFactory("TestCustomPool-")));
26+
}
27+
28+
}

0 commit comments

Comments
 (0)