16
16
17
17
package rx .observables ;
18
18
19
- import static org .junit .Assert .assertEquals ;
20
- import static org .junit .Assert .assertFalse ;
21
- import static org .junit .Assert .assertNull ;
22
- import static org .junit .Assert .assertTrue ;
23
- import static org .mockito .Matchers .any ;
24
- import static org .mockito .Matchers .isA ;
25
- import static org .mockito .Mockito .inOrder ;
26
- import static org .mockito .Mockito .mock ;
27
- import static org .mockito .Mockito .never ;
28
- import static org .mockito .Mockito .times ;
29
- import static org .mockito .Mockito .verify ;
30
-
31
- import java .util .ArrayList ;
32
- import java .util .Arrays ;
33
- import java .util .Iterator ;
34
- import java .util .List ;
35
- import java .util .Map ;
36
- import java .util .concurrent .BrokenBarrierException ;
37
- import java .util .concurrent .Callable ;
38
- import java .util .concurrent .ConcurrentHashMap ;
39
- import java .util .concurrent .CountDownLatch ;
40
- import java .util .concurrent .CyclicBarrier ;
41
- import java .util .concurrent .ExecutionException ;
42
- import java .util .concurrent .ExecutorService ;
43
- import java .util .concurrent .Executors ;
44
- import java .util .concurrent .Future ;
45
- import java .util .concurrent .TimeUnit ;
46
- import java .util .concurrent .atomic .AtomicBoolean ;
47
- import java .util .concurrent .atomic .AtomicInteger ;
48
- import java .util .concurrent .atomic .AtomicReference ;
19
+ import static org .junit .Assert .*;
20
+ import static org .mockito .Matchers .*;
21
+ import static org .mockito .Mockito .*;
22
+
23
+ import java .util .*;
24
+ import java .util .concurrent .*;
25
+ import java .util .concurrent .atomic .*;
49
26
50
27
import org .junit .Test ;
51
- import org .mockito .InOrder ;
52
- import org .mockito .Matchers ;
53
- import org .mockito .Mockito ;
28
+ import org .mockito .*;
54
29
30
+ import rx .*;
55
31
import rx .Observable ;
56
- import rx .Observable .OnSubscribe ;
57
- import rx .Observable .Operator ;
32
+ import rx .Observable .*;
58
33
import rx .Observer ;
59
- import rx .Producer ;
60
- import rx .Subscriber ;
61
34
import rx .exceptions .TestException ;
62
- import rx .functions .Action0 ;
63
- import rx .functions .Action1 ;
64
- import rx .functions .Action2 ;
65
- import rx .functions .Func0 ;
66
- import rx .functions .Func2 ;
35
+ import rx .functions .*;
67
36
import rx .observers .TestSubscriber ;
68
- import rx .schedulers .Schedulers ;
69
- import rx .schedulers .TestScheduler ;
37
+ import rx .schedulers .*;
70
38
71
39
/**
72
40
* Test if SyncOnSubscribe adheres to the usual unsubscription and backpressure contracts.
@@ -489,6 +457,16 @@ public Integer call(Integer state, Observer<? super Integer> observer) {
489
457
verify (onUnSubscribe , times (1 )).call (any (Integer .class ));
490
458
}
491
459
460
+ @ Test
461
+ public void testConcurrentRequestsLoop () throws InterruptedException {
462
+ for (int i = 0 ; i < 100 ; i ++) {
463
+ if (i % 10 == 0 ) {
464
+ System .out .println ("testConcurrentRequestsLoop >> " + i );
465
+ }
466
+ testConcurrentRequests ();
467
+ }
468
+ }
469
+
492
470
@ Test
493
471
public void testConcurrentRequests () throws InterruptedException {
494
472
final int count1 = 1000 ;
@@ -514,12 +492,20 @@ public Integer call(Integer state, Observer<? super Integer> observer) {
514
492
l2 .countDown ();
515
493
// wait until the 2nd request returns then proceed
516
494
try {
517
- if (!l1 .await (1 , TimeUnit .SECONDS ))
518
- throw new IllegalStateException ();
519
- } catch (InterruptedException e ) {}
495
+ if (!l1 .await (2 , TimeUnit .SECONDS )) {
496
+ observer .onError (new TimeoutException ());
497
+ return state + 1 ;
498
+ }
499
+ } catch (InterruptedException e ) {
500
+ observer .onError (e );
501
+ return state + 1 ;
502
+ }
520
503
observer .onNext (state );
521
- if (state == finalCount )
504
+
505
+ if (state == finalCount ) {
522
506
observer .onCompleted ();
507
+ }
508
+
523
509
return state + 1 ;
524
510
}},
525
511
onUnSubscribe );
@@ -532,10 +518,9 @@ public Integer call(Integer state, Observer<? super Integer> observer) {
532
518
Observable .create (os ).subscribeOn (Schedulers .newThread ()).subscribe (ts );
533
519
534
520
// wait until the first request has started processing
535
- try {
536
- if (!l2 .await (1 , TimeUnit .SECONDS ))
537
- throw new IllegalStateException ();
538
- } catch (InterruptedException e ) {}
521
+ if (!l2 .await (2 , TimeUnit .SECONDS )) {
522
+ fail ("SyncOnSubscribe failed to countDown in time" );
523
+ }
539
524
// make a concurrent request, this should return
540
525
ts .requestMore (count2 );
541
526
// unblock the 1st thread to proceed fulfilling requests
0 commit comments