Skip to content

Commit 2a0427b

Browse files
authored
1.x: fix timed replay() not terminating when all items timeout (ReactiveX#5141)
1 parent ccc2b2c commit 2a0427b

File tree

3 files changed

+46
-3
lines changed

3 files changed

+46
-3
lines changed

src/main/java/rx/internal/operators/OperatorReplay.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1239,9 +1239,18 @@ Node getInitialHead() {
12391239
Node prev = get();
12401240

12411241
Node next = prev.get();
1242-
while (next != null && ((Timestamped<?>)next.value).getTimestampMillis() <= timeLimit) {
1243-
prev = next;
1244-
next = next.get();
1242+
while (next != null) {
1243+
Object o = next.value;
1244+
Object v = leaveTransform(o);
1245+
if (NotificationLite.isCompleted(v) || NotificationLite.isError(v)) {
1246+
break;
1247+
}
1248+
if (((Timestamped<?>)o).getTimestampMillis() <= timeLimit) {
1249+
prev = next;
1250+
next = next.get();
1251+
} else {
1252+
break;
1253+
}
12451254
}
12461255

12471256
return prev;

src/test/java/rx/internal/operators/OperatorReplayTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1577,4 +1577,20 @@ public ConnectableObservable<Integer> call(Observable<Integer> o) {
15771577
});
15781578
}
15791579

1580+
@Test
1581+
public void noOldEntries() {
1582+
TestScheduler scheduler = new TestScheduler();
1583+
1584+
Observable<Integer> source = Observable.just(1)
1585+
.replay(2, TimeUnit.SECONDS, scheduler)
1586+
.autoConnect();
1587+
1588+
source.test().assertResult(1);
1589+
1590+
source.test().assertResult(1);
1591+
1592+
scheduler.advanceTimeBy(3, TimeUnit.SECONDS);
1593+
1594+
source.test().assertResult();
1595+
}
15801596
}

src/test/java/rx/subjects/ReplaySubjectTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1175,4 +1175,22 @@ public Boolean call(Integer v) {
11751175
ts2.assertValues(1, 2, 3, 6, 7);
11761176
}
11771177

1178+
@Test
1179+
public void noOldEntries() {
1180+
TestScheduler scheduler = new TestScheduler();
1181+
1182+
ReplaySubject<Integer> source = ReplaySubject.createWithTime(2, TimeUnit.SECONDS, scheduler);
1183+
1184+
source.onNext(1);
1185+
source.onCompleted();
1186+
1187+
source.test().assertResult(1);
1188+
1189+
source.test().assertResult(1);
1190+
1191+
scheduler.advanceTimeBy(3, TimeUnit.SECONDS);
1192+
1193+
source.test().assertResult();
1194+
}
1195+
11781196
}

0 commit comments

Comments
 (0)