Skip to content

Commit 5764959

Browse files
kevinleturcakonczak
authored andcommitted
DATAES-167 - Add scan method for CriteriaQuery.
1 parent c9ffc44 commit 5764959

File tree

3 files changed

+178
-10
lines changed

3 files changed

+178
-10
lines changed

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,16 @@ public interface ElasticsearchOperations {
439439
*/
440440
<T> void refresh(Class<T> clazz, boolean waitForOperation);
441441

442+
/**
443+
* Returns scroll id for criteria query
444+
*
445+
* @param query
446+
* @param scrollTimeInMillis
447+
* @param noFields
448+
* @return
449+
*/
450+
String scan(CriteriaQuery query, long scrollTimeInMillis, boolean noFields);
451+
442452
/**
443453
* Returns scroll id for scan query
444454
*

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -594,29 +594,58 @@ public <T> void delete(CriteriaQuery criteriaQuery, Class<T> clazz) {
594594
delete(deleteQuery, clazz);
595595
}
596596

597+
@Override
598+
public String scan(CriteriaQuery criteriaQuery, long scrollTimeInMillis, boolean noFields) {
599+
Assert.notNull(criteriaQuery.getIndices(), "No index defined for Query");
600+
Assert.notNull(criteriaQuery.getTypes(), "No type define for Query");
601+
Assert.notNull(criteriaQuery.getPageable(), "Query.pageable is required for scan & scroll");
602+
603+
QueryBuilder elasticsearchQuery = new CriteriaQueryProcessor().createQueryFromCriteria(criteriaQuery.getCriteria());
604+
FilterBuilder elasticsearchFilter = new CriteriaFilterProcessor().createFilterFromCriteria(criteriaQuery.getCriteria());
605+
SearchRequestBuilder requestBuilder = prepareScan(criteriaQuery, scrollTimeInMillis, noFields);
606+
607+
if (elasticsearchQuery != null) {
608+
requestBuilder.setQuery(elasticsearchQuery);
609+
} else {
610+
requestBuilder.setQuery(QueryBuilders.matchAllQuery());
611+
}
612+
613+
if (elasticsearchFilter != null) {
614+
requestBuilder.setPostFilter(elasticsearchFilter);
615+
}
616+
617+
return getSearchResponse(requestBuilder.execute()).getScrollId();
618+
}
619+
597620
@Override
598621
public String scan(SearchQuery searchQuery, long scrollTimeInMillis, boolean noFields) {
599622
Assert.notNull(searchQuery.getIndices(), "No index defined for Query");
600623
Assert.notNull(searchQuery.getTypes(), "No type define for Query");
601624
Assert.notNull(searchQuery.getPageable(), "Query.pageable is required for scan & scroll");
602625

603-
SearchRequestBuilder requestBuilder = client.prepareSearch(toArray(searchQuery.getIndices())).setSearchType(SCAN)
604-
.setQuery(searchQuery.getQuery()).setTypes(toArray(searchQuery.getTypes()))
605-
.setScroll(TimeValue.timeValueMillis(scrollTimeInMillis)).setFrom(0)
606-
.setSize(searchQuery.getPageable().getPageSize());
626+
SearchRequestBuilder requestBuilder = prepareScan(searchQuery, scrollTimeInMillis, noFields);
607627

608628
if (searchQuery.getFilter() != null) {
609629
requestBuilder.setPostFilter(searchQuery.getFilter());
610630
}
611631

612-
if (isNotEmpty(searchQuery.getFields())) {
613-
requestBuilder.addFields(toArray(searchQuery.getFields()));
632+
return getSearchResponse(requestBuilder.setQuery(searchQuery.getQuery()).execute()).getScrollId();
633+
}
634+
635+
private SearchRequestBuilder prepareScan(Query query, long scrollTimeInMillis, boolean noFields) {
636+
SearchRequestBuilder requestBuilder = client.prepareSearch(toArray(query.getIndices())).setSearchType(SCAN)
637+
.setTypes(toArray(query.getTypes()))
638+
.setScroll(TimeValue.timeValueMillis(scrollTimeInMillis)).setFrom(0)
639+
.setSize(query.getPageable().getPageSize());
640+
641+
if (isNotEmpty(query.getFields())) {
642+
requestBuilder.addFields(toArray(query.getFields()));
614643
}
615644

616645
if (noFields) {
617646
requestBuilder.setNoFields();
618647
}
619-
return getSearchResponse(requestBuilder.execute()).getScrollId();
648+
return requestBuilder;
620649
}
621650

622651
@Override

src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java

Lines changed: 132 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -647,8 +647,39 @@ public void shouldReturnSimilarResultsGivenMoreLikeThisQuery() {
647647
assertThat(sampleEntities.getContent(), hasItem(sampleEntity));
648648
}
649649

650+
/*
651+
DATAES-167
652+
*/
653+
@Test
654+
public void shouldReturnResultsWithScanAndScrollForGivenCriteriaQuery() {
655+
//given
656+
List<IndexQuery> entities = createSampleEntitiesWithMessage("Test message", 30);
657+
// when
658+
elasticsearchTemplate.bulkIndex(entities);
659+
elasticsearchTemplate.refresh(SampleEntity.class, true);
660+
// then
661+
662+
CriteriaQuery criteriaQuery = new CriteriaQuery(new Criteria());
663+
criteriaQuery.addIndices(INDEX_NAME);
664+
criteriaQuery.addTypes(TYPE_NAME);
665+
criteriaQuery.setPageable(new PageRequest(0, 10));
666+
667+
String scrollId = elasticsearchTemplate.scan(criteriaQuery, 1000, false);
668+
List<SampleEntity> sampleEntities = new ArrayList<SampleEntity>();
669+
boolean hasRecords = true;
670+
while (hasRecords) {
671+
Page<SampleEntity> page = elasticsearchTemplate.scroll(scrollId, 5000L, SampleEntity.class);
672+
if (page.hasContent()) {
673+
sampleEntities.addAll(page.getContent());
674+
} else {
675+
hasRecords = false;
676+
}
677+
}
678+
assertThat(sampleEntities.size(), is(equalTo(30)));
679+
}
680+
650681
@Test
651-
public void shouldReturnResultsWithScanAndScroll() {
682+
public void shouldReturnResultsWithScanAndScrollForGivenSearchQuery() {
652683
//given
653684
List<IndexQuery> entities = createSampleEntitiesWithMessage("Test message", 30);
654685
// when
@@ -673,11 +704,60 @@ public void shouldReturnResultsWithScanAndScroll() {
673704
assertThat(sampleEntities.size(), is(equalTo(30)));
674705
}
675706

707+
/*
708+
DATAES-167
709+
*/
710+
@Test
711+
public void shouldReturnResultsWithScanAndScrollForSpecifiedFieldsForCriteriaCriteria() {
712+
//given
713+
List<IndexQuery> entities = createSampleEntitiesWithMessage("Test message", 30);
714+
// when
715+
elasticsearchTemplate.bulkIndex(entities);
716+
elasticsearchTemplate.refresh(SampleEntity.class, true);
717+
// then
718+
719+
CriteriaQuery criteriaQuery = new CriteriaQuery(new Criteria());
720+
criteriaQuery.addIndices(INDEX_NAME);
721+
criteriaQuery.addTypes(TYPE_NAME);
722+
criteriaQuery.addFields("message");
723+
criteriaQuery.setPageable(new PageRequest(0, 10));
724+
725+
String scrollId = elasticsearchTemplate.scan(criteriaQuery, 5000, false);
726+
List<SampleEntity> sampleEntities = new ArrayList<SampleEntity>();
727+
boolean hasRecords = true;
728+
while (hasRecords) {
729+
Page<SampleEntity> page = elasticsearchTemplate.scroll(scrollId, 5000L, new SearchResultMapper() {
730+
@Override
731+
public <T> FacetedPage<T> mapResults(SearchResponse response, Class<T> clazz, Pageable pageable) {
732+
List<SampleEntity> result = new ArrayList<SampleEntity>();
733+
for (SearchHit searchHit : response.getHits()) {
734+
String message = searchHit.getFields().get("message").getValue();
735+
SampleEntity sampleEntity = new SampleEntity();
736+
sampleEntity.setId(searchHit.getId());
737+
sampleEntity.setMessage(message);
738+
result.add(sampleEntity);
739+
}
740+
741+
if (result.size() > 0) {
742+
return new FacetedPageImpl<T>((List<T>) result);
743+
}
744+
return null;
745+
}
746+
});
747+
if (page != null) {
748+
sampleEntities.addAll(page.getContent());
749+
} else {
750+
hasRecords = false;
751+
}
752+
}
753+
assertThat(sampleEntities.size(), is(equalTo(30)));
754+
}
755+
676756
/*
677757
DATAES-84
678758
*/
679759
@Test
680-
public void shouldReturnResultsWithScanAndScrollForSpecifiedFields() {
760+
public void shouldReturnResultsWithScanAndScrollForSpecifiedFieldsForSearchCriteria() {
681761
//given
682762
List<IndexQuery> entities = createSampleEntitiesWithMessage("Test message", 30);
683763
// when
@@ -723,8 +803,57 @@ public <T> FacetedPage<T> mapResults(SearchResponse response, Class<T> clazz, Pa
723803
assertThat(sampleEntities.size(), is(equalTo(30)));
724804
}
725805

806+
/*
807+
DATAES-167
808+
*/
809+
@Test
810+
public void shouldReturnResultsForScanAndScrollWithCustomResultMapperForGivenCriteriaQuery() {
811+
//given
812+
List<IndexQuery> entities = createSampleEntitiesWithMessage("Test message", 30);
813+
// when
814+
elasticsearchTemplate.bulkIndex(entities);
815+
elasticsearchTemplate.refresh(SampleEntity.class, true);
816+
// then
817+
818+
CriteriaQuery criteriaQuery = new CriteriaQuery(new Criteria());
819+
criteriaQuery.addIndices(INDEX_NAME);
820+
criteriaQuery.addTypes(TYPE_NAME);
821+
criteriaQuery.setPageable(new PageRequest(0, 10));
822+
823+
String scrollId = elasticsearchTemplate.scan(criteriaQuery, 1000, false);
824+
List<SampleEntity> sampleEntities = new ArrayList<SampleEntity>();
825+
boolean hasRecords = true;
826+
while (hasRecords) {
827+
Page<SampleEntity> page = elasticsearchTemplate.scroll(scrollId, 5000L, new SearchResultMapper() {
828+
@Override
829+
public <T> FacetedPage<T> mapResults(SearchResponse response, Class<T> clazz, Pageable pageable) {
830+
List<SampleEntity> chunk = new ArrayList<SampleEntity>();
831+
for (SearchHit searchHit : response.getHits()) {
832+
if (response.getHits().getHits().length <= 0) {
833+
return null;
834+
}
835+
SampleEntity user = new SampleEntity();
836+
user.setId(searchHit.getId());
837+
user.setMessage((String) searchHit.getSource().get("message"));
838+
chunk.add(user);
839+
}
840+
if (chunk.size() > 0) {
841+
return new FacetedPageImpl<T>((List<T>) chunk);
842+
}
843+
return null;
844+
}
845+
});
846+
if (page != null) {
847+
sampleEntities.addAll(page.getContent());
848+
} else {
849+
hasRecords = false;
850+
}
851+
}
852+
assertThat(sampleEntities.size(), is(equalTo(30)));
853+
}
854+
726855
@Test
727-
public void shouldReturnResultsForScanAndScrollWithCustomResultMapper() {
856+
public void shouldReturnResultsForScanAndScrollWithCustomResultMapperForGivenSearchQuery() {
728857
//given
729858
List<IndexQuery> entities = createSampleEntitiesWithMessage("Test message", 30);
730859
// when

0 commit comments

Comments
 (0)