Skip to content

Commit 2eda9d8

Browse files
authored
2.x: Fix publish not requesting upon client change (ReactiveX#6364)
* 2.x: Fix publish not requesting upon client change * Add fused test, rename test method
1 parent d7d0a33 commit 2eda9d8

File tree

2 files changed

+139
-1
lines changed

2 files changed

+139
-1
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -557,12 +557,20 @@ void dispatch() {
557557
InnerSubscriber<T>[] freshArray = subscribers.get();
558558
if (subscribersChanged || freshArray != ps) {
559559
ps = freshArray;
560+
561+
// if we did emit at least one element, request more to replenish the queue
562+
if (d != 0) {
563+
if (sourceMode != QueueSubscription.SYNC) {
564+
upstream.get().request(d);
565+
}
566+
}
567+
560568
continue outer;
561569
}
562570
}
563571

564572
// if we did emit at least one element, request more to replenish the queue
565-
if (d > 0) {
573+
if (d != 0) {
566574
if (sourceMode != QueueSubscription.SYNC) {
567575
upstream.get().request(d);
568576
}

src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishTest.java

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1368,4 +1368,134 @@ public String apply(Integer t) throws Exception {
13681368
public void badRequest() {
13691369
TestHelper.assertBadRequestReported(Flowable.range(1, 5).publish());
13701370
}
1371+
1372+
@Test
1373+
@SuppressWarnings("unchecked")
1374+
public void splitCombineSubscriberChangeAfterOnNext() {
1375+
Flowable<Integer> source = Flowable.range(0, 20)
1376+
.doOnSubscribe(new Consumer<Subscription>() {
1377+
@Override
1378+
public void accept(Subscription v) throws Exception {
1379+
System.out.println("Subscribed");
1380+
}
1381+
})
1382+
.publish(10)
1383+
.refCount()
1384+
;
1385+
1386+
Flowable<Integer> evenNumbers = source.filter(new Predicate<Integer>() {
1387+
@Override
1388+
public boolean test(Integer v) throws Exception {
1389+
return v % 2 == 0;
1390+
}
1391+
});
1392+
1393+
Flowable<Integer> oddNumbers = source.filter(new Predicate<Integer>() {
1394+
@Override
1395+
public boolean test(Integer v) throws Exception {
1396+
return v % 2 != 0;
1397+
}
1398+
});
1399+
1400+
final Single<Integer> getNextOdd = oddNumbers.first(0);
1401+
1402+
TestSubscriber<List<Integer>> ts = evenNumbers.concatMap(new Function<Integer, Publisher<List<Integer>>>() {
1403+
@Override
1404+
public Publisher<List<Integer>> apply(Integer v) throws Exception {
1405+
return Single.zip(
1406+
Single.just(v), getNextOdd,
1407+
new BiFunction<Integer, Integer, List<Integer>>() {
1408+
@Override
1409+
public List<Integer> apply(Integer a, Integer b) throws Exception {
1410+
return Arrays.asList( a, b );
1411+
}
1412+
}
1413+
)
1414+
.toFlowable();
1415+
}
1416+
})
1417+
.takeWhile(new Predicate<List<Integer>>() {
1418+
@Override
1419+
public boolean test(List<Integer> v) throws Exception {
1420+
return v.get(0) < 20;
1421+
}
1422+
})
1423+
.test();
1424+
1425+
ts
1426+
.assertResult(
1427+
Arrays.asList(0, 1),
1428+
Arrays.asList(2, 3),
1429+
Arrays.asList(4, 5),
1430+
Arrays.asList(6, 7),
1431+
Arrays.asList(8, 9),
1432+
Arrays.asList(10, 11),
1433+
Arrays.asList(12, 13),
1434+
Arrays.asList(14, 15),
1435+
Arrays.asList(16, 17),
1436+
Arrays.asList(18, 19)
1437+
);
1438+
}
1439+
1440+
@Test
1441+
@SuppressWarnings("unchecked")
1442+
public void splitCombineSubscriberChangeAfterOnNextFused() {
1443+
Flowable<Integer> source = Flowable.range(0, 20)
1444+
.publish(10)
1445+
.refCount()
1446+
;
1447+
1448+
Flowable<Integer> evenNumbers = source.filter(new Predicate<Integer>() {
1449+
@Override
1450+
public boolean test(Integer v) throws Exception {
1451+
return v % 2 == 0;
1452+
}
1453+
});
1454+
1455+
Flowable<Integer> oddNumbers = source.filter(new Predicate<Integer>() {
1456+
@Override
1457+
public boolean test(Integer v) throws Exception {
1458+
return v % 2 != 0;
1459+
}
1460+
});
1461+
1462+
final Single<Integer> getNextOdd = oddNumbers.first(0);
1463+
1464+
TestSubscriber<List<Integer>> ts = evenNumbers.concatMap(new Function<Integer, Publisher<List<Integer>>>() {
1465+
@Override
1466+
public Publisher<List<Integer>> apply(Integer v) throws Exception {
1467+
return Single.zip(
1468+
Single.just(v), getNextOdd,
1469+
new BiFunction<Integer, Integer, List<Integer>>() {
1470+
@Override
1471+
public List<Integer> apply(Integer a, Integer b) throws Exception {
1472+
return Arrays.asList( a, b );
1473+
}
1474+
}
1475+
)
1476+
.toFlowable();
1477+
}
1478+
})
1479+
.takeWhile(new Predicate<List<Integer>>() {
1480+
@Override
1481+
public boolean test(List<Integer> v) throws Exception {
1482+
return v.get(0) < 20;
1483+
}
1484+
})
1485+
.test();
1486+
1487+
ts
1488+
.assertResult(
1489+
Arrays.asList(0, 1),
1490+
Arrays.asList(2, 3),
1491+
Arrays.asList(4, 5),
1492+
Arrays.asList(6, 7),
1493+
Arrays.asList(8, 9),
1494+
Arrays.asList(10, 11),
1495+
Arrays.asList(12, 13),
1496+
Arrays.asList(14, 15),
1497+
Arrays.asList(16, 17),
1498+
Arrays.asList(18, 19)
1499+
);
1500+
}
13711501
}

0 commit comments

Comments
 (0)