|
4 | 4 |
|
5 | 5 | import java.util.concurrent.CountDownLatch;
|
6 | 6 | import java.util.concurrent.TimeUnit;
|
| 7 | +import java.util.concurrent.atomic.AtomicBoolean; |
7 | 8 | import java.util.concurrent.atomic.AtomicInteger;
|
8 | 9 |
|
9 | 10 | import org.junit.Test;
|
10 | 11 |
|
11 | 12 | import rx.Observable;
|
| 13 | +import rx.Observable.OnSubscribeFunc; |
12 | 14 | import rx.Observer;
|
13 | 15 | import rx.Scheduler;
|
14 | 16 | import rx.Subscription;
|
15 | 17 | import rx.operators.SafeObservableSubscription;
|
| 18 | +import rx.subscriptions.BooleanSubscription; |
16 | 19 | import rx.subscriptions.Subscriptions;
|
17 | 20 | import rx.util.functions.Action0;
|
18 | 21 | import rx.util.functions.Action1;
|
@@ -192,4 +195,135 @@ public void call(Action0 self) {
|
192 | 195 | latch.await();
|
193 | 196 | }
|
194 | 197 |
|
| 198 | + @Test |
| 199 | + public void testRecursiveScheduler2() throws InterruptedException { |
| 200 | + // use latches instead of Thread.sleep |
| 201 | + final CountDownLatch latch = new CountDownLatch(10); |
| 202 | + final CountDownLatch completionLatch = new CountDownLatch(1); |
| 203 | + |
| 204 | + Observable<Integer> obs = Observable.create(new OnSubscribeFunc<Integer>() { |
| 205 | + @Override |
| 206 | + public Subscription onSubscribe(final Observer<? super Integer> observer) { |
| 207 | + |
| 208 | + return getScheduler().schedule(null, new Func2<Scheduler, Void, Subscription>() { |
| 209 | + @Override |
| 210 | + public Subscription call(Scheduler scheduler, Void v) { |
| 211 | + observer.onNext(42); |
| 212 | + latch.countDown(); |
| 213 | + |
| 214 | + // this will recursively schedule this task for execution again |
| 215 | + scheduler.schedule(null, this); |
| 216 | + |
| 217 | + return Subscriptions.create(new Action0() { |
| 218 | + |
| 219 | + @Override |
| 220 | + public void call() { |
| 221 | + observer.onCompleted(); |
| 222 | + completionLatch.countDown(); |
| 223 | + } |
| 224 | + |
| 225 | + }); |
| 226 | + } |
| 227 | + }); |
| 228 | + } |
| 229 | + }); |
| 230 | + |
| 231 | + final AtomicInteger count = new AtomicInteger(); |
| 232 | + final AtomicBoolean completed = new AtomicBoolean(false); |
| 233 | + Subscription subscribe = obs.subscribe(new Observer<Integer>() { |
| 234 | + @Override |
| 235 | + public void onCompleted() { |
| 236 | + System.out.println("Completed"); |
| 237 | + completed.set(true); |
| 238 | + } |
| 239 | + |
| 240 | + @Override |
| 241 | + public void onError(Throwable e) { |
| 242 | + System.out.println("Error"); |
| 243 | + } |
| 244 | + |
| 245 | + @Override |
| 246 | + public void onNext(Integer args) { |
| 247 | + count.incrementAndGet(); |
| 248 | + System.out.println(args); |
| 249 | + } |
| 250 | + }); |
| 251 | + |
| 252 | + if (!latch.await(5000, TimeUnit.MILLISECONDS)) { |
| 253 | + fail("Timed out waiting on onNext latch"); |
| 254 | + } |
| 255 | + |
| 256 | + // now unsubscribe and ensure it stops the recursive loop |
| 257 | + subscribe.unsubscribe(); |
| 258 | + System.out.println("unsubscribe"); |
| 259 | + |
| 260 | + if (!completionLatch.await(5000, TimeUnit.MILLISECONDS)) { |
| 261 | + fail("Timed out waiting on completion latch"); |
| 262 | + } |
| 263 | + |
| 264 | + // the count can be 10 or higher due to thread scheduling of the unsubscribe vs the scheduler looping to emit the count |
| 265 | + assertTrue(count.get() >= 10); |
| 266 | + assertTrue(completed.get()); |
| 267 | + } |
| 268 | + |
| 269 | + @Test |
| 270 | + public final void testSubscribeWithScheduler() throws InterruptedException { |
| 271 | + final Scheduler scheduler = getScheduler(); |
| 272 | + |
| 273 | + final AtomicInteger count = new AtomicInteger(); |
| 274 | + |
| 275 | + Observable<Integer> o1 = Observable.<Integer> from(1, 2, 3, 4, 5); |
| 276 | + |
| 277 | + o1.subscribe(new Action1<Integer>() { |
| 278 | + |
| 279 | + @Override |
| 280 | + public void call(Integer t) { |
| 281 | + System.out.println("Thread: " + Thread.currentThread().getName()); |
| 282 | + System.out.println("t: " + t); |
| 283 | + count.incrementAndGet(); |
| 284 | + } |
| 285 | + }); |
| 286 | + |
| 287 | + // the above should be blocking so we should see a count of 5 |
| 288 | + assertEquals(5, count.get()); |
| 289 | + |
| 290 | + count.set(0); |
| 291 | + |
| 292 | + // now we'll subscribe with a scheduler and it should be async |
| 293 | + |
| 294 | + final String currentThreadName = Thread.currentThread().getName(); |
| 295 | + |
| 296 | + // latches for deterministically controlling the test below across threads |
| 297 | + final CountDownLatch latch = new CountDownLatch(5); |
| 298 | + final CountDownLatch first = new CountDownLatch(1); |
| 299 | + |
| 300 | + o1.subscribe(new Action1<Integer>() { |
| 301 | + |
| 302 | + @Override |
| 303 | + public void call(Integer t) { |
| 304 | + try { |
| 305 | + // we block the first one so we can assert this executes asynchronously with a count |
| 306 | + first.await(1000, TimeUnit.SECONDS); |
| 307 | + } catch (InterruptedException e) { |
| 308 | + throw new RuntimeException("The latch should have released if we are async.", e); |
| 309 | + } |
| 310 | + |
| 311 | + assertFalse(Thread.currentThread().getName().equals(currentThreadName)); |
| 312 | + System.out.println("Thread: " + Thread.currentThread().getName()); |
| 313 | + System.out.println("t: " + t); |
| 314 | + count.incrementAndGet(); |
| 315 | + latch.countDown(); |
| 316 | + } |
| 317 | + }, scheduler); |
| 318 | + |
| 319 | + // assert we are async |
| 320 | + assertEquals(0, count.get()); |
| 321 | + // release the latch so it can go forward |
| 322 | + first.countDown(); |
| 323 | + |
| 324 | + // wait for all 5 responses |
| 325 | + latch.await(); |
| 326 | + assertEquals(5, count.get()); |
| 327 | + } |
| 328 | + |
195 | 329 | }
|
0 commit comments