Skip to content

Commit 9785ada

Browse files
TestSubject, TestObserver and TestScheduler Improvements
1 parent c573497 commit 9785ada

File tree

4 files changed

+191
-5
lines changed

4 files changed

+191
-5
lines changed

rxjava-core/src/main/java/rx/observers/TestObserver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
*/
2828
public class TestObserver<T> implements Observer<T> {
2929

30-
3130
private final Observer<T> delegate;
3231
private final ArrayList<T> onNextEvents = new ArrayList<T>();
3332
private final ArrayList<Throwable> onErrorEvents = new ArrayList<Throwable>();
@@ -91,7 +90,8 @@ public void assertReceivedOnNext(List<T> items) {
9190
throw new AssertionError("Value at index: " + i + " expected to be [null] but was: [" + onNextEvents.get(i) + "]");
9291
}
9392
} else if (!items.get(i).equals(onNextEvents.get(i))) {
94-
throw new AssertionError("Value at index: " + i + " expected to be [" + items.get(i) + "] but was: [" + onNextEvents.get(i) + "]");
93+
throw new AssertionError("Value at index: " + i + " expected to be [" + items.get(i) + "] (" + items.get(i).getClass().getSimpleName() + ") but was: [" + onNextEvents.get(i) + "] (" + onNextEvents.get(i).getClass().getSimpleName() + ")");
94+
9595
}
9696
}
9797

rxjava-core/src/main/java/rx/schedulers/TestScheduler.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,17 +97,21 @@ private void triggerActions(long targetTimeInNanos) {
9797
time = targetTimeInNanos;
9898
}
9999

100+
public Inner createInnerScheduler() {
101+
return new InnerTestScheduler();
102+
}
103+
100104
@Override
101105
public Subscription schedule(Action1<Inner> action, long delayTime, TimeUnit unit) {
102-
InnerTestScheduler inner = new InnerTestScheduler();
106+
Inner inner = createInnerScheduler();
103107
final TimedAction timedAction = new TimedAction(inner, time + unit.toNanos(delayTime), action);
104108
queue.add(timedAction);
105109
return inner;
106110
}
107111

