Skip to content

Commit 92ba6e7

Browse files
Merge pull request ReactiveX#698 from benjchristensen/pull-657-merge
Merge of Pull 657: Average and Sum
2 parents d39d9cf + c451ce9 commit 92ba6e7

File tree

6 files changed

+1078
-3
lines changed

6 files changed

+1078
-3
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 98 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4405,6 +4405,54 @@ public static Observable<Double> sumDoubles(Observable<Double> source) {
44054405
return OperationSum.sumDoubles(source);
44064406
}
44074407

4408+
/**
4409+
* Create an Observable that extracts integer values from this Observable via
4410+
* the provided function and computes the integer sum of the value sequence.
4411+
*
4412+
* @param valueExtractor the function to extract an integer from this Observable
4413+
* @return an Observable that extracts integer values from this Observable via
4414+
* the provided function and computes the integer sum of the value sequence.
4415+
*/
4416+
public Observable<Integer> sumInteger(Func1<? super T, Integer> valueExtractor) {
4417+
return create(new OperationSum.SumIntegerExtractor<T>(this, valueExtractor));
4418+
}
4419+
4420+
/**
4421+
* Create an Observable that extracts long values from this Observable via
4422+
* the provided function and computes the long sum of the value sequence.
4423+
*
4424+
* @param valueExtractor the function to extract an long from this Observable
4425+
* @return an Observable that extracts long values from this Observable via
4426+
* the provided function and computes the long sum of the value sequence.
4427+
*/
4428+
public Observable<Long> sumLong(Func1<? super T, Long> valueExtractor) {
4429+
return create(new OperationSum.SumLongExtractor<T>(this, valueExtractor));
4430+
}
4431+
4432+
/**
4433+
* Create an Observable that extracts float values from this Observable via
4434+
* the provided function and computes the float sum of the value sequence.
4435+
*
4436+
* @param valueExtractor the function to extract an float from this Observable
4437+
* @return an Observable that extracts float values from this Observable via
4438+
* the provided function and computes the float sum of the value sequence.
4439+
*/
4440+
public Observable<Float> sumFloat(Func1<? super T, Float> valueExtractor) {
4441+
return create(new OperationSum.SumFloatExtractor<T>(this, valueExtractor));
4442+
}
4443+
4444+
/**
4445+
* Create an Observable that extracts double values from this Observable via
4446+
* the provided function and computes the double sum of the value sequence.
4447+
*
4448+
* @param valueExtractor the function to extract an double from this Observable
4449+
* @return an Observable that extracts double values from this Observable via
4450+
* the provided function and computes the double sum of the value sequence.
4451+
*/
4452+
public Observable<Double> sumDouble(Func1<? super T, Double> valueExtractor) {
4453+
return create(new OperationSum.SumDoubleExtractor<T>(this, valueExtractor));
4454+
}
4455+
44084456
/**
44094457
* Returns an Observable that computes the average of the Integers emitted
44104458
* by the source Observable.
@@ -4470,6 +4518,54 @@ public static Observable<Double> averageDoubles(Observable<Double> source) {
44704518
return OperationAverage.averageDoubles(source);
44714519
}
44724520

4521+
/**
4522+
* Create an Observable that extracts integer values from this Observable via
4523+
* the provided function and computes the integer average of the value sequence.
4524+
*
4525+
* @param valueExtractor the function to extract an integer from this Observable
4526+
* @return an Observable that extracts integer values from this Observable via
4527+
* the provided function and computes the integer average of the value sequence.
4528+
*/
4529+
public Observable<Integer> averageInteger(Func1<? super T, Integer> valueExtractor) {
4530+
return create(new OperationAverage.AverageIntegerExtractor<T>(this, valueExtractor));
4531+
}
4532+
4533+
/**
4534+
* Create an Observable that extracts long values from this Observable via
4535+
* the provided function and computes the long average of the value sequence.
4536+
*
4537+
* @param valueExtractor the function to extract an long from this Observable
4538+
* @return an Observable that extracts long values from this Observable via
4539+
* the provided function and computes the long average of the value sequence.
4540+
*/
4541+
public Observable<Long> averageLong(Func1<? super T, Long> valueExtractor) {
4542+
return create(new OperationAverage.AverageLongExtractor<T>(this, valueExtractor));
4543+
}
4544+
4545+
/**
4546+
* Create an Observable that extracts float values from this Observable via
4547+
* the provided function and computes the float average of the value sequence.
4548+
*
4549+
* @param valueExtractor the function to extract an float from this Observable
4550+
* @return an Observable that extracts float values from this Observable via
4551+
* the provided function and computes the float average of the value sequence.
4552+
*/
4553+
public Observable<Float> averageFloat(Func1<? super T, Float> valueExtractor) {
4554+
return create(new OperationAverage.AverageFloatExtractor<T>(this, valueExtractor));
4555+
}
4556+
4557+
/**
4558+
* Create an Observable that extracts double values from this Observable via
4559+
* the provided function and computes the double average of the value sequence.
4560+
*
4561+
* @param valueExtractor the function to extract an double from this Observable
4562+
* @return an Observable that extracts double values from this Observable via
4563+
* the provided function and computes the double average of the value sequence.
4564+
*/
4565+
public Observable<Double> averageDouble(Func1<? super T, Double> valueExtractor) {
4566+
return create(new OperationAverage.AverageDoubleExtractor<T>(this, valueExtractor));
4567+
}
4568+
44734569
/**
44744570
* Returns an Observable that emits the minimum item emitted by the source
44754571
* Observable. If there is more than one such item, it returns the
@@ -5230,7 +5326,7 @@ public Observable<T> aggregate(Func2<T, T, T> accumulator) {
52305326
public <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulator) {
52315327
return create(OperationScan.scan(this, initialValue, accumulator)).takeLast(1);
52325328
}
5233-
5329+
52345330
/**
52355331
* Synonymous with <code>reduce()</code>.
52365332
* <p>
@@ -5243,7 +5339,7 @@ public <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulat
52435339
public <R> Observable<R> aggregate(R initialValue, Func2<R, ? super T, R> accumulator) {
52445340
return reduce(initialValue, accumulator);
52455341
}
5246-
5342+
52475343
/**
52485344
* Returns an Observable that applies a function of your choosing to the
52495345
* first item emitted by a source Observable, then feeds the result of that

rxjava-core/src/main/java/rx/operators/OperationAverage.java

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
package rx.operators;
1717

1818
import rx.Observable;
19+
import rx.Observable.OnSubscribeFunc;
20+
import rx.Observer;
21+
import rx.Subscription;
1922
import rx.util.functions.Func1;
2023
import rx.util.functions.Func2;
2124

@@ -102,4 +105,228 @@ public Double call(Tuple2<Double> result) {
102105
}
103106
});
104107
}
108+
109+
/**
110+
* Compute the average by extracting integer values from the source via an
111+
* extractor function.
112+
* @param <T> the source value type
113+
*/
114+
public static final class AverageIntegerExtractor<T> implements OnSubscribeFunc<Integer> {
115+
final Observable<? extends T> source;
116+
final Func1<? super T, Integer> valueExtractor;
117+
118+
public AverageIntegerExtractor(Observable<? extends T> source, Func1<? super T, Integer> valueExtractor) {
119+
this.source = source;
120+
this.valueExtractor = valueExtractor;
121+
}
122+
123+
@Override
124+
public Subscription onSubscribe(Observer<? super Integer> t1) {
125+
return source.subscribe(new AverageObserver(t1));
126+
}
127+
/** Computes the average. */
128+
private final class AverageObserver implements Observer<T> {
129+
final Observer<? super Integer> observer;
130+
int sum;
131+
int count;
132+
public AverageObserver(Observer<? super Integer> observer) {
133+
this.observer = observer;
134+
}
135+
136+
@Override
137+
public void onNext(T args) {
138+
sum += valueExtractor.call(args);
139+
count++;
140+
}
141+
142+
@Override
143+
public void onError(Throwable e) {
144+
observer.onError(e);
145+
}
146+
147+
@Override
148+
public void onCompleted() {
149+
if (count > 0) {
150+
try {
151+
observer.onNext(sum / count);
152+
} catch (Throwable t) {
153+
observer.onError(t);
154+
return;
155+
}
156+
observer.onCompleted();
157+
} else {
158+
observer.onError(new IllegalArgumentException("Sequence contains no elements"));
159+
}
160+
}
161+
162+
}
163+
}
164+
165+
/**
166+
* Compute the average by extracting long values from the source via an
167+
* extractor function.
168+
* @param <T> the source value type
169+
*/
170+
public static final class AverageLongExtractor<T> implements OnSubscribeFunc<Long> {
171+
final Observable<? extends T> source;
172+
final Func1<? super T, Long> valueExtractor;
173+
174+
public AverageLongExtractor(Observable<? extends T> source, Func1<? super T, Long> valueExtractor) {
175+
this.source = source;
176+
this.valueExtractor = valueExtractor;
177+
}
178+
179+
@Override
180+
public Subscription onSubscribe(Observer<? super Long> t1) {
181+
return source.subscribe(new AverageObserver(t1));
182+
}
183+
/** Computes the average. */
184+
private final class AverageObserver implements Observer<T> {
185+
final Observer<? super Long> observer;
186+
long sum;
187+
int count;
188+
public AverageObserver(Observer<? super Long> observer) {
189+
this.observer = observer;
190+
}
191+
192+
@Override
193+
public void onNext(T args) {
194+
sum += valueExtractor.call(args);
195+
count++;
196+
}
197+
198+
@Override
199+
public void onError(Throwable e) {
200+
observer.onError(e);
201+
}
202+
203+
@Override
204+
public void onCompleted() {
205+
if (count > 0) {
206+
try {
207+
observer.onNext(sum / count);
208+
} catch (Throwable t) {
209+
observer.onError(t);
210+
return;
211+
}
212+
observer.onCompleted();
213+
} else {
214+
observer.onError(new IllegalArgumentException("Sequence contains no elements"));
215+
}
216+
}
217+
218+
}
219+
}
220+
221+
/**
222+
* Compute the average by extracting float values from the source via an
223+
* extractor function.
224+
* @param <T> the source value type
225+
*/
226+
public static final class AverageFloatExtractor<T> implements OnSubscribeFunc<Float> {
227+
final Observable<? extends T> source;
228+
final Func1<? super T, Float> valueExtractor;
229+
230+
public AverageFloatExtractor(Observable<? extends T> source, Func1<? super T, Float> valueExtractor) {
231+
this.source = source;
232+
this.valueExtractor = valueExtractor;
233+
}
234+
235+
@Override
236+
public Subscription onSubscribe(Observer<? super Float> t1) {
237+
return source.subscribe(new AverageObserver(t1));
238+
}
239+
/** Computes the average. */
240+
private final class AverageObserver implements Observer<T> {
241+
final Observer<? super Float> observer;
242+
float sum;
243+
int count;
244+
public AverageObserver(Observer<? super Float> observer) {
245+
this.observer = observer;
246+
}
247+
248+
@Override
249+
public void onNext(T args) {
250+
sum += valueExtractor.call(args);
251+
count++;
252+
}
253+
254+
@Override
255+
public void onError(Throwable e) {
256+
observer.onError(e);
257+
}
258+
259+
@Override
260+
public void onCompleted() {
261+
if (count > 0) {
262+
try {
263+
observer.onNext(sum / count);
264+
} catch (Throwable t) {
265+
observer.onError(t);
266+
return;
267+
}
268+
observer.onCompleted();
269+
} else {
270+
observer.onError(new IllegalArgumentException("Sequence contains no elements"));
271+
}
272+
}
273+
274+
}
275+
}
276+
277+
/**
278+
* Compute the average by extracting double values from the source via an
279+
* extractor function.
280+
* @param <T> the source value type
281+
*/
282+
public static final class AverageDoubleExtractor<T> implements OnSubscribeFunc<Double> {
283+
final Observable<? extends T> source;
284+
final Func1<? super T, Double> valueExtractor;
285+
286+
public AverageDoubleExtractor(Observable<? extends T> source, Func1<? super T, Double> valueExtractor) {
287+
this.source = source;
288+
this.valueExtractor = valueExtractor;
289+
}
290+
291+
@Override
292+
public Subscription onSubscribe(Observer<? super Double> t1) {
293+
return source.subscribe(new AverageObserver(t1));
294+
}
295+
/** Computes the average. */
296+
private final class AverageObserver implements Observer<T> {
297+
final Observer<? super Double> observer;
298+
double sum;
299+
int count;
300+
public AverageObserver(Observer<? super Double> observer) {
301+
this.observer = observer;
302+
}
303+
304+
@Override
305+
public void onNext(T args) {
306+
sum += valueExtractor.call(args);
307+
count++;
308+
}
309+
310+
@Override
311+
public void onError(Throwable e) {
312+
observer.onError(e);
313+
}
314+
315+
@Override
316+
public void onCompleted() {
317+
if (count > 0) {
318+
try {
319+
observer.onNext(sum / count);
320+
} catch (Throwable t) {
321+
observer.onError(t);
322+
return;
323+
}
324+
observer.onCompleted();
325+
} else {
326+
observer.onError(new IllegalArgumentException("Sequence contains no elements"));
327+
}
328+
}
329+
330+
}
331+
}
105332
}

0 commit comments

Comments
 (0)