Skip to content

Commit d0c86cc

Browse files
committed
Add debounce variant to RxScala
1 parent ec24d16 commit d0c86cc

File tree

2 files changed

+30
-0
lines changed

2 files changed

+30
-0
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1070,4 +1070,17 @@ class RxScalaDemo extends JUnitSuite {
10701070
assertEquals(10, o2.size.toBlockingObservable.single)
10711071
assertEquals(1000, o2.flatten.size.toBlockingObservable.single)
10721072
}
1073+
1074+
@Test def debounceExample() {
1075+
val o = Observable.interval(100 millis).take(20).debounce {
1076+
n =>
1077+
if (n % 2 == 0) {
1078+
Observable.interval(50 millis)
1079+
}
1080+
else {
1081+
Observable.interval(150 millis)
1082+
}
1083+
}
1084+
o.toBlockingObservable.foreach(println(_))
1085+
}
10731086
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2299,6 +2299,23 @@ trait Observable[+T]
22992299
toScalaObservable[T](asJavaObservable.throttleWithTimeout(timeout.length, timeout.unit))
23002300
}
23012301

2302+
/**
2303+
* Return an Observable that mirrors the source Observable, except that it drops items emitted by the source
2304+
* Observable that are followed by another item within a computed debounce duration.
2305+
*
2306+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/debounce.f.png">
2307+
*
2308+
* @param debounceSelector function to retrieve a sequence that indicates the throttle duration for each item
2309+
* @return an Observable that omits items emitted by the source Observable that are followed by another item
2310+
* within a computed debounce duration
2311+
*/
2312+
def debounce(debounceSelector: T => Observable[Any]): Observable[T] = {
2313+
val fJava = new rx.functions.Func1[T, rx.Observable[Any]] {
2314+
override def call(t: T) = debounceSelector(t).asJavaObservable.asInstanceOf[rx.Observable[Any]]
2315+
}
2316+
toScalaObservable[T](asJavaObservable.debounce[Any](fJava))
2317+
}
2318+
23022319
/**
23032320
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
23042321
*

0 commit comments

Comments
 (0)