Skip to content

Commit b4a0342

Browse files
committed
Operator Retry with predicate
1 parent e8afd05 commit b4a0342

File tree

4 files changed

+439
-6
lines changed

4 files changed

+439
-6
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5388,7 +5388,30 @@ public final Observable<T> retry() {
53885388
public final Observable<T> retry(int retryCount) {
53895389
return nest().lift(new OperatorRetry<T>(retryCount));
53905390
}
5391-
5391+
/**
5392+
* Returns an Observable that mirrors the source Observable, resubscribing to it if it calls {@code onError}
5393+
* and the predicate returns true for that specific exception.
5394+
* @param predicate the predicate that determines if a resubscription may happen in case of a specific exception
5395+
* @return the Observable modified with retry logic
5396+
* @see #retry()
5397+
* @see #retryIf(rx.functions.Func2)
5398+
*/
5399+
public final Observable<T> retry(Func1<Throwable, Boolean> predicate) {
5400+
return nest().lift(new OperatorRetryWithPredicate<T>(predicate));
5401+
}
5402+
/**
5403+
* Returns an Observable that mirrors the source Observable, resubscribing to it if it calls {@code onError}
5404+
* and the predicate returns true for that specific exception and retry count.
5405+
* @param predicate the predicate that determines if a resubscription may happen in case of a specific exception and retry
5406+
* count
5407+
* @return the Observable modified with retry logic
5408+
* @see #retry()
5409+
* @see #retry(rx.functions.Func1)
5410+
*/
5411+
public final Observable<T> retryIf(Func2<Integer, Throwable, Boolean> predicate) {
5412+
return nest().lift(new OperatorRetryWithPredicate<T>(predicate));
5413+
}
5414+
53925415
/**
53935416
* Returns an Observable that emits the most recently emitted item (if any) emitted by the source Observable
53945417
* within periodic time intervals.
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
19+
import rx.Observable;
20+
import rx.Scheduler;
21+
import rx.Subscriber;
22+
import rx.functions.Action0;
23+
import rx.functions.Func1;
24+
import rx.functions.Func2;
25+
import rx.schedulers.Schedulers;
26+
import rx.subscriptions.SerialSubscription;
27+
28+
public final class OperatorRetryWithPredicate<T> implements Observable.Operator<T, Observable<T>> {
29+
final Func2<Integer, Throwable, Boolean> predicate;
30+
public OperatorRetryWithPredicate(Func1<Throwable, Boolean> predicate) {
31+
this.predicate = new IgnoreIndexPredicate(predicate);
32+
}
33+
public OperatorRetryWithPredicate(Func2<Integer, Throwable, Boolean> predicate) {
34+
this.predicate = predicate;
35+
}
36+
37+
@Override
38+
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> child) {
39+
final Scheduler.Worker inner = Schedulers.trampoline().createWorker();
40+
child.add(inner);
41+
42+
final SerialSubscription serialSubscription = new SerialSubscription();
43+
// add serialSubscription so it gets unsubscribed if child is unsubscribed
44+
child.add(serialSubscription);
45+
46+
return new SourceSubscriber<T>(child, predicate, inner, serialSubscription);
47+
}
48+
49+
/** Ignore the index parameter and call a Func1 predicate with the throwable only. */
50+
static final class IgnoreIndexPredicate implements Func2<Integer, Throwable, Boolean> {
51+
final Func1<Throwable, Boolean> predicate;
52+
53+
public IgnoreIndexPredicate(Func1<Throwable, Boolean> predicate) {
54+
this.predicate = predicate;
55+
}
56+
57+
@Override
58+
public Boolean call(Integer t1, Throwable t2) {
59+
return predicate.call(t2);
60+
}
61+
62+
}
63+
64+
static final class SourceSubscriber<T> extends Subscriber<Observable<T>> {
65+
final Subscriber<? super T> child;
66+
final Func2<Integer, Throwable, Boolean> predicate;
67+
final Scheduler.Worker inner;
68+
final SerialSubscription serialSubscription;
69+
70+
volatile int attempts;
71+
@SuppressWarnings("rawtypes")
72+
static final AtomicIntegerFieldUpdater<SourceSubscriber> ATTEMPTS_UPDATER
73+
= AtomicIntegerFieldUpdater.newUpdater(SourceSubscriber.class, "attempts");
74+
75+
public SourceSubscriber(Subscriber<? super T> child, final Func2<Integer, Throwable, Boolean> predicate, Scheduler.Worker inner,
76+
SerialSubscription serialSubscription) {
77+
this.child = child;
78+
this.predicate = predicate;
79+
this.inner = inner;
80+
this.serialSubscription = serialSubscription;
81+
}
82+
83+
84+
@Override
85+
public void onCompleted() {
86+
// ignore as we expect a single nested Observable<T>
87+
}
88+
89+
@Override
90+
public void onError(Throwable e) {
91+
child.onError(e);
92+
}
93+
94+
@Override
95+
public void onNext(final Observable<T> o) {
96+
inner.schedule(new Action0() {
97+
98+
@Override
99+
public void call() {
100+
final Action0 _self = this;
101+
ATTEMPTS_UPDATER.incrementAndGet(SourceSubscriber.this);
102+
103+
// new subscription each time so if it unsubscribes itself it does not prevent retries
104+
// by unsubscribing the child subscription
105+
Subscriber<T> subscriber = new Subscriber<T>() {
106+
107+
@Override
108+
public void onCompleted() {
109+
child.onCompleted();
110+
}
111+
112+
@Override
113+
public void onError(Throwable e) {
114+
if (predicate.call(attempts, e) && !inner.isUnsubscribed()) {
115+
// retry again
116+
inner.schedule(_self);
117+
} else {
118+
// give up and pass the failure
119+
child.onError(e);
120+
}
121+
}
122+
123+
@Override
124+
public void onNext(T v) {
125+
child.onNext(v);
126+
}
127+
128+
};
129+
// register this Subscription (and unsubscribe previous if exists)
130+
serialSubscription.set(subscriber);
131+
o.unsafeSubscribe(subscriber);
132+
}
133+
});
134+
}
135+
}
136+
}

