Skip to content

Commit 42cff54

Browse files
Merge pull request ReactiveX#264 from benjchristensen/pull-256-merge-BehaviorSubject
Merge BehaviorSubject from Pull 256
2 parents 7ffc515 + a0a18ac commit 42cff54

File tree

3 files changed

+266
-9
lines changed

3 files changed

+266
-9
lines changed

rxjava-core/src/main/java/rx/subjects/AsyncSubject.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,8 @@
1515
*/
1616
package rx.subjects;
1717

18-
import static org.mockito.Matchers.any;
19-
import static org.mockito.Mockito.mock;
20-
import static org.mockito.Mockito.times;
21-
import static org.mockito.Mockito.verify;
22-
import static org.mockito.Matchers.anyString;
18+
import static org.mockito.Matchers.*;
19+
import static org.mockito.Mockito.*;
2320

2421
import java.util.concurrent.ConcurrentHashMap;
2522
import java.util.concurrent.atomic.AtomicReference;
@@ -30,7 +27,6 @@
3027
import rx.Observer;
3128
import rx.Subscription;
3229
import rx.util.AtomicObservableSubscription;
33-
import rx.util.SynchronizedObserver;
3430
import rx.util.functions.Action1;
3531
import rx.util.functions.Func0;
3632
import rx.util.functions.Func1;
@@ -80,7 +76,7 @@ public void unsubscribe() {
8076
});
8177

