Skip to content

Commit a2f963d

Browse files
committed
spring-projects#328 - Fix reactive MongoDB tailable cursor examples.
Replace flatMap(…) operator after collection drop with then(…) to create the collection regardless of whether the previous operation emitted an element or not. Collect emitted elements to add an assertion and fail if the tailing does not work.
1 parent 627a08b commit a2f963d

File tree

2 files changed

+24
-9
lines changed

2 files changed

+24
-9
lines changed

mongodb/reactive/src/test/java/example/springdata/mongodb/people/ReactivePersonRepositoryIntegrationTest.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2016 the original author or authors.
2+
* Copyright 2015-2017 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@
2222
import reactor.core.publisher.Mono;
2323

2424
import java.util.List;
25+
import java.util.Queue;
26+
import java.util.concurrent.ConcurrentLinkedQueue;
2527
import java.util.concurrent.CountDownLatch;
2628

2729
import org.junit.Before;
@@ -50,8 +52,10 @@ public void setUp() {
5052

5153
operations.collectionExists(Person.class) //
5254
.flatMap(exists -> exists ? operations.dropCollection(Person.class) : Mono.just(exists)) //
53-
.flatMap(o -> operations.createCollection(Person.class, CollectionOptions.empty().size(1024 * 1024).maxDocuments( 100).capped())) //
54-
.then() //
55+
.then(operations.createCollection(Person.class, CollectionOptions.empty() //
56+
.size(1024 * 1024) //
57+
.maxDocuments(100) //
58+
.capped())) //
5559
.block();
5660

5761
repository
@@ -61,7 +65,6 @@ public void setUp() {
6165
new Person("Jesse", "Pinkman", 27))) //
6266
.then() //
6367
.block();
64-
6568
}
6669

6770
/**
@@ -109,8 +112,11 @@ public void shouldPerformConversionBeforeResultProcessing() throws Exception {
109112
@Test
110113
public void shouldStreamDataWithTailableCursor() throws Exception {
111114

115+
Queue<Person> people = new ConcurrentLinkedQueue<>();
116+
112117
Disposable disposable = repository.findWithTailableCursorBy() //
113118
.doOnNext(System.out::println) //
119+
.doOnNext(people::add) //
114120
.doOnComplete(() -> System.out.println("Complete")) //
115121
.doOnTerminate(() -> System.out.println("Terminated")) //
116122
.subscribe();
@@ -127,6 +133,8 @@ public void shouldStreamDataWithTailableCursor() throws Exception {
127133

128134
repository.save(new Person("Gus", "Fring", 53)).subscribe();
129135
Thread.sleep(100);
136+
137+
assertThat(people).hasSize(6);
130138
}
131139

132140
/**

mongodb/reactive/src/test/java/example/springdata/mongodb/people/RxJava2PersonRepositoryIntegrationTest.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@
2020
import io.reactivex.Flowable;
2121
import io.reactivex.Single;
2222
import io.reactivex.disposables.Disposable;
23-
import reactor.core.publisher.Flux;
2423
import reactor.core.publisher.Mono;
2524

2625
import java.util.List;
26+
import java.util.Queue;
27+
import java.util.concurrent.ConcurrentLinkedQueue;
2728
import java.util.concurrent.CountDownLatch;
2829

2930
import org.junit.Before;
@@ -56,9 +57,10 @@ public void setUp() {
5657

5758
operations.collectionExists(Person.class) //
5859
.flatMap(exists -> exists ? operations.dropCollection(Person.class) : Mono.just(exists)) //
59-
.flatMap(o -> operations.createCollection(Person.class,
60-
CollectionOptions.empty().size(1024 * 1024).maxDocuments(100).capped())) //
61-
.then() //
60+
.then(operations.createCollection(Person.class, CollectionOptions.empty() //
61+
.size(1024 * 1024) //
62+
.maxDocuments(100) //
63+
.capped())) //
6264
.block();
6365

6466
repository.saveAll(Flowable.just(new Person("Walter", "White", 50), //
@@ -112,13 +114,16 @@ public void shouldPerformConversionBeforeResultProcessing() throws Exception {
112114
}
113115

114116
/**
115-
* A tailable cursor streams data using {@link Flux} as it arrives inside the capped collection.
117+
* A tailable cursor streams data using {@link Flowable} as it arrives inside the capped collection.
116118
*/
117119
@Test
118120
public void shouldStreamDataWithTailableCursor() throws Exception {
119121

122+
Queue<Person> people = new ConcurrentLinkedQueue<>();
123+
120124
Disposable subscription = repository.findWithTailableCursorBy() //
121125
.doOnNext(System.out::println) //
126+
.doOnNext(people::add) //
122127
.doOnComplete(() -> System.out.println("Complete")) //
123128
.doOnTerminate(() -> System.out.println("Terminated")) //
124129
.subscribe();
@@ -135,6 +140,8 @@ public void shouldStreamDataWithTailableCursor() throws Exception {
135140

136141
repository.save(new Person("Gus", "Fring", 53)).subscribe();
137142
Thread.sleep(100);
143+
144+
assertThat(people).hasSize(6);
138145
}
139146

140147
/**

0 commit comments

Comments
 (0)