Skip to content

Commit 6d238a8

Browse files
Merge pull request ReactiveX#133 from benjchristensen/fixes
Various fixes
2 parents 6e58701 + 321f3b0 commit 6d238a8

File tree

5 files changed

+221
-216
lines changed

5 files changed

+221
-216
lines changed

rxjava-core/src/main/java/rx/Notification.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public boolean equals(Object obj) {
149149
return true;
150150
if (obj.getClass() != getClass())
151151
return false;
152-
Notification notification = (Notification) obj;
152+
Notification<?> notification = (Notification<?>) obj;
153153
if (notification.getKind() != getKind())
154154
return false;
155155
if (hasValue() && !getValue().equals(notification.getValue()))

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 39 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,13 @@ public static <T> Observable<T> create(Func1<Observer<T>, Subscription> func) {
413413
return new Observable<T>(func);
414414
}
415415

416+
/*
417+
* Private version that creates a 'trusted' Observable to allow performance optimizations.
418+
*/
419+
private static <T> Observable<T> _create(Func1<Observer<T>, Subscription> func) {
420+
return new Observable<T>(func, true);
421+
}
422+
416423
/**
417424
* Creates an Observable that will execute the given function when a {@link Observer} subscribes to it.
418425
* <p>
@@ -484,7 +491,7 @@ public static <T> Observable<T> error(Exception exception) {
484491
* @return an Observable that emits only those items in the original Observable that the filter evaluates as true
485492
*/
486493
public static <T> Observable<T> filter(Observable<T> that, Func1<T, Boolean> predicate) {
487-
return create(OperationFilter.filter(that, predicate));
494+
return _create(OperationFilter.filter(that, predicate));
488495
}
489496

490497
/**
@@ -577,7 +584,7 @@ public static <T> Observable<T> just(T value) {
577584
* by the source Observable
578585
*/
579586
public static <T> Observable<T> last(final Observable<T> that) {
580-
return create(OperationLast.last(that));
587+
return _create(OperationLast.last(that));
581588
}
582589

583590
/**
@@ -598,7 +605,7 @@ public static <T> Observable<T> last(final Observable<T> that) {
598605
* in the sequence emitted by the source Observable
599606
*/
600607
public static <T, R> Observable<R> map(Observable<T> sequence, Func1<T, R> func) {
601-
return create(OperationMap.map(sequence, func));
608+
return _create(OperationMap.map(sequence, func));
602609
}
603610

604611
/**
@@ -654,7 +661,7 @@ public R call(T t1) {
654661
* the Observables obtained from this transformation
655662
*/
656663
public static <T, R> Observable<R> mapMany(Observable<T> sequence, Func1<T, Observable<R>> func) {
657-
return create(OperationMap.mapMany(sequence, func));
664+
return _create(OperationMap.mapMany(sequence, func));
658665
}
659666

660667
/**
@@ -703,7 +710,7 @@ public R call(T t1) {
703710
* @see http://msdn.microsoft.com/en-us/library/hh229453(v=VS.103).aspx
704711
*/
705712
public static <T> Observable<Notification<T>> materialize(final Observable<T> sequence) {
706-
return create(OperationMaterialize.materialize(sequence));
713+
return _create(OperationMaterialize.materialize(sequence));
707714
}
708715

709716
/**
@@ -720,7 +727,7 @@ public static <T> Observable<Notification<T>> materialize(final Observable<T> se
720727
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
721728
*/
722729
public static <T> Observable<T> merge(List<Observable<T>> source) {
723-
return create(OperationMerge.merge(source));
730+
return _create(OperationMerge.merge(source));
724731
}
725732

726733
/**
@@ -737,7 +744,7 @@ public static <T> Observable<T> merge(List<Observable<T>> source) {
737744
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
738745
*/
739746
public static <T> Observable<T> merge(Observable<Observable<T>> source) {
740-
return create(OperationMerge.merge(source));
747+
return _create(OperationMerge.merge(source));
741748
}
742749

743750
/**
@@ -754,7 +761,7 @@ public static <T> Observable<T> merge(Observable<Observable<T>> source) {
754761
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
755762
*/
756763
public static <T> Observable<T> merge(Observable<T>... source) {
757-
return create(OperationMerge.merge(source));
764+
return _create(OperationMerge.merge(source));
758765
}
759766

760767
/**
@@ -770,7 +777,7 @@ public static <T> Observable<T> merge(Observable<T>... source) {
770777
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.concat(v=vs.103).aspx">MSDN: Observable.Concat Method</a>
771778
*/
772779
public static <T> Observable<T> concat(Observable<T>... source) {
773-
return create(OperationConcat.concat(source));
780+
return _create(OperationConcat.concat(source));
774781
}
775782

776783
/**
@@ -789,7 +796,7 @@ public static <T> Observable<T> concat(Observable<T>... source) {
789796
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
790797
*/
791798
public static <T> Observable<T> mergeDelayError(List<Observable<T>> source) {
792-
return create(OperationMergeDelayError.mergeDelayError(source));
799+
return _create(OperationMergeDelayError.mergeDelayError(source));
793800
}
794801

795802
/**
@@ -808,7 +815,7 @@ public static <T> Observable<T> mergeDelayError(List<Observable<T>> source) {
808815
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
809816
*/
810817
public static <T> Observable<T> mergeDelayError(Observable<Observable<T>> source) {
811-
return create(OperationMergeDelayError.mergeDelayError(source));
818+
return _create(OperationMergeDelayError.mergeDelayError(source));
812819
}
813820

814821
/**
@@ -827,7 +834,7 @@ public static <T> Observable<T> mergeDelayError(Observable<Observable<T>> source
827834
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
828835
*/
829836
public static <T> Observable<T> mergeDelayError(Observable<T>... source) {
830-
return create(OperationMergeDelayError.mergeDelayError(source));
837+
return _create(OperationMergeDelayError.mergeDelayError(source));
831838
}
832839

833840
/**
@@ -916,7 +923,7 @@ public void unsubscribe() {
916923
* @return the source Observable, with its behavior modified as described
917924
*/
918925
public static <T> Observable<T> onErrorResumeNext(final Observable<T> that, final Func1<Exception, Observable<T>> resumeFunction) {
919-
return create(OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(that, resumeFunction));
926+
return _create(OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(that, resumeFunction));
920927
}
921928

922929
/**
@@ -980,7 +987,7 @@ public Observable<T> call(Exception e) {
980987
* @return the source Observable, with its behavior modified as described
981988
*/
982989
public static <T> Observable<T> onErrorResumeNext(final Observable<T> that, final Observable<T> resumeSequence) {
983-
return create(OperationOnErrorResumeNextViaObservable.onErrorResumeNextViaObservable(that, resumeSequence));
990+
return _create(OperationOnErrorResumeNextViaObservable.onErrorResumeNextViaObservable(that, resumeSequence));
984991
}
985992

986993
/**
@@ -1004,7 +1011,7 @@ public static <T> Observable<T> onErrorResumeNext(final Observable<T> that, fina
10041011
* @return the source Observable, with its behavior modified as described
10051012
*/
10061013
public static <T> Observable<T> onErrorReturn(final Observable<T> that, Func1<Exception, T> resumeFunction) {
1007-
return create(OperationOnErrorReturn.onErrorReturn(that, resumeFunction));
1014+
return _create(OperationOnErrorReturn.onErrorReturn(that, resumeFunction));
10081015
}
10091016

10101017
/**
@@ -1034,7 +1041,7 @@ public static <T> Observable<T> onErrorReturn(final Observable<T> that, Func1<Ex
10341041
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
10351042
*/
10361043
public static <T> Observable<T> reduce(Observable<T> sequence, Func2<T, T, T> accumulator) {
1037-
return last(create(OperationScan.scan(sequence, accumulator)));
1044+
return last(_create(OperationScan.scan(sequence, accumulator)));
10381045
}
10391046

10401047
/**
@@ -1106,7 +1113,7 @@ public T call(T t1, T t2) {
11061113
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
11071114
*/
11081115
public static <T> Observable<T> reduce(Observable<T> sequence, T initialValue, Func2<T, T, T> accumulator) {
1109-
return last(create(OperationScan.scan(sequence, initialValue, accumulator)));
1116+
return last(_create(OperationScan.scan(sequence, initialValue, accumulator)));
11101117
}
11111118

11121119
/**
@@ -1170,7 +1177,7 @@ public T call(T t1, T t2) {
11701177
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
11711178
*/
11721179
public static <T> Observable<T> scan(Observable<T> sequence, Func2<T, T, T> accumulator) {
1173-
return create(OperationScan.scan(sequence, accumulator));
1180+
return _create(OperationScan.scan(sequence, accumulator));
11741181
}
11751182

11761183
/**
@@ -1228,7 +1235,7 @@ public T call(T t1, T t2) {
12281235
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
12291236
*/
12301237
public static <T> Observable<T> scan(Observable<T> sequence, T initialValue, Func2<T, T, T> accumulator) {
1231-
return create(OperationScan.scan(sequence, initialValue, accumulator));
1238+
return _create(OperationScan.scan(sequence, initialValue, accumulator));
12321239
}
12331240

12341241
/**
@@ -1282,7 +1289,7 @@ public T call(T t1, T t2) {
12821289
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229847(v=vs.103).aspx">MSDN: Observable.Skip Method</a>
12831290
*/
12841291
public static <T> Observable<T> skip(final Observable<T> items, int num) {
1285-
return create(OperationSkip.skip(items, num));
1292+
return _create(OperationSkip.skip(items, num));
12861293
}
12871294

12881295
/**
@@ -1300,7 +1307,7 @@ public static <T> Observable<T> skip(final Observable<T> items, int num) {
13001307
* @return an Observable that is a chronologically well-behaved version of the source Observable
13011308
*/
13021309
public static <T> Observable<T> synchronize(Observable<T> observable) {
1303-
return create(OperationSynchronize.synchronize(observable));
1310+
return _create(OperationSynchronize.synchronize(observable));
13041311
}
13051312

13061313
/**
@@ -1322,7 +1329,7 @@ public static <T> Observable<T> synchronize(Observable<T> observable) {
13221329
* Observable
13231330
*/
13241331
public static <T> Observable<T> take(final Observable<T> items, final int num) {
1325-
return create(OperationTake.take(items, num));
1332+
return _create(OperationTake.take(items, num));
13261333
}
13271334

13281335
/**
@@ -1345,7 +1352,7 @@ public static <T> Observable<T> take(final Observable<T> items, final int num) {
13451352
* items emitted by the source Observable
13461353
*/
13471354
public static <T> Observable<List<T>> toList(final Observable<T> that) {
1348-
return create(OperationToObservableList.toObservableList(that));
1355+
return _create(OperationToObservableList.toObservableList(that));
13491356
}
13501357

13511358
/**
@@ -1364,7 +1371,7 @@ public static <T> Observable<List<T>> toList(final Observable<T> that) {
13641371
* @return an Observable that emits each item in the source Iterable sequence
13651372
*/
13661373
public static <T> Observable<T> toObservable(Iterable<T> iterable) {
1367-
return create(OperationToObservableIterable.toObservableIterable(iterable));
1374+
return _create(OperationToObservableIterable.toObservableIterable(iterable));
13681375
}
13691376

13701377
/**
@@ -1382,7 +1389,7 @@ public static <T> Observable<T> toObservable(Iterable<T> iterable) {
13821389
* @return an Observable that emits the item from the source Future
13831390
*/
13841391
public static <T> Observable<T> toObservable(Future<T> future) {
1385-
return create(OperationToObservableFuture.toObservableFuture(future));
1392+
return _create(OperationToObservableFuture.toObservableFuture(future));
13861393
}
13871394

13881395
/**
@@ -1405,7 +1412,7 @@ public static <T> Observable<T> toObservable(Future<T> future) {
14051412
* @return an Observable that emits the item from the source Future
14061413
*/
14071414
public static <T> Observable<T> toObservable(Future<T> future, long time, TimeUnit unit) {
1408-
return create(OperationToObservableFuture.toObservableFuture(future, time, unit));
1415+
return _create(OperationToObservableFuture.toObservableFuture(future, time, unit));
14091416
}
14101417

14111418
/**
@@ -1438,7 +1445,7 @@ public static <T> Observable<T> toObservable(T... items) {
14381445
* @return
14391446
*/
14401447
public static <T> Observable<List<T>> toSortedList(Observable<T> sequence) {
1441-
return create(OperationToObservableSortedList.toSortedList(sequence));
1448+
return _create(OperationToObservableSortedList.toSortedList(sequence));
14421449
}
14431450

14441451
/**
@@ -1451,7 +1458,7 @@ public static <T> Observable<List<T>> toSortedList(Observable<T> sequence) {
14511458
* @return
14521459
*/
14531460
public static <T> Observable<List<T>> toSortedList(Observable<T> sequence, Func2<T, T, Integer> sortFunction) {
1454-
return create(OperationToObservableSortedList.toSortedList(sequence, sortFunction));
1461+
return _create(OperationToObservableSortedList.toSortedList(sequence, sortFunction));
14551462
}
14561463

14571464
/**
@@ -1466,7 +1473,7 @@ public static <T> Observable<List<T>> toSortedList(Observable<T> sequence, Func2
14661473
public static <T> Observable<List<T>> toSortedList(Observable<T> sequence, final Object sortFunction) {
14671474
@SuppressWarnings("rawtypes")
14681475
final FuncN _f = Functions.from(sortFunction);
1469-
return create(OperationToObservableSortedList.toSortedList(sequence, new Func2<T, T, Integer>() {
1476+
return _create(OperationToObservableSortedList.toSortedList(sequence, new Func2<T, T, Integer>() {
14701477

14711478
@Override
14721479
public Integer call(T t1, T t2) {
@@ -1501,7 +1508,7 @@ public Integer call(T t1, T t2) {
15011508
* @return an Observable that emits the zipped results
15021509
*/
15031510
public static <R, T0, T1> Observable<R> zip(Observable<T0> w0, Observable<T1> w1, Func2<T0, T1, R> reduceFunction) {
1504-
return create(OperationZip.zip(w0, w1, reduceFunction));
1511+
return _create(OperationZip.zip(w0, w1, reduceFunction));
15051512
}
15061513

15071514
/**
@@ -1571,7 +1578,7 @@ public R call(T0 t0, T1 t1) {
15711578
* @return an Observable that emits the zipped results
15721579
*/
15731580
public static <R, T0, T1, T2> Observable<R> zip(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Func3<T0, T1, T2, R> function) {
1574-
return create(OperationZip.zip(w0, w1, w2, function));
1581+
return _create(OperationZip.zip(w0, w1, w2, function));
15751582
}
15761583

15771584
/**
@@ -1646,7 +1653,7 @@ public R call(T0 t0, T1 t1, T2 t2) {
16461653
* @return an Observable that emits the zipped results
16471654
*/
16481655
public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Observable<T3> w3, Func4<T0, T1, T2, T3, R> reduceFunction) {
1649-
return create(OperationZip.zip(w0, w1, w2, w3, reduceFunction));
1656+
return _create(OperationZip.zip(w0, w1, w2, w3, reduceFunction));
16501657
}
16511658

16521659
/**

0 commit comments

Comments
 (0)