8278
// on subscribe add it to the map of outbound observers to notify
83-
observers.put(subscription, new SynchronizedObserver<T>(observer, subscription));
79+
observers.put(subscription, observer);
8480
return subscription;
8581
}
8682
};
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.subjects;
17+
18+
import static org.mockito.Matchers.*;
19+
import static org.mockito.Mockito.*;
20+
21+
import java.util.concurrent.ConcurrentHashMap;
22+
import java.util.concurrent.atomic.AtomicReference;
23+
24+
import org.junit.Test;
25+
import org.mockito.Mockito;
26+
27+
import rx.Observer;
28+
import rx.Subscription;
29+
import rx.util.AtomicObservableSubscription;
30+
import rx.util.functions.Action1;
31+
import rx.util.functions.Func0;
32+
import rx.util.functions.Func1;
33+
34+
/**
35+
* Subject that publishes the last and all subsequent events to each {@link Observer} that subscribes.
36+
* <p>
37+
* Example usage:
38+
* <p>
39+
* <pre> {@code
40+
41+
// observer will receive all events.
42+
BehaviorSubject<Object> subject = BehaviorSubject.createWithDefaultValue("default");
43+
subject.subscribe(observer);
44+
subject.onNext("one");
45+
subject.onNext("two");
46+
subject.onNext("three");
47+
48+
// observer will receive the "one", "two" and "three" events.
49+
BehaviorSubject<Object> subject = BehaviorSubject.createWithDefaultValue("default");
50+
subject.onNext("one");
51+
subject.subscribe(observer);
52+
subject.onNext("two");
53+
subject.onNext("three");
54+
55+
} </pre>
56+
*
57+
* @param <T>
58+
*/
59+
public class BehaviorSubject<T> extends Subject<T, T> {
60+
61+
/**
62+
* Creates a {@link BehaviorSubject} which publishes the last and all subsequent events to each
63+
* {@link Observer} that subscribes to it.
64+
*
65+
* @param defaultValue
66+
* The value which will be published to any {@link Observer} as long as the
67+
* {@link BehaviorSubject} has not yet received any events.
68+
* @return the constructed {@link BehaviorSubject}.
69+
*/
70+
public static <T> BehaviorSubject<T> createWithDefaultValue(T defaultValue) {
71+
final ConcurrentHashMap<Subscription, Observer<T>> observers = new ConcurrentHashMap<Subscription, Observer<T>>();
72+
73+
final AtomicReference<T> currentValue = new AtomicReference<T>(defaultValue);
74+
75+
Func1<Observer<T>, Subscription> onSubscribe = new Func1<Observer<T>, Subscription>() {
76+
@Override
77+
public Subscription call(Observer<T> observer) {
78+
final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
79+
80+
subscription.wrap(new Subscription() {
81+
@Override
82+
public void unsubscribe() {
83+
// on unsubscribe remove it from the map of outbound observers to notify
84+
observers.remove(subscription);
85+
}
86+
});
87+
88+
observer.onNext(currentValue.get());
89+
90+
// on subscribe add it to the map of outbound observers to notify
91+
observers.put(subscription, observer);
92+
return subscription;
93+
}
94+
};
95+
96+
return new BehaviorSubject<T>(currentValue, onSubscribe, observers);
97+
}
98+
99+
private final ConcurrentHashMap<Subscription, Observer<T>> observers;
100+
private final AtomicReference<T> currentValue;
101+
102+
protected BehaviorSubject(AtomicReference<T> currentValue, Func1<Observer<T>, Subscription> onSubscribe, ConcurrentHashMap<Subscription, Observer<T>> observers) {
103+
super(onSubscribe);
104+
this.currentValue = currentValue;
105+
this.observers = observers;
106+
}
107+
108+
@Override
109+
public void onCompleted() {
110+
for (Observer<T> observer : observers.values()) {
111+
observer.onCompleted();
112+
}
113+
}
114+
115+
@Override
116+
public void onError(Exception e) {
117+
for (Observer<T> observer : observers.values()) {
118+
observer.onError(e);
119+
}
120+
}
121+
122+
@Override
123+
public void onNext(T args) {
124+
currentValue.set(args);
125+
for (Observer<T> observer : observers.values()) {
126+
observer.onNext(args);
127+
}
128+
}
129+
130+
public static class UnitTest {
131+
132+
private final Exception testException = new Exception();
133+
134+
@Test
135+
public void testThatObserverReceivesDefaultValueIfNothingWasPublished() {
136+
BehaviorSubject<String> subject = BehaviorSubject.createWithDefaultValue("default");
137+
138+
@SuppressWarnings("unchecked")
139+
Observer<String> aObserver = mock(Observer.class);
140+
subject.subscribe(aObserver);
141+
142+
subject.onNext("one");
143+
subject.onNext("two");
144+
subject.onNext("three");
145+
146+
assertReceivedAllEvents(aObserver);
147+
}
148+
149+
private void assertReceivedAllEvents(Observer<String> aObserver) {
150+
verify(aObserver, times(1)).onNext("default");
151+
verify(aObserver, times(1)).onNext("one");
152+
verify(aObserver, times(1)).onNext("two");
153+
verify(aObserver, times(1)).onNext("three");
154+
verify(aObserver, Mockito.never()).onError(testException);
155+
verify(aObserver, Mockito.never()).onCompleted();
156+
}
157+
158+
@Test
159+
public void testThatObserverDoesNotReceiveDefaultValueIfSomethingWasPublished() {
160+
BehaviorSubject<String> subject = BehaviorSubject.createWithDefaultValue("default");
161+
162+
subject.onNext("one");
163+
164+
@SuppressWarnings("unchecked")
165+
Observer<String> aObserver = mock(Observer.class);
166+
subject.subscribe(aObserver);
167+
168+
subject.onNext("two");
169+
subject.onNext("three");
170+
171+
assertDidNotReceiveTheDefaultValue(aObserver);
172+
}
173+
174+
private void assertDidNotReceiveTheDefaultValue(Observer<String> aObserver) {
175+
verify(aObserver, Mockito.never()).onNext("default");
176+
verify(aObserver, times(1)).onNext("one");
177+
verify(aObserver, times(1)).onNext("two");
178+
verify(aObserver, times(1)).onNext("three");
179+
verify(aObserver, Mockito.never()).onError(testException);
180+
verify(aObserver, Mockito.never()).onCompleted();
181+
}
182+
183+
@Test
184+
public void testCompleted() {
185+
BehaviorSubject<String> subject = BehaviorSubject.createWithDefaultValue("default");
186+
187+
@SuppressWarnings("unchecked")
188+
Observer<String> aObserver = mock(Observer.class);
189+
subject.subscribe(aObserver);
190+
191+
subject.onNext("one");
192+
subject.onCompleted();
193+
194+
assertCompletedObserver(aObserver);
195+
}
196+
197+
private void assertCompletedObserver(Observer<String> aObserver)
198+
{
199+
verify(aObserver, times(1)).onNext("default");
200+
verify(aObserver, times(1)).onNext("one");
201+
verify(aObserver, Mockito.never()).onError(any(Exception.class));
202+
verify(aObserver, times(1)).onCompleted();
203+
}
204+
205+
@Test
206+
public void testCompletedAfterError() {
207+
BehaviorSubject<String> subject = BehaviorSubject.createWithDefaultValue("default");
208+
209+
@SuppressWarnings("unchecked")
210+
Observer<String> aObserver = mock(Observer.class);
211+
subject.subscribe(aObserver);
212+
213+
subject.onNext("one");
214+
subject.onError(testException);
215+
subject.onNext("two");
216+
subject.onCompleted();
217+
218+
assertErrorObserver(aObserver);
219+
}
220+
221+
private void assertErrorObserver(Observer<String> aObserver)
222+
{
223+
verify(aObserver, times(1)).onNext("default");
224+
verify(aObserver, times(1)).onNext("one");
225+
verify(aObserver, times(1)).onError(testException);
226+
}
227+
228+
@Test
229+
public void testUnsubscribe()
230+
{
231+
UnsubscribeTester.test(new Func0<BehaviorSubject<String>>()
232+
{
233+
@Override
234+
public BehaviorSubject<String> call()
235+
{
236+
return BehaviorSubject.createWithDefaultValue("default");
237+
}
238+
}, new Action1<BehaviorSubject<String>>()
239+
{
240+
@Override
241+
public void call(BehaviorSubject<String> DefaultSubject)
242+
{
243+
DefaultSubject.onCompleted();
244+
}
245+
}, new Action1<BehaviorSubject<String>>()
246+
{
247+
@Override
248+
public void call(BehaviorSubject<String> DefaultSubject)
249+
{
250+
DefaultSubject.onError(new Exception());
251+
}
252+
}, new Action1<BehaviorSubject<String>>()
253+
{
254+
@Override
255+
public void call(BehaviorSubject<String> DefaultSubject)
256+
{
257+
DefaultSubject.onNext("one");
258+
}
259+
});
260+
}
261+
}
262+
}

rxjava-core/src/main/java/rx/subjects/PublishSubject.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import rx.Observer;
3535
import rx.Subscription;
3636
import rx.util.AtomicObservableSubscription;
37-
import rx.util.SynchronizedObserver;
3837
import rx.util.functions.Action1;
3938
import rx.util.functions.Func0;
4039
import rx.util.functions.Func1;
@@ -78,7 +77,7 @@ public void unsubscribe() {
7877
});
7978

8079
// on subscribe add it to the map of outbound observers to notify
81-
observers.put(subscription, new SynchronizedObserver<T>(observer, subscription));
80+
observers.put(subscription, observer);
8281
return subscription;
8382
}
8483
};

0 commit comments

Comments
 (0)