Skip to content

Commit da9de6b

Browse files
DATAES-510 - Add reactive scroll support.
The ReactiveElasticsearchClient now support scrolling through large result sets issuing subsequent _search/scroll requests while emitting data on the outbound channel. Resources bound via their scrollId get freed on completion of the flux. Original Pull Request: spring-projects#231
1 parent ce124a2 commit da9de6b

File tree

3 files changed

+188
-3
lines changed

3 files changed

+188
-3
lines changed

src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import io.netty.handler.ssl.JdkSslContext;
2121
import io.netty.handler.timeout.ReadTimeoutHandler;
2222
import io.netty.handler.timeout.WriteTimeoutHandler;
23+
import reactor.core.publisher.EmitterProcessor;
2324
import reactor.core.publisher.Flux;
25+
import reactor.core.publisher.FluxSink;
2426
import reactor.core.publisher.Mono;
2527
import reactor.netty.http.client.HttpClient;
2628
import reactor.netty.tcp.TcpClient;
@@ -31,7 +33,10 @@
3133
import java.net.InetSocketAddress;
3234
import java.nio.charset.StandardCharsets;
3335
import java.time.Duration;
36+
import java.util.ArrayList;
3437
import java.util.Collection;
38+
import java.util.Collections;
39+
import java.util.List;
3540
import java.util.Map.Entry;
3641
import java.util.Optional;
3742
import java.util.concurrent.TimeUnit;
@@ -53,11 +58,15 @@
5358
import org.elasticsearch.action.index.IndexResponse;
5459
import org.elasticsearch.action.main.MainRequest;
5560
import org.elasticsearch.action.main.MainResponse;
61+
import org.elasticsearch.action.search.ClearScrollRequest;
62+
import org.elasticsearch.action.search.ClearScrollResponse;
5663
import org.elasticsearch.action.search.SearchRequest;
5764
import org.elasticsearch.action.search.SearchResponse;
65+
import org.elasticsearch.action.search.SearchScrollRequest;
5866
import org.elasticsearch.action.update.UpdateRequest;
5967
import org.elasticsearch.action.update.UpdateResponse;
6068
import org.elasticsearch.client.Request;
69+
import org.elasticsearch.common.unit.TimeValue;
6170
import org.elasticsearch.common.xcontent.DeprecationHandler;
6271
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
6372
import org.elasticsearch.common.xcontent.XContentParser;
@@ -67,6 +76,7 @@
6776
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
6877
import org.elasticsearch.rest.BytesRestResponse;
6978
import org.elasticsearch.rest.RestStatus;
79+
import org.elasticsearch.search.Scroll;
7080
import org.elasticsearch.search.SearchHit;
7181
import org.reactivestreams.Publisher;
7282
import org.springframework.data.elasticsearch.ElasticsearchException;
@@ -85,6 +95,7 @@
8595
import org.springframework.util.Assert;
8696
import org.springframework.util.ObjectUtils;
8797
import org.springframework.util.ReflectionUtils;
98+
import org.springframework.util.StringUtils;
8899
import org.springframework.web.client.HttpServerErrorException;
89100
import org.springframework.web.reactive.function.BodyExtractors;
90101
import org.springframework.web.reactive.function.client.ClientRequest;
@@ -297,6 +308,74 @@ public Flux<SearchHit> search(HttpHeaders headers, SearchRequest searchRequest)
297308
.flatMap(Flux::fromIterable);
298309
}
299310

