Skip to content

Commit cf0c25f

Browse files
Merge pull request ReactiveX#1534 from benjchristensen/concat-backpressure
Concat Backpressure
2 parents 14a41f8 + 04349fa commit cf0c25f

File tree

2 files changed

+149
-37
lines changed

2 files changed

+149
-37
lines changed

rxjava-core/src/main/java/rx/internal/operators/OperatorConcat.java

Lines changed: 113 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717

1818
import java.util.concurrent.ConcurrentLinkedQueue;
1919
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
20+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
21+
2022
import rx.Observable;
2123
import rx.Observable.Operator;
24+
import rx.Producer;
2225
import rx.Subscriber;
2326
import rx.functions.Action0;
2427
import rx.observers.SerializedSubscriber;
@@ -30,30 +33,55 @@
3033
* <p>
3134
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/concat.png" alt="">
3235
*
33-
* @param <T> the source and result value type
36+
* @param <T>
37+
* the source and result value type
3438
*/
3539
public final class OperatorConcat<T> implements Operator<T, Observable<? extends T>> {
3640
@Override
3741
public Subscriber<? super Observable<? extends T>> call(final Subscriber<? super T> child) {
3842
final SerializedSubscriber<T> s = new SerializedSubscriber<T>(child);
3943
final SerialSubscription current = new SerialSubscription();
4044
child.add(current);
41-
return new ConcatSubscriber<T>(s, current);
45+
ConcatSubscriber<T> cs = new ConcatSubscriber<T>(s, current);
46+
ConcatProducer<T> cp = new ConcatProducer<T>(cs);
47+
child.setProducer(cp);
48+
return cs;
4249
}
43-
50+
51+
static final class ConcatProducer<T> implements Producer {
52+
final ConcatSubscriber<T> cs;
53+
54+
ConcatProducer(ConcatSubscriber<T> cs) {
55+
this.cs = cs;
56+
}
57+
58+
@Override
59+
public void request(long n) {
60+
cs.requestFromChild(n);
61+
}
62+
63+
}
64+
4465
static final class ConcatSubscriber<T> extends Subscriber<Observable<? extends T>> {
4566
final NotificationLite<Observable<? extends T>> nl = NotificationLite.instance();
46-
private final Subscriber<T> s;
67+
private final Subscriber<T> child;
4768
private final SerialSubscription current;
4869
final ConcurrentLinkedQueue<Object> queue;
70+
71+
volatile ConcatInnerSubscriber<T> currentSubscriber;
72+
4973
volatile int wip;
5074
@SuppressWarnings("rawtypes")
51-
static final AtomicIntegerFieldUpdater<ConcatSubscriber> WIP_UPDATER
52-
= AtomicIntegerFieldUpdater.newUpdater(ConcatSubscriber.class, "wip");
53-
75+
static final AtomicIntegerFieldUpdater<ConcatSubscriber> WIP_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ConcatSubscriber.class, "wip");
76+
77+
// accessed by REQUESTED_UPDATER
78+
private volatile long requested;
79+
@SuppressWarnings("rawtypes")
80+
private static final AtomicLongFieldUpdater<ConcatSubscriber> REQUESTED_UPDATER = AtomicLongFieldUpdater.newUpdater(ConcatSubscriber.class, "requested");
81+
5482
public ConcatSubscriber(Subscriber<T> s, SerialSubscription current) {
5583
super(s);
56-
this.s = s;
84+
this.child = s;
5785
this.current = current;
5886
this.queue = new ConcurrentLinkedQueue<Object>();
5987
add(Subscriptions.create(new Action0() {
@@ -71,20 +99,42 @@ public void onStart() {
7199
request(2);
72100
}
73101

102+
private void requestFromChild(long n) {
103+
// we track 'requested' so we know whether we should subscribe the next or not
104+
if (REQUESTED_UPDATER.getAndAdd(this, n) == 0) {
105+
if (currentSubscriber == null && wip > 0) {
106+
// this means we may be moving from one subscriber to another after having stopped processing
107+
// so need to kick off the subscribe via this request notification
108+
subscribeNext();
109+
// return here as we don't want to do the requestMore logic below (which would double request)
110+
return;
111+
}
112+
}
113+
114+
if (currentSubscriber != null) {
115+
// otherwise we are just passing it through to the currentSubscriber
116+
currentSubscriber.requestMore(n);
117+
}
118+
}
119+
120+
private void decrementRequested() {
121+
REQUESTED_UPDATER.decrementAndGet(this);
122+
}
123+
74124
@Override
75125
public void onNext(Observable<? extends T> t) {
76126
queue.add(nl.next(t));
77127
if (WIP_UPDATER.getAndIncrement(this) == 0) {
78128
subscribeNext();
79129
}
80130
}
81-
131+
82132
@Override
83133
public void onError(Throwable e) {
84-
s.onError(e);
134+
child.onError(e);
85135
unsubscribe();
86136
}
87-
137+
88138
@Override
89139
public void onCompleted() {
90140
queue.add(nl.completed());
@@ -95,39 +145,65 @@ public void onCompleted() {
95145

96146
void completeInner() {
97147
request(1);
148+
currentSubscriber = null;
98149
if (WIP_UPDATER.decrementAndGet(this) > 0) {
99150
subscribeNext();
100151
}
101152
}
102153

103154
void subscribeNext() {
104-
Object o = queue.poll();
105-
if (nl.isCompleted(o)) {
106-
s.onCompleted();
107-
} else if (o != null) {
108-
Observable<? extends T> obs = nl.getValue(o);
109-
Subscriber<T> sourceSub = new Subscriber<T>() {
110-
111-
@Override
112-
public void onNext(T t) {
113-
// TODO need to support backpressure here https://github.com/Netflix/RxJava/issues/1480
114-
s.onNext(t);
115-
}
116-
117-
@Override
118-
public void onError(Throwable e) {
119-
ConcatSubscriber.this.onError(e);
120-
}
121-
122-
@Override
123-
public void onCompleted() {
124-
completeInner();
125-
}
126-
127-
};
128-
current.set(sourceSub);
129-
obs.unsafeSubscribe(sourceSub);
155+
if (requested > 0) {
156+
Object o = queue.poll();
157+
if (nl.isCompleted(o)) {
158+
child.onCompleted();
159+
} else if (o != null) {
160+
Observable<? extends T> obs = nl.getValue(o);
161+
currentSubscriber = new ConcatInnerSubscriber<T>(this, child, requested);
162+
current.set(currentSubscriber);
163+
obs.unsafeSubscribe(currentSubscriber);
164+
}
165+
} else {
166+
// requested == 0, so we'll peek to see if we are completed, otherwise wait until another request
167+
Object o = queue.peek();
168+
if (nl.isCompleted(o)) {
169+
child.onCompleted();
170+
}
130171
}
131172
}
132173
}
174+
175+
static class ConcatInnerSubscriber<T> extends Subscriber<T> {
176+
177+
private final Subscriber<T> child;
178+
private final ConcatSubscriber<T> parent;
179+
180+
public ConcatInnerSubscriber(ConcatSubscriber<T> parent, Subscriber<T> child, long initialRequest) {
181+
this.parent = parent;
182+
this.child = child;
183+
request(initialRequest);
184+
}
185+
186+
void requestMore(long n) {
187+
request(n);
188+
}
189+
190+
@Override
191+
public void onNext(T t) {
192+
parent.decrementRequested();
193+
child.onNext(t);
194+
}
195+
196+
@Override
197+
public void onError(Throwable e) {
198+
// terminal error through parent so everything gets cleaned up, including this inner
199+
parent.onError(e);
200+
}
201+
202+
@Override
203+
public void onCompleted() {
204+
// terminal completion to parent so it continues to the next
205+
parent.completeInner();
206+
}
207+
208+
};
133209
}

rxjava-core/src/test/java/rx/internal/operators/OperatorConcatTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@
4040
import rx.Observer;
4141
import rx.Subscriber;
4242
import rx.Subscription;
43+
import rx.internal.util.RxRingBuffer;
44+
import rx.observers.TestSubscriber;
45+
import rx.schedulers.Schedulers;
4346
import rx.schedulers.TestScheduler;
4447
import rx.subscriptions.BooleanSubscription;
4548

@@ -660,4 +663,37 @@ public void testConcatOuterBackpressure() {
660663
.take(1)
661664
.toBlocking().single());
662665
}
666+
667+
@Test
668+
public void testInnerBackpressureWithAlignedBoundaries() {
669+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
670+
Observable.range(0, RxRingBuffer.SIZE * 2)
671+
.concatWith(Observable.range(0, RxRingBuffer.SIZE * 2))
672+
.observeOn(Schedulers.computation()) // observeOn has a backpressured RxRingBuffer
673+
.subscribe(ts);
674+
675+
ts.awaitTerminalEvent();
676+
ts.assertNoErrors();
677+
assertEquals(RxRingBuffer.SIZE * 4, ts.getOnNextEvents().size());
678+
}
679+
680+
/*
681+
* Testing without counts aligned with buffer sizes because concat must prevent the subscription
682+
* to the next Observable if request == 0 which can happen at the end of a subscription
683+
* if the request size == emitted size. It needs to delay subscription until the next request when aligned,
684+
* when not aligned, it just subscribesNext with the outstanding request amount.
685+
*/
686+
@Test
687+
public void testInnerBackpressureWithoutAlignedBoundaries() {
688+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
689+
Observable.range(0, (RxRingBuffer.SIZE * 2) + 10)
690+
.concatWith(Observable.range(0, (RxRingBuffer.SIZE * 2) + 10))
691+
.observeOn(Schedulers.computation()) // observeOn has a backpressured RxRingBuffer
692+
.subscribe(ts);
693+
694+
ts.awaitTerminalEvent();
695+
ts.assertNoErrors();
696+
assertEquals((RxRingBuffer.SIZE * 4) + 20, ts.getOnNextEvents().size());
697+
}
698+
663699
}

0 commit comments

Comments
 (0)