Skip to content

Commit 14a41f8

Browse files
Merge pull request ReactiveX#1533 from benjchristensen/issue-1516
Amb + Backpressure
2 parents d403f69 + c880788 commit 14a41f8

File tree

2 files changed

+154
-29
lines changed

2 files changed

+154
-29
lines changed

rxjava-core/src/main/java/rx/internal/operators/OnSubscribeAmb.java

Lines changed: 92 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,17 @@
1616
package rx.internal.operators;
1717

1818
import java.util.ArrayList;
19+
import java.util.Collection;
1920
import java.util.List;
20-
import java.util.concurrent.atomic.AtomicInteger;
21+
import java.util.concurrent.ConcurrentLinkedQueue;
22+
import java.util.concurrent.atomic.AtomicReference;
2123

2224
import rx.Observable;
2325
import rx.Observable.OnSubscribe;
26+
import rx.Producer;
2427
import rx.Subscriber;
28+
import rx.functions.Action0;
29+
import rx.subscriptions.Subscriptions;
2530

2631
/**
2732
* Given multiple {@link Observable}s, propagates the one that first emits an item.
@@ -262,22 +267,23 @@ public static <T> OnSubscribe<T> amb(final Iterable<? extends Observable<? exten
262267

263268
private static final class AmbSubscriber<T> extends Subscriber<T> {
264269

265-
private static final int NONE = -1;
266-
267270
private final Subscriber<? super T> subscriber;
268-
private final int index;
269-
private final AtomicInteger choice;
271+
private final Selection<T> selection;
270272

271-
private AmbSubscriber(Subscriber<? super T> subscriber, int index, AtomicInteger choice) {
273+
private AmbSubscriber(long requested, Subscriber<? super T> subscriber, Selection<T> selection) {
272274
this.subscriber = subscriber;
273-
this.choice = choice;
274-
this.index = index;
275+
this.selection = selection;
276+
// initial request
277+
request(requested);
278+
}
279+
280+
private final void requestMore(long n) {
281+
request(n);
275282
}
276283

277284
@Override
278285
public void onNext(T args) {
279286
if (!isSelected()) {
280-
unsubscribe();
281287
return;
282288
}
283289
subscriber.onNext(args);
@@ -286,7 +292,6 @@ public void onNext(T args) {
286292
@Override
287293
public void onCompleted() {
288294
if (!isSelected()) {
289-
unsubscribe();
290295
return;
291296
}
292297
subscriber.onCompleted();
@@ -295,44 +300,102 @@ public void onCompleted() {
295300
@Override
296301
public void onError(Throwable e) {
297302
if (!isSelected()) {
298-
unsubscribe();
299303
return;
300304
}
301305
subscriber.onError(e);
302306
}
303307

304308
private boolean isSelected() {
305-
int ch = choice.get();
306-
if (ch == NONE) {
307-
return choice.compareAndSet(NONE, index);
309+
if (selection.choice.get() == this) {
310+
// fast-path
311+
return true;
312+
} else {
313+
if (selection.choice.compareAndSet(null, this)) {
314+
selection.unsubscribeOthers(this);
315+
return true;
316+
} else {
317+
// we lost so unsubscribe ... and force cleanup again due to possible race conditions
318+
selection.unsubscribeLosers();
319+
return false;
320+
}
321+
}
322+
}
323+
}
324+
325+
private static class Selection<T> {
326+
final AtomicReference<AmbSubscriber<T>> choice = new AtomicReference<AmbSubscriber<T>>();
327+
final Collection<AmbSubscriber<T>> ambSubscribers = new ConcurrentLinkedQueue<AmbSubscriber<T>>();
328+
329+
public void unsubscribeLosers() {
330+
AmbSubscriber<T> winner = choice.get();
331+
if(winner != null) {
332+
unsubscribeOthers(winner);
333+
}
334+
}
335+
336+
public void unsubscribeOthers(AmbSubscriber<T> notThis) {
337+
for (AmbSubscriber<T> other : ambSubscribers) {
338+
if (other != notThis) {
339+
other.unsubscribe();
340+
}
308341
}
309-
return ch == index;
342+
ambSubscribers.clear();
310343
}
344+
311345
}
312346

313347
private final Iterable<? extends Observable<? extends T>> sources;
348+
private final Selection<T> selection = new Selection<T>();
314349

315350
private OnSubscribeAmb(Iterable<? extends Observable<? extends T>> sources) {
316351
this.sources = sources;
317352
}
318353

319354
@Override
320-
public void call(Subscriber<? super T> subscriber) {
321-
AtomicInteger choice = new AtomicInteger(AmbSubscriber.NONE);
322-
int index = 0;
323-
for (Observable<? extends T> source : sources) {
324-
if (subscriber.isUnsubscribed()) {
325-
break;
355+
public void call(final Subscriber<? super T> subscriber) {
356+
subscriber.add(Subscriptions.create(new Action0() {
357+
358+
@Override
359+
public void call() {
360+
if (selection.choice.get() != null) {
361+
// there is a single winner so we unsubscribe it
362+
selection.choice.get().unsubscribe();
363+
}
364+
// if we are racing with others still existing, we'll also unsubscribe them
365+
if(!selection.ambSubscribers.isEmpty()) {
366+
for (AmbSubscriber<T> other : selection.ambSubscribers) {
367+
other.unsubscribe();
368+
}
369+
selection.ambSubscribers.clear();
370+
}
326371
}
327-
if (choice.get() != AmbSubscriber.NONE) {
328-
// Already choose someone, the rest Observables can be skipped.
329-
break;
372+
373+
}));
374+
subscriber.setProducer(new Producer() {
375+
376+
@Override
377+
public void request(long n) {
378+
if (selection.choice.get() != null) {
379+
// propagate the request to that single Subscriber that won
380+
selection.choice.get().requestMore(n);
381+
} else {
382+
for (Observable<? extends T> source : sources) {
383+
if (subscriber.isUnsubscribed()) {
384+
break;
385+
}
386+
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(n, subscriber, selection);
387+
selection.ambSubscribers.add(ambSubscriber);
388+
// possible race condition in previous lines ... a choice may have been made so double check (instead of synchronizing)
389+
if (selection.choice.get() != null) {
390+
// Already chose one, the rest can be skipped and we can clean up
391+
selection.unsubscribeOthers(selection.choice.get());
392+
break;
393+
}
394+
source.unsafeSubscribe(ambSubscriber);
395+
}
396+
}
330397
}
331-
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(subscriber, index, choice);
332-
subscriber.add(ambSubscriber);
333-
source.unsafeSubscribe(ambSubscriber);
334-
index++;
335-
}
398+
});
336399
}
337400

338401
}

rxjava-core/src/test/java/rx/internal/operators/OnSubscribeAmbTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertEquals;
1819
import static org.mockito.Mockito.inOrder;
1920
import static org.mockito.Mockito.mock;
2021
import static org.mockito.Mockito.times;
2122
import static rx.internal.operators.OnSubscribeAmb.amb;
2223

2324
import java.io.IOException;
2425
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicLong;
2527

2628
import org.junit.Before;
2729
import org.junit.Test;
@@ -30,9 +32,13 @@
3032
import rx.Observable;
3133
import rx.Observable.OnSubscribe;
3234
import rx.Observer;
35+
import rx.Producer;
3336
import rx.Scheduler;
3437
import rx.Subscriber;
3538
import rx.functions.Action0;
39+
import rx.internal.util.RxRingBuffer;
40+
import rx.observers.TestSubscriber;
41+
import rx.schedulers.Schedulers;
3642
import rx.schedulers.TestScheduler;
3743
import rx.subscriptions.CompositeSubscription;
3844

@@ -157,4 +163,60 @@ public void testAmb3() {
157163
inOrder.verifyNoMoreInteractions();
158164
}
159165

166+
@Test
167+
public void testProducerRequestThroughAmb() {
168+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
169+
ts.requestMore(3);
170+
final AtomicLong requested1 = new AtomicLong();
171+
final AtomicLong requested2 = new AtomicLong();
172+
Observable<Integer> o1 = Observable.create(new OnSubscribe<Integer>() {
173+
174+
@Override
175+
public void call(Subscriber<? super Integer> s) {
176+
s.setProducer(new Producer() {
177+
178+
@Override
179+
public void request(long n) {
180+
System.out.println("1-requested: " + n);
181+
requested1.set(n);
182+
}
183+
184+
});
185+
}
186+
187+
});
188+
Observable<Integer> o2 = Observable.create(new OnSubscribe<Integer>() {
189+
190+
@Override
191+
public void call(Subscriber<? super Integer> s) {
192+
s.setProducer(new Producer() {
193+
194+
@Override
195+
public void request(long n) {
196+
System.out.println("2-requested: " + n);
197+
requested2.set(n);
198+
}
199+
200+
});
201+
}
202+
203+
});
204+
Observable.amb(o1, o2).subscribe(ts);
205+
assertEquals(3, requested1.get());
206+
assertEquals(3, requested2.get());
207+
}
208+
209+
@Test
210+
public void testBackpressure() {
211+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
212+
Observable.range(0, RxRingBuffer.SIZE * 2)
213+
.ambWith(Observable.range(0, RxRingBuffer.SIZE * 2))
214+
.observeOn(Schedulers.computation()) // observeOn has a backpressured RxRingBuffer
215+
.delay(1, TimeUnit.MICROSECONDS) // make it a slightly slow consumer
216+
.subscribe(ts);
217+
218+
ts.awaitTerminalEvent();
219+
ts.assertNoErrors();
220+
assertEquals(RxRingBuffer.SIZE * 2, ts.getOnNextEvents().size());
221+
}
160222
}

0 commit comments

Comments
 (0)