Skip to content

Commit 1c20ab0

Browse files
akarnokdakarnokd
authored andcommitted
Removed the Func1 retry variant.
1 parent b4a0342 commit 1c20ab0

File tree

3 files changed

+126
-157
lines changed

3 files changed

+126
-157
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5388,27 +5388,15 @@ public final Observable<T> retry() {
53885388
public final Observable<T> retry(int retryCount) {
53895389
return nest().lift(new OperatorRetry<T>(retryCount));
53905390
}
5391-
/**
5392-
* Returns an Observable that mirrors the source Observable, resubscribing to it if it calls {@code onError}
5393-
* and the predicate returns true for that specific exception.
5394-
* @param predicate the predicate that determines if a resubscription may happen in case of a specific exception
5395-
* @return the Observable modified with retry logic
5396-
* @see #retry()
5397-
* @see #retryIf(rx.functions.Func2)
5398-
*/
5399-
public final Observable<T> retry(Func1<Throwable, Boolean> predicate) {
5400-
return nest().lift(new OperatorRetryWithPredicate<T>(predicate));
5401-
}
54025391
/**
54035392
* Returns an Observable that mirrors the source Observable, resubscribing to it if it calls {@code onError}
54045393
* and the predicate returns true for that specific exception and retry count.
54055394
* @param predicate the predicate that determines if a resubscription may happen in case of a specific exception and retry
54065395
* count
54075396
* @return the Observable modified with retry logic
54085397
* @see #retry()
5409-
* @see #retry(rx.functions.Func1)
54105398
*/
5411-
public final Observable<T> retryIf(Func2<Integer, Throwable, Boolean> predicate) {
5399+
public final Observable<T> retry(Func2<Integer, Throwable, Boolean> predicate) {
54125400
return nest().lift(new OperatorRetryWithPredicate<T>(predicate));
54135401
}
54145402

Lines changed: 117 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -1,136 +1,117 @@
1-
/**
2-
* Copyright 2014 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
package rx.operators;
17-
18-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
19-
import rx.Observable;
20-
import rx.Scheduler;
21-
import rx.Subscriber;
22-
import rx.functions.Action0;
23-
import rx.functions.Func1;
24-
import rx.functions.Func2;
25-
import rx.schedulers.Schedulers;
26-
import rx.subscriptions.SerialSubscription;
27-
28-
public final class OperatorRetryWithPredicate<T> implements Observable.Operator<T, Observable<T>> {
29-
final Func2<Integer, Throwable, Boolean> predicate;
30-
public OperatorRetryWithPredicate(Func1<Throwable, Boolean> predicate) {
31-
this.predicate = new IgnoreIndexPredicate(predicate);
32-
}
33-
public OperatorRetryWithPredicate(Func2<Integer, Throwable, Boolean> predicate) {
34-
this.predicate = predicate;
35-
}
36-
37-
@Override
38-
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> child) {
39-
final Scheduler.Worker inner = Schedulers.trampoline().createWorker();
40-
child.add(inner);
41-
42-
final SerialSubscription serialSubscription = new SerialSubscription();
43-
// add serialSubscription so it gets unsubscribed if child is unsubscribed
44-
child.add(serialSubscription);
45-
46-
return new SourceSubscriber<T>(child, predicate, inner, serialSubscription);
47-
}
48-
49-
/** Ignore the index parameter and call a Func1 predicate with the throwable only. */
50-
static final class IgnoreIndexPredicate implements Func2<Integer, Throwable, Boolean> {
51-
final Func1<Throwable, Boolean> predicate;
52-
53-
public IgnoreIndexPredicate(Func1<Throwable, Boolean> predicate) {
54-
this.predicate = predicate;
55-
}
56-
57-
@Override
58-
public Boolean call(Integer t1, Throwable t2) {
59-
return predicate.call(t2);
60-
}
61-
62-
}
63-
64-
static final class SourceSubscriber<T> extends Subscriber<Observable<T>> {
65-
final Subscriber<? super T> child;
66-
final Func2<Integer, Throwable, Boolean> predicate;
67-
final Scheduler.Worker inner;
68-
final SerialSubscription serialSubscription;
69-
70-
volatile int attempts;
71-
@SuppressWarnings("rawtypes")
72-
static final AtomicIntegerFieldUpdater<SourceSubscriber> ATTEMPTS_UPDATER
73-
= AtomicIntegerFieldUpdater.newUpdater(SourceSubscriber.class, "attempts");
74-
75-
public SourceSubscriber(Subscriber<? super T> child, final Func2<Integer, Throwable, Boolean> predicate, Scheduler.Worker inner,
76-
SerialSubscription serialSubscription) {
77-
this.child = child;
78-
this.predicate = predicate;
79-
this.inner = inner;
80-
this.serialSubscription = serialSubscription;
81-
}
82-
83-
84-
@Override
85-
public void onCompleted() {
86-
// ignore as we expect a single nested Observable<T>
87-
}
88-
89-
@Override
90-
public void onError(Throwable e) {
91-
child.onError(e);
92-
}
93-
94-
@Override
95-
public void onNext(final Observable<T> o) {
96-
inner.schedule(new Action0() {
97-
98-
@Override
99-
public void call() {
100-
final Action0 _self = this;
101-
ATTEMPTS_UPDATER.incrementAndGet(SourceSubscriber.this);
102-
103-
// new subscription each time so if it unsubscribes itself it does not prevent retries
104-
// by unsubscribing the child subscription
105-
Subscriber<T> subscriber = new Subscriber<T>() {
106-
107-
@Override
108-
public void onCompleted() {
109-
child.onCompleted();
110-
}
111-
112-
@Override
113-
public void onError(Throwable e) {
114-
if (predicate.call(attempts, e) && !inner.isUnsubscribed()) {
115-
// retry again
116-
inner.schedule(_self);
117-
} else {
118-
// give up and pass the failure
119-
child.onError(e);
120-
}
121-
}
122-
123-
@Override
124-
public void onNext(T v) {
125-
child.onNext(v);
126-
}
127-
128-
};
129-
// register this Subscription (and unsubscribe previous if exists)
130-
serialSubscription.set(subscriber);
131-
o.unsafeSubscribe(subscriber);
132-
}
133-
});
134-
}
135-
}
136-
}
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
19+
import rx.Observable;
20+
import rx.Scheduler;
21+
import rx.Subscriber;
22+
import rx.functions.Action0;
23+
import rx.functions.Func2;
24+
import rx.schedulers.Schedulers;
25+
import rx.subscriptions.SerialSubscription;
26+
27+
public final class OperatorRetryWithPredicate<T> implements Observable.Operator<T, Observable<T>> {
28+
final Func2<Integer, Throwable, Boolean> predicate;
29+
public OperatorRetryWithPredicate(Func2<Integer, Throwable, Boolean> predicate) {
30+
this.predicate = predicate;
31+
}
32+
33+
@Override
34+
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> child) {
35+
final Scheduler.Worker inner = Schedulers.trampoline().createWorker();
36+
child.add(inner);
37+
38+
final SerialSubscription serialSubscription = new SerialSubscription();
39+
// add serialSubscription so it gets unsubscribed if child is unsubscribed
40+
child.add(serialSubscription);
41+
42+
return new SourceSubscriber<T>(child, predicate, inner, serialSubscription);
43+
}
44+
45+
static final class SourceSubscriber<T> extends Subscriber<Observable<T>> {
46+
final Subscriber<? super T> child;
47+
final Func2<Integer, Throwable, Boolean> predicate;
48+
final Scheduler.Worker inner;
49+
final SerialSubscription serialSubscription;
50+
51+
volatile int attempts;
52+
@SuppressWarnings("rawtypes")
53+
static final AtomicIntegerFieldUpdater<SourceSubscriber> ATTEMPTS_UPDATER
54+
= AtomicIntegerFieldUpdater.newUpdater(SourceSubscriber.class, "attempts");
55+
56+
public SourceSubscriber(Subscriber<? super T> child, final Func2<Integer, Throwable, Boolean> predicate, Scheduler.Worker inner,
57+
SerialSubscription serialSubscription) {
58+
this.child = child;
59+
this.predicate = predicate;
60+
this.inner = inner;
61+
this.serialSubscription = serialSubscription;
62+
}
63+
64+
65+
@Override
66+
public void onCompleted() {
67+
// ignore as we expect a single nested Observable<T>
68+
}
69+
70+
@Override
71+
public void onError(Throwable e) {
72+
child.onError(e);
73+
}
74+
75+
@Override
76+
public void onNext(final Observable<T> o) {
77+
inner.schedule(new Action0() {
78+
79+
@Override
80+
public void call() {
81+
final Action0 _self = this;
82+
ATTEMPTS_UPDATER.incrementAndGet(SourceSubscriber.this);
83+
84+
// new subscription each time so if it unsubscribes itself it does not prevent retries
85+
// by unsubscribing the child subscription
86+
Subscriber<T> subscriber = new Subscriber<T>() {
87+
88+
@Override
89+
public void onCompleted() {
90+
child.onCompleted();
91+
}
92+
93+
@Override
94+
public void onError(Throwable e) {
95+
if (predicate.call(attempts, e) && !inner.isUnsubscribed()) {
96+
// retry again
97+
inner.schedule(_self);
98+
} else {
99+
// give up and pass the failure
100+
child.onError(e);
101+
}
102+
}
103+
104+
@Override
105+
public void onNext(T v) {
106+
child.onNext(v);
107+
}
108+
109+
};
110+
// register this Subscription (and unsubscribe previous if exists)
111+
serialSubscription.set(subscriber);
112+
o.unsafeSubscribe(subscriber);
113+
}
114+
});
115+
}
116+
}
117+
}

