Skip to content

Commit eb58c59

Browse files
authored
3.x: Fix Flowable.groupBy eviction logic double decrement and hang (ReactiveX#6975)
1 parent 77c2ef1 commit eb58c59

File tree

2 files changed

+50
-3
lines changed

2 files changed

+50
-3
lines changed

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -270,9 +270,10 @@ private void completeEvictions() {
270270

271271
public void cancel(K key) {
272272
Object mapKey = key != null ? key : NULL_KEY;
273-
groups.remove(mapKey);
274-
if (groupCount.decrementAndGet() == 0) {
275-
upstream.cancel();
273+
if (groups.remove(mapKey) != null) {
274+
if (groupCount.decrementAndGet() == 0) {
275+
upstream.cancel();
276+
}
276277
}
277278
}
278279

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2558,4 +2558,50 @@ public void subscribeAbandonRace() throws Throwable {
25582558
ts.assertValueCount(1);
25592559
}
25602560
}
2561+
2562+
@Test
2563+
public void issue6974() {
2564+
2565+
FlowableTransformer<Integer, Integer> operation =
2566+
source -> source.publish(shared ->
2567+
shared
2568+
.firstElement()
2569+
.flatMapPublisher(firstElement ->
2570+
Flowable.just(firstElement).concatWith(shared)
2571+
)
2572+
);
2573+
2574+
issue6974Run(20, 500_000, 20 - 1, 20 * 2, operation, false);
2575+
2576+
issue6974Run(20, 500_000, 20, 20 * 2, operation, false);
2577+
}
2578+
2579+
static void issue6974Run(int groups, int iterations, int sizeCap, int flatMapConcurrency,
2580+
FlowableTransformer<Integer, Integer> operation, boolean notifyOnExplicitRevoke) {
2581+
TestSubscriber<Integer> test = Flowable
2582+
.range(1, groups)
2583+
.repeat(iterations / groups)
2584+
.groupBy(i -> i, i -> i, false, 128, sizeCap(sizeCap, notifyOnExplicitRevoke))
2585+
.flatMap(gf -> gf.compose(operation), flatMapConcurrency)
2586+
.test();
2587+
test.awaitDone(5, TimeUnit.SECONDS);
2588+
test.assertValueCount(iterations);
2589+
}
2590+
2591+
static <T> Function<Consumer<Object>, Map<T, Object>> sizeCap(int maxCapacity, boolean notifyOnExplicit) {
2592+
return itemEvictConsumer ->
2593+
CacheBuilder
2594+
.newBuilder()
2595+
.maximumSize(maxCapacity)
2596+
.removalListener(notification -> {
2597+
if (notification.getCause() != RemovalCause.EXPLICIT || notifyOnExplicit) {
2598+
try {
2599+
itemEvictConsumer.accept(notification.getValue());
2600+
} catch (Throwable throwable) {
2601+
throw new RuntimeException(throwable);
2602+
}
2603+
}
2604+
})
2605+
.<T, Object>build().asMap();
2606+
}
25612607
}

0 commit comments

Comments
 (0)