Skip to content

Commit 2fa35e5

Browse files
committed
Updated ObservableBenchmark to follow better benchmarking practices
1 parent 2052da3 commit 2fa35e5

File tree

1 file changed

+62
-87
lines changed

1 file changed

+62
-87
lines changed
Lines changed: 62 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
package rx.operators;
22

3+
import java.util.ArrayList;
4+
import java.util.Collection;
35
import java.util.concurrent.CountDownLatch;
4-
import java.util.concurrent.atomic.AtomicInteger;
56

6-
import org.openjdk.jmh.annotations.GenerateMicroBenchmark;
7-
import org.openjdk.jmh.runner.Runner;
8-
import org.openjdk.jmh.runner.RunnerException;
9-
import org.openjdk.jmh.runner.options.Options;
10-
import org.openjdk.jmh.runner.options.OptionsBuilder;
7+
import org.openjdk.jmh.annotations.*;
118

9+
import org.openjdk.jmh.logic.BlackHole;
1210
import rx.Observable;
1311
import rx.Observable.OnSubscribe;
1412
import rx.Observable.Operator;
@@ -19,107 +17,84 @@
1917
public class ObservableBenchmark {
2018

2119
@GenerateMicroBenchmark
22-
public void timeBaseline() {
23-
observableOfInts.subscribe(newObserver());
24-
awaitAllObservers();
25-
}
26-
27-
@GenerateMicroBenchmark
28-
public int timeMapIterate() {
29-
int x = 0;
30-
for (int j = 0; j < intValues.length; j++) {
31-
// use hash code to make sure the JIT doesn't optimize too much and remove all of
32-
// our code.
33-
x |= ident.call(intValues[j]).hashCode();
20+
public void measureBaseline(BlackHole bh, Input input) {
21+
for (Integer value : input.values) {
22+
bh.consume(IDENTITY_FUNCTION.call(value));
3423
}
35-
return x;
3624
}
3725

3826
@GenerateMicroBenchmark
39-
public void timeMap() {
40-
timeOperator(new OperatorMap<Integer, Object>(ident));
27+
public void measureMap(Input input) throws InterruptedException {
28+
input.observable.lift(MAP_OPERATOR).subscribe(input.observer);
29+
30+
input.awaitCompletion();
4131
}
4232

43-
/**************************************************************************
44-
* Below is internal stuff to avoid object allocation and time overhead of anything that isn't
45-
* being tested.
46-
*
47-
* @throws RunnerException
48-
**************************************************************************/
33+
private static final Func1<Integer, Integer> IDENTITY_FUNCTION = new Func1<Integer, Integer>() {
34+
@Override
35+
public Integer call(Integer value) {
36+
return value;
37+
}
38+
};
4939

50-
public static void main(String[] args) throws RunnerException {
51-
Options opt = new OptionsBuilder()
52-
.include(ObservableBenchmark.class.getName()+".*")
53-
.forks(1)
54-
.build();
40+
private static final Operator<Integer, Integer> MAP_OPERATOR = new OperatorMap<Integer, Integer>(IDENTITY_FUNCTION);
5541

56-
new Runner(opt).run();
57-
}
42+
@State(Scope.Thread)
43+
public static class Input {
5844

59-
private void timeOperator(Operator<Object, Integer> op) {
60-
observableOfInts.lift(op).subscribe(newObserver());
61-
awaitAllObservers();
62-
}
45+
@Param({"1", "1024", "1048576"})
46+
public int size;
6347

64-
private final static AtomicInteger outstanding = new AtomicInteger(0);
65-
private final static CountDownLatch latch = new CountDownLatch(1);
48+
public Collection<Integer> values;
49+
public Observable<Integer> observable;
50+
public Observer<Integer> observer;
6651

67-
private static <T> Observer<T> newObserver() {
68-
outstanding.incrementAndGet();
69-
return new Observer<T>() {
70-
@Override
71-
public void onCompleted() {
72-
int left = outstanding.decrementAndGet();
73-
if (left == 0) {
74-
latch.countDown();
75-
}
52+
private CountDownLatch latch;
53+
54+
@Setup
55+
public void setup() {
56+
values = new ArrayList<Integer>();
57+
for(int i = 0; i < size; i ++) {
58+
values.add(i);
7659
}
7760

78-
@Override
79-
public void onError(Throwable e) {
80-
int left = outstanding.decrementAndGet();
81-
if (left == 0) {
61+
observable = Observable.create(new OnSubscribe<Integer>() {
62+
@Override
63+
public void call(Subscriber<? super Integer> o) {
64+
for (Integer value : values) {
65+
if (o.isUnsubscribed())
66+
return;
67+
o.onNext(value);
68+
}
69+
o.onCompleted();
70+
}
71+
});
72+
73+
final BlackHole bh = new BlackHole();
74+
latch = new CountDownLatch(1);
75+
76+
observer = new Observer<Integer>() {
77+
@Override
78+
public void onCompleted() {
8279
latch.countDown();
8380
}
84-
}
8581

86-
@Override
87-
public void onNext(T t) {
88-
// do nothing
89-
}
90-
};
91-
}
82+
@Override
83+
public void onError(Throwable e) {
84+
throw new RuntimeException(e);
85+
}
86+
87+
@Override
88+
public void onNext(Integer value) {
89+
bh.consume(value);
90+
}
91+
};
9292

93-
private static void awaitAllObservers() {
94-
try {
95-
latch.await();
96-
} catch (InterruptedException e) {
97-
return;
9893
}
99-
}
10094

101-
private static final Integer[] intValues = new Integer[1000];
102-
static {
103-
for (int i = 0; i < intValues.length; i++) {
104-
intValues[i] = i;
95+
public void awaitCompletion() throws InterruptedException {
96+
latch.await();
10597
}
10698
}
10799

108-
private static final Observable<Integer> observableOfInts = Observable.create(new OnSubscribe<Integer>() {
109-
@Override
110-
public void call(Subscriber<? super Integer> o) {
111-
for (int i = 0; i < intValues.length; i++) {
112-
if (o.isUnsubscribed())
113-
return;
114-
o.onNext(intValues[i]);
115-
}
116-
o.onCompleted();
117-
}
118-
});
119-
private static final Func1<Integer, Object> ident = new Func1<Integer, Object>() {
120-
@Override
121-
public Object call(Integer t) {
122-
return t;
123-
}
124-
};
125100
}

0 commit comments

Comments
 (0)