Skip to content

Commit c1ec1f4

Browse files
committed
Make "single" support backpressure
Signed-off-by: zsxwing <[email protected]>
1 parent a2a7a29 commit c1ec1f4

File tree

2 files changed

+55
-3
lines changed

2 files changed

+55
-3
lines changed

rxjava-core/src/main/java/rx/internal/operators/OperatorSingle.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ public void onNext(T value) {
6060
} else {
6161
this.value = value;
6262
isNonEmpty = true;
63+
// Issue: https://github.com/Netflix/RxJava/pull/1527
64+
// Because we cache a value and don't emit now, we need to request another one.
65+
request(1);
6366
}
6467
}
6568

rxjava-core/src/test/java/rx/internal/operators/OperatorSingleTest.java

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,9 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertEquals;
1819
import static org.mockito.Matchers.isA;
19-
import static org.mockito.Mockito.inOrder;
20-
import static org.mockito.Mockito.mock;
21-
import static org.mockito.Mockito.times;
20+
import static org.mockito.Mockito.*;
2221

2322
import java.util.NoSuchElementException;
2423

@@ -27,7 +26,9 @@
2726

2827
import rx.Observable;
2928
import rx.Observer;
29+
import rx.Subscriber;
3030
import rx.functions.Func1;
31+
import rx.functions.Func2;
3132

3233
public class OperatorSingleTest {
3334

@@ -241,4 +242,52 @@ public Boolean call(Integer t1) {
241242
inOrder.verify(observer, times(1)).onCompleted();
242243
inOrder.verifyNoMoreInteractions();
243244
}
245+
246+
@Test
247+
public void testSingleWithBackpressure() {
248+
Observable<Integer> observable = Observable.from(1, 2).single();
249+
250+
Subscriber<Integer> subscriber = spy(new Subscriber<Integer>() {
251+
252+
@Override
253+
public void onStart() {
254+
request(1);
255+
}
256+
257+
@Override
258+
public void onCompleted() {
259+
260+
}
261+
262+
@Override
263+
public void onError(Throwable e) {
264+
265+
}
266+
267+
@Override
268+
public void onNext(Integer integer) {
269+
request(1);
270+
}
271+
});
272+
observable.subscribe(subscriber);
273+
274+
InOrder inOrder = inOrder(subscriber);
275+
inOrder.verify(subscriber, times(1)).onError(isA(IllegalArgumentException.class));
276+
inOrder.verifyNoMoreInteractions();
277+
}
278+
279+
@Test(timeout = 30000)
280+
public void testIssue1527() throws InterruptedException {
281+
//https://github.com/Netflix/RxJava/pull/1527
282+
Observable<Integer> source = Observable.from(1, 2, 3, 4, 5, 6);
283+
Observable<Integer> reduced = source.reduce(new Func2<Integer, Integer, Integer>() {
284+
@Override
285+
public Integer call(Integer i1, Integer i2) {
286+
return i1 + i2;
287+
}
288+
});
289+
290+
Integer r = reduced.toBlocking().first();
291+
assertEquals(21, r.intValue());
292+
}
244293
}

0 commit comments

Comments
 (0)