Skip to content

Commit b8f4310

Browse files
Merge branch 'subject-implementations' of git://github.com/michaeldejong/RxJava into pull-256-merge-BehaviorSubject
2 parents 7ffc515 + 24be900 commit b8f4310

File tree

1 file changed

+264
-0
lines changed

1 file changed

+264
-0
lines changed
Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.subjects;
17+
18+
import static org.mockito.Matchers.*;
19+
import static org.mockito.Mockito.*;
20+
21+
import java.util.concurrent.ConcurrentHashMap;
22+
import java.util.concurrent.atomic.AtomicReference;
23+
24+
import org.junit.Test;
25+
import org.mockito.Mockito;
26+
27+
import rx.Observer;
28+
import rx.Subscription;
29+
import rx.util.AtomicObservableSubscription;
30+
import rx.util.SynchronizedObserver;
31+
import rx.util.functions.Action1;
32+
import rx.util.functions.Func0;
33+
import rx.util.functions.Func1;
34+
35+
/**
36+
* Subject that publishes the last and all subsequent events to each {@link Observer} that subscribes.
37+
* <p>
38+
* Example usage:
39+
* <p>
40+
* <pre> {@code
41+
42+
// observer will receive all events.
43+
BehaviorSubject<Object> subject = BehaviorSubject.createWithDefaultValue("default");
44+
subject.subscribe(observer);
45+
subject.onNext("one");
46+
subject.onNext("two");
47+
subject.onNext("three");
48+
49+
// observer will receive the "one", "two" and "three" events.
50+
BehaviorSubject<Object> subject = BehaviorSubject.createWithDefaultValue("default");
51+
subject.onNext("one");
52+
subject.subscribe(observer);
53+
subject.onNext("two");
54+
subject.onNext("three");
55+
56+
} </pre>
57+
*
58+
* @param <T>
59+
*/
60+
public class BehaviorSubject<T> extends Subject<T, T> {
61+
62+
/**
63+
* Creates a {@link BehaviorSubject} which publishes the last and all subsequent events to each
64+
* {@link Observer} that subscribes to it.
65+
*
66+
* @param defaultValue
67+
* The value which will be published to any {@link Observer} as long as the
68+
* {@link BehaviorSubject} has not yet received any events.
69+
* @return the constructed {@link BehaviorSubject}.
70+
*/
71+
public static <T> BehaviorSubject<T> createWithDefaultValue(T defaultValue) {
72+
final ConcurrentHashMap<Subscription, Observer<T>> observers = new ConcurrentHashMap<Subscription, Observer<T>>();
73+
74+
final AtomicReference<T> currentValue = new AtomicReference<T>(defaultValue);
75+
76+
Func1<Observer<T>, Subscription> onSubscribe = new Func1<Observer<T>, Subscription>() {
77+
@Override
78+
public Subscription call(Observer<T> observer) {
79+
final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
80+
81+
subscription.wrap(new Subscription() {
82+
@Override
83+
public void unsubscribe() {
84+
// on unsubscribe remove it from the map of outbound observers to notify
85+
observers.remove(subscription);
86+
}
87+
});
88+
89+
SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(observer, subscription);
90+
synchronizedObserver.onNext(currentValue.get());
91+
92+
// on subscribe add it to the map of outbound observers to notify
93+
observers.put(subscription, synchronizedObserver);
94+
return subscription;
95+
}
96+
};
97+
98+
return new BehaviorSubject<T>(currentValue, onSubscribe, observers);
99+
}
100+
101+
private final ConcurrentHashMap<Subscription, Observer<T>> observers;
102+
private final AtomicReference<T> currentValue;
103+
104+
protected BehaviorSubject(AtomicReference<T> currentValue, Func1<Observer<T>, Subscription> onSubscribe, ConcurrentHashMap<Subscription, Observer<T>> observers) {
105+
super(onSubscribe);
106+
this.currentValue = currentValue;
107+
this.observers = observers;
108+
}
109+
110+
@Override
111+
public void onCompleted() {
112+
for (Observer<T> observer : observers.values()) {
113+
observer.onCompleted();
114+
}
115+
}
116+
117+
@Override
118+
public void onError(Exception e) {
119+
for (Observer<T> observer : observers.values()) {
120+
observer.onError(e);
121+
}
122+
}
123+
124+
@Override
125+
public void onNext(T args) {
126+
currentValue.set(args);
127+
for (Observer<T> observer : observers.values()) {
128+
observer.onNext(args);
129+
}
130+
}
131+
132+
public static class UnitTest {
133+
134+
private final Exception testException = new Exception();
135+
136+
@Test
137+
public void testThatObserverReceivesDefaultValueIfNothingWasPublished() {
138+
BehaviorSubject<String> subject = BehaviorSubject.createWithDefaultValue("default");
139+
140+
@SuppressWarnings("unchecked")
141+
Observer<String> aObserver = mock(Observer.class);
142+
subject.subscribe(aObserver);
143+
144+
subject.onNext("one");
145+
subject.onNext("two");
146+
subject.onNext("three");
147+
148+
assertReceivedAllEvents(aObserver);
149+
}
150+
151+
private void assertReceivedAllEvents(Observer<String> aObserver) {
152+
verify(aObserver, times(1)).onNext("default");
153+
verify(aObserver, times(1)).onNext("one");
154+
verify(aObserver, times(1)).onNext("two");
155+
verify(aObserver, times(1)).onNext("three");
156+
verify(aObserver, Mockito.never()).onError(testException);
157+
verify(aObserver, Mockito.never()).onCompleted();
158+
}
159+
160+
@Test
161+
public void testThatObserverDoesNotReceiveDefaultValueIfSomethingWasPublished() {
162+
BehaviorSubject<String> subject = BehaviorSubject.createWithDefaultValue("default");
163+
164+
subject.onNext("one");
165+
166+
@SuppressWarnings("unchecked")
167+
Observer<String> aObserver = mock(Observer.class);
168+
subject.subscribe(aObserver);
169+
170+
subject.onNext("two");
171+
subject.onNext("three");
172+
173+
assertDidNotReceiveTheDefaultValue(aObserver);
174+
}
175+
176+
private void assertDidNotReceiveTheDefaultValue(Observer<String> aObserver) {
177+
verify(aObserver, Mockito.never()).onNext("default");
178+
verify(aObserver, times(1)).onNext("one");
179+
verify(aObserver, times(1)).onNext("two");
180+
verify(aObserver, times(1)).onNext("three");
181+
verify(aObserver, Mockito.never()).onError(testException);
182+
verify(aObserver, Mockito.never()).onCompleted();
183+
}
184+
185+
@Test
186+
public void testCompleted() {
187+
BehaviorSubject<String> subject = BehaviorSubject.createWithDefaultValue("default");
188+
189+
@SuppressWarnings("unchecked")
190+
Observer<String> aObserver = mock(Observer.class);
191+
subject.subscribe(aObserver);
192+
193+
subject.onNext("one");
194+
subject.onCompleted();
195+
196+
assertCompletedObserver(aObserver);
197+
}
198+
199+
private void assertCompletedObserver(Observer<String> aObserver)
200+
{
201+
verify(aObserver, times(1)).onNext("default");
202+
verify(aObserver, times(1)).onNext("one");
203+
verify(aObserver, Mockito.never()).onError(any(Exception.class));
204+
verify(aObserver, times(1)).onCompleted();
205+
}
206+
207+
@Test
208+
public void testCompletedAfterError() {
209+
BehaviorSubject<String> subject = BehaviorSubject.createWithDefaultValue("default");
210+
211+
@SuppressWarnings("unchecked")
212+
Observer<String> aObserver = mock(Observer.class);
213+
subject.subscribe(aObserver);
214+
215+
subject.onNext("one");
216+
subject.onError(testException);
217+
subject.onNext("two");
218+
subject.onCompleted();
219+
220+
assertErrorObserver(aObserver);
221+
}
222+
223+
private void assertErrorObserver(Observer<String> aObserver)
224+
{
225+
verify(aObserver, times(1)).onNext("default");
226+
verify(aObserver, times(1)).onNext("one");
227+
verify(aObserver, times(1)).onError(testException);
228+
}
229+
230+
@Test
231+
public void testUnsubscribe()
232+
{
233+
UnsubscribeTester.test(new Func0<BehaviorSubject<String>>()
234+
{
235+
@Override
236+
public BehaviorSubject<String> call()
237+
{
238+
return BehaviorSubject.createWithDefaultValue("default");
239+
}
240+
}, new Action1<BehaviorSubject<String>>()
241+
{
242+
@Override
243+
public void call(BehaviorSubject<String> DefaultSubject)
244+
{
245+
DefaultSubject.onCompleted();
246+
}
247+
}, new Action1<BehaviorSubject<String>>()
248+
{
249+
@Override
250+
public void call(BehaviorSubject<String> DefaultSubject)
251+
{
252+
DefaultSubject.onError(new Exception());
253+
}
254+
}, new Action1<BehaviorSubject<String>>()
255+
{
256+
@Override
257+
public void call(BehaviorSubject<String> DefaultSubject)
258+
{
259+
DefaultSubject.onNext("one");
260+
}
261+
});
262+
}
263+
}
264+
}

0 commit comments

Comments
 (0)