Skip to content

Commit 506f79a

Browse files
authored
DATAES-831 - SearchOperations.searchForStream does not use requested maxResults.
Original PR: spring-projects#459
1 parent 391e240 commit 506f79a

File tree

6 files changed

+119
-45
lines changed

6 files changed

+119
-45
lines changed

src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,11 @@ public <T> SearchHitsIterator<T> searchForStream(Query query, Class<T> clazz, In
258258

259259
long scrollTimeInMillis = TimeValue.timeValueMinutes(1).millis();
260260

261+
// noinspection ConstantConditions
262+
int maxCount = query.isLimiting() ? query.getMaxResults() : 0;
263+
261264
return StreamQueries.streamResults( //
265+
maxCount, //
262266
searchScrollStart(scrollTimeInMillis, query, clazz, index), //
263267
scrollId -> searchScrollContinue(scrollId, scrollTimeInMillis, clazz, index), //
264268
this::searchScrollClear);

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
import org.elasticsearch.search.builder.SearchSourceBuilder;
4141
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
4242
import org.elasticsearch.search.suggest.SuggestBuilder;
43+
import org.slf4j.Logger;
44+
import org.slf4j.LoggerFactory;
4345
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
4446
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
4547
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
@@ -88,6 +90,8 @@
8890
*/
8991
public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate {
9092

93+
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchRestTemplate.class);
94+
9195
private RestHighLevelClient client;
9296
private ElasticsearchExceptionTranslator exceptionTranslator;
9397

@@ -300,9 +304,13 @@ public <T> SearchScrollHits<T> searchScrollContinue(@Nullable String scrollId, l
300304

301305
@Override
302306
public void searchScrollClear(List<String> scrollIds) {
303-
ClearScrollRequest request = new ClearScrollRequest();
304-
request.scrollIds(scrollIds);
305-
execute(client -> client.clearScroll(request, RequestOptions.DEFAULT));
307+
try {
308+
ClearScrollRequest request = new ClearScrollRequest();
309+
request.scrollIds(scrollIds);
310+
execute(client -> client.clearScroll(request, RequestOptions.DEFAULT));
311+
} catch (Exception e) {
312+
LOGGER.warn("Could not clear scroll: {}", e.getMessage());
313+
}
306314
}
307315

308316
@Override

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
8787
private static final Logger QUERY_LOGGER = LoggerFactory
8888
.getLogger("org.springframework.data.elasticsearch.core.QUERY");
89+
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchTemplate.class);
8990

9091
private Client client;
9192
@Nullable private String searchTimeout;
@@ -322,7 +323,11 @@ public <T> SearchScrollHits<T> searchScrollContinue(@Nullable String scrollId, l
322323

323324
@Override
324325
public void searchScrollClear(List<String> scrollIds) {
325-
client.prepareClearScroll().setScrollIds(scrollIds).execute().actionGet();
326+
try {
327+
client.prepareClearScroll().setScrollIds(scrollIds).execute().actionGet();
328+
} catch (Exception e) {
329+
LOGGER.warn("Could not clear scroll: {}", e.getMessage());
330+
}
326331
}
327332

328333
@Override

src/main/java/org/springframework/data/elasticsearch/core/StreamQueries.java

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.Iterator;
1919
import java.util.List;
2020
import java.util.NoSuchElementException;
21+
import java.util.concurrent.atomic.AtomicInteger;
2122
import java.util.function.Consumer;
2223
import java.util.function.Function;
2324

@@ -38,13 +39,15 @@ abstract class StreamQueries {
3839
/**
3940
* Stream query results using {@link SearchScrollHits}.
4041
*
42+
* @param maxCount the maximum number of entities to return, a value of 0 means that all available entities are
43+
* returned
4144
* @param searchHits the initial hits
4245
* @param continueScrollFunction function to continue scrolling applies to the current scrollId.
4346
* @param clearScrollConsumer consumer to clear the scroll context by accepting the scrollIds to clear.
44-
* @param <T>
47+
* @param <T> the entity type
4548
* @return the {@link SearchHitsIterator}.
4649
*/
47-
static <T> SearchHitsIterator<T> streamResults(SearchScrollHits<T> searchHits,
50+
static <T> SearchHitsIterator<T> streamResults(int maxCount, SearchScrollHits<T> searchHits,
4851
Function<String, SearchScrollHits<T>> continueScrollFunction, Consumer<List<String>> clearScrollConsumer) {
4952

5053
Assert.notNull(searchHits, "searchHits must not be null.");
@@ -59,20 +62,14 @@ static <T> SearchHitsIterator<T> streamResults(SearchScrollHits<T> searchHits,
5962

6063
return new SearchHitsIterator<T>() {
6164

62-
// As we couldn't retrieve single result with scroll, store current hits.
63-
private volatile Iterator<SearchHit<T>> scrollHits = searchHits.iterator();
64-
private volatile boolean continueScroll = scrollHits.hasNext();
65+
private volatile AtomicInteger currentCount = new AtomicInteger();
66+
private volatile Iterator<SearchHit<T>> currentScrollHits = searchHits.iterator();
67+
private volatile boolean continueScroll = currentScrollHits.hasNext();
6568
private volatile ScrollState scrollState = new ScrollState(searchHits.getScrollId());
6669

6770
@Override
6871
public void close() {
69-
70-
try {
71-
clearScrollConsumer.accept(scrollState.getScrollIds());
72-
} finally {
73-
scrollHits = null;
74-
scrollState = null;
75-
}
72+
clearScrollConsumer.accept(scrollState.getScrollIds());
7673
}
7774

7875
@Override
@@ -99,24 +96,25 @@ public TotalHitsRelation getTotalHitsRelation() {
9996
@Override
10097
public boolean hasNext() {
10198

102-
if (!continueScroll) {
99+
if (!continueScroll || (maxCount > 0 && currentCount.get() >= maxCount)) {
103100
return false;
104101
}
105102

106-
if (!scrollHits.hasNext()) {
103+
if (!currentScrollHits.hasNext()) {
107104
SearchScrollHits<T> nextPage = continueScrollFunction.apply(scrollState.getScrollId());
108-
scrollHits = nextPage.iterator();
105+
currentScrollHits = nextPage.iterator();
109106
scrollState.updateScrollId(nextPage.getScrollId());
110-
continueScroll = scrollHits.hasNext();
107+
continueScroll = currentScrollHits.hasNext();
111108
}
112109

113-
return scrollHits.hasNext();
110+
return currentScrollHits.hasNext();
114111
}
115112

116113
@Override
117114
public SearchHit<T> next() {
118115
if (hasNext()) {
119-
return scrollHits.next();
116+
currentCount.incrementAndGet();
117+
return currentScrollHits.next();
120118
}
121119
throw new NoSuchElementException();
122120
}

src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@
7777
import org.springframework.data.elasticsearch.core.geo.GeoPoint;
7878
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
7979
import org.springframework.data.elasticsearch.core.query.*;
80-
import org.springframework.data.util.CloseableIterator;
80+
import org.springframework.data.util.StreamUtils;
8181
import org.springframework.lang.Nullable;
8282

8383
/**
@@ -1298,27 +1298,33 @@ public void shouldReturnResultsWithScanAndScrollForGivenSearchQueryAndClass() {
12981298
assertThat(sampleEntities).hasSize(30);
12991299
}
13001300

1301-
@Test // DATAES-167
1302-
public void shouldReturnResultsWithStreamForGivenCriteriaQuery() {
1301+
@Test // DATAES-167, DATAES-831
1302+
public void shouldReturnAllResultsWithStreamForGivenCriteriaQuery() {
13031303

1304-
// given
1305-
List<IndexQuery> entities = createSampleEntitiesWithMessage("Test message", 30);
1304+
operations.bulkIndex(createSampleEntitiesWithMessage("Test message", 30), index);
1305+
indexOperations.refresh();
1306+
CriteriaQuery criteriaQuery = new CriteriaQuery(new Criteria());
1307+
criteriaQuery.setPageable(PageRequest.of(0, 10));
13061308

1307-
// when
1308-
operations.bulkIndex(entities, index);
1309+
long count = StreamUtils
1310+
.createStreamFromIterator(operations.searchForStream(criteriaQuery, SampleEntity.class, index)).count();
1311+
1312+
assertThat(count).isEqualTo(30);
1313+
}
1314+
1315+
@Test // DATAES-831
1316+
void shouldLimitStreamResultToRequestedSize() {
1317+
1318+
operations.bulkIndex(createSampleEntitiesWithMessage("Test message", 30), index);
13091319
indexOperations.refresh();
13101320

1311-
// then
13121321
CriteriaQuery criteriaQuery = new CriteriaQuery(new Criteria());
1313-
criteriaQuery.setPageable(PageRequest.of(0, 10));
1322+
criteriaQuery.setMaxResults(10);
13141323

1315-
CloseableIterator<SearchHit<SampleEntity>> stream = operations.searchForStream(criteriaQuery, SampleEntity.class,
1316-
index);
1317-
List<SearchHit<SampleEntity>> sampleEntities = new ArrayList<>();
1318-
while (stream.hasNext()) {
1319-
sampleEntities.add(stream.next());
1320-
}
1321-
assertThat(sampleEntities).hasSize(30);
1324+
long count = StreamUtils
1325+
.createStreamFromIterator(operations.searchForStream(criteriaQuery, SampleEntity.class, index)).count();
1326+
1327+
assertThat(count).isEqualTo(10);
13221328
}
13231329

13241330
private static List<IndexQuery> createSampleEntitiesWithMessage(String message, int numberOfEntities) {
@@ -3128,8 +3134,8 @@ void multiSearchShouldReturnSeqNoPrimaryTerm() {
31283134
operations.refresh(OptimisticEntity.class);
31293135

31303136
List<Query> queries = singletonList(queryForOne(saved.getId()));
3131-
List<SearchHits<OptimisticEntity>> retrievedHits = operations.multiSearch(queries,
3132-
OptimisticEntity.class, operations.getIndexCoordinatesFor(OptimisticEntity.class));
3137+
List<SearchHits<OptimisticEntity>> retrievedHits = operations.multiSearch(queries, OptimisticEntity.class,
3138+
operations.getIndexCoordinatesFor(OptimisticEntity.class));
31333139
OptimisticEntity retrieved = retrievedHits.get(0).getSearchHit(0).getContent();
31343140

31353141
assertThatSeqNoPrimaryTermIsFilled(retrieved);
@@ -3162,8 +3168,7 @@ void shouldThrowOptimisticLockingFailureExceptionWhenConcurrentUpdateOccursOnEnt
31623168
operations.save(forEdit1);
31633169

31643170
forEdit2.setMessage("It'll be great");
3165-
assertThatThrownBy(() -> operations.save(forEdit2))
3166-
.isInstanceOf(OptimisticLockingFailureException.class);
3171+
assertThatThrownBy(() -> operations.save(forEdit2)).isInstanceOf(OptimisticLockingFailureException.class);
31673172
}
31683173

31693174
@Test // DATAES-799
@@ -3179,8 +3184,7 @@ void shouldThrowOptimisticLockingFailureExceptionWhenConcurrentUpdateOccursOnVer
31793184
operations.save(forEdit1);
31803185

31813186
forEdit2.setMessage("It'll be great");
3182-
assertThatThrownBy(() -> operations.save(forEdit2))
3183-
.isInstanceOf(OptimisticLockingFailureException.class);
3187+
assertThatThrownBy(() -> operations.save(forEdit2)).isInstanceOf(OptimisticLockingFailureException.class);
31843188
}
31853189

31863190
@Test // DATAES-799

src/test/java/org/springframework/data/elasticsearch/core/StreamQueriesTest.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.atomic.AtomicBoolean;
2626

2727
import org.junit.jupiter.api.Test;
28+
import org.springframework.data.util.StreamUtils;
2829

2930
/**
3031
* @author Sascha Woo
@@ -45,6 +46,7 @@ public void shouldCallClearScrollOnIteratorClose() {
4546

4647
// when
4748
SearchHitsIterator<String> iterator = StreamQueries.streamResults( //
49+
0, //
4850
searchHits, //
4951
scrollId -> newSearchScrollHits(Collections.emptyList(), scrollId), //
5052
scrollIds -> clearScrollCalled.set(true));
@@ -70,6 +72,7 @@ public void shouldReturnTotalHits() {
7072

7173
// when
7274
SearchHitsIterator<String> iterator = StreamQueries.streamResults( //
75+
0, //
7376
searchHits, //
7477
scrollId -> newSearchScrollHits(Collections.emptyList(), scrollId), //
7578
scrollId -> {});
@@ -90,10 +93,12 @@ void shouldClearAllScrollIds() {
9093
Collections.singletonList(new SearchHit<String>(null, 0, null, null, "one")), "s-2");
9194
SearchScrollHits<String> searchHits4 = newSearchScrollHits(Collections.emptyList(), "s-3");
9295

93-
Iterator<SearchScrollHits<String>> searchScrollHitsIterator = Arrays.asList(searchHits1, searchHits2, searchHits3,searchHits4).iterator();
96+
Iterator<SearchScrollHits<String>> searchScrollHitsIterator = Arrays
97+
.asList(searchHits1, searchHits2, searchHits3, searchHits4).iterator();
9498

9599
List<String> clearedScrollIds = new ArrayList<>();
96100
SearchHitsIterator<String> iterator = StreamQueries.streamResults( //
101+
0, //
97102
searchScrollHitsIterator.next(), //
98103
scrollId -> searchScrollHitsIterator.next(), //
99104
scrollIds -> clearedScrollIds.addAll(scrollIds));
@@ -106,6 +111,56 @@ void shouldClearAllScrollIds() {
106111
assertThat(clearedScrollIds).isEqualTo(Arrays.asList("s-1", "s-2", "s-3"));
107112
}
108113

114+
@Test // DATAES-831
115+
void shouldReturnAllForRequestedSizeOf0() {
116+
117+
SearchScrollHits<String> searchHits1 = newSearchScrollHits(
118+
Collections.singletonList(new SearchHit<String>(null, 0, null, null, "one")), "s-1");
119+
SearchScrollHits<String> searchHits2 = newSearchScrollHits(
120+
Collections.singletonList(new SearchHit<String>(null, 0, null, null, "one")), "s-2");
121+
SearchScrollHits<String> searchHits3 = newSearchScrollHits(
122+
Collections.singletonList(new SearchHit<String>(null, 0, null, null, "one")), "s-2");
123+
SearchScrollHits<String> searchHits4 = newSearchScrollHits(Collections.emptyList(), "s-3");
124+
125+
Iterator<SearchScrollHits<String>> searchScrollHitsIterator = Arrays
126+
.asList(searchHits1, searchHits2, searchHits3, searchHits4).iterator();
127+
128+
SearchHitsIterator<String> iterator = StreamQueries.streamResults( //
129+
0, //
130+
searchScrollHitsIterator.next(), //
131+
scrollId -> searchScrollHitsIterator.next(), //
132+
scrollIds -> {});
133+
134+
long count = StreamUtils.createStreamFromIterator(iterator).count();
135+
136+
assertThat(count).isEqualTo(3);
137+
}
138+
139+
@Test // DATAES-831
140+
void shouldOnlyReturnRequestedCount() {
141+
142+
SearchScrollHits<String> searchHits1 = newSearchScrollHits(
143+
Collections.singletonList(new SearchHit<String>(null, 0, null, null, "one")), "s-1");
144+
SearchScrollHits<String> searchHits2 = newSearchScrollHits(
145+
Collections.singletonList(new SearchHit<String>(null, 0, null, null, "one")), "s-2");
146+
SearchScrollHits<String> searchHits3 = newSearchScrollHits(
147+
Collections.singletonList(new SearchHit<String>(null, 0, null, null, "one")), "s-2");
148+
SearchScrollHits<String> searchHits4 = newSearchScrollHits(Collections.emptyList(), "s-3");
149+
150+
Iterator<SearchScrollHits<String>> searchScrollHitsIterator = Arrays
151+
.asList(searchHits1, searchHits2, searchHits3, searchHits4).iterator();
152+
153+
SearchHitsIterator<String> iterator = StreamQueries.streamResults( //
154+
2, //
155+
searchScrollHitsIterator.next(), //
156+
scrollId -> searchScrollHitsIterator.next(), //
157+
scrollIds -> {});
158+
159+
long count = StreamUtils.createStreamFromIterator(iterator).count();
160+
161+
assertThat(count).isEqualTo(2);
162+
}
163+
109164
private SearchScrollHits<String> newSearchScrollHits(List<SearchHit<String>> hits, String scrollId) {
110165
return new SearchHitsImpl<String>(hits.size(), TotalHitsRelation.EQUAL_TO, 0, scrollId, hits, null);
111166
}

0 commit comments

Comments
 (0)