Skip to content

Commit 1363f61

Browse files
committed
Merge pull request ReactiveX#724 from benjchristensen/revert-current-thread-scheduler-usage
Revert use of CurrentThreadScheduler for Observable.from
2 parents 8de063e + 5db1b0d commit 1363f61

File tree

3 files changed

+177
-2
lines changed

3 files changed

+177
-2
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -748,7 +748,7 @@ public static <T> Observable<T> error(Throwable exception, Scheduler scheduler)
748748
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#from">RxJava Wiki: from()</a>
749749
*/
750750
public static <T> Observable<T> from(Iterable<? extends T> iterable) {
751-
return from(iterable, Schedulers.currentThread());
751+
return from(iterable, Schedulers.immediate());
752752
}
753753

754754
/**

rxjava-core/src/main/java/rx/operators/OperationToObservableIterable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public static <T> OnSubscribeFunc<T> toObservableIterable(Iterable<? extends T>
4141
}
4242

4343
public static <T> OnSubscribeFunc<T> toObservableIterable(Iterable<? extends T> list) {
44-
return toObservableIterable(list, Schedulers.currentThread());
44+
return toObservableIterable(list, Schedulers.immediate());
4545
}
4646

4747
private static class ToObservableIterable<T> implements OnSubscribeFunc<T> {
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.schedulers;
17+
18+
import java.util.Arrays;
19+
20+
import rx.Observable;
21+
import rx.Observer;
22+
import rx.Scheduler;
23+
import rx.util.functions.Action0;
24+
25+
public class SchedulerPerformanceTests {
26+
27+
private static final int REPETITIONS = 5 * 1000 * 1000;
28+
private static final int NUM_PRODUCERS = 1;
29+
30+
public static void main(String args[]) {
31+
32+
final SchedulerPerformanceTests spt = new SchedulerPerformanceTests();
33+
try {
34+
spt.runTest(new Action0() {
35+
36+
@Override
37+
public void call() {
38+
// spt.singleResponse(Schedulers.immediate());
39+
// spt.singleResponse(Schedulers.currentThread());
40+
// spt.singleResponse(Schedulers.threadPoolForComputation());
41+
42+
spt.arrayResponse(Schedulers.immediate());
43+
// spt.arrayResponse(Schedulers.currentThread());
44+
// spt.arrayResponse(Schedulers.threadPoolForComputation());
45+
}
46+
});
47+
} catch (Exception e) {
48+
e.printStackTrace();
49+
}
50+
51+
}
52+
53+
private void runTest(Action0 action) throws InterruptedException {
54+
for (int runNum = 0; runNum < 15; runNum++) {
55+
System.gc();
56+
Thread.sleep(1000L);
57+
58+
final long start = System.nanoTime();
59+
60+
action.call();
61+
62+
long duration = System.nanoTime() - start;
63+
long opsPerSec = (REPETITIONS * NUM_PRODUCERS * 1000L * 1000L * 1000L) / duration;
64+
System.out.printf("Run: %d - %,d ops/sec \n",
65+
Integer.valueOf(runNum),
66+
Long.valueOf(opsPerSec));
67+
}
68+
}
69+
70+
/**
71+
* Baseline ops/second without a subject.
72+
*
73+
* Perf along this order of magnitude:
74+
*
75+
* Run: 10 - 316,235,532 ops/sec
76+
* Run: 11 - 301,886,792 ops/sec
77+
* Run: 12 - 310,472,228 ops/sec
78+
* Run: 13 - 313,469,797 ops/sec
79+
* Run: 14 - 305,380,809 ops/sec
80+
*/
81+
public long baseline() {
82+
LongObserver o = new LongObserver();
83+
for (long l = 0; l < REPETITIONS; l++) {
84+
o.onNext(l);
85+
}
86+
o.onCompleted();
87+
return o.sum;
88+
}
89+
90+
/**
91+
* Observable.from(Arrays.asList(1L), scheduler);
92+
*
93+
* --- Schedulers.immediate() ---
94+
*
95+
* Run: 10 - 4,113,672 ops/sec
96+
* Run: 11 - 4,068,351 ops/sec
97+
* Run: 12 - 4,070,318 ops/sec
98+
* Run: 13 - 4,161,793 ops/sec
99+
* Run: 14 - 4,156,725 ops/sec
100+
*
101+
* --- Schedulers.currentThread() ---
102+
*
103+
* Run: 10 - 1,692,286 ops/sec
104+
* Run: 11 - 1,765,054 ops/sec
105+
* Run: 12 - 1,763,100 ops/sec
106+
* Run: 13 - 1,770,907 ops/sec
107+
* Run: 14 - 1,732,291 ops/sec
108+
*
109+
* --- Schedulers.computation() ---
110+
*
111+
* Run: 0 - 224,004 ops/sec
112+
* Run: 1 - 227,101 ops/sec
113+
*
114+
*/
115+
public long singleResponse(Scheduler scheduler) {
116+
Observable<Long> s = Observable.from(Arrays.asList(1L), scheduler);
117+
LongObserver o = new LongObserver();
118+
119+
for (long l = 0; l < REPETITIONS; l++) {
120+
s.subscribe(o);
121+
}
122+
return o.sum;
123+
}
124+
125+
/**
126+
* Observable.from(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), scheduler);
127+
*
128+
* --- Schedulers.immediate() ---
129+
*
130+
* Run: 0 - 1,849,947 ops/sec
131+
* Run: 1 - 2,076,067 ops/sec
132+
* Run: 2 - 2,114,688 ops/sec
133+
* Run: 3 - 2,114,301 ops/sec
134+
* Run: 4 - 2,102,543 ops/sec
135+
*
136+
* --- Schedulers.currentThread() ---
137+
*
138+
* Run: 0 - 548,862 ops/sec
139+
* Run: 1 - 559,955 ops/sec
140+
* Run: 2 - 581,412 ops/sec
141+
* Run: 3 - 562,187 ops/sec
142+
* Run: 4 - 565,723 ops/sec
143+
*
144+
*/
145+
public long arrayResponse(Scheduler scheduler) {
146+
Observable<Long> s = Observable.from(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), scheduler);
147+
LongObserver o = new LongObserver();
148+
149+
for (long l = 0; l < REPETITIONS; l++) {
150+
s.subscribe(o);
151+
}
152+
return o.sum;
153+
}
154+
155+
private static class LongObserver implements Observer<Long> {
156+
157+
long sum = 0;
158+
159+
@Override
160+
public void onCompleted() {
161+
162+
}
163+
164+
@Override
165+
public void onError(Throwable e) {
166+
throw new RuntimeException(e);
167+
}
168+
169+
@Override
170+
public void onNext(Long l) {
171+
sum += l;
172+
}
173+
}
174+
175+
}

0 commit comments

Comments
 (0)