Skip to content

Commit 3013ec9

Browse files
parallel-merge unit test assertions
Using serialize for merge allows less threads to be used under contention instead of blocking and using them all. This changes the assertion to be <= 3 instead of == 3 because of that.
1 parent e68f0b2 commit 3013ec9

File tree

2 files changed

+5
-4
lines changed

2 files changed

+5
-4
lines changed

rxjava-core/src/main/java/rx/operators/OperationParallelMerge.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
public class OperationParallelMerge {
2727

2828
public static <T> Observable<Observable<T>> parallelMerge(final Observable<Observable<T>> source, final int parallelObservables) {
29-
return parallelMerge(source, parallelObservables, Schedulers.currentThread());
29+
return parallelMerge(source, parallelObservables, Schedulers.immediate());
3030
}
3131

3232
public static <T> Observable<Observable<T>> parallelMerge(final Observable<Observable<T>> source, final int parallelObservables, final Scheduler scheduler) {

rxjava-core/src/test/java/rx/operators/OperationParallelMergeTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
*/
1616
package rx.operators;
1717

18-
import static org.junit.Assert.*;
18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertTrue;
1920

2021
import java.util.List;
2122
import java.util.concurrent.ConcurrentHashMap;
@@ -79,7 +80,7 @@ public void call(String o) {
7980
}
8081
});
8182

82-
assertEquals(3, threads.keySet().size());
83+
assertTrue(threads.keySet().size() <= 3); // can be less than since merge doesn't block threads and may not use all of them
8384
}
8485

8586
@Test
@@ -98,7 +99,7 @@ public void call(String o) {
9899
}
99100
});
100101

101-
assertEquals(3, threads.keySet().size());
102+
assertTrue(threads.keySet().size() <= 3); // can be less than since merge doesn't block threads and may not use all of them
102103
}
103104

104105
private static Observable<Observable<String>> getStreams() {

0 commit comments

Comments
 (0)