Skip to content

Commit c880788

Browse files
Propagate request(n) to each AmbSubscriber
... and then keep propagating to the winner.
1 parent 22a56d0 commit c880788

File tree

2 files changed

+99
-48
lines changed

2 files changed

+99
-48
lines changed

rxjava-core/src/main/java/rx/internal/operators/OnSubscribeAmb.java

Lines changed: 81 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,17 @@
1616
package rx.internal.operators;
1717

1818
import java.util.ArrayList;
19+
import java.util.Collection;
1920
import java.util.List;
20-
import java.util.concurrent.atomic.AtomicInteger;
21+
import java.util.concurrent.ConcurrentLinkedQueue;
22+
import java.util.concurrent.atomic.AtomicReference;
2123

2224
import rx.Observable;
2325
import rx.Observable.OnSubscribe;
2426
import rx.Producer;
2527
import rx.Subscriber;
28+
import rx.functions.Action0;
29+
import rx.subscriptions.Subscriptions;
2630

2731
/**
2832
* Given multiple {@link Observable}s, propagates the one that first emits an item.
@@ -263,26 +267,23 @@ public static <T> OnSubscribe<T> amb(final Iterable<? extends Observable<? exten
263267

264268
private static final class AmbSubscriber<T> extends Subscriber<T> {
265269

266-
private static final int NONE = -1;
267-
268270
private final Subscriber<? super T> subscriber;
269-
private final int index;
270-
private final AtomicInteger choice;
271+
private final Selection<T> selection;
271272

272-
private AmbSubscriber(Subscriber<? super T> subscriber, int index, AtomicInteger choice) {
273+
private AmbSubscriber(long requested, Subscriber<? super T> subscriber, Selection<T> selection) {
273274
this.subscriber = subscriber;
274-
this.choice = choice;
275-
this.index = index;
275+
this.selection = selection;
276+
// initial request
277+
request(requested);
276278
}
277279

278-
public void requestMore(long n) {
280+
private final void requestMore(long n) {
279281
request(n);
280282
}
281283

282284
@Override
283285
public void onNext(T args) {
284286
if (!isSelected()) {
285-
unsubscribe();
286287
return;
287288
}
288289
subscriber.onNext(args);
@@ -291,7 +292,6 @@ public void onNext(T args) {
291292
@Override
292293
public void onCompleted() {
293294
if (!isSelected()) {
294-
unsubscribe();
295295
return;
296296
}
297297
subscriber.onCompleted();
@@ -300,66 +300,99 @@ public void onCompleted() {
300300
@Override
301301
public void onError(Throwable e) {
302302
if (!isSelected()) {
303-
unsubscribe();
304303
return;
305304
}
306305
subscriber.onError(e);
307306
}
308307

309308
private boolean isSelected() {
310-
int ch = choice.get();
311-
if (ch == NONE) {
312-
return choice.compareAndSet(NONE, index);
309+
if (selection.choice.get() == this) {
310+
// fast-path
311+
return true;
312+
} else {
313+
if (selection.choice.compareAndSet(null, this)) {
314+
selection.unsubscribeOthers(this);
315+
return true;
316+
} else {
317+
// we lost so unsubscribe ... and force cleanup again due to possible race conditions
318+
selection.unsubscribeLosers();
319+
return false;
320+
}
321+
}
322+
}
323+
}
324+
325+
private static class Selection<T> {
326+
final AtomicReference<AmbSubscriber<T>> choice = new AtomicReference<AmbSubscriber<T>>();
327+
final Collection<AmbSubscriber<T>> ambSubscribers = new ConcurrentLinkedQueue<AmbSubscriber<T>>();
328+
329+
public void unsubscribeLosers() {
330+
AmbSubscriber<T> winner = choice.get();
331+
if(winner != null) {
332+
unsubscribeOthers(winner);
333+
}
334+
}
335+
336+
public void unsubscribeOthers(AmbSubscriber<T> notThis) {
337+
for (AmbSubscriber<T> other : ambSubscribers) {
338+
if (other != notThis) {
339+
other.unsubscribe();
340+
}
313341
}
314-
return ch == index;
342+
ambSubscribers.clear();
315343
}
344+
316345
}
317346

318347
private final Iterable<? extends Observable<? extends T>> sources;
348+
private final Selection<T> selection = new Selection<T>();
319349

320350
private OnSubscribeAmb(Iterable<? extends Observable<? extends T>> sources) {
321351
this.sources = sources;
322352
}
323353

324354
@Override
325-
public void call(Subscriber<? super T> subscriber) {
326-
final AtomicInteger choice = new AtomicInteger(AmbSubscriber.NONE);
327-
final List<AmbSubscriber<T>> ambSubscribers = new ArrayList<AmbSubscriber<T>>();
328-
int index = 0;
329-
for (Observable<? extends T> source : sources) {
330-
if (subscriber.isUnsubscribed()) {
331-
break;
332-
}
333-
if (choice.get() != AmbSubscriber.NONE) {
334-
// Already choose someone, the rest Observables can be skipped.
335-
break;
355+
public void call(final Subscriber<? super T> subscriber) {
356+
subscriber.add(Subscriptions.create(new Action0() {
357+
358+
@Override
359+
public void call() {
360+
if (selection.choice.get() != null) {
361+
// there is a single winner so we unsubscribe it
362+
selection.choice.get().unsubscribe();
363+
}
364+
// if we are racing with others still existing, we'll also unsubscribe them
365+
if(!selection.ambSubscribers.isEmpty()) {
366+
for (AmbSubscriber<T> other : selection.ambSubscribers) {
367+
other.unsubscribe();
368+
}
369+
selection.ambSubscribers.clear();
370+
}
336371
}
337-
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(subscriber, index, choice);
338-
ambSubscribers.add(ambSubscriber);
339-
subscriber.add(ambSubscriber);
340-
source.unsafeSubscribe(ambSubscriber);
341-
index++;
342-
}
343-
// setProducer at the end so that `ambSubscribers` can be finalized before `subscriber` calls `request`
372+
373+
}));
344374
subscriber.setProducer(new Producer() {
345375

346-
private volatile AmbSubscriber<T> selectedAmbSubscriber;
347-
348376
@Override
349377
public void request(long n) {
350-
if (choice.get() == AmbSubscriber.NONE) {
351-
for (AmbSubscriber<T> ambSubscriber : ambSubscribers) {
352-
// Once one Observable emits a message, `unsubscribe` of other Observables will be called
353-
// and further messages will be dropped. Therefore, requesting all sources won't cause
354-
// the backpressure issue.
355-
ambSubscriber.requestMore(n);
356-
}
357-
}
358-
else {
359-
if (selectedAmbSubscriber == null) {
360-
selectedAmbSubscriber = ambSubscribers.get(choice.get());
378+
if (selection.choice.get() != null) {
379+
// propagate the request to that single Subscriber that won
380+
selection.choice.get().requestMore(n);
381+
} else {
382+
for (Observable<? extends T> source : sources) {
383+
if (subscriber.isUnsubscribed()) {
384+
break;
385+
}
386+
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(n, subscriber, selection);
387+
selection.ambSubscribers.add(ambSubscriber);
388+
// possible race condition in previous lines ... a choice may have been made so double check (instead of synchronizing)
389+
if (selection.choice.get() != null) {
390+
// Already chose one, the rest can be skipped and we can clean up
391+
selection.unsubscribeOthers(selection.choice.get());
392+
break;
393+
}
394+
source.unsafeSubscribe(ambSubscriber);
361395
}
362-
selectedAmbSubscriber.requestMore(n);
363396
}
364397
}
365398
});

rxjava-core/src/test/java/rx/internal/operators/OnSubscribeAmbTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@
3636
import rx.Scheduler;
3737
import rx.Subscriber;
3838
import rx.functions.Action0;
39+
import rx.internal.util.RxRingBuffer;
3940
import rx.observers.TestSubscriber;
41+
import rx.schedulers.Schedulers;
4042
import rx.schedulers.TestScheduler;
4143
import rx.subscriptions.CompositeSubscription;
4244

@@ -175,6 +177,7 @@ public void call(Subscriber<? super Integer> s) {
175177

176178
@Override
177179
public void request(long n) {
180+
System.out.println("1-requested: " + n);
178181
requested1.set(n);
179182
}
180183

@@ -190,6 +193,7 @@ public void call(Subscriber<? super Integer> s) {
190193

191194
@Override
192195
public void request(long n) {
196+
System.out.println("2-requested: " + n);
193197
requested2.set(n);
194198
}
195199

@@ -201,4 +205,18 @@ public void request(long n) {
201205
assertEquals(3, requested1.get());
202206
assertEquals(3, requested2.get());
203207
}
208+
209+
@Test
210+
public void testBackpressure() {
211+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
212+
Observable.range(0, RxRingBuffer.SIZE * 2)
213+
.ambWith(Observable.range(0, RxRingBuffer.SIZE * 2))
214+
.observeOn(Schedulers.computation()) // observeOn has a backpressured RxRingBuffer
215+
.delay(1, TimeUnit.MICROSECONDS) // make it a slightly slow consumer
216+
.subscribe(ts);
217+
218+
ts.awaitTerminalEvent();
219+
ts.assertNoErrors();
220+
assertEquals(RxRingBuffer.SIZE * 2, ts.getOnNextEvents().size());
221+
}
204222
}

0 commit comments

Comments
 (0)