Skip to content

Commit 7e8fc57

Browse files
Merge pull request ReactiveX#1271 from akarnokd/OperatorRetryWithPredicate
Operator Retry with predicate
2 parents dd52daf + 1c20ab0 commit 7e8fc57

File tree

4 files changed

+408
-6
lines changed

4 files changed

+408
-6
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5389,7 +5389,18 @@ public final Observable<T> retry() {
53895389
public final Observable<T> retry(int retryCount) {
53905390
return nest().lift(new OperatorRetry<T>(retryCount));
53915391
}
5392-
5392+
/**
5393+
* Returns an Observable that mirrors the source Observable, resubscribing to it if it calls {@code onError}
5394+
* and the predicate returns true for that specific exception and retry count.
5395+
* @param predicate the predicate that determines if a resubscription may happen in case of a specific exception and retry
5396+
* count
5397+
* @return the Observable modified with retry logic
5398+
* @see #retry()
5399+
*/
5400+
public final Observable<T> retry(Func2<Integer, Throwable, Boolean> predicate) {
5401+
return nest().lift(new OperatorRetryWithPredicate<T>(predicate));
5402+
}
5403+
53935404
/**
53945405
* Returns an Observable that emits the most recently emitted item (if any) emitted by the source Observable
53955406
* within periodic time intervals.
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
19+
import rx.Observable;
20+
import rx.Scheduler;
21+
import rx.Subscriber;
22+
import rx.functions.Action0;
23+
import rx.functions.Func2;
24+
import rx.schedulers.Schedulers;
25+
import rx.subscriptions.SerialSubscription;
26+
27+
public final class OperatorRetryWithPredicate<T> implements Observable.Operator<T, Observable<T>> {
28+
final Func2<Integer, Throwable, Boolean> predicate;
29+
public OperatorRetryWithPredicate(Func2<Integer, Throwable, Boolean> predicate) {
30+
this.predicate = predicate;
31+
}
32+
33+
@Override
34+
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> child) {
35+
final Scheduler.Worker inner = Schedulers.trampoline().createWorker();
36+
child.add(inner);
37+
38+
final SerialSubscription serialSubscription = new SerialSubscription();
39+
// add serialSubscription so it gets unsubscribed if child is unsubscribed
40+
child.add(serialSubscription);
41+
42+
return new SourceSubscriber<T>(child, predicate, inner, serialSubscription);
43+
}
44+
45+
static final class SourceSubscriber<T> extends Subscriber<Observable<T>> {
46+
final Subscriber<? super T> child;
47+
final Func2<Integer, Throwable, Boolean> predicate;
48+
final Scheduler.Worker inner;
49+
final SerialSubscription serialSubscription;
50+
51+
volatile int attempts;
52+
@SuppressWarnings("rawtypes")
53+
static final AtomicIntegerFieldUpdater<SourceSubscriber> ATTEMPTS_UPDATER
54+
= AtomicIntegerFieldUpdater.newUpdater(SourceSubscriber.class, "attempts");
55+
56+
public SourceSubscriber(Subscriber<? super T> child, final Func2<Integer, Throwable, Boolean> predicate, Scheduler.Worker inner,
57+
SerialSubscription serialSubscription) {
58+
this.child = child;
59+
this.predicate = predicate;
60+
this.inner = inner;
61+
this.serialSubscription = serialSubscription;
62+
}
63+
64+
65+
@Override
66+
public void onCompleted() {
67+
// ignore as we expect a single nested Observable<T>
68+
}
69+
70+
@Override
71+
public void onError(Throwable e) {
72+
child.onError(e);
73+
}
74+
75+
@Override
76+
public void onNext(final Observable<T> o) {
77+
inner.schedule(new Action0() {
78+
79+
@Override
80+
public void call() {
81+
final Action0 _self = this;
82+
ATTEMPTS_UPDATER.incrementAndGet(SourceSubscriber.this);
83+
84+
// new subscription each time so if it unsubscribes itself it does not prevent retries
85+
// by unsubscribing the child subscription
86+
Subscriber<T> subscriber = new Subscriber<T>() {
87+
88+
@Override
89+
public void onCompleted() {
90+
child.onCompleted();
91+
}
92+
93+
@Override
94+
public void onError(Throwable e) {
95+
if (predicate.call(attempts, e) && !inner.isUnsubscribed()) {
96+
// retry again
97+
inner.schedule(_self);
98+
} else {
99+
// give up and pass the failure
100+
child.onError(e);
101+
}
102+
}
103+
104+
@Override
105+
public void onNext(T v) {
106+
child.onNext(v);
107+
}
108+
109+
};
110+
// register this Subscription (and unsubscribe previous if exists)
111+
serialSubscription.set(subscriber);
112+
o.unsafeSubscribe(subscriber);
113+
}
114+
});
115+
}
116+
}
117+
}

rxjava-core/src/test/java/rx/operators/OperatorRetryTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -260,11 +260,11 @@ public void call(Subscriber<? super String> s) {
260260
assertEquals(4, subsCount.get()); // 1 + 3 retries
261261
}
262262

263-
class SlowObservable implements Observable.OnSubscribe<Long> {
263+
static final class SlowObservable implements Observable.OnSubscribe<Long> {
264264

265-
private AtomicInteger efforts = new AtomicInteger(0);
266-
private AtomicInteger active = new AtomicInteger(0), maxActive = new AtomicInteger(0);
267-
private AtomicInteger nextBeforeFailure;
265+
final AtomicInteger efforts = new AtomicInteger(0);
266+
final AtomicInteger active = new AtomicInteger(0), maxActive = new AtomicInteger(0);
267+
final AtomicInteger nextBeforeFailure;
268268

269269
private final int emitDelay;
270270

@@ -273,6 +273,7 @@ public SlowObservable(int emitDelay, int countNext) {
273273
this.nextBeforeFailure = new AtomicInteger(countNext);
274274
}
275275

276+
@Override
276277
public void call(final Subscriber<? super Long> subscriber) {
277278
final AtomicBoolean terminate = new AtomicBoolean(false);
278279
efforts.getAndIncrement();
@@ -309,7 +310,7 @@ public void call() {
309310
}
310311

311312
/** Observer for listener on seperate thread */
312-
class AsyncObserver<T> implements Observer<T> {
313+
static final class AsyncObserver<T> implements Observer<T> {
313314

314315
protected CountDownLatch latch = new CountDownLatch(1);
315316

0 commit comments

Comments
 (0)