108112
@Override
109113
public Subscription schedule(Action1<Inner> action) {
110-
InnerTestScheduler inner = new InnerTestScheduler();
114+
Inner inner = createInnerScheduler();
111115
final TimedAction timedAction = new TimedAction(inner, 0, action);
112116
queue.add(timedAction);
113117
return inner;
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.subjects;
17+
18+
import java.util.Collection;
19+
import java.util.concurrent.TimeUnit;
20+
import java.util.concurrent.atomic.AtomicReference;
21+
22+
import rx.Notification;
23+
import rx.Observer;
24+
import rx.Scheduler;
25+
import rx.Scheduler.Inner;
26+
import rx.functions.Action1;
27+
import rx.schedulers.TestScheduler;
28+
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
29+
30+
/**
31+
* Subject that, once and {@link Observer} has subscribed, publishes all subsequent events to the subscriber.
32+
* <p>
33+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/S.PublishSubject.png">
34+
* <p>
35+
* Example usage:
36+
* <p>
37+
* <pre> {@code
38+
39+
* PublishSubject<Object> subject = PublishSubject.create();
40+
// observer1 will receive all onNext and onCompleted events
41+
subject.subscribe(observer1);
42+
subject.onNext("one");
43+
subject.onNext("two");
44+
// observer2 will only receive "three" and onCompleted
45+
subject.subscribe(observer2);
46+
subject.onNext("three");
47+
subject.onCompleted();
48+
49+
} </pre>
50+
*
51+
* @param <T>
52+
*/
53+
public final class TestSubject<T> extends Subject<T, T> {
54+
55+
public static <T> TestSubject<T> create(TestScheduler scheduler) {
56+
final SubjectSubscriptionManager<T> subscriptionManager = new SubjectSubscriptionManager<T>();
57+
// set a default value so subscriptions will immediately receive this until a new notification is received
58+
final AtomicReference<Notification<T>> lastNotification = new AtomicReference<Notification<T>>();
59+
60+
OnSubscribe<T> onSubscribe = subscriptionManager.getOnSubscribeFunc(
61+
/**
62+
* This function executes at beginning of subscription.
63+
*
64+
* This will always run, even if Subject is in terminal state.
65+
*/
66+
new Action1<SubjectObserver<? super T>>() {
67+
68+
@Override
69+
public void call(SubjectObserver<? super T> o) {
70+
// nothing onSubscribe unless in terminal state which is the next function
71+
}
72+
},
73+
/**
74+
* This function executes if the Subject is terminated before subscription occurs.
75+
*/
76+
new Action1<SubjectObserver<? super T>>() {
77+
78+
@Override
79+
public void call(SubjectObserver<? super T> o) {
80+
/*
81+
* If we are already terminated, or termination happens while trying to subscribe
82+
* this will be invoked and we emit whatever the last terminal value was.
83+
*/
84+
lastNotification.get().accept(o);
85+
}
86+
});
87+
88+
return new TestSubject<T>(onSubscribe, subscriptionManager, lastNotification, scheduler);
89+
}
90+
91+
private final SubjectSubscriptionManager<T> subscriptionManager;
92+
private final AtomicReference<Notification<T>> lastNotification;
93+
private final Scheduler.Inner innerScheduler;
94+
95+
protected TestSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification, TestScheduler scheduler) {
96+
super(onSubscribe);
97+
this.subscriptionManager = subscriptionManager;
98+
this.lastNotification = lastNotification;
99+
this.innerScheduler = scheduler.createInnerScheduler();
100+
}
101+
102+
@Override
103+
public void onCompleted() {
104+
onCompleted(innerScheduler.now());
105+
}
106+
107+
private void _onCompleted() {
108+
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
109+
110+
@Override
111+
public void call(Collection<SubjectObserver<? super T>> observers) {
112+
lastNotification.set(Notification.<T> createOnCompleted());
113+
for (Observer<? super T> o : observers) {
114+
o.onCompleted();
115+
}
116+
}
117+
});
118+
}
119+
120+
public void onCompleted(long timeInMilliseconds) {
121+
innerScheduler.schedule(new Action1<Inner>() {
122+
123+
@Override
124+
public void call(Inner t1) {
125+
_onCompleted();
126+
}
127+
128+
}, timeInMilliseconds, TimeUnit.MILLISECONDS);
129+
}
130+
131+
@Override
132+
public void onError(final Throwable e) {
133+
onError(e, innerScheduler.now());
134+
}
135+
136+
private void _onError(final Throwable e) {
137+
subscriptionManager.terminate(new Action1<Collection<SubjectObserver<? super T>>>() {
138+
139+
@Override
140+
public void call(Collection<SubjectObserver<? super T>> observers) {
141+
lastNotification.set(Notification.<T> createOnError(e));
142+
for (Observer<? super T> o : observers) {
143+
o.onError(e);
144+
}
145+
}
146+
});
147+
148+
}
149+
150+
public void onError(final Throwable e, long timeInMilliseconds) {
151+
innerScheduler.schedule(new Action1<Inner>() {
152+
153+
@Override
154+
public void call(Inner t1) {
155+
_onError(e);
156+
}
157+
158+
}, timeInMilliseconds, TimeUnit.MILLISECONDS);
159+
}
160+
161+
@Override
162+
public void onNext(T v) {
163+
onNext(v, innerScheduler.now());
164+
}
165+
166+
private void _onNext(T v) {
167+
for (Observer<? super T> o : subscriptionManager.rawSnapshot()) {
168+
o.onNext(v);
169+
}
170+
}
171+
172+
public void onNext(final T v, long timeInMilliseconds) {
173+
innerScheduler.schedule(new Action1<Inner>() {
174+
175+
@Override
176+
public void call(Inner t1) {
177+
_onNext(v);
178+
}
179+
180+
}, timeInMilliseconds, TimeUnit.MILLISECONDS);
181+
}
182+
}

rxjava-core/src/test/java/rx/observers/TestObserverTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void testAssertNotMatchValue() {
6666
oi.subscribe(o);
6767

6868
thrown.expect(AssertionError.class);
69-
thrown.expectMessage("Value at index: 1 expected to be [3] but was: [2]");
69+
thrown.expectMessage("Value at index: 1 expected to be [3] (Integer) but was: [2] (Integer)");
7070

7171
o.assertReceivedOnNext(Arrays.asList(1, 3));
7272
assertEquals(2, o.getOnNextEvents().size());

0 commit comments

Comments
 (0)