Skip to content

Commit 9a0f54f

Browse files
OperationSynchronize -> OperatorSynchronize
1 parent fc2e45f commit 9a0f54f

File tree

4 files changed

+767
-103
lines changed

4 files changed

+767
-103
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@
9494
import rx.operators.OperatorSkipWhile;
9595
import rx.operators.OperationSum;
9696
import rx.operators.OperationSwitch;
97-
import rx.operators.OperationSynchronize;
9897
import rx.operators.OperationTakeLast;
9998
import rx.operators.OperationTakeTimed;
10099
import rx.operators.OperationTakeUntil;
@@ -122,6 +121,7 @@
122121
import rx.operators.OperatorScan;
123122
import rx.operators.OperatorSkip;
124123
import rx.operators.OperatorSubscribeOn;
124+
import rx.operators.OperatorSynchronize;
125125
import rx.operators.OperatorTake;
126126
import rx.operators.OperatorTimeout;
127127
import rx.operators.OperatorTimeoutWithSelector;
@@ -2712,7 +2712,7 @@ public final static <T> Observable<T> switchOnNext(Observable<? extends Observab
27122712
*/
27132713
@Deprecated
27142714
public final static <T> Observable<T> synchronize(Observable<T> source) {
2715-
return create(OperationSynchronize.synchronize(source));
2715+
return source.synchronize();
27162716
}
27172717

27182718
/**
@@ -7261,7 +7261,7 @@ public final <R> Observable<R> switchMap(Func1<? super T, ? extends Observable<?
72617261
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-synchronize">RxJava Wiki: synchronize()</a>
72627262
*/
72637263
public final Observable<T> synchronize() {
7264-
return create(OperationSynchronize.synchronize(this));
7264+
return lift(new OperatorSynchronize<T>());
72657265
}
72667266

72677267
/**
@@ -7285,7 +7285,7 @@ public final Observable<T> synchronize() {
72857285
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-synchronize">RxJava Wiki: synchronize()</a>
72867286
*/
72877287
public final Observable<T> synchronize(Object lock) {
7288-
return create(OperationSynchronize.synchronize(this, lock));
7288+
return lift(new OperatorSynchronize<T>(lock));
72897289
}
72907290

72917291
/**

rxjava-core/src/main/java/rx/operators/OperationSynchronize.java

Lines changed: 0 additions & 99 deletions
This file was deleted.
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable;
19+
import rx.Observable.Operator;
20+
import rx.Subscriber;
21+
import rx.observers.SynchronizedSubscriber;
22+
23+
/**
24+
* Wraps an Observable in another Observable that ensures that the resulting Observable is
25+
* chronologically well-behaved.
26+
* <p>
27+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/synchronize.png">
28+
* <p>
29+
* A well-behaved Observable does not interleave its invocations of the <code>onNext</code>,
30+
* <code>onCompleted</code>, and <code>onError</code> methods of its Observers; it invokes
31+
* <code>onCompleted</code> or <code>onError</code> only once; and it never invokes
32+
* <code>onNext</code> after invoking either <code>onCompleted</code> or <code>onError</code>. The
33+
* synchronize operation enforces this, and the Observable it returns invokes <code>onNext</code>
34+
* and <code>onCompleted</code> or <code>onError</code> synchronously.
35+
* <p>
36+
* NOTE: {@link Observable#create} already wraps Observables so this is generally redundant.
37+
*
38+
* @param <T>
39+
* The type of the observable sequence.
40+
*/
41+
public final class OperatorSynchronize<T> implements Operator<T, T> {
42+
43+
final Object lock;
44+
45+
public OperatorSynchronize(Object lock) {
46+
this.lock = lock;
47+
}
48+
49+
public OperatorSynchronize() {
50+
this.lock = new Object();
51+
}
52+
53+
@Override
54+
public Subscriber<? super T> call(final Subscriber<? super T> s) {
55+
return new SynchronizedSubscriber<T>(new Subscriber<T>(s) {
56+
57+
@Override
58+
public void onCompleted() {
59+
s.onCompleted();
60+
}
61+
62+
@Override
63+
public void onError(Throwable e) {
64+
s.onError(e);
65+
}
66+
67+
@Override
68+
public void onNext(T t) {
69+
s.onNext(t);
70+
}
71+
72+
}, lock);
73+
}
74+
75+
}

0 commit comments

Comments
 (0)