Skip to content

Commit a038c7e

Browse files
Start of Performance Testing Operators
These are examples of how we should proceed. Very likely though we want to use something like Caliper: https://code.google.com/p/caliper/
1 parent a79f8da commit a038c7e

8 files changed

+407
-0
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package rx.operators;
2+
3+
import java.util.Arrays;
4+
5+
import rx.Observable;
6+
import rx.perf.AbstractPerformanceTester;
7+
import rx.perf.LongSumObserver;
8+
import rx.util.functions.Action0;
9+
10+
public class OperatorFromIterablePerformance extends AbstractPerformanceTester {
11+
12+
public static void main(String args[]) {
13+
14+
final OperatorFromIterablePerformance spt = new OperatorFromIterablePerformance();
15+
try {
16+
spt.runTest(new Action0() {
17+
18+
@Override
19+
public void call() {
20+
spt.timeTenLongs();
21+
}
22+
});
23+
} catch (Exception e) {
24+
e.printStackTrace();
25+
}
26+
27+
}
28+
29+
/**
30+
* Observable.from(Iterable)
31+
*
32+
* Run: 10 - 10,629,658 ops/sec
33+
* Run: 11 - 9,573,775 ops/sec
34+
* Run: 12 - 10,589,787 ops/sec
35+
* Run: 13 - 10,514,141 ops/sec
36+
* Run: 14 - 9,765,586 ops/sec
37+
*/
38+
public long timeTenLongs() {
39+
40+
Observable<Long> s = Observable.from(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L));
41+
LongSumObserver o = new LongSumObserver();
42+
43+
for (long l = 0; l < REPETITIONS; l++) {
44+
s.subscribe(o);
45+
}
46+
return o.sum;
47+
}
48+
49+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package rx.operators;
2+
3+
import rx.Observable;
4+
import rx.perf.AbstractPerformanceTester;
5+
import rx.perf.LongSumObserver;
6+
import rx.util.functions.Action0;
7+
import rx.util.functions.Func1;
8+
9+
public class OperatorMapPerformance extends AbstractPerformanceTester {
10+
11+
public static void main(String args[]) {
12+
13+
final OperatorMapPerformance spt = new OperatorMapPerformance();
14+
try {
15+
spt.runTest(new Action0() {
16+
17+
@Override
18+
public void call() {
19+
spt.timeMapPlusOne();
20+
}
21+
});
22+
} catch (Exception e) {
23+
e.printStackTrace();
24+
}
25+
26+
}
27+
28+
/**
29+
* Observable.from(1L).map((l) -> { l+1})
30+
*
31+
* Run: 10 - 7,377,982 ops/sec
32+
* Run: 11 - 7,714,715 ops/sec
33+
* Run: 12 - 7,783,579 ops/sec
34+
* Run: 13 - 7,693,372 ops/sec
35+
* Run: 14 - 7,567,777 ops/sec
36+
*/
37+
public long timeMapPlusOne() {
38+
39+
Observable<Long> s = Observable.from(1L).map(new Func1<Long, Long>() {
40+
41+
@Override
42+
public Long call(Long l) {
43+
return l + 1;
44+
}
45+
46+
});
47+
LongSumObserver o = new LongSumObserver();
48+
49+
for (long l = 0; l < REPETITIONS; l++) {
50+
s.subscribe(o);
51+
}
52+
return o.sum;
53+
}
54+
55+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package rx.operators;
2+
3+
import rx.Observable;
4+
import rx.perf.AbstractPerformanceTester;
5+
import rx.perf.IntegerSumObserver;
6+
import rx.util.functions.Action0;
7+
8+
public class OperatorMergePerformance extends AbstractPerformanceTester {
9+
10+
public static void main(String args[]) {
11+
12+
final OperatorMergePerformance spt = new OperatorMergePerformance();
13+
try {
14+
spt.runTest(new Action0() {
15+
16+
@Override
17+
public void call() {
18+
// spt.timeMergeAandBwithSingleItems();
19+
spt.timeMergeAandBwith100Items();
20+
}
21+
});
22+
} catch (Exception e) {
23+
e.printStackTrace();
24+
}
25+
26+
}
27+
28+
/**
29+
* Observable.merge(from(1), from(1))
30+
*
31+
* Run: 10 - 2,308,617 ops/sec
32+
* Run: 11 - 2,309,602 ops/sec
33+
* Run: 12 - 2,318,590 ops/sec
34+
* Run: 13 - 2,270,100 ops/sec
35+
* Run: 14 - 2,312,006 ops/sec
36+
*
37+
*/
38+
public long timeMergeAandBwithSingleItems() {
39+
40+
Observable<Integer> sA = Observable.from(1);
41+
Observable<Integer> sB = Observable.from(2);
42+
Observable<Integer> s = Observable.merge(sA, sB);
43+
44+
IntegerSumObserver o = new IntegerSumObserver();
45+
46+
for (long l = 0; l < REPETITIONS; l++) {
47+
s.subscribe(o);
48+
}
49+
return o.sum;
50+
}
51+
52+
/**
53+
* Observable.merge(range(0, 100), range(100, 200))
54+
*
55+
* Run: 10 - 340,049 ops/sec
56+
* Run: 11 - 339,059 ops/sec
57+
* Run: 12 - 348,899 ops/sec
58+
* Run: 13 - 350,953 ops/sec
59+
* Run: 14 - 352,228 ops/sec
60+
*/
61+
public long timeMergeAandBwith100Items() {
62+
63+
Observable<Integer> sA = Observable.range(0, 100);
64+
Observable<Integer> sB = Observable.range(100, 200);
65+
Observable<Integer> s = Observable.merge(sA, sB);
66+
67+
IntegerSumObserver o = new IntegerSumObserver();
68+
69+
for (long l = 0; l < REPETITIONS; l++) {
70+
s.subscribe(o);
71+
}
72+
return o.sum;
73+
}
74+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package rx.operators;
2+
3+
import rx.Observable;
4+
import rx.perf.AbstractPerformanceTester;
5+
import rx.perf.IntegerSumObserver;
6+
import rx.util.functions.Action0;
7+
8+
public class OperatorTakePerformance extends AbstractPerformanceTester {
9+
10+
public static void main(String args[]) {
11+
12+
final OperatorTakePerformance spt = new OperatorTakePerformance();
13+
try {
14+
spt.runTest(new Action0() {
15+
16+
@Override
17+
public void call() {
18+
spt.timeTake5();
19+
}
20+
});
21+
} catch (Exception e) {
22+
e.printStackTrace();
23+
}
24+
25+
}
26+
27+
/**
28+
* Observable.range(0, 10).take(5);
29+
*
30+
* Run: 10 - 3,951,557 ops/sec
31+
* Run: 11 - 3,981,329 ops/sec
32+
* Run: 12 - 3,988,949 ops/sec
33+
* Run: 13 - 3,925,971 ops/sec
34+
* Run: 14 - 4,033,468 ops/sec
35+
*/
36+
public long timeTake5() {
37+
38+
Observable<Integer> s = Observable.range(0, 10).take(5);
39+
40+
IntegerSumObserver o = new IntegerSumObserver();
41+
42+
for (long l = 0; l < REPETITIONS; l++) {
43+
s.subscribe(o);
44+
}
45+
return o.sum;
46+
}
47+
48+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package rx.operators;
2+
3+
import rx.Observable;
4+
import rx.perf.AbstractPerformanceTester;
5+
import rx.perf.IntegerSumObserver;
6+
import rx.util.functions.Action0;
7+
import rx.util.functions.Func2;
8+
9+
public class OperatorZipPerformance extends AbstractPerformanceTester {
10+
11+
public static void main(String args[]) {
12+
13+
final OperatorZipPerformance spt = new OperatorZipPerformance();
14+
try {
15+
spt.runTest(new Action0() {
16+
17+
@Override
18+
public void call() {
19+
// spt.timeZipAandBwithSingleItems();
20+
spt.timeZipAandBwith100Items();
21+
}
22+
});
23+
} catch (Exception e) {
24+
e.printStackTrace();
25+
}
26+
27+
}
28+
29+
/**
30+
* Observable.zip(from(1), from(1), {a, b -> a+b})
31+
*
32+
* Run: 10 - 1,419,071 ops/sec
33+
* Run: 11 - 1,418,867 ops/sec
34+
* Run: 12 - 1,420,459 ops/sec
35+
* Run: 13 - 1,409,877 ops/sec
36+
* Run: 14 - 1,426,019 ops/sec
37+
*/
38+
public long timeZipAandBwithSingleItems() {
39+
40+
Observable<Integer> sA = Observable.from(1);
41+
Observable<Integer> sB = Observable.from(2);
42+
Observable<Integer> s = Observable.zip(sA, sB, new Func2<Integer, Integer, Integer>() {
43+
44+
@Override
45+
public Integer call(Integer t1, Integer t2) {
46+
return t1 + t2;
47+
}
48+
49+
});
50+
51+
IntegerSumObserver o = new IntegerSumObserver();
52+
53+
for (long l = 0; l < REPETITIONS; l++) {
54+
s.subscribe(o);
55+
}
56+
return o.sum;
57+
}
58+
59+
/**
60+
* Observable.zip(range(0, 100), range(100, 200), {a, b -> a+b})
61+
*
62+
* ... really slow ...
63+
*
64+
* Run: 0 - 30,698 ops/sec
65+
* Run: 1 - 31,061 ops/sec
66+
*
67+
*/
68+
public long timeZipAandBwith100Items() {
69+
70+
Observable<Integer> sA = Observable.range(0, 100);
71+
Observable<Integer> sB = Observable.range(100, 200);
72+
Observable<Integer> s = Observable.zip(sA, sB, new Func2<Integer, Integer, Integer>() {
73+
74+
@Override
75+
public Integer call(Integer t1, Integer t2) {
76+
return t1 + t2;
77+
}
78+
79+
});
80+
81+
IntegerSumObserver o = new IntegerSumObserver();
82+
83+
for (long l = 0; l < REPETITIONS; l++) {
84+
s.subscribe(o);
85+
}
86+
return o.sum;
87+
}
88+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package rx.perf;
2+
3+
import rx.util.functions.Action0;
4+
5+
public abstract class AbstractPerformanceTester {
6+
7+
public static final int REPETITIONS = 5 * 1000 * 1000;
8+
public static final int NUM_PRODUCERS = 1;
9+
10+
public final void runTest(Action0 action) throws InterruptedException {
11+
for (int runNum = 0; runNum < 15; runNum++) {
12+
System.gc();
13+
Thread.sleep(1000L);
14+
15+
final long start = System.nanoTime();
16+
17+
action.call();
18+
19+
long duration = System.nanoTime() - start;
20+
long opsPerSec = (REPETITIONS * NUM_PRODUCERS * 1000L * 1000L * 1000L) / duration;
21+
System.out.printf("Run: %d - %,d ops/sec \n",
22+
Integer.valueOf(runNum),
23+
Long.valueOf(opsPerSec));
24+
}
25+
}
26+
27+
/**
28+
* Baseline ops/second without a subject.
29+
*
30+
* Perf along this order of magnitude:
31+
*
32+
* Run: 10 - 316,235,532 ops/sec
33+
* Run: 11 - 301,886,792 ops/sec
34+
* Run: 12 - 310,472,228 ops/sec
35+
* Run: 13 - 313,469,797 ops/sec
36+
* Run: 14 - 305,380,809 ops/sec
37+
*/
38+
public long baseline() {
39+
LongSumObserver o = new LongSumObserver();
40+
for (long l = 0; l < REPETITIONS; l++) {
41+
o.onNext(l);
42+
}
43+
o.onCompleted();
44+
return o.sum;
45+
}
46+
47+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package rx.perf;
2+
3+
import rx.Observer;
4+
5+
public class IntegerSumObserver implements Observer<Integer> {
6+
7+
public int sum = 0;
8+
9+
@Override
10+
public void onCompleted() {
11+
12+
}
13+
14+
@Override
15+
public void onError(Throwable e) {
16+
throw new RuntimeException(e);
17+
}
18+
19+
@Override
20+
public void onNext(Integer l) {
21+
sum += l;
22+
}
23+
}

0 commit comments

Comments
 (0)