Skip to content

Commit 7a75d4b

Browse files
Merge pull request ReactiveX#717 from Applied-Duality/ScalaPublishFix
Added ConnectableObservable
2 parents 7333bf1 + 679a88f commit 7a75d4b

File tree

5 files changed

+83
-13
lines changed

5 files changed

+83
-13
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -243,9 +243,9 @@ class RxScalaDemo extends JUnitSuite {
243243
}
244244

245245
@Test def olympicsExample() {
246-
val (go, medals) = Olympics.mountainBikeMedals.publish
246+
val medals = Olympics.mountainBikeMedals.publish
247247
medals.subscribe(println(_))
248-
go()
248+
medals.connect
249249
//waitFor(medals)
250250
}
251251

@@ -257,10 +257,10 @@ class RxScalaDemo extends JUnitSuite {
257257

258258
@Test def exampleWithPublish() {
259259
val unshared = List(1 to 4).toObservable
260-
val (startFunc, shared) = unshared.publish
260+
val shared = unshared.publish
261261
shared.subscribe(n => println(s"subscriber 1 gets $n"))
262262
shared.subscribe(n => println(s"subscriber 2 gets $n"))
263-
startFunc()
263+
shared.connect
264264
}
265265

266266
def doLater(waitTime: Duration, action: () => Unit): Unit = {
@@ -269,9 +269,9 @@ class RxScalaDemo extends JUnitSuite {
269269

270270
@Test def exampleWithoutReplay() {
271271
val numbers = Observable.interval(1000 millis).take(6)
272-
val (startFunc, sharedNumbers) = numbers.publish
272+
val sharedNumbers = numbers.publish
273273
sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n"))
274-
startFunc()
274+
sharedNumbers.connect
275275
// subscriber 2 misses 0, 1, 2!
276276
doLater(3500 millis, () => { sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n")) })
277277
waitFor(sharedNumbers)

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package rx.lang.scala
1818

1919
import rx.util.functions.FuncN
2020
import rx.Observable.OnSubscribeFunc
21-
21+
import rx.lang.scala.observables.ConnectableObservable
2222

2323

2424
/**
@@ -1052,12 +1052,10 @@ trait Observable[+T]
10521052
*
10531053
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/publishConnect.png">
10541054
*
1055-
* @return a pair of a start function and an [[rx.lang.scala.Observable]] such that when the start function
1056-
* is called, the Observable starts to emit items to its [[rx.lang.scala.Observer]]s
1055+
* @return an [[rx.lang.scala.observables.ConnectableObservable]].
10571056
*/
1058-
def publish: (() => Subscription, Observable[T]) = {
1059-
val javaCO = asJavaObservable.publish()
1060-
(() => javaCO.connect(), toScalaObservable[T](javaCO))
1057+
def publish: ConnectableObservable[T] = {
1058+
new ConnectableObservable[T](asJavaObservable.publish())
10611059
}
10621060

10631061
// TODO add Scala-like aggregate function
@@ -1136,7 +1134,8 @@ trait Observable[+T]
11361134
* the initial (seed) accumulator value
11371135
* @param accumulator
11381136
* an accumulator function to be invoked on each item emitted by the source
1139-
* Observable, whose result will be emitted to [[rx.lang.scala.Observer]]s via [[rx.lang.scala.Observer.onNext onNext]] and used in the next accumulator call.
1137+
* Observable, whose result will be emitted to [[rx.lang.scala.Observer]]s via
1138+
* [[rx.lang.scala.Observer.onNext onNext]] and used in the next accumulator call.
11401139
* @return an Observable that emits the results of each call to the accumulator function
11411140
*/
11421141
def scan[R](initialValue: R)(accumulator: (R, T) => R): Observable[R] = {
@@ -1145,6 +1144,30 @@ trait Observable[+T]
11451144
}))
11461145
}
11471146

1147+
/**
1148+
* Returns an Observable that applies a function of your choosing to the
1149+
* first item emitted by a source Observable, then feeds the result of that
1150+
* function along with the second item emitted by an Observable into the
1151+
* same function, and so on until all items have been emitted by the source
1152+
* Observable, emitting the result of each of these iterations.
1153+
* <p>
1154+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/scan.png">
1155+
* <p>
1156+
*
1157+
* @param accumulator
1158+
* an accumulator function to be invoked on each item emitted by the source
1159+
* Observable, whose result will be emitted to [[rx.lang.scala.Observer]]s via
1160+
* [[rx.lang.scala.Observer.onNext onNext]] and used in the next accumulator call.
1161+
* @return
1162+
* an Observable that emits the results of each call to the
1163+
* accumulator function
1164+
*/
1165+
def scan[U >: T](accumulator: (U, U) => U): Observable[U] = {
1166+
val func: Func2[_ >: U, _ >: U, _ <: U] = accumulator
1167+
val func2 = func.asInstanceOf[Func2[T, T, T]]
1168+
toScalaObservable[U](asJavaObservable.asInstanceOf[rx.Observable[T]].scan(func2))
1169+
}
1170+
11481171
/**
11491172
* Returns an Observable that emits a Boolean that indicates whether all of the items emitted by
11501173
* the source Observable satisfy a condition.

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package rx.lang.scala.observables
1818
import scala.collection.JavaConverters._
1919
import rx.lang.scala.ImplicitFunctionConversions._
2020

21+
2122
/**
2223
* An Observable that provides blocking operators.
2324
*
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.lang.scala.observables
18+
19+
import rx.lang.scala.{Observable, Subscription}
20+
import rx.lang.scala.JavaConversions._
21+
22+
class ConnectableObservable[+T] private[scala](val asJavaObservable: rx.observables.ConnectableObservable[_ <: T])
23+
extends Observable[T] {
24+
25+
/**
26+
* Call a ConnectableObservable's connect method to instruct it to begin emitting the
27+
* items from its underlying [[rx.lang.scala.Observable]] to its [[rx.lang.scala.Observer]]s.
28+
*/
29+
def connect: Subscription = toScalaSubscription(asJavaObservable.connect())
30+
31+
/**
32+
* Returns an observable sequence that stays connected to the source as long
33+
* as there is at least one subscription to the observable sequence.
34+
*
35+
* @return a [[rx.lang.scala.Observable]]
36+
*/
37+
def refCount: Observable[T] = toScalaObservable[T](asJavaObservable.refCount())
38+
}

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,14 @@ class ObservableTests extends JUnitSuite {
5757
assertEquals(demat.toBlockingObservable.toIterable.toList, List(1, 2, 3))
5858
}
5959

60+
@Test def TestScan() {
61+
val xs = Observable.items(0,1,2,3)
62+
val ys = xs.scan(0)(_+_)
63+
assertEquals(List(0,0,1,3,6), ys.toBlockingObservable.toList)
64+
val zs = xs.scan((x: Int, y:Int) => x*y)
65+
assertEquals(List(0, 0, 0, 0), zs.toBlockingObservable.toList)
66+
}
67+
6068
// Test that Java's firstOrDefault propagates errors.
6169
// If this changes (i.e. it suppresses errors and returns default) then Scala's firstOrElse
6270
// should be changed accordingly.

0 commit comments

Comments
 (0)