Skip to content

Commit 95ab2a1

Browse files
committed
Merge branch 'master' of https://github.com/Netflix/RxJava into docs
+ improve retry(Func2) javadocs
2 parents ea2249a + 7e8fc57 commit 95ab2a1

File tree

4 files changed

+413
-5
lines changed

4 files changed

+413
-5
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5392,6 +5392,23 @@ public final Observable<T> retry(int retryCount) {
53925392
return nest().lift(new OperatorRetry<T>(retryCount));
53935393
}
53945394

5395+
/**
5396+
* Returns an Observable that mirrors the source Observable, resubscribing to it if it calls {@code onError}
5397+
* and the predicate returns true for that specific exception and retry count.
5398+
* <p>
5399+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/retry.png">
5400+
*
5401+
* @param predicate
5402+
* the predicate that determines if a resubscription may happen in case of a specific exception
5403+
* and retry count
5404+
* @return the source Observable modified with retry logic
5405+
* @see #retry()
5406+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#wiki-retry">RxJava Wiki: retry()</a>
5407+
*/
5408+
public final Observable<T> retry(Func2<Integer, Throwable, Boolean> predicate) {
5409+
return nest().lift(new OperatorRetryWithPredicate<T>(predicate));
5410+
}
5411+
53955412
/**
53965413
* Returns an Observable that emits the most recently emitted item (if any) emitted by the source Observable
53975414
* within periodic time intervals.
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
19+
import rx.Observable;
20+
import rx.Scheduler;
21+
import rx.Subscriber;
22+
import rx.functions.Action0;
23+
import rx.functions.Func2;
24+
import rx.schedulers.Schedulers;
25+
import rx.subscriptions.SerialSubscription;
26+
27+
public final class OperatorRetryWithPredicate<T> implements Observable.Operator<T, Observable<T>> {
28+
final Func2<Integer, Throwable, Boolean> predicate;
29+
public OperatorRetryWithPredicate(Func2<Integer, Throwable, Boolean> predicate) {
30+
this.predicate = predicate;
31+
}
32+
33+
@Override
34+
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> child) {
35+
final Scheduler.Worker inner = Schedulers.trampoline().createWorker();
36+
child.add(inner);
37+
38+
final SerialSubscription serialSubscription = new SerialSubscription();
39+
// add serialSubscription so it gets unsubscribed if child is unsubscribed
40+
child.add(serialSubscription);
41+
42+
return new SourceSubscriber<T>(child, predicate, inner, serialSubscription);
43+
}
44+
45+
static final class SourceSubscriber<T> extends Subscriber<Observable<T>> {
46+
final Subscriber<? super T> child;
47+
final Func2<Integer, Throwable, Boolean> predicate;
48+
final Scheduler.Worker inner;
49+
final SerialSubscription serialSubscription;
50+
51+
volatile int attempts;
52+
@SuppressWarnings("rawtypes")
53+
static final AtomicIntegerFieldUpdater<SourceSubscriber> ATTEMPTS_UPDATER
54+
= AtomicIntegerFieldUpdater.newUpdater(SourceSubscriber.class, "attempts");
55+
56+
public SourceSubscriber(Subscriber<? super T> child, final Func2<Integer, Throwable, Boolean> predicate, Scheduler.Worker inner,
57+
SerialSubscription serialSubscription) {
58+
this.child = child;
59+
this.predicate = predicate;
60+
this.inner = inner;
61+
this.serialSubscription = serialSubscription;
62+
}
63+
64+
65+
@Override
66+
public void onCompleted() {
67+
// ignore as we expect a single nested Observable<T>
68+
}
69+
70+
@Override
71+
public void onError(Throwable e) {
72+
child.onError(e);
73+
}
74+
75+
@Override
76+
public void onNext(final Observable<T> o) {
77+
inner.schedule(new Action0() {
78+
79+
@Override
80+
public void call() {
81+
final Action0 _self = this;
82+
ATTEMPTS_UPDATER.incrementAndGet(SourceSubscriber.this);
83+
84+
// new subscription each time so if it unsubscribes itself it does not prevent retries
85+
// by unsubscribing the child subscription
86+
Subscriber<T> subscriber = new Subscriber<T>() {
87+
88+
@Override
89+
public void onCompleted() {
90+
child.onCompleted();
91+
}
92+
93+
@Override
94+
public void onError(Throwable e) {
95+
if (predicate.call(attempts, e) && !inner.isUnsubscribed()) {
96+
// retry again
97+
inner.schedule(_self);
98+
} else {
99+
// give up and pass the failure
100+
child.onError(e);
101+
}
102+
}
103+
104+
@Override
105+
public void onNext(T v) {
106+
child.onNext(v);
107+
}
108+
109+
};
110+
// register this Subscription (and unsubscribe previous if exists)
111+
serialSubscription.set(subscriber);
112+
o.unsafeSubscribe(subscriber);
113+
}
114+
});
115+
}
116+
}
117+
}

rxjava-core/src/test/java/rx/operators/OperatorRetryTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -260,11 +260,11 @@ public void call(Subscriber<? super String> s) {
260260
assertEquals(4, subsCount.get()); // 1 + 3 retries
261261
}
262262

263-
class SlowObservable implements Observable.OnSubscribe<Long> {
263+
static final class SlowObservable implements Observable.OnSubscribe<Long> {
264264

265-
private AtomicInteger efforts = new AtomicInteger(0);
266-
private AtomicInteger active = new AtomicInteger(0), maxActive = new AtomicInteger(0);
267-
private AtomicInteger nextBeforeFailure;
265+
final AtomicInteger efforts = new AtomicInteger(0);
266+
final AtomicInteger active = new AtomicInteger(0), maxActive = new AtomicInteger(0);
267+
final AtomicInteger nextBeforeFailure;
268268

269269
private final int emitDelay;
270270

@@ -273,6 +273,7 @@ public SlowObservable(int emitDelay, int countNext) {
273273
this.nextBeforeFailure = new AtomicInteger(countNext);
274274
}
275275

276+
@Override
276277
public void call(final Subscriber<? super Long> subscriber) {
277278
final AtomicBoolean terminate = new AtomicBoolean(false);
278279
efforts.getAndIncrement();
@@ -309,7 +310,7 @@ public void call() {
309310
}
310311

311312
/** Observer for listener on seperate thread */
312-
class AsyncObserver<T> implements Observer<T> {
313+
static final class AsyncObserver<T> implements Observer<T> {
313314

314315
protected CountDownLatch latch = new CountDownLatch(1);
315316

0 commit comments

Comments
 (0)