Skip to content

Commit 442292c

Browse files
Merge pull request ReactiveX#476 from benjchristensen/bugfix-asyncsubject-empty
Don't emit null onComplete when no onNext received in AsyncSubject
2 parents 4856f64 + fe7e8a7 commit 442292c

File tree

2 files changed

+32
-24
lines changed

2 files changed

+32
-24
lines changed

rxjava-core/src/main/java/rx/subjects/AsyncSubject.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.subjects;
1717

1818
import java.util.concurrent.ConcurrentHashMap;
19+
import java.util.concurrent.atomic.AtomicBoolean;
1920
import java.util.concurrent.atomic.AtomicReference;
2021

2122
import rx.Observer;
@@ -85,6 +86,7 @@ public void unsubscribe() {
8586

8687
private final ConcurrentHashMap<Subscription, Observer<? super T>> observers;
8788
private final AtomicReference<T> currentValue;
89+
private final AtomicBoolean hasValue = new AtomicBoolean();
8890

8991
protected AsyncSubject(OnSubscribeFunc<T> onSubscribe, ConcurrentHashMap<Subscription, Observer<? super T>> observers) {
9092
super(onSubscribe);
@@ -96,9 +98,9 @@ protected AsyncSubject(OnSubscribeFunc<T> onSubscribe, ConcurrentHashMap<Subscri
9698
public void onCompleted() {
9799
T finalValue = currentValue.get();
98100
for (Observer<? super T> observer : observers.values()) {
99-
observer.onNext(finalValue);
100-
}
101-
for (Observer<? super T> observer : observers.values()) {
101+
if (hasValue.get()) {
102+
observer.onNext(finalValue);
103+
}
102104
observer.onCompleted();
103105
}
104106
}
@@ -112,6 +114,7 @@ public void onError(Throwable e) {
112114

113115
@Override
114116
public void onNext(T args) {
117+
hasValue.set(true);
115118
currentValue.set(args);
116119
}
117120
}

rxjava-core/src/test/java/rx/subjects/AsyncSubjectTest.java

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,6 +19,7 @@
1919
import static org.mockito.Mockito.*;
2020

2121
import org.junit.Test;
22+
import org.mockito.InOrder;
2223
import org.mockito.Mockito;
2324

2425
import rx.Observer;
@@ -42,10 +43,6 @@ public void testNeverCompleted() {
4243
subject.onNext("two");
4344
subject.onNext("three");
4445

45-
assertNeverCompletedObserver(aObserver);
46-
}
47-
48-
private void assertNeverCompletedObserver(Observer<String> aObserver) {
4946
verify(aObserver, Mockito.never()).onNext(anyString());
5047
verify(aObserver, Mockito.never()).onError(testException);
5148
verify(aObserver, Mockito.never()).onCompleted();
@@ -64,10 +61,6 @@ public void testCompleted() {
6461
subject.onNext("three");
6562
subject.onCompleted();
6663

67-
assertCompletedObserver(aObserver);
68-
}
69-
70-
private void assertCompletedObserver(Observer<String> aObserver) {
7164
verify(aObserver, times(1)).onNext("three");
7265
verify(aObserver, Mockito.never()).onError(any(Throwable.class));
7366
verify(aObserver, times(1)).onCompleted();
@@ -89,10 +82,6 @@ public void testError() {
8982
subject.onError(new Throwable());
9083
subject.onCompleted();
9184

92-
assertErrorObserver(aObserver);
93-
}
94-
95-
private void assertErrorObserver(Observer<String> aObserver) {
9685
verify(aObserver, Mockito.never()).onNext(anyString());
9786
verify(aObserver, times(1)).onError(testException);
9887
verify(aObserver, Mockito.never()).onCompleted();
@@ -110,15 +99,14 @@ public void testUnsubscribeBeforeCompleted() {
11099
subject.onNext("two");
111100

112101
subscription.unsubscribe();
113-
assertNoOnNextEventsReceived(aObserver);
102+
103+
verify(aObserver, Mockito.never()).onNext(anyString());
104+
verify(aObserver, Mockito.never()).onError(any(Throwable.class));
105+
verify(aObserver, Mockito.never()).onCompleted();
114106

115107
subject.onNext("three");
116108
subject.onCompleted();
117109

118-
assertNoOnNextEventsReceived(aObserver);
119-
}
120-
121-
private void assertNoOnNextEventsReceived(Observer<String> aObserver) {
122110
verify(aObserver, Mockito.never()).onNext(anyString());
123111
verify(aObserver, Mockito.never()).onError(any(Throwable.class));
124112
verify(aObserver, Mockito.never()).onCompleted();
@@ -146,4 +134,21 @@ public void call(AsyncSubject<Object> DefaultSubject) {
146134
null
147135
);
148136
}
137+
138+
@Test
139+
public void testEmptySubjectCompleted() {
140+
AsyncSubject<String> subject = AsyncSubject.create();
141+
142+
@SuppressWarnings("unchecked")
143+
Observer<String> aObserver = mock(Observer.class);
144+
subject.subscribe(aObserver);
145+
146+
subject.onCompleted();
147+
148+
InOrder inOrder = inOrder(aObserver);
149+
inOrder.verify(aObserver, never()).onNext(null);
150+
inOrder.verify(aObserver, never()).onNext(any(String.class));
151+
inOrder.verify(aObserver, times(1)).onCompleted();
152+
inOrder.verifyNoMoreInteractions();
153+
}
149154
}

0 commit comments

Comments
 (0)