@@ -1600,6 +1600,81 @@ trait Observable[+T]
1600
1600
toScalaObservable[T ](asJavaObservable.throttleLast(intervalDuration.length, intervalDuration.unit, scheduler))
1601
1601
}
1602
1602
1603
+ /**
1604
+ * Applies a timeout policy for each item emitted by the Observable, using
1605
+ * the specified scheduler to run timeout timers. If the next item isn't
1606
+ * observed within the specified timeout duration starting from its
1607
+ * predecessor, observers are notified of a `TimeoutException`.
1608
+ * <p>
1609
+ * <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout.1.png">
1610
+ *
1611
+ * @param timeout maximum duration between items before a timeout occurs
1612
+ * @return the source Observable modified to notify observers of a
1613
+ * `TimeoutException` in case of a timeout
1614
+ */
1615
+ def timeout (timeout : Duration ): Observable [T ] = {
1616
+ toScalaObservable[T ](asJavaObservable.timeout(timeout.length, timeout.unit))
1617
+ }
1618
+
1619
+ /**
1620
+ * Applies a timeout policy for each item emitted by the Observable, using
1621
+ * the specified scheduler to run timeout timers. If the next item isn't
1622
+ * observed within the specified timeout duration starting from its
1623
+ * predecessor, a specified fallback Observable produces future items and
1624
+ * notifications from that point on.
1625
+ * <p>
1626
+ * <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout.2.png">
1627
+ *
1628
+ * @param timeout maximum duration between items before a timeout occurs
1629
+ * @param other fallback Observable to use in case of a timeout
1630
+ * @return the source Observable modified to switch to the fallback
1631
+ * Observable in case of a timeout
1632
+ */
1633
+ def timeout [U >: T ](timeout : Duration , other : Observable [U ]): Observable [U ] = {
1634
+ val otherJava : rx.Observable [_ <: U ] = other.asJavaObservable
1635
+ val thisJava = this .asJavaObservable.asInstanceOf [rx.Observable [U ]]
1636
+ toScalaObservable[U ](thisJava.timeout(timeout.length, timeout.unit, otherJava))
1637
+ }
1638
+
1639
+ /**
1640
+ * Applies a timeout policy for each item emitted by the Observable, using
1641
+ * the specified scheduler to run timeout timers. If the next item isn't
1642
+ * observed within the specified timeout duration starting from its
1643
+ * predecessor, the observer is notified of a `TimeoutException`.
1644
+ * <p>
1645
+ * <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout.1s.png">
1646
+ *
1647
+ * @param timeout maximum duration between items before a timeout occurs
1648
+ * @param scheduler Scheduler to run the timeout timers on
1649
+ * @return the source Observable modified to notify observers of a
1650
+ * `TimeoutException` in case of a timeout
1651
+ */
1652
+ def timeout (timeout : Duration , scheduler : Scheduler ): Observable [T ] = {
1653
+ toScalaObservable[T ](asJavaObservable.timeout(timeout.length, timeout.unit, scheduler.asJavaScheduler))
1654
+ }
1655
+
1656
+ /**
1657
+ * Applies a timeout policy for each item emitted by the Observable, using
1658
+ * the specified scheduler to run timeout timers. If the next item isn't
1659
+ * observed within the specified timeout duration starting from its
1660
+ * predecessor, a specified fallback Observable sequence produces future
1661
+ * items and notifications from that point on.
1662
+ * <p>
1663
+ * <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout.2s.png">
1664
+ *
1665
+ * @param timeout maximum duration between items before a timeout occurs
1666
+ * @param other Observable to use as the fallback in case of a timeout
1667
+ * @param scheduler Scheduler to run the timeout timers on
1668
+ * @return the source Observable modified so that it will switch to the
1669
+ * fallback Observable in case of a timeout
1670
+ */
1671
+ def timeout [U >: T ](timeout : Duration , other : Observable [U ], scheduler : Scheduler ): Observable [U ] = {
1672
+ val otherJava : rx.Observable [_ <: U ] = other.asJavaObservable
1673
+ val thisJava = this .asJavaObservable.asInstanceOf [rx.Observable [U ]]
1674
+ toScalaObservable[U ](thisJava.timeout(timeout.length, timeout.unit, otherJava, scheduler.asJavaScheduler))
1675
+ }
1676
+
1677
+
1603
1678
/**
1604
1679
* Returns an Observable that sums up the elements of this Observable.
1605
1680
*
@@ -1894,21 +1969,21 @@ trait Observable[+T]
1894
1969
}
1895
1970
1896
1971
/**
1897
- * Invokes an action if the source Observable calls <code> onError</code> .
1972
+ * Invokes an action if the source Observable calls ` onError` .
1898
1973
*
1899
1974
* @param onError the action to invoke if the source Observable calls
1900
- * <code> onError</code>
1975
+ * ` onError`
1901
1976
* @return the source Observable with the side-effecting behavior applied
1902
1977
*/
1903
1978
def doOnError (onError : Throwable => Unit ): Observable [T ] = {
1904
1979
toScalaObservable[T ](asJavaObservable.doOnError(onError))
1905
1980
}
1906
1981
1907
1982
/**
1908
- * Invokes an action when the source Observable calls <code> onCompleted</code> .
1983
+ * Invokes an action when the source Observable calls ` onCompleted` .
1909
1984
*
1910
1985
* @param onCompleted the action to invoke when the source Observable calls
1911
- * <code> onCompleted</code>
1986
+ * ` onCompleted`
1912
1987
* @return the source Observable with the side-effecting behavior applied
1913
1988
*/
1914
1989
def doOnCompleted (onCompleted : () => Unit ): Observable [T ] = {
0 commit comments