Skip to content

Commit 28a9a99

Browse files
Merge pull request ReactiveX#1265 from zsxwing/rxscala-more
Add more operators to RxScala
2 parents 95ab2a1 + 49cc022 commit 28a9a99

File tree

4 files changed

+453
-57
lines changed

4 files changed

+453
-57
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,14 @@ class RxScalaDemo extends JUnitSuite {
156156
).subscribe(output(_))
157157
}
158158

159+
@Test def windowExample2() {
160+
val windowObservable = Observable.interval(500 millis)
161+
val o = Observable.from(1 to 20).zip(Observable.interval(100 millis)).map(_._1)
162+
(for ((o, i) <- o.window(windowObservable).zipWithIndex; n <- o)
163+
yield s"Observable#$i emits $n"
164+
).toBlocking.foreach(println)
165+
}
166+
159167
@Test def testReduce() {
160168
assertEquals(10, List(1, 2, 3, 4).toObservable.reduce(_ + _).toBlockingObservable.single)
161169
}
@@ -731,6 +739,23 @@ class RxScalaDemo extends JUnitSuite {
731739
println(result)
732740
}
733741

742+
@Test def ambWithVarargsExample(): Unit = {
743+
val o1 = List(100L, 200L, 300L).toObservable.delay(4 seconds)
744+
val o2 = List(1000L, 2000L, 3000L).toObservable.delay(2 seconds)
745+
val o3 = List(10000L, 20000L, 30000L).toObservable.delay(4 seconds)
746+
val result = Observable.amb(o1, o2, o3).toBlocking.toList
747+
println(result)
748+
}
749+
750+
@Test def ambWithSeqExample(): Unit = {
751+
val o1 = List(100L, 200L, 300L).toObservable.delay(4 seconds)
752+
val o2 = List(1000L, 2000L, 3000L).toObservable.delay(2 seconds)
753+
val o3 = List(10000L, 20000L, 30000L).toObservable.delay(4 seconds)
754+
val o = Seq(o1, o2, o3)
755+
val result = Observable.amb(o: _*).toBlocking.toList
756+
println(result)
757+
}
758+
734759
@Test def delayExample(): Unit = {
735760
val o = List(100L, 200L, 300L).toObservable.delay(2 seconds)
736761
val result = o.toBlockingObservable.toList
@@ -1012,4 +1037,75 @@ class RxScalaDemo extends JUnitSuite {
10121037
subscription.unsubscribe()
10131038
}
10141039

1040+
def createAHotObservable: Observable[String] = {
1041+
var first = true
1042+
Observable[String] {
1043+
subscriber =>
1044+
if (first) {
1045+
subscriber.onNext("1st: First")
1046+
subscriber.onNext("1st: Last")
1047+
first = false
1048+
}
1049+
else {
1050+
subscriber.onNext("2nd: First")
1051+
subscriber.onNext("2nd: Last")
1052+
}
1053+
subscriber.onCompleted()
1054+
}
1055+
}
1056+
1057+
@Test def withoutPublishLastExample() {
1058+
val hot = createAHotObservable
1059+
hot.takeRight(1).subscribe(n => println(s"subscriber 1 gets $n"))
1060+
hot.takeRight(1).subscribe(n => println(s"subscriber 2 gets $n"))
1061+
}
1062+
1063+
@Test def publishLastExample() {
1064+
val hot = createAHotObservable
1065+
val o = hot.publishLast
1066+
o.subscribe(n => println(s"subscriber 1 gets $n"))
1067+
o.subscribe(n => println(s"subscriber 2 gets $n"))
1068+
o.connect
1069+
}
1070+
1071+
@Test def publishLastExample2() {
1072+
val hot = createAHotObservable
1073+
val o = hot.publishLast(co => co ++ co) // "++" subscribes "co" twice
1074+
o.subscribe(n => println(s"subscriber gets $n"))
1075+
}
1076+
1077+
@Test def unsubscribeOnExample() {
1078+
val o = Observable[String] {
1079+
subscriber =>
1080+
subscriber.add(Subscription {
1081+
println("unsubscribe on " + Thread.currentThread().getName())
1082+
})
1083+
subscriber.onNext("RxScala")
1084+
subscriber.onCompleted()
1085+
}
1086+
o.unsubscribeOn(NewThreadScheduler()).subscribe(println(_))
1087+
}
1088+
1089+
@Test def parallelMergeExample() {
1090+
val o: Observable[Observable[Int]] = (1 to 100).toObservable.map(_ => (1 to 10).toObservable)
1091+
assertEquals(100, o.size.toBlockingObservable.single)
1092+
assertEquals(1000, o.flatten.size.toBlockingObservable.single)
1093+
1094+
val o2: Observable[Observable[Int]] = o.parallelMerge(10, ComputationScheduler())
1095+
assertEquals(10, o2.size.toBlockingObservable.single)
1096+
assertEquals(1000, o2.flatten.size.toBlockingObservable.single)
1097+
}
1098+
1099+
@Test def debounceExample() {
1100+
val o = Observable.interval(100 millis).take(20).debounce {
1101+
n =>
1102+
if (n % 2 == 0) {
1103+
Observable.interval(50 millis)
1104+
}
1105+
else {
1106+
Observable.interval(150 millis)
1107+
}
1108+
}
1109+
o.toBlockingObservable.foreach(println(_))
1110+
}
10151111
}

0 commit comments

Comments
 (0)