Skip to content

Commit d6345dc

Browse files
davidmotenakarnokd
authored andcommitted
2.x: groupBy add overload with evicting map factory (ReactiveX#5860)
1 parent 0f73283 commit d6345dc

File tree

4 files changed

+416
-7
lines changed

4 files changed

+416
-7
lines changed

build.gradle

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ apply plugin: "me.champeau.gradle.jmh"
4343
apply plugin: "com.github.hierynomus.license"
4444
apply plugin: "com.jfrog.bintray"
4545
apply plugin: "com.jfrog.artifactory"
46+
apply plugin: "eclipse"
4647

4748
sourceCompatibility = JavaVersion.VERSION_1_6
4849
targetCompatibility = JavaVersion.VERSION_1_6
@@ -55,7 +56,7 @@ def reactiveStreamsVersion = "1.0.2"
5556
def mockitoVersion = "2.1.0"
5657
def jmhLibVersion = "1.19"
5758
def testNgVersion = "6.11"
58-
59+
def guavaVersion = "24.0-jre"
5960
// --------------------------------------
6061

6162
repositories {
@@ -73,6 +74,7 @@ dependencies {
7374

7475
testImplementation "org.reactivestreams:reactive-streams-tck:$reactiveStreamsVersion"
7576
testImplementation "org.testng:testng:$testNgVersion"
77+
testImplementation "com.google.guava:guava:$guavaVersion"
7678
}
7779

7880
javadoc {

src/main/java/io/reactivex/Flowable.java

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9708,9 +9708,116 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
97089708
ObjectHelper.requireNonNull(valueSelector, "valueSelector is null");
97099709
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
97109710

9711-
return RxJavaPlugins.onAssembly(new FlowableGroupBy<T, K, V>(this, keySelector, valueSelector, bufferSize, delayError));
9711+
return RxJavaPlugins.onAssembly(new FlowableGroupBy<T, K, V>(this, keySelector, valueSelector, bufferSize, delayError, null));
97129712
}
97139713

9714+
/**
9715+
* Groups the items emitted by a {@code Publisher} according to a specified criterion, and emits these
9716+
* grouped items as {@link GroupedFlowable}s. The emitted {@code GroupedFlowable} allows only a single
9717+
* {@link Subscriber} during its lifetime and if this {@code Subscriber} cancels before the
9718+
* source terminates, the next emission by the source having the same key will trigger a new
9719+
* {@code GroupedPublisher} emission. The {@code evictingMapFactory} is used to create a map that will
9720+
* be used to hold the {@link GroupedFlowable}s by key. The evicting map created by this factory must
9721+
* notify the provided {@code Consumer<Object>} with the entry value (not the key!) when an entry in this
9722+
* map has been evicted. The next source emission will bring about the completion of the evicted
9723+
* {@link GroupedFlowable}s and the arrival of an item with the same key as a completed {@link GroupedFlowable}
9724+
* will prompt the creation and emission of a new {@link GroupedFlowable} with that key.
9725+
*
9726+
* <p>A use case for specifying an {@code evictingMapFactory} is where the source is infinite and fast and
9727+
* over time the number of keys grows enough to be a concern in terms of the memory footprint of the
9728+
* internal hash map containing the {@link GroupedFlowable}s.
9729+
*
9730+
* <p>The map created by an {@code evictingMapFactory} must be thread-safe.
9731+
*
9732+
* <p>An example of an {@code evictingMapFactory} using <a href="https://google.github.io/guava/releases/24.0-jre/api/docs/com/google/common/cache/CacheBuilder.html">CacheBuilder</a> from the Guava library is below:
9733+
*
9734+
* <pre>
9735+
* Function&lt;Consumer&lt;Object&gt;, Map&lt;Integer, Object&gt;&gt; evictingMapFactory =
9736+
* notify -&gt;
9737+
* CacheBuilder
9738+
* .newBuilder()
9739+
* .maximumSize(3)
9740+
* .removalListener(entry -&gt; {
9741+
* try {
9742+
* // emit the value not the key!
9743+
* notify.accept(entry.getValue());
9744+
* } catch (Exception e) {
9745+
* throw new RuntimeException(e);
9746+
* }
9747+
* })
9748+
* .&lt;Integer, Object&gt; build()
9749+
* .asMap();
9750+
*
9751+
* // Emit 1000 items but ensure that the
9752+
* // internal map never has more than 3 items in it
9753+
* Flowable
9754+
* .range(1, 1000)
9755+
* // note that number of keys is 10
9756+
* .groupBy(x -&gt; x % 10, x -&gt; x, true, 16, evictingMapFactory)
9757+
* .flatMap(g -&gt; g)
9758+
* .forEach(System.out::println);
9759+
* </pre>
9760+
*
9761+
* <p>
9762+
* <img width="640" height="360" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/groupBy.png" alt="">
9763+
* <p>
9764+
* <em>Note:</em> A {@link GroupedFlowable} will cache the items it is to emit until such time as it
9765+
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
9766+
* {@code GroupedFlowable}s that do not concern you. Instead, you can signal to them that they may
9767+
* discard their buffers by applying an operator like {@link #ignoreElements} to them.
9768+
* <dl>
9769+
* <dt><b>Backpressure:</b></dt>
9770+
* <dd>Both the returned and its inner {@code GroupedFlowable}s honor backpressure and the source {@code Publisher}
9771+
* is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on
9772+
* downstream consumption). Note that both the returned and its inner {@code GroupedFlowable}s use
9773+
* unbounded internal buffers and if the source {@code Publisher} doesn't honor backpressure, that <em>may</em>
9774+
* lead to {@code OutOfMemoryError}.</dd>
9775+
* <dt><b>Scheduler:</b></dt>
9776+
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
9777+
* </dl>
9778+
*
9779+
* @param keySelector
9780+
* a function that extracts the key for each item
9781+
* @param valueSelector
9782+
* a function that extracts the return element for each item
9783+
* @param delayError
9784+
* if true, the exception from the current Flowable is delayed in each group until that specific group emitted
9785+
* the normal values; if false, the exception bypasses values in the groups and is reported immediately.
9786+
* @param bufferSize
9787+
* the hint for how many {@link GroupedFlowable}s and element in each {@link GroupedFlowable} should be buffered
9788+
* @param evictingMapFactory
9789+
* The factory used to create a map that will be used by the implementation to hold the
9790+
* {@link GroupedFlowable}s. The evicting map created by this factory must
9791+
* notify the provided {@code Consumer<Object>} with the entry value (not the key!) when
9792+
* an entry in this map has been evicted. The next source emission will bring about the
9793+
* completion of the evicted {@link GroupedFlowable}s. See example above.
9794+
* @param <K>
9795+
* the key type
9796+
* @param <V>
9797+
* the element type
9798+
* @return a {@code Publisher} that emits {@link GroupedFlowable}s, each of which corresponds to a
9799+
* unique key value and each of which emits those items from the source Publisher that share that
9800+
* key value
9801+
* @see <a href="http://reactivex.io/documentation/operators/groupby.html">ReactiveX operators documentation: GroupBy</a>
9802+
*
9803+
* @since 2.1.10
9804+
*/
9805+
@CheckReturnValue
9806+
@BackpressureSupport(BackpressureKind.FULL)
9807+
@SchedulerSupport(SchedulerSupport.NONE)
9808+
@Beta
9809+
public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T, ? extends K> keySelector,
9810+
Function<? super T, ? extends V> valueSelector,
9811+
boolean delayError, int bufferSize,
9812+
Function<? super Consumer<Object>, ? extends Map<K, Object>> evictingMapFactory) {
9813+
ObjectHelper.requireNonNull(keySelector, "keySelector is null");
9814+
ObjectHelper.requireNonNull(valueSelector, "valueSelector is null");
9815+
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
9816+
ObjectHelper.requireNonNull(evictingMapFactory, "evictingMapFactory is null");
9817+
9818+
return RxJavaPlugins.onAssembly(new FlowableGroupBy<T, K, V>(this, keySelector, valueSelector, bufferSize, delayError, evictingMapFactory));
9819+
}
9820+
97149821
/**
97159822
* Returns a Flowable that correlates two Publishers when they overlap in time and groups the results.
97169823
* <p>

src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
package io.reactivex.internal.operators.flowable;
1515

1616
import java.util.Map;
17+
import java.util.Queue;
1718
import java.util.concurrent.ConcurrentHashMap;
19+
import java.util.concurrent.ConcurrentLinkedQueue;
1820
import java.util.concurrent.atomic.*;
1921

2022
import org.reactivestreams.*;
@@ -23,30 +25,56 @@
2325
import io.reactivex.annotations.Nullable;
2426
import io.reactivex.exceptions.Exceptions;
2527
import io.reactivex.flowables.GroupedFlowable;
28+
import io.reactivex.functions.Consumer;
2629
import io.reactivex.functions.Function;
2730
import io.reactivex.internal.functions.ObjectHelper;
2831
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
2932
import io.reactivex.internal.subscriptions.*;
3033
import io.reactivex.internal.util.BackpressureHelper;
34+
import io.reactivex.internal.util.EmptyComponent;
3135
import io.reactivex.plugins.RxJavaPlugins;
3236

3337
public final class FlowableGroupBy<T, K, V> extends AbstractFlowableWithUpstream<T, GroupedFlowable<K, V>> {
3438
final Function<? super T, ? extends K> keySelector;
3539
final Function<? super T, ? extends V> valueSelector;
3640
final int bufferSize;
3741
final boolean delayError;
42+
final Function<? super Consumer<Object>, ? extends Map<K, Object>> mapFactory;
3843

39-
public FlowableGroupBy(Flowable<T> source, Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError) {
44+
public FlowableGroupBy(Flowable<T> source, Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector,
45+
int bufferSize, boolean delayError, Function<? super Consumer<Object>, ? extends Map<K, Object>> mapFactory) {
4046
super(source);
4147
this.keySelector = keySelector;
4248
this.valueSelector = valueSelector;
4349
this.bufferSize = bufferSize;
4450
this.delayError = delayError;
51+
this.mapFactory = mapFactory;
4552
}
4653

4754
@Override
4855
protected void subscribeActual(Subscriber<? super GroupedFlowable<K, V>> s) {
49-
source.subscribe(new GroupBySubscriber<T, K, V>(s, keySelector, valueSelector, bufferSize, delayError));
56+
57+
final Map<Object, GroupedUnicast<K, V>> groups;
58+
final Queue<GroupedUnicast<K, V>> evictedGroups;
59+
60+
try {
61+
if (mapFactory == null) {
62+
evictedGroups = null;
63+
groups = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>();
64+
} else {
65+
evictedGroups = new ConcurrentLinkedQueue<GroupedUnicast<K, V>>();
66+
Consumer<Object> evictionAction = (Consumer<Object>)(Consumer<?>) new EvictionAction<K, V>(evictedGroups);
67+
groups = (Map<Object, GroupedUnicast<K,V>>)(Map<Object, ?>) mapFactory.apply(evictionAction);
68+
}
69+
} catch (Exception e) {
70+
Exceptions.throwIfFatal(e);
71+
s.onSubscribe(EmptyComponent.INSTANCE);
72+
s.onError(e);
73+
return;
74+
}
75+
GroupBySubscriber<T, K, V> subscriber =
76+
new GroupBySubscriber<T, K, V>(s, keySelector, valueSelector, bufferSize, delayError, groups, evictedGroups);
77+
source.subscribe(subscriber);
5078
}
5179

5280
public static final class GroupBySubscriber<T, K, V>
@@ -62,6 +90,7 @@ public static final class GroupBySubscriber<T, K, V>
6290
final boolean delayError;
6391
final Map<Object, GroupedUnicast<K, V>> groups;
6492
final SpscLinkedArrayQueue<GroupedFlowable<K, V>> queue;
93+
final Queue<GroupedUnicast<K, V>> evictedGroups;
6594

6695
static final Object NULL_KEY = new Object();
6796

@@ -78,13 +107,16 @@ public static final class GroupBySubscriber<T, K, V>
78107

79108
boolean outputFused;
80109

81-
public GroupBySubscriber(Subscriber<? super GroupedFlowable<K, V>> actual, Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError) {
110+
public GroupBySubscriber(Subscriber<? super GroupedFlowable<K, V>> actual, Function<? super T, ? extends K> keySelector,
111+
Function<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError,
112+
Map<Object, GroupedUnicast<K, V>> groups, Queue<GroupedUnicast<K, V>> evictedGroups) {
82113
this.actual = actual;
83114
this.keySelector = keySelector;
84115
this.valueSelector = valueSelector;
85116
this.bufferSize = bufferSize;
86117
this.delayError = delayError;
87-
this.groups = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>();
118+
this.groups = groups;
119+
this.evictedGroups = evictedGroups;
88120
this.queue = new SpscLinkedArrayQueue<GroupedFlowable<K, V>>(bufferSize);
89121
}
90122

@@ -144,6 +176,13 @@ public void onNext(T t) {
144176
}
145177

146178
group.onNext(v);
179+
180+
if (evictedGroups != null) {
181+
GroupedUnicast<K, V> evictedGroup;
182+
while ((evictedGroup = evictedGroups.poll()) != null) {
183+
evictedGroup.onComplete();
184+
}
185+
}
147186

148187
if (newGroup) {
149188
q.offer(group);
@@ -161,7 +200,9 @@ public void onError(Throwable t) {
161200
g.onError(t);
162201
}
163202
groups.clear();
164-
203+
if (evictedGroups != null) {
204+
evictedGroups.clear();
205+
}
165206
error = t;
166207
done = true;
167208
drain();
@@ -174,6 +215,9 @@ public void onComplete() {
174215
g.onComplete();
175216
}
176217
groups.clear();
218+
if (evictedGroups != null) {
219+
evictedGroups.clear();
220+
}
177221
done = true;
178222
drain();
179223
}
@@ -372,6 +416,20 @@ public boolean isEmpty() {
372416
}
373417
}
374418

419+
static final class EvictionAction<K, V> implements Consumer<GroupedUnicast<K,V>> {
420+
421+
final Queue<GroupedUnicast<K, V>> evictedGroups;
422+
423+
EvictionAction(Queue<GroupedUnicast<K, V>> evictedGroups) {
424+
this.evictedGroups = evictedGroups;
425+
}
426+
427+
@Override
428+
public void accept(GroupedUnicast<K,V> value) {
429+
evictedGroups.offer(value);
430+
}
431+
}
432+
375433
static final class GroupedUnicast<K, T> extends GroupedFlowable<K, T> {
376434

377435
final State<T, K> state;

0 commit comments

Comments
 (0)