Skip to content

Commit 15c385d

Browse files
Reduce Subscription Object Allocation
- significant reduction in object allocations - details on research available at ReactiveX#1204
1 parent c6c17cc commit 15c385d

File tree

8 files changed

+471
-127
lines changed

8 files changed

+471
-127
lines changed

rxjava-core/src/main/java/rx/Subscriber.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx;
1717

18+
import rx.subscriptions.ChainedSubscription;
1819
import rx.subscriptions.CompositeSubscription;
1920

2021
/**
@@ -30,17 +31,23 @@
3031
*/
3132
public abstract class Subscriber<T> implements Observer<T>, Subscription {
3233

33-
private final CompositeSubscription cs;
34+
private final ChainedSubscription cs;
3435

35-
protected Subscriber(CompositeSubscription cs) {
36+
protected Subscriber(ChainedSubscription cs) {
3637
if (cs == null) {
3738
throw new IllegalArgumentException("The CompositeSubscription can not be null");
3839
}
3940
this.cs = cs;
4041
}
42+
43+
@Deprecated
44+
protected Subscriber(CompositeSubscription cs) {
45+
this(new ChainedSubscription());
46+
add(cs);
47+
}
4148

4249
protected Subscriber() {
43-
this(new CompositeSubscription());
50+
this(new ChainedSubscription());
4451
}
4552

4653
protected Subscriber(Subscriber<?> op) {

rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import rx.functions.Action0;
2727
import rx.functions.Func1;
2828
import rx.observables.GroupedObservable;
29-
import rx.subscriptions.CompositeSubscription;
29+
import rx.subscriptions.ChainedSubscription;
3030
import rx.subscriptions.Subscriptions;
3131

3232
/**
@@ -55,7 +55,7 @@ static final class GroupBySubscriber<K, T> extends Subscriber<T> {
5555
public GroupBySubscriber(Func1<? super T, ? extends K> keySelector, Subscriber<? super GroupedObservable<K, T>> childObserver) {
5656
// a new CompositeSubscription to decouple the subscription as the inner subscriptions need a separate lifecycle
5757
// and will unsubscribe on this parent if they are all unsubscribed
58-
super(new CompositeSubscription());
58+
super(new ChainedSubscription());
5959
this.keySelector = keySelector;
6060
this.childObserver = childObserver;
6161
}

rxjava-core/src/main/java/rx/operators/OperatorPivot.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,15 @@
2828
import rx.Subscriber;
2929
import rx.functions.Action0;
3030
import rx.observables.GroupedObservable;
31-
import rx.subscriptions.CompositeSubscription;
31+
import rx.subscriptions.ChainedSubscription;
3232
import rx.subscriptions.Subscriptions;
3333

3434
public final class OperatorPivot<K1, K2, T> implements Operator<GroupedObservable<K2, GroupedObservable<K1, T>>, GroupedObservable<K1, GroupedObservable<K2, T>>> {
3535

3636
@Override
3737
public Subscriber<? super GroupedObservable<K1, GroupedObservable<K2, T>>> call(final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
3838
final AtomicReference<State> state = new AtomicReference<State>(State.create());
39-
final PivotSubscriber<K1, K2, T> pivotSubscriber = new PivotSubscriber<K1, K2, T>(new CompositeSubscription(), child, state);
39+
final PivotSubscriber<K1, K2, T> pivotSubscriber = new PivotSubscriber<K1, K2, T>(new ChainedSubscription(), child, state);
4040
child.add(Subscriptions.create(new Action0() {
4141

4242
@Override
@@ -65,12 +65,12 @@ private static final class PivotSubscriber<K1, K2, T> extends Subscriber<Grouped
6565
* needs to decouple the subscription as the inner subscriptions need a separate lifecycle
6666
* and will unsubscribe on this parent if they are all unsubscribed
6767
*/
68-
private final CompositeSubscription parentSubscription;
68+
private final ChainedSubscription parentSubscription;
6969
private final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child;
7070
private final AtomicReference<State> state;
7171
private final GroupState<K1, K2, T> groups;
7272

73-
private PivotSubscriber(CompositeSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child, AtomicReference<State> state) {
73+
private PivotSubscriber(ChainedSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child, AtomicReference<State> state) {
7474
super(parentSubscription);
7575
this.parentSubscription = parentSubscription;
7676
this.child = child;
@@ -158,7 +158,7 @@ public void onNext(T t) {
158158
private static final class GroupState<K1, K2, T> {
159159
private final ConcurrentHashMap<KeyPair<K1, K2>, Inner<K1, K2, T>> innerSubjects = new ConcurrentHashMap<KeyPair<K1, K2>, Inner<K1, K2, T>>();
160160
private final ConcurrentHashMap<K2, Outer<K1, K2, T>> outerSubjects = new ConcurrentHashMap<K2, Outer<K1, K2, T>>();
161-
private final CompositeSubscription parentSubscription;
161+
private final ChainedSubscription parentSubscription;
162162
private final Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child;
163163
/** Indicates a terminal state. */
164164
volatile int completed;
@@ -167,7 +167,7 @@ private static final class GroupState<K1, K2, T> {
167167
static final AtomicIntegerFieldUpdater<GroupState> COMPLETED_UPDATER
168168
= AtomicIntegerFieldUpdater.newUpdater(GroupState.class, "completed");
169169

170-
public GroupState(CompositeSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
170+
public GroupState(ChainedSubscription parentSubscription, Subscriber<? super GroupedObservable<K2, GroupedObservable<K1, T>>> child) {
171171
this.parentSubscription = parentSubscription;
172172
this.child = child;
173173
}

rxjava-core/src/main/java/rx/operators/OperatorTake.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import rx.Observable.Operator;
1919
import rx.Subscriber;
20-
import rx.subscriptions.CompositeSubscription;
20+
import rx.subscriptions.ChainedSubscription;
2121

2222
/**
2323
* Returns an Observable that emits the first <code>num</code> items emitted by the source
@@ -40,7 +40,7 @@ public OperatorTake(int limit) {
4040

4141
@Override
4242
public Subscriber<? super T> call(final Subscriber<? super T> child) {
43-
final CompositeSubscription parent = new CompositeSubscription();
43+
final ChainedSubscription parent = new ChainedSubscription();
4444
if (limit == 0) {
4545
child.onCompleted();
4646
parent.unsubscribe();

rxjava-core/src/main/java/rx/operators/OperatorUnsubscribeOn.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import rx.Scheduler;
2020
import rx.Subscriber;
2121
import rx.functions.Action0;
22-
import rx.subscriptions.CompositeSubscription;
22+
import rx.subscriptions.ChainedSubscription;
2323
import rx.subscriptions.Subscriptions;
2424

2525
/**
@@ -36,7 +36,7 @@ public OperatorUnsubscribeOn(Scheduler scheduler) {
3636

3737
@Override
3838
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
39-
final CompositeSubscription parentSubscription = new CompositeSubscription();
39+
final ChainedSubscription parentSubscription = new ChainedSubscription();
4040
subscriber.add(Subscriptions.create(new Action0() {
4141

4242
@Override
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.subscriptions;
17+
18+
import java.util.ArrayList;
19+
import java.util.Arrays;
20+
import java.util.Collection;
21+
import java.util.LinkedList;
22+
import java.util.List;
23+
24+
import rx.Subscription;
25+
import rx.exceptions.CompositeException;
26+
27+
/**
28+
* Subscription that represents a group of Subscriptions that are unsubscribed together.
29+
*
30+
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net equivalent CompositeDisposable</a>
31+
*/
32+
public final class ChainedSubscription implements Subscription {
33+
34+
private List<Subscription> subscriptions;
35+
private boolean unsubscribed = false;
36+
37+
public ChainedSubscription() {
38+
}
39+
40+
public ChainedSubscription(final Subscription... subscriptions) {
41+
this.subscriptions = new LinkedList<Subscription>(Arrays.asList(subscriptions));
42+
}
43+
44+
@Override
45+
public synchronized boolean isUnsubscribed() {
46+
return unsubscribed;
47+
}
48+
49+
public void add(final Subscription s) {
50+
Subscription unsubscribe = null;
51+
synchronized (this) {
52+
if (unsubscribed) {
53+
unsubscribe = s;
54+
} else {
55+
if (subscriptions == null) {
56+
subscriptions = new LinkedList<Subscription>();
57+
}
58+
subscriptions.add(s);
59+
}
60+
}
61+
if (unsubscribe != null) {
62+
// call after leaving the synchronized block so we're not holding a lock while executing this
63+
unsubscribe.unsubscribe();
64+
}
65+
}
66+
67+
@Override
68+
public void unsubscribe() {
69+
synchronized (this) {
70+
if (unsubscribed) {
71+
return;
72+
}
73+
unsubscribed = true;
74+
}
75+
// we will only get here once
76+
unsubscribeFromAll(subscriptions);
77+
}
78+
79+
private static void unsubscribeFromAll(Collection<Subscription> subscriptions) {
80+
if (subscriptions == null) {
81+
return;
82+
}
83+
List<Throwable> es = null;
84+
for (Subscription s : subscriptions) {
85+
try {
86+
s.unsubscribe();
87+
} catch (Throwable e) {
88+
if (es == null) {
89+
es = new ArrayList<Throwable>();
90+
}
91+
es.add(e);
92+
}
93+
}
94+
if (es != null) {
95+
if (es.size() == 1) {
96+
Throwable t = es.get(0);
97+
if (t instanceof RuntimeException) {
98+
throw (RuntimeException) t;
99+
} else {
100+
throw new CompositeException(
101+
"Failed to unsubscribe to 1 or more subscriptions.", es);
102+
}
103+
} else {
104+
throw new CompositeException(
105+
"Failed to unsubscribe to 2 or more subscriptions.", es);
106+
}
107+
}
108+
}
109+
}

0 commit comments

Comments
 (0)