rxjava-core/src/test/java/rx/operators/OperatorRetryWithPredicateTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public void testWithNothingToRetry() {
5959
Observer<Integer> o = mock(Observer.class);
6060
InOrder inOrder = inOrder(o);
6161

62-
source.retryIf(retryTwice).subscribe(o);
62+
source.retry(retryTwice).subscribe(o);
6363

6464
inOrder.verify(o).onNext(0);
6565
inOrder.verify(o).onNext(1);
@@ -90,7 +90,7 @@ public void call(Subscriber<? super Integer> t1) {
9090
Observer<Integer> o = mock(Observer.class);
9191
InOrder inOrder = inOrder(o);
9292

93-
source.retryIf(retryTwice).subscribe(o);
93+
source.retry(retryTwice).subscribe(o);
9494

9595
inOrder.verify(o).onNext(0);
9696
inOrder.verify(o).onNext(1);
@@ -117,7 +117,7 @@ public void call(Subscriber<? super Integer> t1) {
117117
Observer<Integer> o = mock(Observer.class);
118118
InOrder inOrder = inOrder(o);
119119

120-
source.retryIf(retryTwice).subscribe(o);
120+
source.retry(retryTwice).subscribe(o);
121121

122122
inOrder.verify(o).onNext(0);
123123
inOrder.verify(o).onNext(1);
@@ -152,7 +152,7 @@ public void call(Subscriber<? super Integer> t1) {
152152
Observer<Integer> o = mock(Observer.class);
153153
InOrder inOrder = inOrder(o);
154154

155-
source.retryIf(retryOnTestException).subscribe(o);
155+
source.retry(retryOnTestException).subscribe(o);
156156

157157
inOrder.verify(o).onNext(0);
158158
inOrder.verify(o).onNext(1);
@@ -188,7 +188,7 @@ public void call(Subscriber<? super Integer> t1) {
188188
Observer<Integer> o = mock(Observer.class);
189189
InOrder inOrder = inOrder(o);
190190

191-
source.retryIf(retryOnTestException).subscribe(o);
191+
source.retry(retryOnTestException).subscribe(o);
192192

193193
inOrder.verify(o).onNext(0);
194194
inOrder.verify(o).onNext(1);
@@ -205,7 +205,7 @@ public void call(Subscriber<? super Integer> t1) {
205205
public void testUnsubscribeFromRetry() {
206206
PublishSubject<Integer> subject = PublishSubject.create();
207207
final AtomicInteger count = new AtomicInteger(0);
208-
Subscription sub = subject.retryIf(retryTwice).subscribe(new Action1<Integer>() {
208+
Subscription sub = subject.retry(retryTwice).subscribe(new Action1<Integer>() {
209209
@Override
210210
public void call(Integer n) {
211211
count.incrementAndGet();
@@ -227,7 +227,7 @@ public void testUnsubscribeAfterError() {
227227
OperatorRetryTest.SlowObservable so = new OperatorRetryTest.SlowObservable(100, 0);
228228
Observable<Long> o = Observable
229229
.create(so)
230-
.retryIf(retry5);
230+
.retry(retry5);
231231

232232
OperatorRetryTest.AsyncObserver<Long> async = new OperatorRetryTest.AsyncObserver<Long>(observer);
233233

@@ -255,7 +255,7 @@ public void testTimeoutWithRetry() {
255255
Observable<Long> o = Observable
256256
.create(so)
257257
.timeout(80, TimeUnit.MILLISECONDS)
258-
.retryIf(retry5);
258+
.retry(retry5);
259259

260260
OperatorRetryTest.AsyncObserver<Long> async = new OperatorRetryTest.AsyncObserver<Long>(observer);
261261

0 commit comments

Comments
 (0)