Skip to content

Commit 22f2062

Browse files
author
Joakim Bodin
committed
RxScala: Add retryWhen/repeatWhen methods
- Add example usage of retryWhen/repeatWhen to RxScalaDemo
1 parent a7953e6 commit 22f2062

File tree

2 files changed

+173
-0
lines changed

2 files changed

+173
-0
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

100644100755
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1055,6 +1055,30 @@ class RxScalaDemo extends JUnitSuite {
10551055
}.subscribe(s => println(s), e => e.printStackTrace())
10561056
}
10571057

1058+
@Test def retryWhenExample(): Unit = {
1059+
Observable[String]({ subscriber =>
1060+
println("subscribing")
1061+
subscriber.onError(new RuntimeException("always fails"))
1062+
}).retryWhen(attempts => {
1063+
attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => {
1064+
println("delay retry by " + i + " second(s)")
1065+
Observable.timer(Duration(i, TimeUnit.SECONDS))
1066+
})
1067+
}).toBlocking.foreach(s => println(s))
1068+
}
1069+
1070+
@Test def repeatWhenExample(): Unit = {
1071+
Observable[String]({ subscriber =>
1072+
println("subscribing")
1073+
subscriber.onCompleted()
1074+
}).repeatWhen(attempts => {
1075+
attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => {
1076+
println("delay repeat by " + i + " second(s)")
1077+
Observable.timer(Duration(i, TimeUnit.SECONDS)).materialize
1078+
})
1079+
}, NewThreadScheduler()).toBlocking.foreach(s => println(s))
1080+
}
1081+
10581082
@Test def liftExample1(): Unit = {
10591083
// Add "No. " in front of each item
10601084
val o = List(1, 2, 3).toObservable.lift {

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

100644100755
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3175,6 +3175,94 @@ trait Observable[+T]
31753175
toScalaObservable[T](asJavaObservable.retry(f))
31763176
}
31773177

3178+
/**
3179+
* Returns an Observable that emits the same values as the source observable with the exception of an
3180+
* {@code onError}. An {@code onError} notification from the source will result in the emission of a
3181+
* {@link Notification} to the Observable provided as an argument to the {@code notificationHandler}
3182+
* function. If the Observable returned {@code onCompletes} or {@code onErrors} then {@code retry} will call
3183+
* {@code onCompleted} or {@code onError} on the child subscription. Otherwise, this Observable will
3184+
* resubscribe to the source Observable.
3185+
* <p>
3186+
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/retryWhen.f.png" alt="">
3187+
*
3188+
* Example:
3189+
*
3190+
* This retries 3 times, each time incrementing the number of seconds it waits.
3191+
*
3192+
* <pre> {@code
3193+
* Observable[String]({ subscriber =>
3194+
* println("subscribing")
3195+
* subscriber.onError(new RuntimeException("always fails"))
3196+
* }).retryWhen(attempts => {
3197+
* attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => {
3198+
* println("delay retry by " + i + " second(s)")
3199+
* Observable.timer(Duration(i, TimeUnit.SECONDS))
3200+
* })
3201+
* }).toBlocking.foreach(s => println(s))
3202+
* } </pre>
3203+
*
3204+
* Output is:
3205+
*
3206+
* <pre> {@code
3207+
* subscribing
3208+
* delay retry by 1 second(s)
3209+
* subscribing
3210+
* delay retry by 2 second(s)
3211+
* subscribing
3212+
* delay retry by 3 second(s)
3213+
* subscribing
3214+
* } </pre>
3215+
* <dl>
3216+
* <dt><b>Scheduler:</b></dt>
3217+
* <dd>{@code retryWhen} operates by default on the {@code trampoline} {@link Scheduler}.</dd>
3218+
* </dl>
3219+
*
3220+
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the
3221+
* retry
3222+
* @return the source Observable modified with retry logic
3223+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#retrywhen">RxJava Wiki: retryWhen()</a>
3224+
* @since 0.20
3225+
*/
3226+
def retryWhen(notificationHandler: Observable[Notification[Any]] => Observable[Any]): Observable[T] = {
3227+
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: Any]] =
3228+
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
3229+
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
3230+
notificationHandler(on).asJavaObservable
3231+
}
3232+
3233+
toScalaObservable[T](asJavaObservable.retryWhen(f))
3234+
}
3235+
3236+
/**
3237+
* Returns an Observable that emits the same values as the source observable with the exception of an {@code onError}.
3238+
* An onError will emit a {@link Notification} to the observable provided as an argument to the notificationHandler
3239+
* func. If the observable returned {@code onCompletes} or {@code onErrors} then retry will call {@code onCompleted}
3240+
* or {@code onError} on the child subscription. Otherwise, this observable will resubscribe to the source observable, on a particular Scheduler.
3241+
* <p>
3242+
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/retryWhen.f.png" alt="">
3243+
* <p>
3244+
* <dl>
3245+
* <dt><b>Scheduler:</b></dt>
3246+
* <dd>you specify which {@link Scheduler} this operator will use</dd>
3247+
* </dl>
3248+
*
3249+
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the
3250+
* retry
3251+
* @param scheduler the Scheduler on which to subscribe to the source Observable
3252+
* @return the source Observable modified with retry logic
3253+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#retrywhen">RxJava Wiki: retryWhen()</a>
3254+
* @since 0.20
3255+
*/
3256+
def retryWhen(notificationHandler: Observable[Notification[Any]] => Observable[Notification[Any]], scheduler: Scheduler): Observable[T] = {
3257+
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: rx.Notification[_ <: Any]]] =
3258+
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
3259+
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
3260+
notificationHandler(on).map({ n => n.asJavaNotification }).asJavaObservable
3261+
}
3262+
3263+
toScalaObservable[T](asJavaObservable.retryWhen(f, scheduler))
3264+
}
3265+
31783266
/**
31793267
* Returns an Observable that repeats the sequence of items emitted by the source Observable indefinitely.
31803268
* <p>
@@ -3237,6 +3325,67 @@ trait Observable[+T]
32373325
toScalaObservable[T](asJavaObservable.repeat(count, scheduler))
32383326
}
32393327

3328+
/**
3329+
* Returns an Observable that emits the same values as the source Observable with the exception of an
3330+
* {@code onCompleted}. An {@code onCompleted} notification from the source will result in the emission of
3331+
* a {@link Notification} to the Observable provided as an argument to the {@code notificationHandler}
3332+
* function. If the Observable returned {@code onCompletes} or {@code onErrors} then {@code repeatWhen} will
3333+
* call {@code onCompleted} or {@code onError} on the child subscription. Otherwise, this Observable will
3334+
* resubscribe to the source Observable, on a particular Scheduler.
3335+
* <p>
3336+
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/repeatWhen.f.png" alt="">
3337+
* <dl>
3338+
* <dt><b>Scheduler:</b></dt>
3339+
* <dd>you specify which {@link Scheduler} this operator will use</dd>
3340+
* </dl>
3341+
*
3342+
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the repeat.
3343+
* @param scheduler the Scheduler to emit the items on
3344+
* @return the source Observable modified with repeat logic
3345+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#repeatwhen">RxJava Wiki: repeatWhen()</a>
3346+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
3347+
* @since 0.20
3348+
*/
3349+
def repeatWhen(notificationHandler: Observable[Notification[Any]] => Observable[Notification[Any]], scheduler: Scheduler): Observable[T] = {
3350+
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: rx.Notification[_ <: Any]]] =
3351+
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
3352+
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
3353+
notificationHandler(on).map({ n => n.asJavaNotification }).asJavaObservable
3354+
}
3355+
3356+
toScalaObservable[T](asJavaObservable.repeatWhen(f, scheduler))
3357+
}
3358+
3359+
/**
3360+
* Returns an Observable that emits the same values as the source Observable with the exception of an
3361+
* {@code onCompleted}. An {@code onCompleted} notification from the source will result in the emission of
3362+
* a {@link Notification} to the Observable provided as an argument to the {@code notificationHandler}
3363+
* function. If the Observable returned {@code onCompletes} or {@code onErrors} then {@code repeatWhen} will
3364+
* call {@code onCompleted} or {@code onError} on the child subscription. Otherwise, this Observable will
3365+
* resubscribe to the source observable.
3366+
* <p>
3367+
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/repeatWhen.f.png" alt="">
3368+
* <dl>
3369+
* <dt><b>Scheduler:</b></dt>
3370+
* <dd>{@code repeatWhen} operates by default on the {@code trampoline} {@link Scheduler}.</dd>
3371+
* </dl>
3372+
*
3373+
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the repeat.
3374+
* @return the source Observable modified with repeat logic
3375+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#repeatwhen">RxJava Wiki: repeatWhen()</a>
3376+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
3377+
* @since 0.20
3378+
*/
3379+
def repeatWhen(notificationHandler: Observable[Notification[Any]] => Observable[Notification[Any]]): Observable[T] = {
3380+
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: rx.Notification[_ <: Any]]] =
3381+
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
3382+
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
3383+
notificationHandler(on).map({ n => n.asJavaNotification }).asJavaObservable
3384+
}
3385+
3386+
toScalaObservable[T](asJavaObservable.repeatWhen(f))
3387+
}
3388+
32403389
/**
32413390
* Converts an Observable into a [[BlockingObservable]] (an Observable with blocking operators).
32423391
*

0 commit comments

Comments
 (0)