Skip to content

Commit fe438ca

Browse files
Merge pull request ReactiveX#731 from zsxwing/flaky-unit-tests
Replaced 'Thread.sleep' with 'CountDownLatch' to fix the flaky test failures
2 parents ab689bf + 5ef9075 commit fe438ca

File tree

1 file changed

+17
-20
lines changed

1 file changed

+17
-20
lines changed

rxjava-core/src/test/java/rx/operators/OperationConcatTest.java

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ public void testNestedAsyncConcat() throws Throwable {
156156
final CountDownLatch allowThird = new CountDownLatch(1);
157157

158158
final AtomicReference<Thread> parent = new AtomicReference<Thread>();
159+
final CountDownLatch parentHasStarted = new CountDownLatch(1);
159160
Observable<Observable<String>> observableOfObservables = Observable.create(new Observable.OnSubscribeFunc<Observable<String>>() {
160161

161162
@Override
@@ -197,29 +198,22 @@ public void run() {
197198
}
198199
}));
199200
parent.get().start();
201+
parentHasStarted.countDown();
200202
return s;
201203
}
202204
});
203205

204206
Observable.create(concat(observableOfObservables)).subscribe(observer);
205207

206208
// wait for parent to start
207-
while (parent.get() == null) {
208-
Thread.sleep(1);
209-
}
209+
parentHasStarted.await();
210210

211211
try {
212212
// wait for first 2 async observables to complete
213-
while (o1.t == null) {
214-
Thread.sleep(1);
215-
}
216-
System.out.println("Thread1 started ... waiting for it to complete ...");
217-
o1.t.join();
218-
while (o2.t == null) {
219-
Thread.sleep(1);
220-
}
221-
System.out.println("Thread2 started ... waiting for it to complete ...");
222-
o2.t.join();
213+
System.out.println("Thread1 is starting ... waiting for it to complete ...");
214+
o1.waitForThreadDone();
215+
System.out.println("Thread2 is starting ... waiting for it to complete ...");
216+
o2.waitForThreadDone();
223217
} catch (Throwable e) {
224218
throw new RuntimeException("failed waiting on threads", e);
225219
}
@@ -243,11 +237,8 @@ public void run() {
243237
allowThird.countDown();
244238

245239
try {
246-
while (o3.t == null) {
247-
Thread.sleep(1);
248-
}
249240
// wait for 3rd to complete
250-
o3.t.join();
241+
o3.waitForThreadDone();
251242
} catch (Throwable e) {
252243
throw new RuntimeException("failed waiting on threads", e);
253244
}
@@ -320,9 +311,8 @@ public void testConcatConcurrentWithInfinity() {
320311

321312
//Wait for the thread to start up.
322313
try {
323-
Thread.sleep(25);
324-
w1.t.join();
325-
w2.t.join();
314+
w1.waitForThreadDone();
315+
w2.waitForThreadDone();
326316
} catch (InterruptedException e) {
327317
// TODO Auto-generated catch block
328318
e.printStackTrace();
@@ -500,6 +490,7 @@ public void unsubscribe() {
500490
private boolean subscribed = true;
501491
private final CountDownLatch once;
502492
private final CountDownLatch okToContinue;
493+
private final CountDownLatch threadHasStarted = new CountDownLatch(1);
503494
private final T seed;
504495
private final int size;
505496

@@ -553,8 +544,14 @@ public void run() {
553544

554545
});
555546
t.start();
547+
threadHasStarted.countDown();
556548
return s;
557549
}
550+
551+
void waitForThreadDone() throws InterruptedException {
552+
threadHasStarted.await();
553+
t.join();
554+
}
558555
}
559556
@Test
560557
public void testMultipleObservers() {

0 commit comments

Comments
 (0)