Skip to content

Commit 3f7ad8c

Browse files
Merge pull request ReactiveX#1583 from benjchristensen/repeat-concat
Redo/Repeat Backpressure
2 parents c7baab8 + 0475f5c commit 3f7ad8c

File tree

2 files changed

+67
-32
lines changed

2 files changed

+67
-32
lines changed

rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRedo.java

Lines changed: 44 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535

3636
import java.util.concurrent.atomic.AtomicBoolean;
3737
import java.util.concurrent.atomic.AtomicLong;
38-
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
3938
import java.util.concurrent.atomic.AtomicReference;
4039

4140
import rx.Notification;
@@ -50,7 +49,7 @@
5049
import rx.functions.Func2;
5150
import rx.schedulers.Schedulers;
5251
import rx.subjects.PublishSubject;
53-
import rx.subscriptions.CompositeSubscription;
52+
import rx.subscriptions.SerialSubscription;
5453

5554
public final class OnSubscribeRedo<T> implements OnSubscribe<T> {
5655

@@ -82,8 +81,10 @@ public Observable<?> call(Observable<? extends Notification<?>> ts) {
8281
@Override
8382
public Notification<Long> call(Notification<Long> n, Notification<?> term) {
8483
final long value = n.getValue();
85-
if (value < count) return Notification.createOnNext(value + 1);
86-
else return (Notification<Long>) term;
84+
if (value < count)
85+
return Notification.createOnNext(value + 1);
86+
else
87+
return (Notification<Long>) term;
8788
}
8889
}).dematerialize();
8990
}
@@ -103,8 +104,10 @@ public Observable<? extends Notification<?>> call(Observable<? extends Notificat
103104
@Override
104105
public Notification<Integer> call(Notification<Integer> n, Notification<?> term) {
105106
final int value = n.getValue();
106-
if (predicate.call(value, term.getThrowable()).booleanValue()) return Notification.createOnNext(value + 1);
107-
else return (Notification<Integer>) term;
107+
if (predicate.call(value, term.getThrowable()).booleanValue())
108+
return Notification.createOnNext(value + 1);
109+
else
110+
return (Notification<Integer>) term;
108111
}
109112
});
110113
}
@@ -115,8 +118,10 @@ public static <T> Observable<T> retry(Observable<T> source) {
115118
}
116119

117120
public static <T> Observable<T> retry(Observable<T> source, final long count) {
118-
if (count < 0) throw new IllegalArgumentException("count >= 0 expected");
119-
if (count == 0) return source;
121+
if (count < 0)
122+
throw new IllegalArgumentException("count >= 0 expected");
123+
if (count == 0)
124+
return source;
120125
return retry(source, new RedoFinite(count));
121126
}
122127

@@ -141,7 +146,8 @@ public static <T> Observable<T> repeat(Observable<T> source, final long count) {
141146
}
142147

143148
public static <T> Observable<T> repeat(Observable<T> source, final long count, Scheduler scheduler) {
144-
if (count < 0) throw new IllegalArgumentException("count >= 0 expected");
149+
if (count < 0)
150+
throw new IllegalArgumentException("count >= 0 expected");
145151
return repeat(source, new RedoFinite(count - 1), scheduler);
146152
}
147153

