Skip to content

Commit 416daef

Browse files
kevinleturcakonczak
authored andcommitted
DATAES-165 - Add Java 8 Steram support in custom repositories.
1 parent 5764959 commit 416daef

File tree

6 files changed

+186
-0
lines changed

6 files changed

+186
-0
lines changed

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.springframework.data.domain.Page;
2020
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
2121
import org.springframework.data.elasticsearch.core.query.*;
22+
import org.springframework.data.util.CloseableIterator;
2223

2324
import java.util.LinkedList;
2425
import java.util.List;
@@ -209,6 +210,46 @@ public interface ElasticsearchOperations {
209210
*/
210211
<T> FacetedPage<T> queryForPage(StringQuery query, Class<T> clazz, SearchResultMapper mapper);
211212

213+
/**
214+
* Executes the given {@link CriteriaQuery} against elasticsearch and return result as {@link CloseableIterator}.
215+
* <p>
216+
* Returns a {@link CloseableIterator} that wraps an Elasticsearch scroll context that needs to be closed in case of error.
217+
*
218+
* @param <T> element return type
219+
* @param query
220+
* @param clazz
221+
* @return
222+
* @since 1.3
223+
*/
224+
<T> CloseableIterator<T> stream(CriteriaQuery query, Class<T> clazz);
225+
226+
/**
227+
* Executes the given {@link SearchQuery} against elasticsearch and return result as {@link CloseableIterator}.
228+
* <p>
229+
* Returns a {@link CloseableIterator} that wraps an Elasticsearch scroll context that needs to be closed in case of error.
230+
*
231+
* @param <T> element return type
232+
* @param query
233+
* @param clazz
234+
* @return
235+
* @since 1.3
236+
*/
237+
<T> CloseableIterator<T> stream(SearchQuery query, Class<T> clazz);
238+
239+
/**
240+
* Executes the given {@link SearchQuery} against elasticsearch and return result as {@link CloseableIterator} using custom mapper.
241+
* <p>
242+
* Returns a {@link CloseableIterator} that wraps an Elasticsearch scroll context that needs to be closed in case of error.
243+
*
244+
* @param <T> element return type
245+
* @param query
246+
* @param clazz
247+
* @param mapper
248+
* @return
249+
* @since 1.3
250+
*/
251+
<T> CloseableIterator<T> stream(SearchQuery query, Class<T> clazz, SearchResultMapper mapper);
252+
212253
/**
213254
* Execute the criteria query against elasticsearch and return result as {@link List}
214255
*

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
7979
import org.springframework.data.elasticsearch.core.query.*;
8080
import org.springframework.data.mapping.PersistentProperty;
81+
import org.springframework.data.util.CloseableIterator;
8182
import org.springframework.util.Assert;
8283

8384
import java.io.BufferedReader;
@@ -330,6 +331,80 @@ public <T> FacetedPage<T> queryForPage(StringQuery query, Class<T> clazz, Search
330331
return mapper.mapResults(response, clazz, query.getPageable());
331332
}
332333

334+
@Override
335+
public <T> CloseableIterator<T> stream(CriteriaQuery query, Class<T> clazz) {
336+
final long scrollTimeInMillis = TimeValue.timeValueMinutes(1).millis();
337+
final String initScrollId = scan(query, scrollTimeInMillis, false);
338+
return doStream(initScrollId, scrollTimeInMillis, clazz, resultsMapper);
339+
}
340+
341+
@Override
342+
public <T> CloseableIterator<T> stream(SearchQuery query, Class<T> clazz) {
343+
return stream(query, clazz, resultsMapper);
344+
}
345+
346+
@Override
347+
public <T> CloseableIterator<T> stream(SearchQuery query, final Class<T> clazz, final SearchResultMapper mapper) {
348+
final long scrollTimeInMillis = TimeValue.timeValueMinutes(1).millis();
349+
final String initScrollId = scan(query, scrollTimeInMillis, false);
350+
return doStream(initScrollId, scrollTimeInMillis, clazz, mapper);
351+
}
352+
353+
private <T> CloseableIterator<T> doStream(final String initScrollId, final long scrollTimeInMillis, final Class<T> clazz, final SearchResultMapper mapper) {
354+
return new CloseableIterator<T>() {
355+
356+
/** As we couldn't retrieve single result with scroll, store current hits. */
357+
private volatile Iterator<T> currentHits;
358+
359+
/** The scroll id. */
360+
private volatile String scrollId = initScrollId;
361+
362+
/** If stream is finished (ie: cluster returns no results. */
363+
private volatile boolean finished;
364+
365+
@Override
366+
public void close() {
367+
try {
368+
// Clear scroll on cluster only in case of error (cause elasticsearch auto clear scroll when it's done)
369+
if (!finished && scrollId != null && currentHits != null && currentHits.hasNext()) {
370+
client.prepareClearScroll().addScrollId(scrollId).execute().actionGet();
371+
}
372+
} finally {
373+
currentHits = null;
374+
scrollId = null;
375+
}
376+
}
377+
378+
@Override
379+
public boolean hasNext() {
380+
// Test if stream is finished
381+
if (finished) {
382+
return false;
383+
}
384+
// Test if it remains hits
385+
if (currentHits == null || !currentHits.hasNext()) {
386+
// Do a new request
387+
SearchResponse response = getSearchResponse(client.prepareSearchScroll(scrollId)
388+
.setScroll(TimeValue.timeValueMillis(scrollTimeInMillis)).execute());
389+
// Save hits and scroll id
390+
currentHits = mapper.mapResults(response, clazz, null).iterator();
391+
finished = !currentHits.hasNext();
392+
scrollId = response.getScrollId();
393+
}
394+
return currentHits.hasNext();
395+
}
396+
397+
@Override
398+
public T next() {
399+
if (hasNext()) {
400+
return currentHits.next();
401+
}
402+
throw new NoSuchElementException();
403+
}
404+
405+
};
406+
}
407+
333408
@Override
334409
public <T> long count(CriteriaQuery criteriaQuery, Class<T> clazz) {
335410
QueryBuilder elasticsearchQuery = new CriteriaQueryProcessor().createQueryFromCriteria(criteriaQuery.getCriteria());

src/main/java/org/springframework/data/elasticsearch/repository/query/ElasticsearchPartQuery.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.springframework.data.repository.query.ParametersParameterAccessor;
2525
import org.springframework.data.repository.query.parser.PartTree;
2626
import org.springframework.util.ClassUtils;
27+
import org.springframework.data.util.CloseableIterator;
28+
import org.springframework.data.util.StreamUtils;
2729

2830
/**
2931
* ElasticsearchPartQuery
@@ -54,6 +56,14 @@ public Object execute(Object[] parameters) {
5456
} else if (queryMethod.isPageQuery()) {
5557
query.setPageable(accessor.getPageable());
5658
return elasticsearchOperations.queryForPage(query, queryMethod.getEntityInformation().getJavaType());
59+
} else if (queryMethod.isStreamQuery()) {
60+
Class<?> entityType = queryMethod.getEntityInformation().getJavaType();
61+
if (query.getPageable() == null) {
62+
query.setPageable(new PageRequest(0, 20));
63+
}
64+
65+
return StreamUtils.createStreamFromIterator((CloseableIterator<Object>) elasticsearchOperations.stream(query, entityType));
66+
5767
} else if (queryMethod.isCollectionQuery()) {
5868
if (accessor.getPageable() == null) {
5969
int itemCount = (int) elasticsearchOperations.count(query, queryMethod.getEntityInformation().getJavaType());

src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.springframework.data.elasticsearch.entities.HetroEntity2;
4343
import org.springframework.data.elasticsearch.entities.SampleEntity;
4444
import org.springframework.data.elasticsearch.entities.SampleMappingEntity;
45+
import org.springframework.data.util.CloseableIterator;
4546
import org.springframework.test.context.ContextConfiguration;
4647
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
4748

@@ -896,6 +897,31 @@ public <T> FacetedPage<T> mapResults(SearchResponse response, Class<T> clazz, Pa
896897
assertThat(sampleEntities.size(), is(equalTo(30)));
897898
}
898899

900+
/*
901+
DATAES-167
902+
*/
903+
@Test
904+
public void shouldReturnResultsWithStreamForGivenCriteriaQuery() {
905+
//given
906+
List<IndexQuery> entities = createSampleEntitiesWithMessage("Test message", 30);
907+
// when
908+
elasticsearchTemplate.bulkIndex(entities);
909+
elasticsearchTemplate.refresh(SampleEntity.class, true);
910+
// then
911+
912+
CriteriaQuery criteriaQuery = new CriteriaQuery(new Criteria());
913+
criteriaQuery.addIndices(INDEX_NAME);
914+
criteriaQuery.addTypes(TYPE_NAME);
915+
criteriaQuery.setPageable(new PageRequest(0, 10));
916+
917+
CloseableIterator<SampleEntity> stream = elasticsearchTemplate.stream(criteriaQuery, SampleEntity.class);
918+
List<SampleEntity> sampleEntities = new ArrayList<SampleEntity>();
919+
while (stream.hasNext()) {
920+
sampleEntities.add(stream.next());
921+
}
922+
assertThat(sampleEntities.size(), is(equalTo(30)));
923+
}
924+
899925
private static List<IndexQuery> createSampleEntitiesWithMessage(String message, int numberOfEntities) {
900926
List<IndexQuery> indexQueries = new ArrayList<IndexQuery>();
901927
for (int i = 0; i < numberOfEntities; i++) {

src/test/java/org/springframework/data/elasticsearch/repositories/CustomMethodRepositoryTests.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
import static org.hamcrest.Matchers.*;
2020
import static org.junit.Assert.*;
2121

22+
import java.util.ArrayList;
2223
import java.util.Arrays;
2324
import java.util.List;
25+
import java.util.stream.Stream;
2426

2527
import org.junit.Before;
2628
import org.junit.Test;
@@ -641,6 +643,22 @@ public void shouldExecuteCustomMethodWithNearPointAndDistance() {
641643
assertThat(page.getTotalElements(), is(equalTo(1L)));
642644
}
643645

646+
/*
647+
DATAES-165
648+
*/
649+
@Test
650+
public void shouldAllowReturningJava8StreamInCustomQuery() {
651+
// given
652+
List<SampleEntity> entities = createSampleEntities("abc", 30);
653+
repository.save(entities);
654+
655+
// when
656+
Stream<SampleEntity> stream = repository.findByType("abc");
657+
// then
658+
assertThat(stream, is(notNullValue()));
659+
assertThat(stream.count(), is(equalTo(30L)));
660+
}
661+
644662
/*
645663
DATAES-106
646664
*/
@@ -1174,5 +1192,18 @@ public void shouldCountCustomMethodWithNearPointAndDistance() {
11741192
// then
11751193
assertThat(count, is(equalTo(1L)));
11761194
}
1195+
1196+
private List<SampleEntity> createSampleEntities(String type, int numberOfEntities) {
1197+
List<SampleEntity> entities = new ArrayList<SampleEntity>();
1198+
for (int i = 0; i < numberOfEntities; i++) {
1199+
SampleEntity entity = new SampleEntity();
1200+
entity.setId(randomNumeric(numberOfEntities));
1201+
entity.setAvailable(true);
1202+
entity.setMessage("Message");
1203+
entity.setType(type);
1204+
entities.add(entity);
1205+
}
1206+
return entities;
1207+
}
11771208
}
11781209

src/test/java/org/springframework/data/elasticsearch/repositories/custom/SampleCustomMethodRepository.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.springframework.data.elasticsearch.repositories.custom;
1717

1818
import java.util.List;
19+
import java.util.stream.Stream;
1920

2021
import org.springframework.data.domain.Page;
2122
import org.springframework.data.domain.Pageable;
@@ -87,6 +88,8 @@ public interface SampleCustomMethodRepository extends ElasticsearchRepository<Sa
8788

8889
Page<SampleEntity> findByLocationNear(GeoPoint point, String distance, Pageable pageable);
8990

91+
Stream<SampleEntity> findByType(String type);
92+
9093
long countByType(String type);
9194

9295
long countByTypeNot(String type);

0 commit comments

Comments
 (0)