rxjava-core/src/test/java/rx/operators/OperatorRetryTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -260,11 +260,11 @@ public void call(Subscriber<? super String> s) {
260260
assertEquals(4, subsCount.get()); // 1 + 3 retries
261261
}
262262

263-
class SlowObservable implements Observable.OnSubscribe<Long> {
263+
static final class SlowObservable implements Observable.OnSubscribe<Long> {
264264

265-
private AtomicInteger efforts = new AtomicInteger(0);
266-
private AtomicInteger active = new AtomicInteger(0), maxActive = new AtomicInteger(0);
267-
private AtomicInteger nextBeforeFailure;
265+
final AtomicInteger efforts = new AtomicInteger(0);
266+
final AtomicInteger active = new AtomicInteger(0), maxActive = new AtomicInteger(0);
267+
final AtomicInteger nextBeforeFailure;
268268

269269
private final int emitDelay;
270270

@@ -273,6 +273,7 @@ public SlowObservable(int emitDelay, int countNext) {
273273
this.nextBeforeFailure = new AtomicInteger(countNext);
274274
}
275275

276+
@Override
276277
public void call(final Subscriber<? super Long> subscriber) {
277278
final AtomicBoolean terminate = new AtomicBoolean(false);
278279
efforts.getAndIncrement();
@@ -309,7 +310,7 @@ public void call() {
309310
}
310311

311312
/** Observer for listener on seperate thread */
312-
class AsyncObserver<T> implements Observer<T> {
313+
static final class AsyncObserver<T> implements Observer<T> {
313314

314315
protected CountDownLatch latch = new CountDownLatch(1);
315316

0 commit comments

Comments
 (0)