Skip to content

Commit 8222607

Browse files
Merge pull request ReactiveX#477 from benjchristensen/subscription-bugfixes
CompositeSubscription bugfixes
2 parents 442292c + 1d5991c commit 8222607

File tree

2 files changed

+94
-4
lines changed

2 files changed

+94
-4
lines changed

rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,36 @@ public CompositeSubscription(Subscription... subscriptions) {
5151
}
5252
}
5353

54+
/**
55+
* Remove and unsubscribe all subscriptions but do not unsubscribe the outer CompositeSubscription.
56+
*/
57+
public void clear() {
58+
Collection<Throwable> es = null;
59+
for (Subscription s : subscriptions.keySet()) {
60+
try {
61+
s.unsubscribe();
62+
this.subscriptions.remove(s);
63+
} catch (Throwable e) {
64+
if (es == null) {
65+
es = new ArrayList<Throwable>();
66+
}
67+
es.add(e);
68+
}
69+
}
70+
if (es != null) {
71+
throw new CompositeException("Failed to unsubscribe to 1 or more subscriptions.", es);
72+
}
73+
}
74+
75+
/**
76+
* Remove the {@link Subscription} and unsubscribe it.
77+
*
78+
* @param s
79+
*/
5480
public void remove(Subscription s) {
5581
this.subscriptions.remove(s);
82+
// also unsubscribe from it: http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx
83+
s.unsubscribe();
5684
}
5785

5886
public boolean isUnsubscribed() {

rxjava-core/src/test/java/rx/subscriptions/CompositeSubscriptionTest.java

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -82,4 +82,66 @@ public void unsubscribe() {
8282
// we should still have unsubscribed to the second one
8383
assertEquals(1, counter.get());
8484
}
85+
86+
@Test
87+
public void testRemoveUnsubscribes() {
88+
BooleanSubscription s1 = new BooleanSubscription();
89+
BooleanSubscription s2 = new BooleanSubscription();
90+
91+
CompositeSubscription s = new CompositeSubscription();
92+
s.add(s1);
93+
s.add(s2);
94+
95+
s.remove(s1);
96+
97+
assertTrue(s1.isUnsubscribed());
98+
assertFalse(s2.isUnsubscribed());
99+
}
100+
101+
@Test
102+
public void testClear() {
103+
BooleanSubscription s1 = new BooleanSubscription();
104+
BooleanSubscription s2 = new BooleanSubscription();
105+
106+
CompositeSubscription s = new CompositeSubscription();
107+
s.add(s1);
108+
s.add(s2);
109+
110+
assertFalse(s1.isUnsubscribed());
111+
assertFalse(s2.isUnsubscribed());
112+
113+
s.clear();
114+
115+
assertTrue(s1.isUnsubscribed());
116+
assertTrue(s1.isUnsubscribed());
117+
assertFalse(s.isUnsubscribed());
118+
119+
BooleanSubscription s3 = new BooleanSubscription();
120+
121+
s.add(s3);
122+
s.unsubscribe();
123+
124+
assertTrue(s3.isUnsubscribed());
125+
assertTrue(s.isUnsubscribed());
126+
}
127+
128+
@Test
129+
public void testUnsubscribeIdempotence() {
130+
final AtomicInteger counter = new AtomicInteger();
131+
CompositeSubscription s = new CompositeSubscription();
132+
s.add(new Subscription() {
133+
134+
@Override
135+
public void unsubscribe() {
136+
counter.incrementAndGet();
137+
}
138+
});
139+
140+
s.unsubscribe();
141+
s.unsubscribe();
142+
s.unsubscribe();
143+
144+
// we should have only unsubscribed once
145+
assertEquals(1, counter.get());
146+
}
85147
}

0 commit comments

Comments
 (0)