Skip to content

Commit 491092d

Browse files
committed
Merge pull request ReactiveX#2894 from akarnokd/ConcatOverflowFix
Concat: fixed reentrancy problem in completeInner
2 parents 5044a0b + 8d5d179 commit 491092d

File tree

2 files changed

+58
-10
lines changed

2 files changed

+58
-10
lines changed

src/main/java/rx/internal/operators/OperatorConcat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,11 +158,11 @@ public void onCompleted() {
158158
}
159159

160160
void completeInner() {
161-
request(1);
162161
currentSubscriber = null;
163162
if (WIP_UPDATER.decrementAndGet(this) > 0) {
164163
subscribeNext();
165164
}
165+
request(1);
166166
}
167167

168168
void subscribeNext() {

src/test/java/rx/internal/operators/OperatorConcatTest.java

Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,22 +28,20 @@
2828
import java.util.ArrayList;
2929
import java.util.Arrays;
3030
import java.util.List;
31-
import java.util.concurrent.CountDownLatch;
32-
import java.util.concurrent.TimeUnit;
33-
import java.util.concurrent.atomic.AtomicReference;
31+
import java.util.concurrent.*;
32+
import java.util.concurrent.atomic.*;
3433

3534
import org.junit.Test;
3635
import org.mockito.InOrder;
3736

38-
import rx.Observable;
3937
import rx.Observable.OnSubscribe;
40-
import rx.Observer;
41-
import rx.Subscriber;
42-
import rx.Subscription;
38+
import rx.*;
39+
import rx.functions.Func1;
4340
import rx.internal.util.RxRingBuffer;
4441
import rx.observers.TestSubscriber;
4542
import rx.schedulers.Schedulers;
4643
import rx.schedulers.TestScheduler;
44+
import rx.subjects.Subject;
4745
import rx.subscriptions.BooleanSubscription;
4846

4947
public class OperatorConcatTest {
@@ -485,11 +483,11 @@ public boolean isUnsubscribed() {
485483
private final T seed;
486484
private final int size;
487485

488-
public TestObservable(@SuppressWarnings("unchecked") T... values) {
486+
public TestObservable(T... values) {
489487
this(null, null, values);
490488
}
491489

492-
public TestObservable(CountDownLatch once, CountDownLatch okToContinue, @SuppressWarnings("unchecked") T... values) {
490+
public TestObservable(CountDownLatch once, CountDownLatch okToContinue, T... values) {
493491
this.values = Arrays.asList(values);
494492
this.size = this.values.size();
495493
this.once = once;
@@ -718,4 +716,54 @@ public void call(Subscriber<? super String> s) {
718716
ts.assertReceivedOnNext(Arrays.asList("hello", "hello"));
719717
}
720718

719+
@Test(timeout = 10000)
720+
public void testIssue2890NoStackoverflow() throws InterruptedException {
721+
final ExecutorService executor = Executors.newFixedThreadPool(2);
722+
final Scheduler sch = Schedulers.from(executor);
723+
724+
Func1<Integer, Observable<Integer>> func = new Func1<Integer, Observable<Integer>>() {
725+
@Override
726+
public Observable<Integer> call(Integer t) {
727+
Observable<Integer> observable = Observable.just(t)
728+
.subscribeOn(sch)
729+
;
730+
Subject<Integer, Integer> subject = BufferUntilSubscriber.create();
731+
observable.subscribe(subject);
732+
return subject;
733+
}
734+
};
735+
736+
int n = 5000;
737+
final AtomicInteger counter = new AtomicInteger();
738+
739+
Observable.range(1, n).concatMap(func).subscribe(new Subscriber<Integer>() {
740+
@Override
741+
public void onNext(Integer t) {
742+
// Consume after sleep for 1 ms
743+
try {
744+
Thread.sleep(1);
745+
} catch (InterruptedException e) {
746+
// ignored
747+
}
748+
if (counter.getAndIncrement() % 100 == 0) {
749+
System.out.print("testIssue2890NoStackoverflow -> ");
750+
System.out.println(counter.get());
751+
};
752+
}
753+
754+
@Override
755+
public void onCompleted() {
756+
executor.shutdown();
757+
}
758+
759+
@Override
760+
public void onError(Throwable e) {
761+
executor.shutdown();
762+
}
763+
});
764+
765+
executor.awaitTermination(12000, TimeUnit.MILLISECONDS);
766+
767+
assertEquals(n, counter.get());
768+
}
721769
}

0 commit comments

Comments
 (0)