Skip to content

Commit b50e60f

Browse files
mp911dechristophstrobl
authored andcommitted
DATAES-510 - Polishing.
Wrap Scroll execution with usingWhen and run cleanup through usingWhen callback to clean up scrolls state on success/on error/on cancellation. Extract isEmpty(SearchHits) check into own method. Improve synchronization in ScrollState to prevent concurrent modification exceptions during read. Original Pull Request: spring-projects#231
1 parent da9de6b commit b50e60f

File tree

1 file changed

+45
-27
lines changed

1 file changed

+45
-27
lines changed

src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java

Lines changed: 45 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import org.elasticsearch.rest.RestStatus;
7979
import org.elasticsearch.search.Scroll;
8080
import org.elasticsearch.search.SearchHit;
81+
import org.elasticsearch.search.SearchHits;
8182
import org.reactivestreams.Publisher;
8283
import org.springframework.data.elasticsearch.ElasticsearchException;
8384
import org.springframework.data.elasticsearch.client.ClientConfiguration;
@@ -92,6 +93,7 @@
9293
import org.springframework.http.HttpStatus;
9394
import org.springframework.http.MediaType;
9495
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
96+
import org.springframework.lang.Nullable;
9597
import org.springframework.util.Assert;
9698
import org.springframework.util.ObjectUtils;
9799
import org.springframework.util.ReflectionUtils;
@@ -339,41 +341,55 @@ public Flux<SearchHit> scroll(HttpHeaders headers, SearchRequest searchRequest)
339341
}
340342

341343
throw new IllegalArgumentException(
342-
String.format("Cannot handle '%s'. Please make sure to use a 'SearchRequest' or 'SearchScrollRequest'."));
344+
String.format("Cannot handle '%s'. Please make sure to use a 'SearchRequest' or 'SearchScrollRequest'.", it));
343345
});
344346

345-
ScrollState state = new ScrollState();
347+
return Flux.usingWhen(Mono.fromSupplier(ScrollState::new),
346348

347-
Flux<SearchHit> searchHits = inbound.doOnNext(searchResponse -> {
348-
state.updateScrollId(searchResponse.getScrollId());
349-
}).<SearchResponse> handle((searchResponse, sink) -> {
349+
scrollState -> {
350350

351-
if (searchResponse.getHits() != null && searchResponse.getHits().getHits() != null
352-
&& searchResponse.getHits().getHits().length == 0) {
351+
Flux<SearchHit> searchHits = inbound.<SearchResponse> handle((searchResponse, sink) -> {
353352

354-
inbound.onComplete();
355-
outbound.onComplete();
353+
scrollState.updateScrollId(searchResponse.getScrollId());
354+
if (isEmpty(searchResponse.getHits())) {
356355

357-
} else {
356+
inbound.onComplete();
357+
outbound.onComplete();
358358

359-
sink.next(searchResponse);
359+
} else {
360360

361-
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(state.getScrollId()).scroll(scrollTimeout);
362-
request.next(searchScrollRequest);
363-
}
361+
sink.next(searchResponse);
362+
363+
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollState.getScrollId())
364+
.scroll(scrollTimeout);
365+
request.next(searchScrollRequest);
366+
}
364367

365-
}).map(SearchResponse::getHits) //
366-
.flatMap(Flux::fromIterable) //
367-
.doOnComplete(() -> {
368+
}).map(SearchResponse::getHits) //
369+
.flatMap(Flux::fromIterable);
368370

369-
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
370-
clearScrollRequest.scrollIds(state.getScrollIds());
371+
return searchHits.doOnSubscribe(ignore -> exchange.subscribe(inbound));
371372

372-
// just send the request, resources get cleaned up anyways after scrollTimeout has been reached.
373-
sendRequest(clearScrollRequest, RequestCreator.clearScroll(), ClearScrollResponse.class, headers).subscribe();
374-
});
373+
}, state -> cleanupScroll(headers, state), //
374+
state -> cleanupScroll(headers, state), //
375+
state -> cleanupScroll(headers, state)); //
376+
}
377+
378+
private static boolean isEmpty(@Nullable SearchHits hits) {
379+
return hits != null && hits.getHits() != null && hits.getHits().length == 0;
380+
}
375381

376-
return searchHits.doOnSubscribe(ignore -> exchange.subscribe(inbound));
382+
private Publisher<?> cleanupScroll(HttpHeaders headers, ScrollState state) {
383+
384+
if (state.getScrollIds().isEmpty()) {
385+
return Mono.empty();
386+
}
387+
388+
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
389+
clearScrollRequest.scrollIds(state.getScrollIds());
390+
391+
// just send the request, resources get cleaned up anyways after scrollTimeout has been reached.
392+
return sendRequest(clearScrollRequest, RequestCreator.clearScroll(), ClearScrollResponse.class, headers);
377393
}
378394

379395
/*
@@ -645,17 +661,20 @@ public Collection<ElasticsearchHost> hosts() {
645661
*/
646662
private static class ScrollState {
647663

648-
private Object lock = new Object();
664+
private final Object lock = new Object();
649665

666+
private final List<String> pastIds = new ArrayList<>(1);
650667
private String scrollId;
651-
private List<String> pastIds = new ArrayList<>(1);
652668

653669
String getScrollId() {
654670
return scrollId;
655671
}
656672

657673
List<String> getScrollIds() {
658-
return Collections.unmodifiableList(pastIds);
674+
675+
synchronized (lock) {
676+
return Collections.unmodifiableList(new ArrayList<>(pastIds));
677+
}
659678
}
660679

661680
void updateScrollId(String scrollId) {
@@ -669,6 +688,5 @@ void updateScrollId(String scrollId) {
669688
}
670689
}
671690
}
672-
673691
}
674692
}

0 commit comments

Comments
 (0)