Skip to content

Commit 5c26064

Browse files
authored
3.x: Fix Flowable.groupBy cancellation/cleanup/eviction race hangs (ReactiveX#6979)
1 parent eb58c59 commit 5c26064

File tree

2 files changed

+155
-36
lines changed

2 files changed

+155
-36
lines changed

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java

Lines changed: 66 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -259,8 +259,9 @@ private void completeEvictions() {
259259
int count = 0;
260260
GroupedUnicast<K, V> evictedGroup;
261261
while ((evictedGroup = evictedGroups.poll()) != null) {
262-
evictedGroup.onComplete();
263-
count++;
262+
if (evictedGroup.state.tryComplete()) {
263+
count++;
264+
}
264265
}
265266
if (count != 0) {
266267
groupCount.addAndGet(-count);
@@ -383,6 +384,8 @@ static final class State<T, K> extends BasicIntQueueSubscription<T> implements P
383384
static final int ABANDONED = 2;
384385
static final int ABANDONED_HAS_SUBSCRIBER = ABANDONED | HAS_SUBSCRIBER;
385386

387+
final AtomicBoolean evictOnce = new AtomicBoolean();
388+
386389
State(int bufferSize, GroupBySubscriber<?, K, T> parent, K key, boolean delayError) {
387390
this.queue = new SpscLinkedArrayQueue<>(bufferSize);
388391
this.parent = parent;
@@ -444,9 +447,18 @@ public void onComplete() {
444447
drain();
445448
}
446449

450+
boolean tryComplete() {
451+
boolean canEvict = evictOnce.compareAndSet(false, true);
452+
done = true;
453+
drain();
454+
return canEvict;
455+
}
456+
447457
void cancelParent() {
448458
if ((once.get() & ABANDONED) == 0) {
449-
parent.cancel(key);
459+
if (evictOnce.compareAndSet(false, true)) {
460+
parent.cancel(key);
461+
}
450462
}
451463
}
452464

@@ -518,37 +530,44 @@ void drainNormal() {
518530
final SpscLinkedArrayQueue<T> q = queue;
519531
final boolean delayError = this.delayError;
520532
Subscriber<? super T> a = actual.get();
533+
final AtomicBoolean cancelled = this.cancelled;
534+
535+
outer:
521536
for (;;) {
522-
if (a != null) {
523-
long r = requested.get();
524-
long e = 0;
537+
if (cancelled.get()) {
538+
cleanupQueue(0, false);
539+
} else {
540+
if (a != null) {
541+
long r = requested.get();
542+
long e = 0;
525543

526-
while (e != r) {
527-
boolean d = done;
528-
T v = q.poll();
529-
boolean empty = v == null;
544+
while (e != r) {
545+
boolean d = done;
546+
T v = q.poll();
547+
boolean empty = v == null;
530548

531-
if (checkTerminated(d, empty, a, delayError, e)) {
532-
return;
533-
}
549+
if (checkTerminated(d, empty, a, delayError, e, !empty)) {
550+
continue outer;
551+
}
534552

535-
if (empty) {
536-
break;
537-
}
553+
if (empty) {
554+
break;
555+
}
538556

539-
a.onNext(v);
557+
a.onNext(v);
540558

541-
e++;
542-
}
559+
e++;
560+
}
543561

544-
if (e == r && checkTerminated(done, q.isEmpty(), a, delayError, e)) {
545-
return;
546-
}
562+
if (e == r && checkTerminated(done, q.isEmpty(), a, delayError, e, false)) {
563+
continue outer;
564+
}
547565

548-
if (e != 0L) {
549-
BackpressureHelper.produced(requested, e);
550-
// replenish based on this batch run
551-
requestParent(e);
566+
if (e != 0L) {
567+
BackpressureHelper.produced(requested, e);
568+
// replenish based on this batch run
569+
requestParent(e);
570+
}
552571
}
553572
}
554573

@@ -568,23 +587,32 @@ void requestParent(long e) {
568587
}
569588
}
570589

571-
boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a, boolean delayError, long emitted) {
590+
void cleanupQueue(long emitted, boolean polled) {
591+
// if this group is canceled, all accumulated emissions and
592+
// remaining items in the queue should be requested
593+
// so that other groups can proceed
594+
while (queue.poll() != null) {
595+
emitted++;
596+
}
597+
if (polled) {
598+
emitted++;
599+
}
600+
if (emitted != 0L) {
601+
requestParent(emitted);
602+
}
603+
}
604+
605+
boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a,
606+
boolean delayError, long emitted, boolean polled) {
572607
if (cancelled.get()) {
573-
// if this group is canceled, all accumulated emissions and
574-
// remaining items in the queue should be requested
575-
// so that other groups can proceed
576-
while (queue.poll() != null) {
577-
emitted++;
578-
}
579-
if (emitted != 0L) {
580-
requestParent(emitted);
581-
}
608+
cleanupQueue(emitted, polled);
582609
return true;
583610
}
584611

585612
if (d) {
586613
if (delayError) {
587614
if (empty) {
615+
cancelled.lazySet(true);
588616
Throwable e = error;
589617
if (e != null) {
590618
a.onError(e);
@@ -597,10 +625,12 @@ boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a, boole
597625
Throwable e = error;
598626
if (e != null) {
599627
queue.clear();
628+
cancelled.lazySet(true);
600629
a.onError(e);
601630
return true;
602631
} else
603632
if (empty) {
633+
cancelled.lazySet(true);
604634
a.onComplete();
605635
return true;
606636
}

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2604,4 +2604,93 @@ static <T> Function<Consumer<Object>, Map<T, Object>> sizeCap(int maxCapacity, b
26042604
})
26052605
.<T, Object>build().asMap();
26062606
}
2607+
2608+
static void issue6974RunPart2(int groupByBufferSize, int flatMapMaxConcurrency, int groups,
2609+
boolean notifyOnExplicitEviction) {
2610+
TestSubscriber<Integer> ts = Flowable
2611+
.range(1, 500_000)
2612+
.map(i -> i % groups)
2613+
.groupBy(i -> i, i -> i, false, groupByBufferSize,
2614+
// set cap too high
2615+
sizeCap(groups * 100, notifyOnExplicitEviction))
2616+
.flatMap(gf -> gf
2617+
.take(10, TimeUnit.MILLISECONDS)
2618+
, flatMapMaxConcurrency)
2619+
.test();
2620+
2621+
ts
2622+
.awaitDone(5, TimeUnit.SECONDS)
2623+
.assertNoErrors()
2624+
.assertComplete();
2625+
}
2626+
2627+
@Test
2628+
public void issue6974Part2Case1() {
2629+
final int groups = 20;
2630+
2631+
// Not completed (Timed out), buffer is too small
2632+
int groupByBufferSize = groups * 2;
2633+
int flatMapMaxConcurrency = 2 * groups;
2634+
boolean notifyOnExplicitEviction = false;
2635+
issue6974RunPart2(groupByBufferSize, flatMapMaxConcurrency, groups, notifyOnExplicitEviction);
2636+
}
2637+
2638+
@Test
2639+
public void issue6974Part2Case2() {
2640+
final int groups = 20;
2641+
2642+
// Timeout... explicit eviction notification makes difference
2643+
int groupByBufferSize = groups * 30;
2644+
int flatMapMaxConcurrency = 2 * groups;
2645+
boolean notifyOnExplicitEviction = true;
2646+
issue6974RunPart2(groupByBufferSize, flatMapMaxConcurrency, groups, notifyOnExplicitEviction);
2647+
}
2648+
2649+
/*
2650+
* Disabled: Takes very long. Run it locally only.
2651+
@Test
2652+
public void issue6974Part2Case2Loop() {
2653+
for (int i = 0; i < 1000; i++) {
2654+
issue6974Part2Case2();
2655+
}
2656+
}
2657+
*/
2658+
2659+
static void issue6974RunPart2NoEvict(int groupByBufferSize, int flatMapMaxConcurrency, int groups,
2660+
boolean notifyOnExplicitEviction) {
2661+
TestSubscriber<Integer> ts = Flowable
2662+
.range(1, 500_000)
2663+
.map(i -> i % groups)
2664+
.groupBy(i -> i)
2665+
.flatMap(gf -> gf
2666+
.take(10, TimeUnit.MILLISECONDS)
2667+
, flatMapMaxConcurrency)
2668+
.test();
2669+
2670+
ts
2671+
.awaitDone(5, TimeUnit.SECONDS)
2672+
.assertNoErrors()
2673+
.assertComplete();
2674+
}
2675+
2676+
@Test
2677+
public void issue6974Part2Case1NoEvict() {
2678+
final int groups = 20;
2679+
2680+
// Not completed (Timed out), buffer is too small
2681+
int groupByBufferSize = groups * 2;
2682+
int flatMapMaxConcurrency = 2 * groups;
2683+
boolean notifyOnExplicitEviction = false;
2684+
issue6974RunPart2NoEvict(groupByBufferSize, flatMapMaxConcurrency, groups, notifyOnExplicitEviction);
2685+
}
2686+
2687+
/*
2688+
* Disabled: Takes very long. Run it locally only.
2689+
@Test
2690+
public void issue6974Part2Case1NoEvictLoop() {
2691+
for (int i = 0; i < 1000; i++) {
2692+
issue6974Part2Case1NoEvict();
2693+
}
2694+
}
2695+
*/
26072696
}

0 commit comments

Comments
 (0)