Skip to content

Commit a0a18ac

Browse files
Removing SynchronizedObserver usage from Subject implementations.
- We don't need to add synchronization as the subjects can trust their source Observables to comply with the Rx contract. - This optimization follows Rx Design Guidelines 6.8. Avoid serializing operators This was discussed at ReactiveX#256
1 parent b8f4310 commit a0a18ac

File tree

3 files changed

+6
-13
lines changed

3 files changed

+6
-13
lines changed

rxjava-core/src/main/java/rx/subjects/AsyncSubject.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,8 @@
1515
*/
1616
package rx.subjects;
1717

18-
import static org.mockito.Matchers.any;
19-
import static org.mockito.Mockito.mock;
20-
import static org.mockito.Mockito.times;
21-
import static org.mockito.Mockito.verify;
22-
import static org.mockito.Matchers.anyString;
18+
import static org.mockito.Matchers.*;
19+
import static org.mockito.Mockito.*;
2320

2421
import java.util.concurrent.ConcurrentHashMap;
2522
import java.util.concurrent.atomic.AtomicReference;
@@ -30,7 +27,6 @@
3027
import rx.Observer;
3128
import rx.Subscription;
3229
import rx.util.AtomicObservableSubscription;
33-
import rx.util.SynchronizedObserver;
3430
import rx.util.functions.Action1;
3531
import rx.util.functions.Func0;
3632
import rx.util.functions.Func1;
@@ -80,7 +76,7 @@ public void unsubscribe() {
8076
});
8177

8278
// on subscribe add it to the map of outbound observers to notify
83-
observers.put(subscription, new SynchronizedObserver<T>(observer, subscription));
79+
observers.put(subscription, observer);
8480
return subscription;
8581
}
8682
};

rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import rx.Observer;
2828
import rx.Subscription;
2929
import rx.util.AtomicObservableSubscription;
30-
import rx.util.SynchronizedObserver;
3130
import rx.util.functions.Action1;
3231
import rx.util.functions.Func0;
3332
import rx.util.functions.Func1;
@@ -86,11 +85,10 @@ public void unsubscribe() {
8685
}
8786
});
8887

89-
SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(observer, subscription);
90-
synchronizedObserver.onNext(currentValue.get());
88+
observer.onNext(currentValue.get());
9189

9290
// on subscribe add it to the map of outbound observers to notify
93-
observers.put(subscription, synchronizedObserver);
91+
observers.put(subscription, observer);
9492
return subscription;
9593
}
9694
};

rxjava-core/src/main/java/rx/subjects/PublishSubject.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import rx.Observer;
3535
import rx.Subscription;
3636
import rx.util.AtomicObservableSubscription;
37-
import rx.util.SynchronizedObserver;
3837
import rx.util.functions.Action1;
3938
import rx.util.functions.Func0;
4039
import rx.util.functions.Func1;
@@ -78,7 +77,7 @@ public void unsubscribe() {
7877
});
7978

8079
// on subscribe add it to the map of outbound observers to notify
81-
observers.put(subscription, new SynchronizedObserver<T>(observer, subscription));
80+
observers.put(subscription, observer);
8281
return subscription;
8382
}
8483
};

0 commit comments

Comments
 (0)