Skip to content

Commit 22a56d0

Browse files
zsxwingbenjchristensen
authored andcommitted
Amb with backpressure support
1 parent d403f69 commit 22a56d0

File tree

2 files changed

+75
-1
lines changed

2 files changed

+75
-1
lines changed

rxjava-core/src/main/java/rx/internal/operators/OnSubscribeAmb.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import rx.Observable;
2323
import rx.Observable.OnSubscribe;
24+
import rx.Producer;
2425
import rx.Subscriber;
2526

2627
/**
@@ -274,6 +275,10 @@ private AmbSubscriber(Subscriber<? super T> subscriber, int index, AtomicInteger
274275
this.index = index;
275276
}
276277

278+
public void requestMore(long n) {
279+
request(n);
280+
}
281+
277282
@Override
278283
public void onNext(T args) {
279284
if (!isSelected()) {
@@ -318,7 +323,8 @@ private OnSubscribeAmb(Iterable<? extends Observable<? extends T>> sources) {
318323

319324
@Override
320325
public void call(Subscriber<? super T> subscriber) {
321-
AtomicInteger choice = new AtomicInteger(AmbSubscriber.NONE);
326+
final AtomicInteger choice = new AtomicInteger(AmbSubscriber.NONE);
327+
final List<AmbSubscriber<T>> ambSubscribers = new ArrayList<AmbSubscriber<T>>();
322328
int index = 0;
323329
for (Observable<? extends T> source : sources) {
324330
if (subscriber.isUnsubscribed()) {
@@ -329,10 +335,34 @@ public void call(Subscriber<? super T> subscriber) {
329335
break;
330336
}
331337
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(subscriber, index, choice);
338+
ambSubscribers.add(ambSubscriber);
332339
subscriber.add(ambSubscriber);
333340
source.unsafeSubscribe(ambSubscriber);
334341
index++;
335342
}
343+
// setProducer at the end so that `ambSubscribers` can be finalized before `subscriber` calls `request`
344+
subscriber.setProducer(new Producer() {
345+
346+
private volatile AmbSubscriber<T> selectedAmbSubscriber;
347+
348+
@Override
349+
public void request(long n) {
350+
if (choice.get() == AmbSubscriber.NONE) {
351+
for (AmbSubscriber<T> ambSubscriber : ambSubscribers) {
352+
// Once one Observable emits a message, `unsubscribe` of other Observables will be called
353+
// and further messages will be dropped. Therefore, requesting all sources won't cause
354+
// the backpressure issue.
355+
ambSubscriber.requestMore(n);
356+
}
357+
}
358+
else {
359+
if (selectedAmbSubscriber == null) {
360+
selectedAmbSubscriber = ambSubscribers.get(choice.get());
361+
}
362+
selectedAmbSubscriber.requestMore(n);
363+
}
364+
}
365+
});
336366
}
337367

338368
}

rxjava-core/src/test/java/rx/internal/operators/OnSubscribeAmbTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertEquals;
1819
import static org.mockito.Mockito.inOrder;
1920
import static org.mockito.Mockito.mock;
2021
import static org.mockito.Mockito.times;
2122
import static rx.internal.operators.OnSubscribeAmb.amb;
2223

2324
import java.io.IOException;
2425
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicLong;
2527

2628
import org.junit.Before;
2729
import org.junit.Test;
@@ -30,9 +32,11 @@
3032
import rx.Observable;
3133
import rx.Observable.OnSubscribe;
3234
import rx.Observer;
35+
import rx.Producer;
3336
import rx.Scheduler;
3437
import rx.Subscriber;
3538
import rx.functions.Action0;
39+
import rx.observers.TestSubscriber;
3640
import rx.schedulers.TestScheduler;
3741
import rx.subscriptions.CompositeSubscription;
3842

@@ -157,4 +161,44 @@ public void testAmb3() {
157161
inOrder.verifyNoMoreInteractions();
158162
}
159163

164+
@Test
165+
public void testProducerRequestThroughAmb() {
166+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
167+
ts.requestMore(3);
168+
final AtomicLong requested1 = new AtomicLong();
169+
final AtomicLong requested2 = new AtomicLong();
170+
Observable<Integer> o1 = Observable.create(new OnSubscribe<Integer>() {
171+
172+
@Override
173+
public void call(Subscriber<? super Integer> s) {
174+
s.setProducer(new Producer() {
175+
176+
@Override
177+
public void request(long n) {
178+
requested1.set(n);
179+
}
180+
181+
});
182+
}
183+
184+
});
185+
Observable<Integer> o2 = Observable.create(new OnSubscribe<Integer>() {
186+
187+
@Override
188+
public void call(Subscriber<? super Integer> s) {
189+
s.setProducer(new Producer() {
190+
191+
@Override
192+
public void request(long n) {
193+
requested2.set(n);
194+
}
195+
196+
});
197+
}
198+
199+
});
200+
Observable.amb(o1, o2).subscribe(ts);
201+
assertEquals(3, requested1.get());
202+
assertEquals(3, requested2.get());
203+
}
160204
}

0 commit comments

Comments
 (0)