Skip to content

Commit 53ff799

Browse files
committed
Fix for take() reentrancy bug.
1 parent c58a785 commit 53ff799

File tree

2 files changed

+23
-2
lines changed

2 files changed

+23
-2
lines changed

src/main/java/rx/internal/operators/OperatorTake.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ public void onError(Throwable e) {
6868

6969
@Override
7070
public void onNext(T i) {
71-
if (!isUnsubscribed()) {
72-
boolean stop = ++count >= limit;
71+
if (!isUnsubscribed() && count++ < limit) {
72+
boolean stop = count == limit;
7373
child.onNext(i);
7474
if (stop && !completed) {
7575
completed = true;

src/test/java/rx/internal/operators/OperatorTakeTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import rx.functions.*;
3333
import rx.observers.*;
3434
import rx.schedulers.Schedulers;
35+
import rx.subjects.PublishSubject;
3536

3637
public class OperatorTakeTest {
3738

@@ -417,4 +418,24 @@ public void onNext(Integer t) {
417418
ts.assertError(TestException.class);
418419
ts.assertNotCompleted();
419420
}
421+
422+
@Test
423+
public void testReentrantTake() {
424+
final PublishSubject<Integer> source = PublishSubject.create();
425+
426+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
427+
428+
source.take(1).doOnNext(new Action1<Integer>() {
429+
@Override
430+
public void call(Integer v) {
431+
source.onNext(2);
432+
}
433+
}).subscribe(ts);
434+
435+
source.onNext(1);
436+
437+
ts.assertValue(1);
438+
ts.assertNoErrors();
439+
ts.assertCompleted();
440+
}
420441
}

0 commit comments

Comments
 (0)