Skip to content

Commit 4b13ff6

Browse files
petermdbenjchristensen
authored andcommitted
unsubscribe before retry
1 parent b0460d0 commit 4b13ff6

File tree

2 files changed

+107
-3
lines changed

2 files changed

+107
-3
lines changed

rxjava-core/src/main/java/rx/operators/OperatorRetry.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,14 @@
3232
*/
3333

3434
import java.util.concurrent.atomic.AtomicInteger;
35+
import java.util.concurrent.atomic.AtomicReference;
3536

3637
import rx.Observable;
3738
import rx.Observable.Operator;
39+
import rx.Scheduler;
3840
import rx.Scheduler.Inner;
3941
import rx.Subscriber;
42+
import rx.Subscription;
4043
import rx.functions.Action1;
4144
import rx.schedulers.Schedulers;
4245

@@ -71,13 +74,16 @@ public void onError(Throwable e) {
7174

7275
@Override
7376
public void onNext(final Observable<T> o) {
77+
78+
final AtomicReference<Subscription> retrySub=new AtomicReference<Subscription>();
79+
7480
Schedulers.trampoline().schedule(new Action1<Inner>() {
7581

7682
@Override
7783
public void call(final Inner inner) {
7884
final Action1<Inner> _self = this;
7985
attempts.incrementAndGet();
80-
o.unsafeSubscribe(new Subscriber<T>(s) {
86+
retrySub.set(o.unsafeSubscribe(new Subscriber<T>(s) {
8187

8288
@Override
8389
public void onCompleted() {
@@ -88,7 +94,15 @@ public void onCompleted() {
8894
public void onError(Throwable e) {
8995
if ((retryCount == INFINITE_RETRY || attempts.get() <= retryCount) && !inner.isUnsubscribed()) {
9096
// retry again
91-
inner.schedule(_self);
97+
inner.schedule(new Action1<Inner>() {
98+
@Override
99+
public void call(Inner inner)
100+
{
101+
// Remove the failed subscription first
102+
retrySub.get().unsubscribe();
103+
_self.call(inner);
104+
}
105+
});
92106
} else {
93107
// give up and pass the failure
94108
s.onError(e);
@@ -100,7 +114,7 @@ public void onNext(T v) {
100114
s.onNext(v);
101115
}
102116

103-
});
117+
}));
104118
}
105119
});
106120
}

rxjava-core/src/test/java/rx/operators/OperatorRetryTest.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import static org.mockito.Matchers.*;
2020
import static org.mockito.Mockito.*;
2121

22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.TimeUnit;
2224
import java.util.concurrent.atomic.AtomicInteger;
2325

2426
import org.junit.Test;
@@ -27,6 +29,7 @@
2729
import rx.Observable;
2830
import rx.Observable.OnSubscribeFunc;
2931
import rx.Observer;
32+
import rx.Subscriber;
3033
import rx.Subscription;
3134
import rx.functions.Action1;
3235
import rx.subjects.PublishSubject;
@@ -147,6 +150,93 @@ public void call(Integer n) {
147150
assertEquals(1, count.get());
148151
}
149152

153+
public static class SlowFuncAlwaysFails implements Observable.OnSubscribe<String> {
154+
155+
final AtomicInteger nextSeq=new AtomicInteger();
156+
final AtomicInteger activeSubs=new AtomicInteger();
157+
final AtomicInteger concurrentSubs=new AtomicInteger();
158+
159+
public void call(final Subscriber<? super String> s)
160+
{
161+
final int seq=nextSeq.incrementAndGet();
162+
163+
int cur=activeSubs.incrementAndGet();
164+
// Track concurrent subscriptions
165+
concurrentSubs.set(Math.max(cur,concurrentSubs.get()));
166+
167+
// Use async error
168+
new Thread(new Runnable() {
169+
@Override
170+
public void run() {
171+
try {
172+
Thread.sleep(100);
173+
} catch (InterruptedException e) {
174+
// ignore
175+
}
176+
s.onError(new RuntimeException("Subscriber #"+seq+" fails"));
177+
}
178+
}).start();
179+
180+
// Track unsubscribes
181+
s.add(new Subscription()
182+
{
183+
private boolean active=true;
184+
185+
public void unsubscribe()
186+
{
187+
if (active) {
188+
activeSubs.decrementAndGet();
189+
active=false;
190+
}
191+
}
192+
193+
public boolean isUnsubscribed()
194+
{
195+
return !active;
196+
}
197+
});
198+
}
199+
}
200+
201+
@Test
202+
public void testUnsubscribeAfterError() {
203+
204+
final CountDownLatch check=new CountDownLatch(1);
205+
final SlowFuncAlwaysFails sf=new SlowFuncAlwaysFails();
206+
207+
Observable
208+
.create(sf)
209+
.retry(4)
210+
.subscribe(
211+
new Action1<String>()
212+
{
213+
@Override
214+
public void call(String v)
215+
{
216+
fail("Should never happen");
217+
}
218+
},
219+
new Action1<Throwable>()
220+
{
221+
public void call(Throwable throwable)
222+
{
223+
check.countDown();
224+
}
225+
}
226+
);
227+
228+
try
229+
{
230+
check.await(1, TimeUnit.SECONDS);
231+
} catch (InterruptedException e)
232+
{
233+
fail("interrupted");
234+
}
235+
236+
assertEquals("5 Subscribers created", 5, sf.nextSeq.get());
237+
assertEquals("1 Active Subscriber", 1, sf.concurrentSubs.get());
238+
}
239+
150240
@Test
151241
public void testRetryAllowsSubscriptionAfterAllSubscriptionsUnsubsribed() throws InterruptedException {
152242
final AtomicInteger subsCount = new AtomicInteger(0);

0 commit comments

Comments
 (0)