311+
/*
312+
* (non-Javadoc)
313+
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#scroll(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
314+
*/
315+
@Override
316+
public Flux<SearchHit> scroll(HttpHeaders headers, SearchRequest searchRequest) {
317+
318+
TimeValue scrollTimeout = searchRequest.scroll() != null ? searchRequest.scroll().keepAlive()
319+
: TimeValue.timeValueMinutes(1);
320+
321+
if (searchRequest.scroll() == null) {
322+
searchRequest.scroll(scrollTimeout);
323+
}
324+
325+
EmitterProcessor<ActionRequest> outbound = EmitterProcessor.create(false);
326+
FluxSink<ActionRequest> request = outbound.sink();
327+
328+
EmitterProcessor<SearchResponse> inbound = EmitterProcessor.create(false);
329+
330+
Flux<SearchResponse> exchange = outbound.startWith(searchRequest).flatMap(it -> {
331+
332+
if (it instanceof SearchRequest) {
333+
return sendRequest((SearchRequest) it, RequestCreator.search(), SearchResponse.class, headers);
334+
} else if (it instanceof SearchScrollRequest) {
335+
return sendRequest((SearchScrollRequest) it, RequestCreator.scroll(), SearchResponse.class, headers);
336+
} else if (it instanceof ClearScrollRequest) {
337+
return sendRequest((ClearScrollRequest) it, RequestCreator.clearScroll(), ClearScrollResponse.class, headers)
338+
.flatMap(discard -> Flux.empty());
339+
}
340+
341+
throw new IllegalArgumentException(
342+
String.format("Cannot handle '%s'. Please make sure to use a 'SearchRequest' or 'SearchScrollRequest'."));
343+
});
344+
345+
ScrollState state = new ScrollState();
346+
347+
Flux<SearchHit> searchHits = inbound.doOnNext(searchResponse -> {
348+
state.updateScrollId(searchResponse.getScrollId());
349+
}).<SearchResponse> handle((searchResponse, sink) -> {
350+
351+
if (searchResponse.getHits() != null && searchResponse.getHits().getHits() != null
352+
&& searchResponse.getHits().getHits().length == 0) {
353+
354+
inbound.onComplete();
355+
outbound.onComplete();
356+
357+
} else {
358+
359+
sink.next(searchResponse);
360+
361+
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(state.getScrollId()).scroll(scrollTimeout);
362+
request.next(searchScrollRequest);
363+
}
364+
365+
}).map(SearchResponse::getHits) //
366+
.flatMap(Flux::fromIterable) //
367+
.doOnComplete(() -> {
368+
369+
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
370+
clearScrollRequest.scrollIds(state.getScrollIds());
371+
372+
// just send the request, resources get cleaned up anyways after scrollTimeout has been reached.
373+
sendRequest(clearScrollRequest, RequestCreator.clearScroll(), ClearScrollResponse.class, headers).subscribe();
374+
});
375+
376+
return searchHits.doOnSubscribe(ignore -> exchange.subscribe(inbound));
377+
}
378+
300379
/*
301380
* (non-Javadoc)
302381
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.index.reindex.DeleteByQueryRequest)
@@ -482,6 +561,14 @@ static Function<SearchRequest, Request> search() {
482561
return RequestConverters::search;
483562
}
484563

564+
static Function<SearchScrollRequest, Request> scroll() {
565+
return RequestConverters::searchScroll;
566+
}
567+
568+
static Function<ClearScrollRequest, Request> clearScroll() {
569+
return RequestConverters::clearScroll;
570+
}
571+
485572
static Function<IndexRequest, Request> index() {
486573
return RequestConverters::index;
487574
}
@@ -549,4 +636,39 @@ public Collection<ElasticsearchHost> hosts() {
549636
return connectedHosts;
550637
}
551638
}
639+
640+
/**
641+
* Mutable state object holding scrollId to be used for {@link SearchScrollRequest#scroll(Scroll)}
642+
*
643+
* @author Christoph Strobl
644+
* @since 4.0
645+
*/
646+
private static class ScrollState {
647+
648+
private Object lock = new Object();
649+
650+
private String scrollId;
651+
private List<String> pastIds = new ArrayList<>(1);
652+
653+
String getScrollId() {
654+
return scrollId;
655+
}
656+
657+
List<String> getScrollIds() {
658+
return Collections.unmodifiableList(pastIds);
659+
}
660+
661+
void updateScrollId(String scrollId) {
662+
663+
if (StringUtils.hasText(scrollId)) {
664+
665+
synchronized (lock) {
666+
667+
this.scrollId = scrollId;
668+
pastIds.add(scrollId);
669+
}
670+
}
671+
}
672+
673+
}
552674
}

