37
37
import rx .Subscriber ;
38
38
import rx .Subscription ;
39
39
40
- public class SerializedObserverTest {
40
+ public class SerializedObserverViaQueueAndCounterTest {
41
41
42
42
@ Mock
43
43
Subscriber <String > observer ;
@@ -53,7 +53,7 @@ public void testSingleThreadedBasic() {
53
53
TestSingleThreadedObservable onSubscribe = new TestSingleThreadedObservable (s , "one" , "two" , "three" );
54
54
Observable <String > w = Observable .create (onSubscribe );
55
55
56
- SerializedObserver <String > aw = new SerializedObserver <String >(observer );
56
+ SerializedObserverViaQueueAndCounter <String > aw = new SerializedObserverViaQueueAndCounter <String >(observer );
57
57
58
58
w .subscribe (aw );
59
59
onSubscribe .waitToFinish ();
@@ -75,7 +75,7 @@ public void testMultiThreadedBasic() {
75
75
Observable <String > w = Observable .create (onSubscribe );
76
76
77
77
BusyObserver busyObserver = new BusyObserver ();
78
- SerializedObserver <String > aw = new SerializedObserver <String >(busyObserver );
78
+ SerializedObserverViaQueueAndCounter <String > aw = new SerializedObserverViaQueueAndCounter <String >(busyObserver );
79
79
80
80
w .subscribe (aw );
81
81
onSubscribe .waitToFinish ();
@@ -100,7 +100,7 @@ public void testMultiThreadedWithNPE() throws InterruptedException {
100
100
Observable <String > w = Observable .create (onSubscribe );
101
101
102
102
BusyObserver busyObserver = new BusyObserver ();
103
- SerializedObserver <String > aw = new SerializedObserver <String >(busyObserver );
103
+ SerializedObserverViaQueueAndCounter <String > aw = new SerializedObserverViaQueueAndCounter <String >(busyObserver );
104
104
105
105
w .subscribe (aw );
106
106
onSubscribe .waitToFinish ();
@@ -132,7 +132,7 @@ public void testMultiThreadedWithNPEinMiddle() {
132
132
Observable <String > w = Observable .create (onSubscribe );
133
133
134
134
BusyObserver busyObserver = new BusyObserver ();
135
- SerializedObserver <String > aw = new SerializedObserver <String >(busyObserver );
135
+ SerializedObserverViaQueueAndCounter <String > aw = new SerializedObserverViaQueueAndCounter <String >(busyObserver );
136
136
137
137
w .subscribe (aw );
138
138
onSubscribe .waitToFinish ();
@@ -168,7 +168,7 @@ public void runOutOfOrderConcurrencyTest() {
168
168
try {
169
169
TestConcurrencyObserver tw = new TestConcurrencyObserver ();
170
170
// we need Synchronized + SafeSubscriber to handle synchronization plus life-cycle
171
- SerializedObserver <String > w = new SerializedObserver <String >(new SafeSubscriber <String >(tw ));
171
+ SerializedObserverViaQueueAndCounter <String > w = new SerializedObserverViaQueueAndCounter <String >(new SafeSubscriber <String >(tw ));
172
172
173
173
Future <?> f1 = tp .submit (new OnNextThread (w , 12000 ));
174
174
Future <?> f2 = tp .submit (new OnNextThread (w , 5000 ));
@@ -223,7 +223,7 @@ public void runConcurrencyTest() {
223
223
try {
224
224
TestConcurrencyObserver tw = new TestConcurrencyObserver ();
225
225
// we need Synchronized + SafeSubscriber to handle synchronization plus life-cycle
226
- SerializedObserver <String > w = new SerializedObserver <String >(new SafeSubscriber <String >(tw ));
226
+ SerializedObserverViaQueueAndCounter <String > w = new SerializedObserverViaQueueAndCounter <String >(new SafeSubscriber <String >(tw ));
227
227
228
228
Future <?> f1 = tp .submit (new OnNextThread (w , 12000 ));
229
229
Future <?> f2 = tp .submit (new OnNextThread (w , 5000 ));
0 commit comments