Skip to content

Commit 748b56d

Browse files
authored
1.x: fix groupBy consuming the upstream in an unbounded manner (ReactiveX#5030)
1 parent 56d94b2 commit 748b56d

File tree

2 files changed

+35
-6
lines changed

2 files changed

+35
-6
lines changed

src/main/java/rx/internal/operators/OperatorGroupBy.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ public void onNext(T t) {
202202
return;
203203
}
204204

205-
boolean notNew = true;
205+
boolean newGroup = false;
206206
Object mapKey = key != null ? key : NULL_KEY;
207207
GroupedUnicast<K, V> group = groups.get(mapKey);
208208
if (group == null) {
@@ -214,9 +214,7 @@ public void onNext(T t) {
214214

215215
groupCount.getAndIncrement();
216216

217-
notNew = false;
218-
q.offer(group);
219-
drain();
217+
newGroup = true;
220218
} else {
221219
return;
222220
}
@@ -243,8 +241,9 @@ public void onNext(T t) {
243241
}
244242
}
245243

246-
if (notNew) {
247-
s.request(1);
244+
if (newGroup) {
245+
q.offer(group);
246+
drain();
248247
}
249248
}
250249

src/test/java/rx/internal/operators/OperatorGroupByTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2017,4 +2017,34 @@ public Map<Integer, Object> call(Action1<Integer> t) {
20172017
throw exception;
20182018
}};
20192019
}
2020+
2021+
@Test
2022+
public void outerConsumedInABoundedManner() {
2023+
final int[] counter = { 0 };
2024+
2025+
Observable.range(1, 10000)
2026+
.doOnRequest(new Action1<Long>() {
2027+
@Override
2028+
public void call(Long v) {
2029+
counter[0] += v;
2030+
}
2031+
})
2032+
.groupBy(new Func1<Integer, Integer>() {
2033+
@Override
2034+
public Integer call(Integer v) {
2035+
return 1;
2036+
}
2037+
})
2038+
.flatMap(new Func1<GroupedObservable<Integer, Integer>, Observable<Integer>>() {
2039+
@Override
2040+
public Observable<Integer> call(GroupedObservable<Integer, Integer> v) {
2041+
return v;
2042+
}
2043+
})
2044+
.test(0);
2045+
2046+
int c = counter[0];
2047+
assertTrue("" + c, c > 0);
2048+
assertTrue("" + c, c < 10000);
2049+
}
20202050
}

0 commit comments

Comments
 (0)