Skip to content

Commit a7ba04b

Browse files
committed
Merge pull request ReactiveX#3351 from vqvu/blocking-iterator-backpressure
Make BlockingOperatorToIterator exert backpressure.
2 parents 125b10d + d1a8739 commit a7ba04b

File tree

2 files changed

+116
-50
lines changed

2 files changed

+116
-50
lines changed

src/main/java/rx/internal/operators/BlockingOperatorToIterator.java

Lines changed: 70 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
import rx.Notification;
2424
import rx.Observable;
2525
import rx.Subscriber;
26-
import rx.Subscription;
2726
import rx.exceptions.Exceptions;
27+
import rx.internal.util.RxRingBuffer;
2828

2929
/**
3030
* Returns an Iterator that iterates over all items emitted by a specified Observable.
@@ -47,68 +47,88 @@ private BlockingOperatorToIterator() {
4747
* @return the iterator that could be used to iterate over the elements of the observable.
4848
*/
4949
public static <T> Iterator<T> toIterator(Observable<? extends T> source) {
50-
final BlockingQueue<Notification<? extends T>> notifications = new LinkedBlockingQueue<Notification<? extends T>>();
50+
SubscriberIterator<T> subscriber = new SubscriberIterator<T>();
5151

5252
// using subscribe instead of unsafeSubscribe since this is a BlockingObservable "final subscribe"
53-
final Subscription subscription = source.materialize().subscribe(new Subscriber<Notification<? extends T>>() {
54-
@Override
55-
public void onCompleted() {
56-
// ignore
57-
}
53+
source.materialize().subscribe(subscriber);
54+
return subscriber;
55+
}
5856

59-
@Override
60-
public void onError(Throwable e) {
61-
notifications.offer(Notification.<T>createOnError(e));
62-
}
57+
public static final class SubscriberIterator<T>
58+
extends Subscriber<Notification<? extends T>> implements Iterator<T> {
6359

64-
@Override
65-
public void onNext(Notification<? extends T> args) {
66-
notifications.offer(args);
67-
}
68-
});
60+
static final int LIMIT = 3 * RxRingBuffer.SIZE / 4;
6961

70-
return new Iterator<T>() {
71-
private Notification<? extends T> buf;
62+
private final BlockingQueue<Notification<? extends T>> notifications;
63+
private Notification<? extends T> buf;
64+
private int received;
7265

73-
@Override
74-
public boolean hasNext() {
75-
if (buf == null) {
76-
buf = take();
77-
}
78-
if (buf.isOnError()) {
79-
throw Exceptions.propagate(buf.getThrowable());
66+
public SubscriberIterator() {
67+
this.notifications = new LinkedBlockingQueue<Notification<? extends T>>();
68+
}
69+
70+
@Override
71+
public void onStart() {
72+
request(RxRingBuffer.SIZE);
73+
}
74+
75+
@Override
76+
public void onCompleted() {
77+
// ignore
78+
}
79+
80+
@Override
81+
public void onError(Throwable e) {
82+
notifications.offer(Notification.<T>createOnError(e));
83+
}
84+
85+
@Override
86+
public void onNext(Notification<? extends T> args) {
87+
notifications.offer(args);
88+
}
89+
90+
@Override
91+
public boolean hasNext() {
92+
if (buf == null) {
93+
buf = take();
94+
received++;
95+
if (received >= LIMIT) {
96+
request(received);
97+
received = 0;
8098
}
81-
return !buf.isOnCompleted();
8299
}
100+
if (buf.isOnError()) {
101+
throw Exceptions.propagate(buf.getThrowable());
102+
}
103+
return !buf.isOnCompleted();
104+
}
83105

84-
@Override
85-
public T next() {
86-
if (hasNext()) {
87-
T result = buf.getValue();
88-
buf = null;
89-
return result;
90-
}
91-
throw new NoSuchElementException();
106+
@Override
107+
public T next() {
108+
if (hasNext()) {
109+
T result = buf.getValue();
110+
buf = null;
111+
return result;
92112
}
113+
throw new NoSuchElementException();
114+
}
93115

94-
private Notification<? extends T> take() {
95-
try {
96-
Notification<? extends T> poll = notifications.poll();
97-
if (poll != null) {
98-
return poll;
99-
}
100-
return notifications.take();
101-
} catch (InterruptedException e) {
102-
subscription.unsubscribe();
103-
throw Exceptions.propagate(e);
116+
private Notification<? extends T> take() {
117+
try {
118+
Notification<? extends T> poll = notifications.poll();
119+
if (poll != null) {
120+
return poll;
104121
}
122+
return notifications.take();
123+
} catch (InterruptedException e) {
124+
unsubscribe();
125+
throw Exceptions.propagate(e);
105126
}
127+
}
106128

107-
@Override
108-
public void remove() {
109-
throw new UnsupportedOperationException("Read-only iterator");
110-
}
111-
};
129+
@Override
130+
public void remove() {
131+
throw new UnsupportedOperationException("Read-only iterator");
132+
}
112133
}
113-
114134
}

src/test/java/rx/internal/operators/BlockingOperatorToIteratorTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import rx.Observable.OnSubscribe;
2727
import rx.Subscriber;
2828
import rx.exceptions.TestException;
29+
import rx.internal.operators.BlockingOperatorToIterator.SubscriberIterator;
30+
import rx.internal.util.RxRingBuffer;
2931

3032
public class BlockingOperatorToIteratorTest {
3133

@@ -81,4 +83,48 @@ public void call(Subscriber<? super String> subscriber) {
8183
System.out.println(string);
8284
}
8385
}
86+
87+
@Test
88+
public void testIteratorExertBackpressure() {
89+
final Counter src = new Counter();
90+
91+
Observable<Integer> obs = Observable.from(new Iterable<Integer>() {
92+
@Override
93+
public Iterator<Integer> iterator() {
94+
return src;
95+
}
96+
});
97+
98+
Iterator<Integer> it = toIterator(obs);
99+
while (it.hasNext()) {
100+
// Correct backpressure should cause this interleaved behavior.
101+
// We first request RxRingBuffer.SIZE. Then in increments of
102+
// SubscriberIterator.LIMIT.
103+
int i = it.next();
104+
int expected = i - (i % SubscriberIterator.LIMIT) + RxRingBuffer.SIZE;
105+
expected = Math.min(expected, Counter.MAX);
106+
107+
assertEquals(expected, src.count);
108+
}
109+
}
110+
111+
public static final class Counter implements Iterator<Integer> {
112+
static final int MAX = 5 * RxRingBuffer.SIZE;
113+
public int count;
114+
115+
@Override
116+
public boolean hasNext() {
117+
return count < MAX;
118+
}
119+
120+
@Override
121+
public Integer next() {
122+
return ++count;
123+
}
124+
125+
@Override
126+
public void remove() {
127+
throw new UnsupportedOperationException();
128+
}
129+
}
84130
}

0 commit comments

Comments
 (0)