Skip to content

Commit 6949e7b

Browse files
committed
Merge remote-tracking branch 'origin/master' into docs
2 parents 18ea25a + 92ba6e7 commit 6949e7b

File tree

6 files changed

+1078
-3
lines changed

6 files changed

+1078
-3
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 98 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4412,6 +4412,54 @@ public static Observable<Double> sumDoubles(Observable<Double> source) {
44124412
return OperationSum.sumDoubles(source);
44134413
}
44144414

4415+
/**
4416+
* Create an Observable that extracts integer values from this Observable via
4417+
* the provided function and computes the integer sum of the value sequence.
4418+
*
4419+
* @param valueExtractor the function to extract an integer from this Observable
4420+
* @return an Observable that extracts integer values from this Observable via
4421+
* the provided function and computes the integer sum of the value sequence.
4422+
*/
4423+
public Observable<Integer> sumInteger(Func1<? super T, Integer> valueExtractor) {
4424+
return create(new OperationSum.SumIntegerExtractor<T>(this, valueExtractor));
4425+
}
4426+
4427+
/**
4428+
* Create an Observable that extracts long values from this Observable via
4429+
* the provided function and computes the long sum of the value sequence.
4430+
*
4431+
* @param valueExtractor the function to extract an long from this Observable
4432+
* @return an Observable that extracts long values from this Observable via
4433+
* the provided function and computes the long sum of the value sequence.
4434+
*/
4435+
public Observable<Long> sumLong(Func1<? super T, Long> valueExtractor) {
4436+
return create(new OperationSum.SumLongExtractor<T>(this, valueExtractor));
4437+
}
4438+
4439+
/**
4440+
* Create an Observable that extracts float values from this Observable via
4441+
* the provided function and computes the float sum of the value sequence.
4442+
*
4443+
* @param valueExtractor the function to extract an float from this Observable
4444+
* @return an Observable that extracts float values from this Observable via
4445+
* the provided function and computes the float sum of the value sequence.
4446+
*/
4447+
public Observable<Float> sumFloat(Func1<? super T, Float> valueExtractor) {
4448+
return create(new OperationSum.SumFloatExtractor<T>(this, valueExtractor));
4449+
}
4450+
4451+
/**
4452+
* Create an Observable that extracts double values from this Observable via
4453+
* the provided function and computes the double sum of the value sequence.
4454+
*
4455+
* @param valueExtractor the function to extract an double from this Observable
4456+
* @return an Observable that extracts double values from this Observable via
4457+
* the provided function and computes the double sum of the value sequence.
4458+
*/
4459+
public Observable<Double> sumDouble(Func1<? super T, Double> valueExtractor) {
4460+
return create(new OperationSum.SumDoubleExtractor<T>(this, valueExtractor));
4461+
}
4462+
44154463
/**
44164464
* Returns an Observable that computes the average of the Integers emitted
44174465
* by the source Observable.
@@ -4477,6 +4525,54 @@ public static Observable<Double> averageDoubles(Observable<Double> source) {
44774525
return OperationAverage.averageDoubles(source);
44784526
}
44794527

4528+
/**
4529+
* Create an Observable that extracts integer values from this Observable via
4530+
* the provided function and computes the integer average of the value sequence.
4531+
*
4532+
* @param valueExtractor the function to extract an integer from this Observable
4533+
* @return an Observable that extracts integer values from this Observable via
4534+
* the provided function and computes the integer average of the value sequence.
4535+
*/
4536+
public Observable<Integer> averageInteger(Func1<? super T, Integer> valueExtractor) {
4537+
return create(new OperationAverage.AverageIntegerExtractor<T>(this, valueExtractor));
4538+
}
4539+
4540+
/**
4541+
* Create an Observable that extracts long values from this Observable via
4542+
* the provided function and computes the long average of the value sequence.
4543+
*
4544+
* @param valueExtractor the function to extract an long from this Observable
4545+
* @return an Observable that extracts long values from this Observable via
4546+
* the provided function and computes the long average of the value sequence.
4547+
*/
4548+
public Observable<Long> averageLong(Func1<? super T, Long> valueExtractor) {
4549+
return create(new OperationAverage.AverageLongExtractor<T>(this, valueExtractor));
4550+
}
4551+
4552+
/**
4553+
* Create an Observable that extracts float values from this Observable via
4554+
* the provided function and computes the float average of the value sequence.
4555+
*
4556+
* @param valueExtractor the function to extract an float from this Observable
4557+
* @return an Observable that extracts float values from this Observable via
4558+
* the provided function and computes the float average of the value sequence.
4559+
*/
4560+
public Observable<Float> averageFloat(Func1<? super T, Float> valueExtractor) {
4561+
return create(new OperationAverage.AverageFloatExtractor<T>(this, valueExtractor));
4562+
}
4563+
4564+
/**
4565+
* Create an Observable that extracts double values from this Observable via
4566+
* the provided function and computes the double average of the value sequence.
4567+
*
4568+
* @param valueExtractor the function to extract an double from this Observable
4569+
* @return an Observable that extracts double values from this Observable via
4570+
* the provided function and computes the double average of the value sequence.
4571+
*/
4572+
public Observable<Double> averageDouble(Func1<? super T, Double> valueExtractor) {
4573+
return create(new OperationAverage.AverageDoubleExtractor<T>(this, valueExtractor));
4574+
}
4575+
44804576
/**
44814577
* Returns an Observable that emits the minimum item emitted by the source
44824578
* Observable. If there is more than one such item, it returns the
@@ -5237,7 +5333,7 @@ public Observable<T> aggregate(Func2<T, T, T> accumulator) {
52375333
public <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulator) {
52385334
return create(OperationScan.scan(this, initialValue, accumulator)).takeLast(1);
52395335
}
5240-
5336+
52415337
/**
52425338
* Synonymous with <code>reduce()</code>.
52435339
* <p>
@@ -5250,7 +5346,7 @@ public <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulat
52505346
public <R> Observable<R> aggregate(R initialValue, Func2<R, ? super T, R> accumulator) {
52515347
return reduce(initialValue, accumulator);
52525348
}
5253-
5349+
52545350
/**
52555351
* Returns an Observable that applies a function of your choosing to the
52565352
* first item emitted by a source Observable, then feeds the result of that

rxjava-core/src/main/java/rx/operators/OperationAverage.java

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
package rx.operators;
1717

1818
import rx.Observable;
19+
import rx.Observable.OnSubscribeFunc;
20+
import rx.Observer;
21+
import rx.Subscription;
1922
import rx.util.functions.Func1;
2023
import rx.util.functions.Func2;
2124

@@ -102,4 +105,228 @@ public Double call(Tuple2<Double> result) {
102105
}
103106
});
104107
}
108+
109+
/**
110+
* Compute the average by extracting integer values from the source via an
111+
* extractor function.
112+
* @param <T> the source value type
113+
*/
114+
public static final class AverageIntegerExtractor<T> implements OnSubscribeFunc<Integer> {
115+
final Observable<? extends T> source;
116+
final Func1<? super T, Integer> valueExtractor;
117+
118+
public AverageIntegerExtractor(Observable<? extends T> source, Func1<? super T, Integer> valueExtractor) {
119+
this.source = source;
120+
this.valueExtractor = valueExtractor;
121+
}
122+
123+
@Override
124+
public Subscription onSubscribe(Observer<? super Integer> t1) {
125+
return source.subscribe(new AverageObserver(t1));
126+
}
127+
/** Computes the average. */
128+
private final class AverageObserver implements Observer<T> {
129+
final Observer<? super Integer> observer;
130+
int sum;
131+
int count;
132+
public AverageObserver(Observer<? super Integer> observer) {
133+
this.observer = observer;
134+
}
135+
136+
@Override
137+
public void onNext(T args) {
138+
sum += valueExtractor.call(args);
139+
count++;
140+
}
141+
142+
@Override
143+
public void onError(Throwable e) {
144+
observer.onError(e);
145+
}
146+
147+
@Override
148+
public void onCompleted() {
149+
if (count > 0) {
150+
try {
151+
observer.onNext(sum / count);
152+
} catch (Throwable t) {
153+
observer.onError(t);
154+
return;
155+
}
156+
observer.onCompleted();
157+
} else {
158+
observer.onError(new IllegalArgumentException("Sequence contains no elements"));
159+
}
160+
}
161+
162+
}
163+
}
164+
165+
/**
166+
* Compute the average by extracting long values from the source via an
167+
* extractor function.
168+
* @param <T> the source value type
169+
*/
170+
public static final class AverageLongExtractor<T> implements OnSubscribeFunc<Long> {
171+
final Observable<? extends T> source;
172+
final Func1<? super T, Long> valueExtractor;
173+
174+
public AverageLongExtractor(Observable<? extends T> source, Func1<? super T, Long> valueExtractor) {
175+
this.source = source;
176+
this.valueExtractor = valueExtractor;
177+
}
178+
179+
@Override
180+
public Subscription onSubscribe(Observer<? super Long> t1) {
181+
return source.subscribe(new AverageObserver(t1));
182+
}
183+
/** Computes the average. */
184+
private final class AverageObserver implements Observer<T> {
185+
final Observer<? super Long> observer;
186+
long sum;
187+
int count;
188+
public AverageObserver(Observer<? super Long> observer) {
189+
this.observer = observer;
190+
}
191+
192+
@Override
193+
public void onNext(T args) {
194+
sum += valueExtractor.call(args);
195+
count++;
196+
}
197+
198+
@Override
199+
public void onError(Throwable e) {
200+
observer.onError(e);
201+
}
202+
203+
@Override
204+
public void onCompleted() {
205+
if (count > 0) {
206+
try {
207+
observer.onNext(sum / count);
208+
} catch (Throwable t) {
209+
observer.onError(t);
210+
return;
211+
}
212+
observer.onCompleted();
213+
} else {
214+
observer.onError(new IllegalArgumentException("Sequence contains no elements"));
215+
}
216+
}
217+
218+
}
219+
}
220+
221+
/**
222+
* Compute the average by extracting float values from the source via an
223+
* extractor function.
224+
* @param <T> the source value type
225+
*/
226+
public static final class AverageFloatExtractor<T> implements OnSubscribeFunc<Float> {
227+
final Observable<? extends T> source;
228+
final Func1<? super T, Float> valueExtractor;
229+
230+
public AverageFloatExtractor(Observable<? extends T> source, Func1<? super T, Float> valueExtractor) {
231+
this.source = source;
232+
this.valueExtractor = valueExtractor;
233+
}
234+
235+
@Override
236+
public Subscription onSubscribe(Observer<? super Float> t1) {
237+
return source.subscribe(new AverageObserver(t1));
238+
}
239+
/** Computes the average. */
240+
private final class AverageObserver implements Observer<T> {
241+
final Observer<? super Float> observer;
242+
float sum;
243+
int count;
244+
public AverageObserver(Observer<? super Float> observer) {
245+
this.observer = observer;
246+
}
247+
248+
@Override
249+
public void onNext(T args) {
250+
sum += valueExtractor.call(args);
251+
count++;
252+
}
253+
254+
@Override
255+
public void onError(Throwable e) {
256+
observer.onError(e);
257+
}
258+
259+
@Override
260+
public void onCompleted() {
261+
if (count > 0) {
262+
try {
263+
observer.onNext(sum / count);
264+
} catch (Throwable t) {
265+
observer.onError(t);
266+
return;
267+
}
268+
observer.onCompleted();
269+
} else {
270+
observer.onError(new IllegalArgumentException("Sequence contains no elements"));
271+
}
272+
}
273+
274+
}
275+
}
276+
277+
/**
278+
* Compute the average by extracting double values from the source via an
279+
* extractor function.
280+
* @param <T> the source value type
281+
*/
282+
public static final class AverageDoubleExtractor<T> implements OnSubscribeFunc<Double> {
283+
final Observable<? extends T> source;
284+
final Func1<? super T, Double> valueExtractor;
285+
286+
public AverageDoubleExtractor(Observable<? extends T> source, Func1<? super T, Double> valueExtractor) {
287+
this.source = source;
288+
this.valueExtractor = valueExtractor;
289+
}
290+
291+
@Override
292+
public Subscription onSubscribe(Observer<? super Double> t1) {
293+
return source.subscribe(new AverageObserver(t1));
294+
}
295+
/** Computes the average. */
296+
private final class AverageObserver implements Observer<T> {
297+
final Observer<? super Double> observer;
298+
double sum;
299+
int count;
300+
public AverageObserver(Observer<? super Double> observer) {
301+
this.observer = observer;
302+
}
303+
304+
@Override
305+
public void onNext(T args) {
306+
sum += valueExtractor.call(args);
307+
count++;
308+
}
309+
310+
@Override
311+
public void onError(Throwable e) {
312+
observer.onError(e);
313+
}
314+
315+
@Override
316+
public void onCompleted() {
317+
if (count > 0) {
318+
try {
319+
observer.onNext(sum / count);
320+
} catch (Throwable t) {
321+
observer.onError(t);
322+
return;
323+
}
324+
observer.onCompleted();
325+
} else {
326+
observer.onError(new IllegalArgumentException("Sequence contains no elements"));
327+
}
328+
}
329+
330+
}
331+
}
105332
}

0 commit comments

Comments
 (0)