Skip to content

Commit c030321

Browse files
committed
Added Observable.timeout wrappers to scala adapter
Added the four timeout methods on Observable in the Scala adaptor. Note for the java/scala type interop: http://stackoverflow.com/q/20912151
1 parent 7a75d4b commit c030321

File tree

1 file changed

+79
-4
lines changed

1 file changed

+79
-4
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1600,6 +1600,81 @@ trait Observable[+T]
16001600
toScalaObservable[T](asJavaObservable.throttleLast(intervalDuration.length, intervalDuration.unit, scheduler))
16011601
}
16021602

1603+
/**
1604+
* Applies a timeout policy for each item emitted by the Observable, using
1605+
* the specified scheduler to run timeout timers. If the next item isn't
1606+
* observed within the specified timeout duration starting from its
1607+
* predecessor, observers are notified of a `TimeoutException`.
1608+
* <p>
1609+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout.1.png">
1610+
*
1611+
* @param timeout maximum duration between items before a timeout occurs
1612+
* @return the source Observable modified to notify observers of a
1613+
* `TimeoutException` in case of a timeout
1614+
*/
1615+
def timeout(timeout: Duration): Observable[T] = {
1616+
toScalaObservable[T](asJavaObservable.timeout(timeout.length, timeout.unit))
1617+
}
1618+
1619+
/**
1620+
* Applies a timeout policy for each item emitted by the Observable, using
1621+
* the specified scheduler to run timeout timers. If the next item isn't
1622+
* observed within the specified timeout duration starting from its
1623+
* predecessor, a specified fallback Observable produces future items and
1624+
* notifications from that point on.
1625+
* <p>
1626+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout.2.png">
1627+
*
1628+
* @param timeout maximum duration between items before a timeout occurs
1629+
* @param other fallback Observable to use in case of a timeout
1630+
* @return the source Observable modified to switch to the fallback
1631+
* Observable in case of a timeout
1632+
*/
1633+
def timeout[U >: T](timeout: Duration, other: Observable[U]): Observable[U] = {
1634+
val otherJava: rx.Observable[_ <: U] = other.asJavaObservable
1635+
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
1636+
toScalaObservable[U](thisJava.timeout(timeout.length, timeout.unit, otherJava))
1637+
}
1638+
1639+
/**
1640+
* Applies a timeout policy for each item emitted by the Observable, using
1641+
* the specified scheduler to run timeout timers. If the next item isn't
1642+
* observed within the specified timeout duration starting from its
1643+
* predecessor, the observer is notified of a `TimeoutException`.
1644+
* <p>
1645+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout.1s.png">
1646+
*
1647+
* @param timeout maximum duration between items before a timeout occurs
1648+
* @param scheduler Scheduler to run the timeout timers on
1649+
* @return the source Observable modified to notify observers of a
1650+
* `TimeoutException` in case of a timeout
1651+
*/
1652+
def timeout(timeout: Duration, scheduler: Scheduler): Observable[T] = {
1653+
toScalaObservable[T](asJavaObservable.timeout(timeout.length, timeout.unit, scheduler.asJavaScheduler))
1654+
}
1655+
1656+
/**
1657+
* Applies a timeout policy for each item emitted by the Observable, using
1658+
* the specified scheduler to run timeout timers. If the next item isn't
1659+
* observed within the specified timeout duration starting from its
1660+
* predecessor, a specified fallback Observable sequence produces future
1661+
* items and notifications from that point on.
1662+
* <p>
1663+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout.2s.png">
1664+
*
1665+
* @param timeout maximum duration between items before a timeout occurs
1666+
* @param other Observable to use as the fallback in case of a timeout
1667+
* @param scheduler Scheduler to run the timeout timers on
1668+
* @return the source Observable modified so that it will switch to the
1669+
* fallback Observable in case of a timeout
1670+
*/
1671+
def timeout[U >: T](timeout: Duration, other: Observable[U], scheduler: Scheduler): Observable[U] = {
1672+
val otherJava: rx.Observable[_ <: U] = other.asJavaObservable
1673+
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
1674+
toScalaObservable[U](thisJava.timeout(timeout.length, timeout.unit, otherJava, scheduler.asJavaScheduler))
1675+
}
1676+
1677+
16031678
/**
16041679
* Returns an Observable that sums up the elements of this Observable.
16051680
*
@@ -1894,21 +1969,21 @@ trait Observable[+T]
18941969
}
18951970

18961971
/**
1897-
* Invokes an action if the source Observable calls <code>onError</code>.
1972+
* Invokes an action if the source Observable calls `onError`.
18981973
*
18991974
* @param onError the action to invoke if the source Observable calls
1900-
* <code>onError</code>
1975+
* `onError`
19011976
* @return the source Observable with the side-effecting behavior applied
19021977
*/
19031978
def doOnError(onError: Throwable => Unit): Observable[T] = {
19041979
toScalaObservable[T](asJavaObservable.doOnError(onError))
19051980
}
19061981

19071982
/**
1908-
* Invokes an action when the source Observable calls <code>onCompleted</code>.
1983+
* Invokes an action when the source Observable calls `onCompleted`.
19091984
*
19101985
* @param onCompleted the action to invoke when the source Observable calls
1911-
* <code>onCompleted</code>
1986+
* `onCompleted`
19121987
* @return the source Observable with the side-effecting behavior applied
19131988
*/
19141989
def doOnCompleted(onCompleted: () => Unit): Observable[T] = {

0 commit comments

Comments
 (0)