src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.action.index.IndexResponse;
3131
import org.elasticsearch.action.main.MainResponse;
3232
import org.elasticsearch.action.search.SearchRequest;
33+
import org.elasticsearch.action.search.SearchResponse;
3334
import org.elasticsearch.action.update.UpdateRequest;
3435
import org.elasticsearch.action.update.UpdateResponse;
3536
import org.elasticsearch.index.get.GetResult;
@@ -350,6 +351,31 @@ default Flux<SearchHit> search(SearchRequest searchRequest) {
350351
*/
351352
Flux<SearchHit> search(HttpHeaders headers, SearchRequest searchRequest);
352353

354+
/**
355+
* Execute the given {@link SearchRequest} against the {@literal search scroll} API.
356+
*
357+
* @param searchRequest must not be {@literal null}.
358+
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html">Search
359+
* Scroll API on elastic.co</a>
360+
* @return the {@link Flux} emitting {@link SearchHit hits} one by one.
361+
*/
362+
default Flux<SearchHit> scroll(SearchRequest searchRequest) {
363+
return scroll(HttpHeaders.EMPTY, searchRequest);
364+
}
365+
366+
/**
367+
* Execute the given {@link SearchRequest} against the {@literal search scroll} API. <br />
368+
* Scroll keeps track of {@link SearchResponse#getScrollId() scrollIds} returned by the server and provides them when
369+
* requesting more results via {@code _search/scroll}. All bound server resources are freed on completion.
370+
*
371+
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
372+
* @param searchRequest must not be {@literal null}.
373+
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html">Search
374+
* Scroll API on elastic.co</a>
375+
* @return the {@link Flux} emitting {@link SearchHit hits} one by one.
376+
*/
377+
Flux<SearchHit> scroll(HttpHeaders headers, SearchRequest searchRequest);
378+
353379
/**
354380
* Execute a {@link DeleteByQueryRequest} against the {@literal delete by query} API.
355381
*

src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.LinkedHashMap;
2626
import java.util.Map;
2727
import java.util.UUID;
28+
import java.util.stream.IntStream;
2829

2930
import org.elasticsearch.ElasticsearchStatusException;
3031
import org.elasticsearch.Version;
@@ -36,7 +37,9 @@
3637
import org.elasticsearch.action.search.SearchRequest;
3738
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
3839
import org.elasticsearch.action.update.UpdateRequest;
40+
import org.elasticsearch.client.RequestOptions;
3941
import org.elasticsearch.client.RestHighLevelClient;
42+
import org.elasticsearch.common.unit.TimeValue;
4043
import org.elasticsearch.index.query.QueryBuilders;
4144
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
4245
import org.elasticsearch.rest.RestStatus;
@@ -50,6 +53,7 @@
5053
import org.springframework.data.elasticsearch.ElasticsearchVersionRule;
5154
import org.springframework.data.elasticsearch.TestUtils;
5255
import org.springframework.data.elasticsearch.client.ClientConfiguration;
56+
import org.springframework.http.HttpHeaders;
5357
import org.springframework.lang.Nullable;
5458
import org.springframework.test.context.ContextConfiguration;
5559
import org.springframework.test.context.junit4.SpringRunner;
@@ -111,8 +115,8 @@ public void pingForActiveHostShouldReturnTrue() {
111115
public void pingForUnknownHostShouldReturnFalse() {
112116

113117
DefaultReactiveElasticsearchClient
114-
.create(ClientConfiguration.builder().connectedTo("localhost:4711")
115-
.withConnectTimeout(Duration.ofSeconds(2)).build())
118+
.create(ClientConfiguration.builder().connectedTo("localhost:4711").withConnectTimeout(Duration.ofSeconds(2))
119+
.build())
116120
.ping() //
117121
.as(StepVerifier::create) //
118122
.expectNext(false) //
@@ -413,7 +417,7 @@ public void searchShouldFindExistingDocuments() {
413417
@Test // DATAES-488
414418
public void searchShouldCompleteIfNothingFound() throws IOException {
415419

416-
syncClient.indices().create(new CreateIndexRequest(INDEX_I));
420+
syncClient.indices().create(new CreateIndexRequest(INDEX_I), RequestOptions.DEFAULT);
417421

418422
SearchRequest request = new SearchRequest(INDEX_I).types(TYPE_I) //
419423
.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()));
@@ -460,6 +464,39 @@ public void deleteByEmitResultWhenNothingRemoved() {
460464
.verifyComplete();
461465
}
462466

467+
@Test // DATAES-510
468+
public void scrollShouldReadWhileEndNotReached() {
469+
470+
IntStream.range(0, 100).forEach(it -> add(Collections.singletonMap(it + "-foo", "bar")).ofType(TYPE_I).to(INDEX_I));
471+
472+
SearchRequest request = new SearchRequest(INDEX_I).types(TYPE_I) //
473+
.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()));
474+
475+
request = request.scroll(TimeValue.timeValueMinutes(1));
476+
477+
client.scroll(HttpHeaders.EMPTY, request) //
478+
.as(StepVerifier::create) //
479+
.expectNextCount(100) //
480+
.verifyComplete();
481+
}
482+
483+
@Test // DATAES-510
484+
public void scrollShouldReadWhileTakeNotReached() {
485+
486+
IntStream.range(0, 100).forEach(it -> add(Collections.singletonMap(it + "-foo", "bar")).ofType(TYPE_I).to(INDEX_I));
487+
488+
SearchRequest request = new SearchRequest(INDEX_I).types(TYPE_I) //
489+
.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()));
490+
491+
request = request.scroll(TimeValue.timeValueMinutes(1));
492+
493+
client.scroll(HttpHeaders.EMPTY, request) //
494+
.take(73)
495+
.as(StepVerifier::create) //
496+
.expectNextCount(73) //
497+
.verifyComplete();
498+
}
499+
463500
AddToIndexOfType addSourceDocument() {
464501
return add(DOC_SOURCE);
465502
}

0 commit comments

Comments
 (0)