Skip to content

Commit 498c9dc

Browse files
committed
Add singleOption, singleOrElse, headOption, lastOption, lastOrElse to RxScala
1 parent ad1562d commit 498c9dc

File tree

3 files changed

+128
-8
lines changed

3 files changed

+128
-8
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2651,10 +2651,20 @@ trait Observable[+T]
26512651
* if the source Observable completes without emitting any item.
26522652
*/
26532653
def firstOrElse[U >: T](default: => U): Observable[U] = {
2654-
this.take(1).foldLeft[Option[U]](None)((v: Option[U], e: U) => Some(e)).map({
2655-
case Some(element) => element
2656-
case None => default
2657-
})
2654+
take(1).singleOrElse(default)
2655+
}
2656+
2657+
/**
2658+
* Returns an Observable that emits only an `Option` with the very first item emitted by the source Observable,
2659+
* or `None` if the source Observable is empty.
2660+
*
2661+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/firstOrDefault.png">
2662+
*
2663+
* @return an Observable that emits only an `Option` with the very first item from the source, or `None`
2664+
* if the source Observable completes without emitting any item.
2665+
*/
2666+
def headOption: Observable[Option[T]] = {
2667+
take(1).singleOption
26582668
}
26592669

26602670
/**
@@ -2736,6 +2746,34 @@ trait Observable[+T]
27362746
toScalaObservable[T](asJavaObservable.last)
27372747
}
27382748

2749+
/**
2750+
* Returns an Observable that emits only an `Option` with the last item emitted by the source Observable,
2751+
* or `None` if the source Observable completes without emitting any items.
2752+
*
2753+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/lastOrDefault.png">
2754+
*
2755+
* @return an Observable that emits only an `Option` with the last item emitted by the source Observable,
2756+
* or `None` if the source Observable is empty
2757+
*/
2758+
def lastOption: Observable[Option[T]] = {
2759+
takeRight(1).singleOption
2760+
}
2761+
2762+
/**
2763+
* Returns an Observable that emits only the last item emitted by the source Observable, or a default item
2764+
* if the source Observable completes without emitting any items.
2765+
*
2766+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/lastOrDefault.png">
2767+
*
2768+
* @param default the default item to emit if the source Observable is empty.
2769+
* This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
2770+
* @return an Observable that emits only the last item emitted by the source Observable, or a default item
2771+
* if the source Observable is empty
2772+
*/
2773+
def lastOrElse[U >: T](default: => U): Observable[U] = {
2774+
takeRight(1).singleOrElse(default)
2775+
}
2776+
27392777
/**
27402778
* If the source Observable completes after emitting a single item, return an Observable that emits that
27412779
* item. If the source Observable emits more than one item or no items, throw an `NoSuchElementException`.
@@ -2752,6 +2790,42 @@ trait Observable[+T]
27522790
toScalaObservable[T](asJavaObservable.single)
27532791
}
27542792

2793+
/**
2794+
* If the source Observable completes after emitting a single item, return an Observable that emits an `Option`
2795+
* with that item; if the source Observable is empty, return an Observable that emits `None`.
2796+
* If the source Observable emits more than one item, throw an `IllegalArgumentException`.
2797+
*
2798+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/singleOrDefault.png">
2799+
*
2800+
* @return an Observable that emits an `Option` with the single item emitted by the source Observable, or
2801+
* `None` if the source Observable is empty
2802+
* @throws IllegalArgumentException if the source Observable emits more than one item
2803+
*/
2804+
def singleOption: Observable[Option[T]] = {
2805+
val jObservableOption = map(Some(_)).asJavaObservable.asInstanceOf[rx.Observable[Option[T]]]
2806+
toScalaObservable[Option[T]](jObservableOption.singleOrDefault(None))
2807+
}
2808+
2809+
/**
2810+
* If the source Observable completes after emitting a single item, return an Observable that emits that
2811+
* item; if the source Observable is empty, return an Observable that emits a default item. If the source
2812+
* Observable emits more than one item, throw an `IllegalArgumentException`.
2813+
*
2814+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/singleOrDefault.png">
2815+
*
2816+
* @param default a default value to emit if the source Observable emits no item.
2817+
* This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
2818+
* @return an Observable that emits the single item emitted by the source Observable, or a default item if
2819+
* the source Observable is empty
2820+
* @throws IllegalArgumentException if the source Observable emits more than one item
2821+
*/
2822+
def singleOrElse[U >: T](default: => U): Observable[U] = {
2823+
singleOption.map {
2824+
case Some(element) => element
2825+
case None => default
2826+
}
2827+
}
2828+
27552829
/**
27562830
* Returns an Observable that forwards all sequentially distinct items emitted from the source Observable.
27572831
*

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ class CompletenessTest extends JUnitSuite {
8989
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]",
9090
"groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: GroupedObservable[TKey, T], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, (K, Observable[T]) => Observable[Any])",
9191
"ignoreElements()" -> "[use `filter(_ => false)`]",
92+
"last(Func1[_ >: T, Boolean])" -> "[use `filter(predicate).last`]",
93+
"lastOrDefault(T)" -> "lastOrElse(=> U)",
94+
"lastOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `filter(predicate).lastOrElse(default)`]",
9295
"lift(Operator[_ <: R, _ >: T])" -> "lift(Subscriber[R] => Subscriber[T])",
9396
"limit(Int)" -> "take(Int)",
9497
"mapWithIndex(Func2[_ >: T, Integer, _ <: R])" -> "[combine `zipWithIndex` with `map` or with a for comprehension]",
@@ -117,13 +120,15 @@ class CompletenessTest extends JUnitSuite {
117120
"sample(Observable[U])" -> "sample(Observable[Any])",
118121
"scan(Func2[T, T, T])" -> unnecessary,
119122
"scan(R, Func2[R, _ >: T, R])" -> "scan(R)((R, T) => R)",
123+
"single(Func1[_ >: T, Boolean])" -> "[use `filter(predicate).single`]",
124+
"singleOrDefault(T)" -> "singleOrElse(=> U)",
125+
"singleOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `filter(predicate).singleOrElse(default)`]",
120126
"skip(Int)" -> "drop(Int)",
121127
"skip(Long, TimeUnit)" -> "drop(Duration)",
122128
"skip(Long, TimeUnit, Scheduler)" -> "drop(Duration, Scheduler)",
123129
"skipWhile(Func1[_ >: T, Boolean])" -> "dropWhile(T => Boolean)",
124130
"skipWhileWithIndex(Func2[_ >: T, Integer, Boolean])" -> unnecessary,
125131
"skipUntil(Observable[U])" -> "dropUntil(Observable[E])",
126-
"single(Func1[_ >: T, Boolean])" -> "[use `filter(predicate).single`]",
127132
"startWith(T)" -> "[use `item +: o`]",
128133
"startWith(Array[T])" -> "[use `Observable.items(items) ++ o`]",
129134
"startWith(Array[T], Scheduler)" -> "[use `Observable.items(items).subscribeOn(scheduler) ++ o`]",
@@ -159,8 +164,6 @@ class CompletenessTest extends JUnitSuite {
159164
"window(Observable[_ <: TOpening], Func1[_ >: TOpening, _ <: Observable[_ <: TClosing]])" -> "window(Observable[Opening], Opening => Observable[Any])",
160165
"window(Long, Long, TimeUnit)" -> "window(Duration, Duration)",
161166
"window(Long, Long, TimeUnit, Scheduler)" -> "window(Duration, Duration, Scheduler)",
162-
"zip(Observable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Observable[U], (T, U) => R)",
163-
"zip(Iterable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Iterable[U], (T, U) => R)",
164167

165168
// manually added entries for Java static methods
166169
"average(Observable[Integer])" -> averageProblem,
@@ -197,7 +200,9 @@ class CompletenessTest extends JUnitSuite {
197200
"switchOnNext(Observable[_ <: Observable[_ <: T]])" -> "switch(<:<[Observable[T], Observable[Observable[U]]])",
198201
"zip(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "[use instance method `zip` and `map`]",
199202
"zip(Observable[_ <: Observable[_]], FuncN[_ <: R])" -> "[use `zip` in companion object and `map`]",
200-
"zip(Iterable[_ <: Observable[_]], FuncN[_ <: R])" -> "[use `zip` in companion object and `map`]"
203+
"zip(Iterable[_ <: Observable[_]], FuncN[_ <: R])" -> "[use `zip` in companion object and `map`]",
204+
"zip(Observable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Observable[U], (T, U) => R)",
205+
"zip(Iterable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Iterable[U], (T, U) => R)"
201206
) ++ List.iterate("T, T", 8)(s => s + ", T").map(
202207
// all 9 overloads of startWith:
203208
"startWith(" + _ + ")" -> "[use `Observable.items(...) ++ o`]"
@@ -345,6 +350,7 @@ class CompletenessTest extends JUnitSuite {
345350
println( "----------------------------------------------\n")
346351

347352
val actualMethods = getPublicInstanceAndCompanionMethods(typeOf[rx.lang.scala.Observable[_]]).toSet
353+
actualMethods.toList.sorted.foreach(println)
348354
var good = 0
349355
var bad = 0
350356
for ((javaM, scalaM) <- SortedMap(correspondence.toSeq :_*)) {

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,4 +176,44 @@ class ObservableTests extends JUnitSuite {
176176
assertEquals(List(0, 1, 2), o.toBlockingObservable.toList)
177177
}
178178

179+
@Test
180+
def testSingleOrElse() {
181+
val o = Observable.items(1).singleOrElse(2)
182+
assertEquals(1, o.toBlocking.single)
183+
}
184+
185+
@Test
186+
def testSingleOrElseWithEmptyObservable() {
187+
val o: Observable[Int] = Observable.empty.singleOrElse(1)
188+
assertEquals(1, o.toBlocking.single)
189+
}
190+
191+
@Test(expected = classOf[IllegalArgumentException])
192+
def testSingleOrElseWithTooManyItems() {
193+
Observable.items(1, 2).singleOrElse(1).toBlocking.single
194+
}
195+
196+
@Test
197+
def testSingleOrElseWithCallByName() {
198+
var called = false
199+
val o: Observable[Int] = Observable.empty.singleOrElse {
200+
called = true
201+
1
202+
}
203+
assertFalse(called)
204+
o.subscribe()
205+
assertTrue(called)
206+
}
207+
208+
@Test
209+
def testSingleOrElseWithCallByName2() {
210+
var called = false
211+
val o = Observable.items(1).singleOrElse {
212+
called = true
213+
2
214+
}
215+
assertFalse(called)
216+
o.subscribe()
217+
assertFalse(called)
218+
}
179219
}

0 commit comments

Comments
 (0)