@@ -39,10 +39,11 @@ protected RxRingBuffer createRingBuffer() {
39
39
/**
40
40
* Single producer, 2 consumers. The request() ensures it gets scheduled back on the same Producer thread.
41
41
*/
42
- @ Test ( timeout = 2000 )
42
+ @ Test
43
43
public void testConcurrency () throws InterruptedException {
44
44
final RxRingBuffer b = createRingBuffer ();
45
- final CountDownLatch latch = new CountDownLatch (255 );
45
+ final CountDownLatch emitLatch = new CountDownLatch (255 );
46
+ final CountDownLatch drainLatch = new CountDownLatch (2 );
46
47
47
48
final Scheduler .Worker w1 = Schedulers .newThread ().createWorker ();
48
49
Scheduler .Worker w2 = Schedulers .newThread ().createWorker ();
@@ -58,33 +59,32 @@ public void testConcurrency() throws InterruptedException {
58
59
59
60
@ Override
60
61
public void request (final long n ) {
61
- System .out .println ("request[" + c .incrementAndGet () + "]: " + n + " Thread: " + Thread .currentThread ());
62
+ // System.out.println("request[" + c.incrementAndGet() + "]: " + n + " Thread: " + Thread.currentThread());
62
63
w1 .schedule (new Action0 () {
63
64
64
65
@ Override
65
66
public void call () {
66
- if (latch .getCount () == 0 ) {
67
+ if (emitLatch .getCount () == 0 ) {
67
68
return ;
68
69
}
69
70
for (int i = 0 ; i < n ; i ++) {
70
71
try {
71
- emit .incrementAndGet ();
72
72
b .onNext ("one" );
73
+ emit .incrementAndGet ();
73
74
} catch (MissingBackpressureException e ) {
74
75
System .out .println ("BackpressureException => item: " + i + " requested: " + n + " emit: " + emit .get () + " poll: " + poll .get ());
75
76
backpressureExceptions .incrementAndGet ();
76
77
}
77
78
}
78
79
// we'll release after n batches
79
- latch .countDown ();
80
+ emitLatch .countDown ();
80
81
}
81
82
82
83
});
83
84
}
84
85
85
86
};
86
87
final TestSubscriber <String > ts = new TestSubscriber <String >();
87
-
88
88
w1 .schedule (new Action0 () {
89
89
90
90
@ Override
@@ -95,7 +95,7 @@ public void call() {
95
95
96
96
});
97
97
98
- w2 . schedule ( new Action0 () {
98
+ Action0 drainer = new Action0 () {
99
99
100
100
@ Override
101
101
public void call () {
@@ -109,39 +109,30 @@ public void call() {
109
109
if (emitted > 0 ) {
110
110
ts .requestMore (emitted );
111
111
emitted = 0 ;
112
+ } else {
113
+ if (emitLatch .getCount () == 0 ) {
114
+ // this works with SynchronizedQueue, if changing to a non-blocking Queue
115
+ // then this will likely need to change like the SpmcTest version
116
+ drainLatch .countDown ();
117
+ return ;
118
+ }
112
119
}
113
120
}
114
121
}
115
122
116
123
}
117
124
118
- });
119
-
120
- w3 .schedule (new Action0 () {
125
+ };
121
126
122
- @ Override
123
- public void call () {
124
- int emitted = 0 ;
125
- while (true ) {
126
- Object o = b .poll ();
127
- if (o != null ) {
128
- emitted ++;
129
- poll .incrementAndGet ();
130
- } else {
131
- if (emitted > 0 ) {
132
- ts .requestMore (emitted );
133
- emitted = 0 ;
134
- }
135
- }
136
- }
137
- }
127
+ w2 .schedule (drainer );
128
+ w3 .schedule (drainer );
138
129
139
- });
130
+ emitLatch .await ();
131
+ drainLatch .await ();
140
132
141
- latch .await ();
142
- w1 .unsubscribe ();
143
133
w2 .unsubscribe ();
144
134
w3 .unsubscribe ();
135
+ w1 .unsubscribe (); // put this one last as unsubscribing from it can cause Exceptions to be throw in w2/w3
145
136
146
137
System .out .println ("emit: " + emit .get () + " poll: " + poll .get ());
147
138
assertEquals (0 , backpressureExceptions .get ());
0 commit comments