Skip to content

Commit a1c1394

Browse files
christophstroblmp911de
authored andcommitted
DATAES-518 - Use scroll for unpaged find operations in ReactiveElasticsearchTemplate.
We now use scroll instead of a max page size for unpaged queries. Original pull request: spring-projects#233.
1 parent fae1125 commit a1c1394

File tree

4 files changed

+85
-26
lines changed

4 files changed

+85
-26
lines changed

src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchOperations.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.elasticsearch.index.query.QueryBuilders;
2222
import org.reactivestreams.Publisher;
23+
import org.springframework.data.domain.Pageable;
2324
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
2425
import org.springframework.data.elasticsearch.core.query.Query;
2526
import org.springframework.data.elasticsearch.core.query.StringQuery;
@@ -205,7 +206,10 @@ default Mono<Boolean> exists(String id, Class<?> entityType, @Nullable String in
205206
Mono<Boolean> exists(String id, Class<?> entityType, @Nullable String index, @Nullable String type);
206207

207208
/**
208-
* Search the index for entities matching the given {@link Query query}.
209+
* Search the index for entities matching the given {@link Query query}. <br />
210+
* {@link Pageable#isUnpaged() Unpaged} queries may overrule elasticsearch server defaults for page size by either
211+
* delegating to the scroll API or using a max {@link org.elasticsearch.search.builder.SearchSourceBuilder#size(int)
212+
* size}.
209213
*
210214
* @param query must not be {@literal null}.
211215
* @param entityType must not be {@literal null}.
@@ -217,7 +221,10 @@ default <T> Flux<T> find(Query query, Class<T> entityType) {
217221
}
218222

219223
/**
220-
* Search the index for entities matching the given {@link Query query}.
224+
* Search the index for entities matching the given {@link Query query}. <br />
225+
* {@link Pageable#isUnpaged() Unpaged} queries may overrule elasticsearch server defaults for page size by either *
226+
* delegating to the scroll API or using a max {@link org.elasticsearch.search.builder.SearchSourceBuilder#size(int) *
227+
* size}.
221228
*
222229
* @param query must not be {@literal null}.
223230
* @param entityType The entity type for mapping the query. Must not be {@literal null}.

src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,16 @@ private Flux<SearchHit> doFind(Query query, ElasticsearchPersistentEntity<?> ent
253253
searchSourceBuilder.fetchSource(query.getSourceFilter().getIncludes(), query.getSourceFilter().getExcludes());
254254
}
255255

256+
sort(query, entity).forEach(searchSourceBuilder::sort);
257+
258+
if (query.getMinScore() > 0) {
259+
searchSourceBuilder.minScore(query.getMinScore());
260+
}
261+
262+
if (query.getIndicesOptions() != null) {
263+
request.indicesOptions(query.getIndicesOptions());
264+
}
265+
256266
if (query.getPageable().isPaged()) {
257267

258268
long offset = query.getPageable().getOffset();
@@ -262,25 +272,15 @@ private Flux<SearchHit> doFind(Query query, ElasticsearchPersistentEntity<?> ent
262272

263273
searchSourceBuilder.from((int) offset);
264274
searchSourceBuilder.size(query.getPageable().getPageSize());
265-
} else {
266-
267-
searchSourceBuilder.from(0);
268-
searchSourceBuilder.size(10000); // this is the index.max_result_window default value
269-
}
270275

271-
if (query.getIndicesOptions() != null) {
272-
request.indicesOptions(query.getIndicesOptions());
273-
}
276+
request.source(searchSourceBuilder);
277+
return doFind(prepareSearchRequest(request));
274278

275-
sort(query, entity).forEach(searchSourceBuilder::sort);
279+
} else {
276280

277-
if (query.getMinScore() > 0) {
278-
searchSourceBuilder.minScore(query.getMinScore());
281+
request.source(searchSourceBuilder);
282+
return doScan(prepareSearchRequest(request));
279283
}
280-
281-
request.source(searchSourceBuilder);
282-
283-
return doFind(prepareSearchRequest(request));
284284
});
285285
}
286286

@@ -516,6 +516,21 @@ protected Flux<SearchHit> doFind(SearchRequest request) {
516516
return Flux.from(execute(client -> client.search(request)));
517517
}
518518

519+
/**
520+
* Customization hook on the actual execution result {@link Publisher}. <br />
521+
*
522+
* @param request the already prepared {@link SearchRequest} ready to be executed.
523+
* @return a {@link Flux} emitting the result of the operation.
524+
*/
525+
protected Flux<SearchHit> doScan(SearchRequest request) {
526+
527+
if (QUERY_LOGGER.isDebugEnabled()) {
528+
QUERY_LOGGER.debug("Executing doScan: {}", request);
529+
}
530+
531+
return Flux.from(execute(client -> client.scroll(request)));
532+
}
533+
519534
/**
520535
* Customization hook on the actual execution result {@link Publisher}. <br />
521536
*

src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,17 @@
3232
import java.util.Map;
3333
import java.util.UUID;
3434
import java.util.stream.Collectors;
35+
import java.util.stream.IntStream;
3536

3637
import org.junit.Before;
3738
import org.junit.Rule;
3839
import org.junit.Test;
3940
import org.junit.runner.RunWith;
4041
import org.springframework.dao.DataAccessResourceFailureException;
4142
import org.springframework.data.annotation.Id;
43+
import org.springframework.data.domain.PageRequest;
44+
import org.springframework.data.domain.Pageable;
45+
import org.springframework.data.domain.Sort;
4246
import org.springframework.data.elasticsearch.ElasticsearchVersion;
4347
import org.springframework.data.elasticsearch.ElasticsearchVersionRule;
4448
import org.springframework.data.elasticsearch.TestUtils;
@@ -354,6 +358,40 @@ public void shouldReturnProjectedTargetEntity() {
354358
.verifyComplete();
355359
}
356360

361+
@Test // DATAES-518
362+
public void findShouldApplyPagingCorrectly() {
363+
364+
List<SampleEntity> source = IntStream.range(0, 100).mapToObj(it -> randomEntity("entity - " + it))
365+
.collect(Collectors.toList());
366+
367+
index(source.toArray(new SampleEntity[0]));
368+
369+
CriteriaQuery query = new CriteriaQuery(new Criteria("message").contains("entity")) //
370+
.addSort(Sort.by("message"))//
371+
.setPageable(PageRequest.of(0, 20));
372+
373+
template.find(query, SampleEntity.class).as(StepVerifier::create) //
374+
.expectNextCount(20) //
375+
.verifyComplete();
376+
}
377+
378+
@Test // DATAES-518
379+
public void findWithoutPagingShouldReadAll() {
380+
381+
List<SampleEntity> source = IntStream.range(0, 100).mapToObj(it -> randomEntity("entity - " + it))
382+
.collect(Collectors.toList());
383+
384+
index(source.toArray(new SampleEntity[0]));
385+
386+
CriteriaQuery query = new CriteriaQuery(new Criteria("message").contains("entity")) //
387+
.addSort(Sort.by("message"))//
388+
.setPageable(Pageable.unpaged());
389+
390+
template.find(query, SampleEntity.class).as(StepVerifier::create) //
391+
.expectNextCount(100) //
392+
.verifyComplete();
393+
}
394+
357395
@Test // DATAES-504
358396
public void countShouldReturnCountAllWhenGivenNoQuery() {
359397

src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateUnitTests.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -92,28 +92,28 @@ public void insertShouldApplyRefreshPolicy() {
9292
assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(RefreshPolicy.WAIT_UNTIL);
9393
}
9494

95-
@Test // DATAES-504
95+
@Test // DATAES-504, DATAES-518
9696
public void findShouldFallBackToDefaultIndexOptionsIfNotSet() {
9797

9898
ArgumentCaptor<SearchRequest> captor = ArgumentCaptor.forClass(SearchRequest.class);
9999
when(client.search(captor.capture())).thenReturn(Flux.empty());
100100

101-
template.find(new CriteriaQuery(new Criteria("*")), SampleEntity.class) //
101+
template.find(new CriteriaQuery(new Criteria("*")).setPageable(PageRequest.of(0, 10)), SampleEntity.class) //
102102
.as(StepVerifier::create) //
103103
.verifyComplete();
104104

105105
assertThat(captor.getValue().indicesOptions()).isEqualTo(DEFAULT_INDICES_OPTIONS);
106106
}
107107

108-
@Test // DATAES-504
108+
@Test // DATAES-504, DATAES-518
109109
public void findShouldApplyIndexOptionsIfSet() {
110110

111111
ArgumentCaptor<SearchRequest> captor = ArgumentCaptor.forClass(SearchRequest.class);
112112
when(client.search(captor.capture())).thenReturn(Flux.empty());
113113

114114
template.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
115115

116-
template.find(new CriteriaQuery(new Criteria("*")), SampleEntity.class) //
116+
template.find(new CriteriaQuery(new Criteria("*")).setPageable(PageRequest.of(0, 10)), SampleEntity.class) //
117117
.as(StepVerifier::create) //
118118
.verifyComplete();
119119

@@ -135,19 +135,18 @@ public void findShouldApplyPaginationIfSet() {
135135
assertThat(captor.getValue().source().size()).isEqualTo(50);
136136
}
137137

138-
@Test // DATAES-504
139-
public void findShouldApplyDefaultMaxIfPaginationNotSet() {
138+
@Test // DATAES-504, DATAES-518
139+
public void findShouldUseScrollIfPaginationNotSet() {
140140

141141
ArgumentCaptor<SearchRequest> captor = ArgumentCaptor.forClass(SearchRequest.class);
142-
when(client.search(captor.capture())).thenReturn(Flux.empty());
142+
when(client.scroll(captor.capture())).thenReturn(Flux.empty());
143143

144144

145145
template.find(new CriteriaQuery(new Criteria("*")).setPageable(Pageable.unpaged()), SampleEntity.class) //
146146
.as(StepVerifier::create) //
147147
.verifyComplete();
148148

149-
assertThat(captor.getValue().source().from()).isEqualTo(0);
150-
assertThat(captor.getValue().source().size()).isEqualTo(10000);
149+
verify(client).scroll(any());
151150
}
152151

153152
@Test // DATAES-504

0 commit comments

Comments
 (0)