Skip to content

Commit 49cc022

Browse files
committed
Add "buffer(=> Observable[Any])" to replace "buffer(() => Observable[Any])" and "buffer(Observable[Any])""; Remove the cast for "window(=> Observable[Any])"
1 parent dc7621c commit 49cc022

File tree

2 files changed

+10
-27
lines changed

2 files changed

+10
-27
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 8 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -408,26 +408,6 @@ trait Observable[+T]
408408
zip(0 until Int.MaxValue)
409409
}
410410

411-
/**
412-
* Creates an Observable which produces buffers of collected values.
413-
*
414-
* This Observable produces connected non-overlapping buffers. The current buffer is
415-
* emitted and replaced with a new buffer when the Observable produced by the specified function produces an object. The function will then
416-
* be used to create a new Observable to listen for the end of the next buffer.
417-
*
418-
* @param closings
419-
* The function which is used to produce an [[rx.lang.scala.Observable]] for every buffer created.
420-
* When this [[rx.lang.scala.Observable]] produces an object, the associated buffer
421-
* is emitted and replaced with a new one.
422-
* @return
423-
* An [[rx.lang.scala.Observable]] which produces connected non-overlapping buffers, which are emitted
424-
* when the current [[rx.lang.scala.Observable]] created with the function argument produces an object.
425-
*/
426-
def buffer(closings: () => Observable[Any]) : Observable[Seq[T]] = {
427-
val f: Func0[_ <: rx.Observable[_ <: Any]] = closings().asJavaObservable
428-
val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer[Any](f)
429-
Observable.jObsOfListToScObsOfSeq(jObs.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
430-
}
431411
/**
432412
* Creates an Observable which produces buffers of collected values.
433413
*
@@ -626,13 +606,16 @@ trait Observable[+T]
626606
* Completion of either the source or the boundary Observable causes the returned Observable to emit the
627607
* latest buffer and complete.
628608
*
629-
* @param boundary the boundary Observable
609+
* @param boundary the boundary Observable. Note: This is a by-name parameter,
610+
* so it is only evaluated when someone subscribes to the returned Observable.
630611
* @return an Observable that emits buffered items from the source Observable when the boundary Observable
631612
* emits an item
632613
*/
633-
def buffer(boundary: Observable[Any]): Observable[Seq[T]] = {
634-
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]]
635-
toScalaObservable(thisJava.buffer(boundary.asJavaObservable)).map(_.asScala)
614+
def buffer(boundary: => Observable[Any]): Observable[Seq[T]] = {
615+
val f = new Func0[rx.Observable[_ <: Any]]() {
616+
override def call(): rx.Observable[_ <: Any] = boundary.asJavaObservable
617+
}
618+
toScalaObservable(asJavaObservable.buffer[Any](f)).map(_.asScala)
636619
}
637620

638621
/**
@@ -670,7 +653,7 @@ trait Observable[+T]
670653
val func = new Func0[rx.Observable[_ <: Any]]() {
671654
override def call(): rx.Observable[_ <: Any] = boundary.asJavaObservable
672655
}
673-
val jo: rx.Observable[rx.Observable[T]] = asJavaObservable.asInstanceOf[rx.Observable[T]].window[Any](func)
656+
val jo: rx.Observable[_ <: rx.Observable[_ <: T]] = asJavaObservable.window[Any](func)
674657
toScalaObservable(jo).map(toScalaObservable[T](_))
675658
}
676659

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ class CompletenessTest extends JUnitSuite {
7272
"all(Func1[_ >: T, Boolean])" -> "forall(T => Boolean)",
7373
"buffer(Long, Long, TimeUnit)" -> "buffer(Duration, Duration)",
7474
"buffer(Long, Long, TimeUnit, Scheduler)" -> "buffer(Duration, Duration, Scheduler)",
75-
"buffer(Func0[_ <: Observable[_ <: TClosing]])" -> "buffer(() => Observable[Any])",
76-
"buffer(Observable[B])" -> "buffer(Observable[Any])",
75+
"buffer(Func0[_ <: Observable[_ <: TClosing]])" -> "buffer(=> Observable[Any])",
76+
"buffer(Observable[B])" -> "buffer(=> Observable[Any])",
7777
"buffer(Observable[B], Int)" -> "buffer(Observable[Any], Int)",
7878
"buffer(Observable[_ <: TOpening], Func1[_ >: TOpening, _ <: Observable[_ <: TClosing]])" -> "buffer(Observable[Opening], Opening => Observable[Any])",
7979
"contains(Any)" -> "contains(U)",

0 commit comments

Comments
 (0)