Skip to content

Commit da9aa93

Browse files
committed
add caveat about avoiding memory leaks when using GroupedObservables
1 parent 6020822 commit da9aa93

File tree

2 files changed

+50
-24
lines changed

2 files changed

+50
-24
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2255,12 +2255,12 @@ public final static <T> Observable<Observable<T>> parallelMerge(Observable<Obser
22552255
}
22562256

22572257
/**
2258-
* Pivots a sequence of GroupedObservables emitted by an Observable so as to swap the group and and the set
2259-
* on which their items are grouped.
2258+
* Pivots a sequence of {@code GroupedObservable}s emitted by an {@code Observable} so as to swap the group
2259+
* and and the set on which their items are grouped.
22602260
* <p>
22612261
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/pivot.png">
22622262
*
2263-
* For example an Observable such as this =&gt;
2263+
* For example an {@code Observable} such as this =&gt;
22642264
*
22652265
* {@code Observable<GroupedObservable<String, GroupedObservable<Boolean, Integer>>>}:
22662266
* <ul>
@@ -2280,10 +2280,16 @@ public final static <T> Observable<Observable<T>> parallelMerge(Observable<Obser
22802280
* </ul>
22812281
* <p>
22822282
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/pivot.ex.png">
2283+
* <p>
2284+
* <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
2285+
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
2286+
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
2287+
* discard their buffers by applying an operator like {@link #take}{@code (0)} to them.
22832288
*
22842289
* @param groups
22852290
the {@link GroupedObservable} to pivot
2286-
* @return an Observable containing a stream of nested GroupedObservables with swapped inner-outer keys.
2291+
* @return an {@code Observable} containing a stream of nested {@code GroupedObservable}s with swapped
2292+
* inner-outer keys.
22872293
* @since 0.17
22882294
*/
22892295
public static final <K1, K2, T> Observable<GroupedObservable<K2, GroupedObservable<K1, T>>> pivot(Observable<GroupedObservable<K1, GroupedObservable<K2, T>>> groups) {
@@ -4027,36 +4033,47 @@ public final void forEach(final Action1<? super T> onNext, final Action1<Throwab
40274033
}
40284034

40294035
/**
4030-
* Groups the items emitted by an Observable according to a specified criterion, and emits these grouped
4031-
* items as {@link GroupedObservable}s, one {@code GroupedObservable} per group.
4036+
* Groups the items emitted by an {@code Observable} according to a specified criterion, and emits these
4037+
* grouped items as {@link GroupedObservable}s, one {@code GroupedObservable} per group.
40324038
* <p>
40334039
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/groupBy.png">
4040+
* <p>
4041+
* <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
4042+
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
4043+
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
4044+
* discard their buffers by applying an operator like {@link #take}{@code (0)} to them.
40344045
*
40354046
* @param keySelector
40364047
* a function that extracts the key for each item
40374048
* @param <K>
40384049
* the key type
4039-
* @return an Observable that emits {@link GroupedObservable}s, each of which corresponds to a unique key
4040-
* value and each of which emits those items from the source Observable that share that key value
4050+
* @return an {@code Observable} that emits {@link GroupedObservable}s, each of which corresponds to a
4051+
* unique key value and each of which emits those items from the source Observable that share that
4052+
* key value
40414053
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-groupby-and-groupbyuntil">RxJava Wiki: groupBy</a>
40424054
*/
40434055
public final <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super T, ? extends K> keySelector) {
40444056
return lift(new OperatorGroupBy<K, T>(keySelector));
40454057
}
40464058

40474059
/**
4048-
* Groups the items emitted by an Observable according to a specified key selector function until the
4049-
* duration Observable expires for the key.
4060+
* Groups the items emitted by an {@code Observable} according to a specified key selector function until
4061+
* the duration {@code Observable} expires for the key.
40504062
* <p>
40514063
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/groupByUntil.png">
4064+
* <p>
4065+
* <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
4066+
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
4067+
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
4068+
* discard their buffers by applying an operator like {@link #take}{@code (0)} to them.
40524069
*
40534070
* @param keySelector
40544071
* a function to extract the key for each item
40554072
* @param durationSelector
40564073
* a function to signal the expiration of a group
4057-
* @return an Observable that emits {@link GroupedObservable}s, each of which corresponds to a key value and
4058-
* each of which emits all items emitted by the source Observable during that key's duration that
4059-
* share that same key value
4074+
* @return an {@code Observable} that emits {@link GroupedObservable}s, each of which corresponds to a key
4075+
* value and each of which emits all items emitted by the source {@code Observable} during that
4076+
* key's duration that share that same key value
40604077
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-groupby-and-groupbyuntil">RxJava Wiki: groupByUntil()</a>
40614078
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211932.aspx">MSDN: Observable.GroupByUntil</a>
40624079
*/
@@ -4065,21 +4082,26 @@ public final <TKey, TDuration> Observable<GroupedObservable<TKey, T>> groupByUnt
40654082
}
40664083

40674084
/**
4068-
* Groups the items emitted by an Observable (transformed by a selector) according to a specified key
4069-
* selector function until the duration Observable expires for the key.
4085+
* Groups the items emitted by an {@code Observable} (transformed by a selector) according to a specified
4086+
* key selector function until the duration Observable expires for the key.
40704087
* <p>
40714088
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/groupByUntil.png">
4089+
* <p>
4090+
* <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
4091+
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
4092+
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
4093+
* discard their buffers by applying an operator like {@link #take}{@code (0)} to them.
40724094
*
40734095
* @param keySelector
40744096
* a function to extract the key for each item
40754097
* @param valueSelector
4076-
* a function to map each item emitted by the source Observable to an item emitted by one of the
4077-
* resulting {@link GroupedObservable}s
4098+
* a function to map each item emitted by the source {@code Observable} to an item emitted by one
4099+
* of the resulting {@link GroupedObservable}s
40784100
* @param durationSelector
40794101
* a function to signal the expiration of a group
4080-
* @return an Observable that emits {@link GroupedObservable}s, each of which corresponds to a key value and
4081-
* each of which emits all items emitted by the source Observable during that key's duration that
4082-
* share that same key value, transformed by the value selector
4102+
* @return an {@code Observable} that emits {@link GroupedObservable}s, each of which corresponds to a key
4103+
* value and each of which emits all items emitted by the source {@code Observable} during that
4104+
* key's duration that share that same key value, transformed by the value selector
40834105
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#wiki-groupby-and-groupbyuntil">RxJava Wiki: groupByUntil()</a>
40844106
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229433.aspx">MSDN: Observable.GroupByUntil</a>
40854107
*/

rxjava-core/src/main/java/rx/observables/GroupedObservable.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,19 @@
2222
/**
2323
* An {@link Observable} that has been grouped by key, the value of which can be obtained with
2424
* {@link #getKey()}.
25-
*
26-
* @see Observable#groupBy(Func1)
27-
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava Wiki: groupBy() and groupByUntil()</a>
28-
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#pivot">RxJava Wiki: pivot()</a>
25+
* <p>
26+
* <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
27+
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
28+
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
29+
* discard their buffers by applying an operator like {@link Observable#take}{@code (0)} to them.
2930
*
3031
* @param <K>
3132
* the type of the key
3233
* @param <T>
3334
* the type of the items emitted by the {@code GroupedObservable}
35+
* @see Observable#groupBy(Func1)
36+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava Wiki: groupBy() and groupByUntil()</a>
37+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#pivot">RxJava Wiki: pivot()</a>
3438
*/
3539
public class GroupedObservable<K, T> extends Observable<T> {
3640
private final K key;

0 commit comments

Comments
 (0)