Skip to content

Commit c573497

Browse files
Merge pull request ReactiveX#973 from benjchristensen/merge-handle-bad-observable
Merge with Serialize - Handle Bad Observables
2 parents 5ca609b + 352e7fa commit c573497

File tree

2 files changed

+127
-6
lines changed

2 files changed

+127
-6
lines changed

rxjava-core/src/main/java/rx/operators/OperatorMerge.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,22 @@ public void onNext(Observable<? extends T> innerObservable) {
6868

6969
final class InnerObserver extends Subscriber<T> {
7070

71+
private boolean innerCompleted = false;
72+
7173
public InnerObserver() {
7274
}
7375

7476
@Override
7577
public void onCompleted() {
76-
if (runningCount.decrementAndGet() == 0 && completed) {
77-
o.onCompleted();
78+
if (!innerCompleted) {
79+
// we check if already completed otherwise a misbehaving Observable that emits onComplete more than once
80+
// will cause the runningCount to decrement multiple times.
81+
innerCompleted = true;
82+
if (runningCount.decrementAndGet() == 0 && completed) {
83+
o.onCompleted();
84+
}
85+
cleanup();
7886
}
79-
cleanup();
8087
}
8188

8289
@Override

rxjava-core/src/test/java/rx/operators/OperatorMergeTest.java

Lines changed: 117 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,14 @@
1515
*/
1616
package rx.operators;
1717

18-
import static org.junit.Assert.*;
19-
import static org.mockito.Matchers.*;
20-
import static org.mockito.Mockito.*;
18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertFalse;
20+
import static org.junit.Assert.assertTrue;
21+
import static org.junit.Assert.fail;
22+
import static org.mockito.Matchers.any;
23+
import static org.mockito.Mockito.never;
24+
import static org.mockito.Mockito.times;
25+
import static org.mockito.Mockito.verify;
2126

2227
import java.util.ArrayList;
2328
import java.util.Arrays;
@@ -37,6 +42,7 @@
3742
import rx.Observable.OnSubscribe;
3843
import rx.Observer;
3944
import rx.Scheduler;
45+
import rx.Scheduler.Inner;
4046
import rx.Subscriber;
4147
import rx.Subscription;
4248
import rx.functions.Action0;
@@ -472,4 +478,112 @@ public void call() {
472478
});
473479
}
474480

481+
@Test
482+
public void testConcurrency() {
483+
484+
Observable<Integer> o = Observable.create(new OnSubscribe<Integer>() {
485+
486+
@Override
487+
public void call(final Subscriber<? super Integer> s) {
488+
Schedulers.newThread().schedule(new Action1<Inner>() {
489+
490+
@Override
491+
public void call(Inner inner) {
492+
for (int i = 0; i < 10000; i++) {
493+
s.onNext(1);
494+
}
495+
s.onCompleted();
496+
}
497+
498+
});
499+
}
500+
});
501+
502+
for (int i = 0; i < 10; i++) {
503+
Observable<Integer> merge = Observable.merge(o, o, o);
504+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
505+
merge.subscribe(ts);
506+
507+
ts.awaitTerminalEvent();
508+
assertEquals(1, ts.getOnCompletedEvents().size());
509+
assertEquals(30000, ts.getOnNextEvents().size());
510+
List<Integer> onNextEvents = ts.getOnNextEvents();
511+
// System.out.println("onNext: " + onNextEvents.size() + " onCompleted: " + ts.getOnCompletedEvents().size());
512+
}
513+
}
514+
515+
@Test
516+
public void testConcurrencyWithSleeping() {
517+
518+
Observable<Integer> o = Observable.create(new OnSubscribe<Integer>() {
519+
520+
@Override
521+
public void call(final Subscriber<? super Integer> s) {
522+
Schedulers.newThread().schedule(new Action1<Inner>() {
523+
524+
@Override
525+
public void call(Inner inner) {
526+
for (int i = 0; i < 100; i++) {
527+
s.onNext(1);
528+
try {
529+
Thread.sleep(1);
530+
} catch (InterruptedException e) {
531+
e.printStackTrace();
532+
}
533+
}
534+
s.onCompleted();
535+
}
536+
537+
});
538+
}
539+
});
540+
541+
for (int i = 0; i < 10; i++) {
542+
Observable<Integer> merge = Observable.merge(o, o, o);
543+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
544+
merge.subscribe(ts);
545+
546+
ts.awaitTerminalEvent();
547+
assertEquals(1, ts.getOnCompletedEvents().size());
548+
assertEquals(300, ts.getOnNextEvents().size());
549+
List<Integer> onNextEvents = ts.getOnNextEvents();
550+
// System.out.println("onNext: " + onNextEvents.size() + " onCompleted: " + ts.getOnCompletedEvents().size());
551+
}
552+
}
553+
554+
@Test
555+
public void testConcurrencyWithBrokenOnCompleteContract() {
556+
557+
Observable<Integer> o = Observable.create(new OnSubscribe<Integer>() {
558+
559+
@Override
560+
public void call(final Subscriber<? super Integer> s) {
561+
Schedulers.newThread().schedule(new Action1<Inner>() {
562+
563+
@Override
564+
public void call(Inner inner) {
565+
for (int i = 0; i < 10000; i++) {
566+
s.onNext(1);
567+
}
568+
s.onCompleted();
569+
s.onCompleted();
570+
s.onCompleted();
571+
}
572+
573+
});
574+
}
575+
});
576+
577+
for (int i = 0; i < 100; i++) {
578+
Observable<Integer> merge = Observable.merge(o, o, o);
579+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
580+
merge.subscribe(ts);
581+
582+
ts.awaitTerminalEvent();
583+
assertEquals(1, ts.getOnCompletedEvents().size());
584+
assertEquals(30000, ts.getOnNextEvents().size());
585+
List<Integer> onNextEvents = ts.getOnNextEvents();
586+
// System.out.println("onNext: " + onNextEvents.size() + " onCompleted: " + ts.getOnCompletedEvents().size());
587+
}
588+
}
475589
}

0 commit comments

Comments
 (0)