@@ -162,11 +168,6 @@ public static <T> Observable<T> redo(Observable<T> source, Func1<? super Observa
162168
private boolean stopOnComplete;
163169
private boolean stopOnError;
164170
private final Scheduler scheduler;
165-
private final AtomicBoolean isLocked = new AtomicBoolean(true);
166-
private final AtomicBoolean isStarted = new AtomicBoolean(false);
167-
// incremented when requests are made, decremented when requests are fulfilled
168-
private final AtomicLong consumerCapacity = new AtomicLong(0l);
169-
private final AtomicReference<Producer> currentProducer = new AtomicReference<Producer>();
170171

171172
private OnSubscribeRedo(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> f, boolean stopOnComplete, boolean stopOnError,
172173
Scheduler scheduler) {
@@ -179,17 +180,18 @@ private OnSubscribeRedo(Observable<T> source, Func1<? super Observable<? extends
179180

180181
@Override
181182
public void call(final Subscriber<? super T> child) {
182-
isStarted.set(false);
183-
isLocked.set(true);
184-
consumerCapacity.set(0l);
185-
currentProducer.set(null);
183+
final AtomicBoolean isLocked = new AtomicBoolean(true);
184+
final AtomicBoolean isStarted = new AtomicBoolean(false);
185+
// incremented when requests are made, decremented when requests are fulfilled
186+
final AtomicLong consumerCapacity = new AtomicLong(0l);
187+
final AtomicReference<Producer> currentProducer = new AtomicReference<Producer>();
186188

187-
final Scheduler.Worker inner = scheduler.createWorker();
188-
child.add(inner);
189+
final Scheduler.Worker worker = scheduler.createWorker();
190+
child.add(worker);
189191

190-
final CompositeSubscription sourceSubscriptions = new CompositeSubscription();
192+
final SerialSubscription sourceSubscriptions = new SerialSubscription();
191193
child.add(sourceSubscriptions);
192-
194+
193195
final PublishSubject<Notification<?>> terminals = PublishSubject.create();
194196

195197
final Action0 subscribeToSource = new Action0() {
@@ -222,7 +224,7 @@ public void setProducer(Producer producer) {
222224
};
223225
// new subscription each time so if it unsubscribes itself it does not prevent retries
224226
// by unsubscribing the child subscription
225-
sourceSubscriptions.add(terminalDelegatingSubscriber);
227+
sourceSubscriptions.set(terminalDelegatingSubscriber);
226228
source.unsafeSubscribe(terminalDelegatingSubscriber);
227229
}
228230
};
@@ -247,8 +249,10 @@ public void onError(Throwable e) {
247249

248250
@Override
249251
public void onNext(Notification<?> t) {
250-
if (t.isOnCompleted() && stopOnComplete) child.onCompleted();
251-
else if (t.isOnError() && stopOnError) child.onError(t.getThrowable());
252+
if (t.isOnCompleted() && stopOnComplete)
253+
child.onCompleted();
254+
else if (t.isOnError() && stopOnError)
255+
child.onError(t.getThrowable());
252256
else {
253257
isLocked.set(false);
254258
filteredTerminals.onNext(t);
@@ -264,7 +268,7 @@ public void setProducer(Producer producer) {
264268
}));
265269

266270
// subscribe to the restarts observable to know when to schedule the next redo.
267-
child.add(inner.schedule(new Action0() {
271+
worker.schedule(new Action0() {
268272
@Override
269273
public void call() {
270274
restarts.unsafeSubscribe(new Subscriber<Object>(child) {
@@ -281,7 +285,9 @@ public void onError(Throwable e) {
281285
@Override
282286
public void onNext(Object t) {
283287
if (!isLocked.get() && !child.isUnsubscribed()) {
284-
child.add(inner.schedule(subscribeToSource));
288+
if (consumerCapacity.get() > 0) {
289+
worker.schedule(subscribeToSource);
290+
}
285291
}
286292
}
287293

@@ -291,18 +297,24 @@ public void setProducer(Producer producer) {
291297
}
292298
});
293299
}
294-
}));
300+
});
295301

296302
child.setProducer(new Producer() {
297303

298304
@Override
299305
public void request(long n) {
300306
if (isStarted.compareAndSet(false, true)) {
301307
consumerCapacity.set(n);
302-
if (!child.isUnsubscribed()) child.add(inner.schedule(subscribeToSource));
303-
} else if (currentProducer.get() != null) {
304-
consumerCapacity.getAndAdd(n);
305-
currentProducer.get().request(n);
308+
worker.schedule(subscribeToSource);
309+
} else {
310+
if (consumerCapacity.getAndAdd(n) == 0) {
311+
// restart
312+
worker.schedule(subscribeToSource);
313+
} else {
314+
if (currentProducer.get() != null) {
315+
currentProducer.get().request(n);
316+
}
317+
}
306318
}
307319
}
308320
});

rxjava-core/src/test/java/rx/internal/operators/OperatorRetryTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import rx.functions.Action1;
4444
import rx.functions.Func1;
4545
import rx.functions.Func2;
46+
import rx.internal.util.RxRingBuffer;
4647
import rx.observers.TestSubscriber;
4748
import rx.schedulers.Schedulers;
4849
import rx.subjects.PublishSubject;
@@ -630,5 +631,27 @@ public void testTimeoutWithRetry() {
630631

631632
assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get());
632633
}
634+
635+
@Test
636+
public void testRetryWithBackpressure() {
637+
@SuppressWarnings("unchecked")
638+
Observer<String> observer = mock(Observer.class);
639+
int NUM_RETRIES = RxRingBuffer.SIZE * 2;
640+
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_RETRIES));
641+
TestSubscriber<String> ts = new TestSubscriber<String>(observer);
642+
origin.retry().observeOn(Schedulers.computation()).unsafeSubscribe(ts);
643+
ts.awaitTerminalEvent();
644+
645+
InOrder inOrder = inOrder(observer);
646+
// should show 3 attempts
647+
inOrder.verify(observer, times(NUM_RETRIES + 1)).onNext("beginningEveryTime");
648+
// should have no errors
649+
inOrder.verify(observer, never()).onError(any(Throwable.class));
650+
// should have a single success
651+
inOrder.verify(observer, times(1)).onNext("onSuccessOnly");
652+
// should have a single successful onCompleted
653+
inOrder.verify(observer, times(1)).onCompleted();
654+
inOrder.verifyNoMoreInteractions();
655+
}
633656

634657
}

0 commit comments

Comments
 (0)