From 7f178238db2647d82e87a6d4a94885c22a15d570 Mon Sep 17 00:00:00 2001 From: Peter-Josef Meisch Date: Sun, 24 Mar 2024 17:37:11 +0100 Subject: [PATCH 001/148] Add environment variable to skip repository initialization. Original Pull Request #2878 Closes #2876 --- .../support/SimpleElasticsearchRepository.java | 10 ++++++---- .../support/SimpleReactiveElasticsearchRepository.java | 4 +++- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleElasticsearchRepository.java b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleElasticsearchRepository.java index 2ed128d2b..64325d33d 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleElasticsearchRepository.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleElasticsearchRepository.java @@ -82,10 +82,12 @@ public SimpleElasticsearchRepository(ElasticsearchEntityInformation metad this.entityClass = this.entityInformation.getJavaType(); this.indexOperations = operations.indexOps(this.entityClass); - if (shouldCreateIndexAndMapping() && !indexOperations.exists()) { - indexOperations.createWithMapping(); - } else if (shouldAlwaysWriteMapping()) { - indexOperations.putMapping(); + if (!"true".equals(System.getenv("SPRING_DATA_ELASTICSEARCH_SKIP_REPOSITORY_INIT"))) { + if (shouldCreateIndexAndMapping() && !indexOperations.exists()) { + indexOperations.createWithMapping(); + } else if (shouldAlwaysWriteMapping()) { + indexOperations.putMapping(); + } } } diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java index 4f3d61ec0..e780c2d27 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java @@ -61,7 +61,9 @@ public SimpleReactiveElasticsearchRepository(ElasticsearchEntityInformation Date: Sun, 24 Mar 2024 18:46:04 +0100 Subject: [PATCH 002/148] Polishing. --- .../support/SimpleElasticsearchRepository.java | 15 ++++++++++----- .../SimpleReactiveElasticsearchRepository.java | 2 +- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleElasticsearchRepository.java b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleElasticsearchRepository.java index 64325d33d..d4993361f 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleElasticsearchRepository.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleElasticsearchRepository.java @@ -83,11 +83,16 @@ public SimpleElasticsearchRepository(ElasticsearchEntityInformation metad this.indexOperations = operations.indexOps(this.entityClass); if (!"true".equals(System.getenv("SPRING_DATA_ELASTICSEARCH_SKIP_REPOSITORY_INIT"))) { - if (shouldCreateIndexAndMapping() && !indexOperations.exists()) { - indexOperations.createWithMapping(); - } else if (shouldAlwaysWriteMapping()) { - indexOperations.putMapping(); - } + createIndexAndMappingIfNeeded(); + } + } + + public void createIndexAndMappingIfNeeded() { + + if (shouldCreateIndexAndMapping() && !indexOperations.exists()) { + indexOperations.createWithMapping(); + } else if (shouldAlwaysWriteMapping()) { + indexOperations.putMapping(); } } diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java index e780c2d27..4e1aaea80 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java @@ -66,7 +66,7 @@ public SimpleReactiveElasticsearchRepository(ElasticsearchEntityInformation Date: Mon, 25 Mar 2024 19:51:30 +0100 Subject: [PATCH 003/148] Nullability annotation cleanup. --- .../modules/ROOT/pages/elasticsearch/object-mapping.adoc | 4 +--- .../elasticsearch/core/AbstractElasticsearchTemplate.java | 6 +----- .../core/AbstractReactiveElasticsearchTemplate.java | 3 --- .../configuration/ElasticsearchConfigurationELCTests.java | 2 -- .../ReactiveElasticsearchConfigurationELCTests.java | 2 -- 5 files changed, 2 insertions(+), 15 deletions(-) diff --git a/src/main/antora/modules/ROOT/pages/elasticsearch/object-mapping.adoc b/src/main/antora/modules/ROOT/pages/elasticsearch/object-mapping.adoc index ab6e98d5c..6ca12728c 100644 --- a/src/main/antora/modules/ROOT/pages/elasticsearch/object-mapping.adoc +++ b/src/main/antora/modules/ROOT/pages/elasticsearch/object-mapping.adoc @@ -192,8 +192,7 @@ public String getProperty() { This annotation can be set on a String property of an entity. This property will not be written to the mapping, it will not be stored in Elasticsearch and its value will not be read from an Elasticsearch document. -After an entity is persisted, for example with a call to `ElasticsearchOperations.save(T entity)`, the entity -returned from that call will contain the name of the index that an entity was saved to in that property. +After an entity is persisted, for example with a call to `ElasticsearchOperations.save(T entity)`, the entity returned from that call will contain the name of the index that an entity was saved to in that property. This is useful when the index name is dynamically set by a bean, or when writing to a write alias. Putting some value into such a property does not set the index into which an entity is stored! @@ -423,7 +422,6 @@ Looking at the `Configuration` from the xref:elasticsearch/object-mapping.adoc#e @Configuration public class Config extends ElasticsearchConfiguration { - @NonNull @Override public ClientConfiguration clientConfiguration() { return ClientConfiguration.builder() // diff --git a/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java index b6e4d2178..60018096b 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java @@ -57,7 +57,6 @@ import org.springframework.data.mapping.callback.EntityCallbacks; import org.springframework.data.mapping.context.MappingContext; import org.springframework.data.util.Streamable; -import org.springframework.lang.NonNull; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -779,8 +778,7 @@ public T doWith(@Nullable Document document) { } protected interface SearchDocumentResponseCallback { - @NonNull - T doWith(@NonNull SearchDocumentResponse response); + T doWith(SearchDocumentResponse response); } protected class ReadSearchDocumentResponseCallback implements SearchDocumentResponseCallback> { @@ -795,7 +793,6 @@ public ReadSearchDocumentResponseCallback(Class type, IndexCoordinates index) this.type = type; } - @NonNull @Override public SearchHits doWith(SearchDocumentResponse response) { List entities = response.getSearchDocuments().stream().map(delegate::doWith).collect(Collectors.toList()); @@ -816,7 +813,6 @@ public ReadSearchScrollDocumentResponseCallback(Class type, IndexCoordinates this.type = type; } - @NonNull @Override public SearchScrollHits doWith(SearchDocumentResponse response) { List entities = response.getSearchDocuments().stream().map(delegate::doWith).collect(Collectors.toList()); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java index a1be403ef..98aa9a176 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java @@ -55,7 +55,6 @@ import org.springframework.data.elasticsearch.core.suggest.response.Suggest; import org.springframework.data.elasticsearch.support.VersionInfo; import org.springframework.data.mapping.callback.ReactiveEntityCallbacks; -import org.springframework.lang.NonNull; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -575,7 +574,6 @@ protected interface DocumentCallback { * @param document the document to convert * @return a Mono of the entity */ - @NonNull Mono toEntity(@Nullable Document document); } @@ -593,7 +591,6 @@ public ReadDocumentCallback(EntityReader reader, Class t this.index = index; } - @NonNull public Mono toEntity(@Nullable Document document) { if (document == null) { return Mono.empty(); diff --git a/src/test/java/org/springframework/data/elasticsearch/config/configuration/ElasticsearchConfigurationELCTests.java b/src/test/java/org/springframework/data/elasticsearch/config/configuration/ElasticsearchConfigurationELCTests.java index 38e8567ec..71e1cd3af 100644 --- a/src/test/java/org/springframework/data/elasticsearch/config/configuration/ElasticsearchConfigurationELCTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/config/configuration/ElasticsearchConfigurationELCTests.java @@ -31,7 +31,6 @@ import org.springframework.data.elasticsearch.core.ElasticsearchOperations; import org.springframework.data.elasticsearch.repository.ElasticsearchRepository; import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories; -import org.springframework.lang.NonNull; import org.springframework.lang.Nullable; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit.jupiter.SpringExtension; @@ -50,7 +49,6 @@ public class ElasticsearchConfigurationELCTests { @EnableElasticsearchRepositories(basePackages = { "org.springframework.data.elasticsearch.config.configuration" }, considerNestedRepositories = true) static class Config extends ElasticsearchConfiguration { - @NonNull @Override public ClientConfiguration clientConfiguration() { return ClientConfiguration.builder() // diff --git a/src/test/java/org/springframework/data/elasticsearch/config/configuration/ReactiveElasticsearchConfigurationELCTests.java b/src/test/java/org/springframework/data/elasticsearch/config/configuration/ReactiveElasticsearchConfigurationELCTests.java index 2a9b22a07..70fafd50e 100644 --- a/src/test/java/org/springframework/data/elasticsearch/config/configuration/ReactiveElasticsearchConfigurationELCTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/config/configuration/ReactiveElasticsearchConfigurationELCTests.java @@ -29,7 +29,6 @@ import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations; import org.springframework.data.elasticsearch.repository.ReactiveElasticsearchRepository; import org.springframework.data.elasticsearch.repository.config.EnableReactiveElasticsearchRepositories; -import org.springframework.lang.NonNull; import org.springframework.lang.Nullable; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit.jupiter.SpringExtension; @@ -48,7 +47,6 @@ public class ReactiveElasticsearchConfigurationELCTests { considerNestedRepositories = true) static class Config extends ReactiveElasticsearchConfiguration { - @NonNull @Override public ClientConfiguration clientConfiguration() { return ClientConfiguration.builder() // From 496b8d62a462a81bbfa19a1f20d810eb3229408b Mon Sep 17 00:00:00 2001 From: Aouichaoui Youssef <21143371+youssef3wi@users.noreply.github.com> Date: Tue, 26 Mar 2024 19:18:26 +0100 Subject: [PATCH 004/148] Support Delete by query with es parameters. Original Pull Request #2875 Closes #2865 --- .../client/elc/ElasticsearchTemplate.java | 18 + .../elc/ReactiveElasticsearchTemplate.java | 10 + .../client/elc/RequestConverter.java | 73 ++ .../elasticsearch/client/elc/TypeUtils.java | 25 + ...AbstractReactiveElasticsearchTemplate.java | 6 + .../core/DocumentOperations.java | 26 + .../core/ReactiveDocumentOperations.java | 24 + .../elasticsearch/core/query/DeleteQuery.java | 736 ++++++++++++++++++ .../core/query/types/ConflictsType.java | 25 + .../core/query/types/OperatorType.java | 25 + .../core/query/types/package-info.java | 3 + .../core/ElasticsearchIntegrationTests.java | 38 + 12 files changed, 1009 insertions(+) create mode 100644 src/main/java/org/springframework/data/elasticsearch/core/query/DeleteQuery.java create mode 100644 src/main/java/org/springframework/data/elasticsearch/core/query/types/ConflictsType.java create mode 100644 src/main/java/org/springframework/data/elasticsearch/core/query/types/OperatorType.java create mode 100644 src/main/java/org/springframework/data/elasticsearch/core/query/types/package-info.java diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchTemplate.java index e535f857f..20ddf2a0d 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchTemplate.java @@ -54,6 +54,7 @@ import org.springframework.data.elasticsearch.core.query.BaseQueryBuilder; import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.ByQueryResponse; +import org.springframework.data.elasticsearch.core.query.DeleteQuery; import org.springframework.data.elasticsearch.core.query.IndexQuery; import org.springframework.data.elasticsearch.core.query.MoreLikeThisQuery; import org.springframework.data.elasticsearch.core.query.Query; @@ -177,6 +178,11 @@ public void bulkUpdate(List queries, BulkOptions bulkOptions, Index doBulkOperation(queries, bulkOptions, index); } + @Override + public ByQueryResponse delete(DeleteQuery query, Class clazz) { + return delete(query, clazz, getIndexCoordinatesFor(clazz)); + } + @Override public ByQueryResponse delete(Query query, Class clazz, IndexCoordinates index) { @@ -190,6 +196,18 @@ public ByQueryResponse delete(Query query, Class clazz, IndexCoordinates inde return responseConverter.byQueryResponse(response); } + @Override + public ByQueryResponse delete(DeleteQuery query, Class clazz, IndexCoordinates index) { + Assert.notNull(query, "query must not be null"); + + DeleteByQueryRequest request = requestConverter.documentDeleteByQueryRequest(query, routingResolver.getRouting(), + clazz, index, getRefreshPolicy()); + + DeleteByQueryResponse response = execute(client -> client.deleteByQuery(request)); + + return responseConverter.byQueryResponse(response); + } + @Override public UpdateResponse update(UpdateQuery updateQuery, IndexCoordinates index) { diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java index e9e7fb981..20b6c038e 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java @@ -25,6 +25,7 @@ import co.elastic.clients.json.JsonpMapper; import co.elastic.clients.transport.Version; import co.elastic.clients.transport.endpoints.BooleanResponse; +import org.springframework.data.elasticsearch.core.query.DeleteQuery; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; @@ -180,6 +181,15 @@ public Mono delete(Query query, Class entityType, IndexCoord return Mono.from(execute(client -> client.deleteByQuery(request))).map(responseConverter::byQueryResponse); } + @Override + public Mono delete(DeleteQuery query, Class entityType, IndexCoordinates index) { + Assert.notNull(query, "query must not be null"); + + DeleteByQueryRequest request = requestConverter.documentDeleteByQueryRequest(query, routingResolver.getRouting(), + entityType, index, getRefreshPolicy()); + return Mono.from(execute(client -> client.deleteByQuery(request))).map(responseConverter::byQueryResponse); + } + @Override public Mono get(String id, Class entityType, IndexCoordinates index) { diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java index be7ff96ff..0865b495c 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java @@ -968,6 +968,79 @@ public DeleteByQueryRequest documentDeleteByQueryRequest(Query query, @Nullable }); } + public DeleteByQueryRequest documentDeleteByQueryRequest(DeleteQuery query, @Nullable String routing, Class clazz, + IndexCoordinates index, @Nullable RefreshPolicy refreshPolicy) { + Assert.notNull(query, "query must not be null"); + Assert.notNull(index, "index must not be null"); + + return DeleteByQueryRequest.of(dqb -> { + dqb.index(Arrays.asList(index.getIndexNames())) // + .query(getQuery(query.getQuery(), clazz))// + .refresh(deleteByQueryRefresh(refreshPolicy)) + .requestsPerSecond(query.getRequestsPerSecond()) + .maxDocs(query.getMaxDocs()) + .scroll(time(query.getScroll())) + .scrollSize(query.getScrollSize()); + + if (query.getRouting() != null) { + dqb.routing(query.getRouting()); + } else if (StringUtils.hasText(routing)) { + dqb.routing(routing); + } + + if (query.getQ() != null) { + dqb.q(query.getQ()) + .analyzer(query.getAnalyzer()) + .analyzeWildcard(query.getAnalyzeWildcard()) + .defaultOperator(operator(query.getDefaultOperator())) + .df(query.getDf()) + .lenient(query.getLenient()); + } + + if (query.getExpandWildcards() != null && !query.getExpandWildcards().isEmpty()) { + dqb.expandWildcards(expandWildcards(query.getExpandWildcards())); + } + if (query.getStats() != null && !query.getStats().isEmpty()) { + dqb.stats(query.getStats()); + } + if (query.getSlices() != null) { + dqb.slices(sb -> sb.value(query.getSlices())); + } + if (query.getSort() != null) { + ElasticsearchPersistentEntity persistentEntity = getPersistentEntity(clazz); + List sortOptions = getSortOptions(query.getSort(), persistentEntity); + + if (!sortOptions.isEmpty()) { + dqb.sort( + sortOptions.stream() + .map(sortOption -> { + String order = "asc"; + var sortField = sortOption.field(); + if (sortField.order() != null) { + order = sortField.order().jsonValue(); + } + + return sortField.field() + ":" + order; + }) + .collect(Collectors.toList()) + ); + } + } + dqb.allowNoIndices(query.getAllowNoIndices()) + .conflicts(conflicts(query.getConflicts())) + .ignoreUnavailable(query.getIgnoreUnavailable()) + .preference(query.getPreference()) + .requestCache(query.getRequestCache()) + .searchType(searchType(query.getSearchType())) + .searchTimeout(time(query.getSearchTimeout())) + .terminateAfter(query.getTerminateAfter()) + .timeout(time(query.getTimeout())) + .version(query.getVersion()); + + return dqb; + }); + } + public UpdateRequest documentUpdateRequest(UpdateQuery query, IndexCoordinates index, @Nullable RefreshPolicy refreshPolicy, @Nullable String routing) { diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/TypeUtils.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/TypeUtils.java index 83bd46dae..98354b267 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/TypeUtils.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/TypeUtils.java @@ -18,6 +18,7 @@ import co.elastic.clients.elasticsearch._types.*; import co.elastic.clients.elasticsearch._types.mapping.FieldType; import co.elastic.clients.elasticsearch._types.mapping.TypeMapping; +import co.elastic.clients.elasticsearch._types.query_dsl.Operator; import co.elastic.clients.elasticsearch.core.search.BoundaryScanner; import co.elastic.clients.elasticsearch.core.search.HighlighterEncoder; import co.elastic.clients.elasticsearch.core.search.HighlighterFragmenter; @@ -46,6 +47,8 @@ import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.query.RescorerQuery; import org.springframework.data.elasticsearch.core.query.UpdateResponse; +import org.springframework.data.elasticsearch.core.query.types.ConflictsType; +import org.springframework.data.elasticsearch.core.query.types.OperatorType; import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -500,4 +503,26 @@ static Map paramsMap(Map params) { }); return mappedParams; } + + /** + * Convert a spring-data-elasticsearch operator to an Elasticsearch operator. + * + * @param operator spring-data-elasticsearch operator. + * @return an Elasticsearch Operator. + */ + @Nullable + static Operator operator(@Nullable OperatorType operator) { + return operator != null ? Operator.valueOf(operator.name()) : null; + } + + /** + * Convert a spring-data-elasticsearch {@literal conflicts} to an Elasticsearch {@literal conflicts}. + * + * @param conflicts spring-data-elasticsearch {@literal conflicts}. + * @return an Elasticsearch {@literal conflicts}. + */ + @Nullable + static Conflicts conflicts(@Nullable ConflictsType conflicts) { + return conflicts != null ? Conflicts.valueOf(conflicts.name()) : null; + } } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java index 98aa9a176..ccd0c440b 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java @@ -15,6 +15,7 @@ */ package org.springframework.data.elasticsearch.core; +import org.springframework.data.elasticsearch.core.query.DeleteQuery; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; @@ -411,6 +412,11 @@ public Mono delete(String id, IndexCoordinates index) { public Mono delete(Query query, Class entityType) { return delete(query, entityType, getIndexCoordinatesFor(entityType)); } + + @Override + public Mono delete(DeleteQuery query, Class entityType) { + return delete(query, entityType, getIndexCoordinatesFor(entityType)); + } // endregion // region SearchDocument diff --git a/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java index 04e147007..a00750d52 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java @@ -21,6 +21,7 @@ import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.ByQueryResponse; +import org.springframework.data.elasticsearch.core.query.DeleteQuery; import org.springframework.data.elasticsearch.core.query.IndexQuery; import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.query.UpdateQuery; @@ -279,9 +280,21 @@ default void bulkUpdate(List queries, IndexCoordinates index) { * {@link org.springframework.data.elasticsearch.annotations.Document} * @return response with detailed information * @since 4.1 + * @deprecated since 5.3.0, use {@link #delete(DeleteQuery, Class)} */ ByQueryResponse delete(Query query, Class clazz); + /** + * Delete all records matching the query. + * + * @param query query defining the objects + * @param clazz The entity class must be annotated with + * {@link org.springframework.data.elasticsearch.annotations.Document} + * @return response with detailed information + * @since 5.3 + */ + ByQueryResponse delete(DeleteQuery query, Class clazz); + /** * Delete all records matching the query. * @@ -290,9 +303,22 @@ default void bulkUpdate(List queries, IndexCoordinates index) { * {@link org.springframework.data.elasticsearch.annotations.Document} * @param index the index from which to delete * @return response with detailed information + * @deprecated since 5.3.0, use {@link #delete(DeleteQuery, Class, IndexCoordinates)} */ ByQueryResponse delete(Query query, Class clazz, IndexCoordinates index); + /** + * Delete all records matching the query. + * + * @param query query defining the objects + * @param clazz The entity class must be annotated with + * {@link org.springframework.data.elasticsearch.annotations.Document} + * @param index the index from which to delete + * @return response with detailed information + * @since 5.3 + */ + ByQueryResponse delete(DeleteQuery query, Class clazz, IndexCoordinates index); + /** * Partially update a document by the given entity. * diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java index 5aef1bc2f..f228be0b3 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java @@ -15,6 +15,7 @@ */ package org.springframework.data.elasticsearch.core; +import org.springframework.data.elasticsearch.core.query.DeleteQuery; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -331,9 +332,20 @@ default Mono bulkUpdate(List queries, IndexCoordinates index) * @param query must not be {@literal null}. * @param entityType must not be {@literal null}. * @return a {@link Mono} emitting the number of the removed documents. + * @deprecated since 5.3.0, use {@link #delete(DeleteQuery, Class)} */ Mono delete(Query query, Class entityType); + /** + * Delete the documents matching the given {@link Query} extracting index from entity metadata. + * + * @param query must not be {@literal null}. + * @param entityType must not be {@literal null}. + * @return a {@link Mono} emitting the number of the removed documents. + * @since 5.3 + */ + Mono delete(DeleteQuery query, Class entityType); + /** * Delete the documents matching the given {@link Query} extracting index from entity metadata. * @@ -341,9 +353,21 @@ default Mono bulkUpdate(List queries, IndexCoordinates index) * @param entityType must not be {@literal null}. * @param index the target index, must not be {@literal null} * @return a {@link Mono} emitting the number of the removed documents. + * @deprecated since 5.3.0, use {@link #delete(DeleteQuery, Class, IndexCoordinates)} */ Mono delete(Query query, Class entityType, IndexCoordinates index); + /** + * Delete the documents matching the given {@link Query} extracting index from entity metadata. + * + * @param query must not be {@literal null}. + * @param entityType must not be {@literal null}. + * @param index the target index, must not be {@literal null} + * @return a {@link Mono} emitting the number of the removed documents. + * @since 5.3 + */ + Mono delete(DeleteQuery query, Class entityType, IndexCoordinates index); + /** * Partial update of the document. * diff --git a/src/main/java/org/springframework/data/elasticsearch/core/query/DeleteQuery.java b/src/main/java/org/springframework/data/elasticsearch/core/query/DeleteQuery.java new file mode 100644 index 000000000..be430d28b --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/query/DeleteQuery.java @@ -0,0 +1,736 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core.query; + +import org.springframework.data.domain.Sort; +import org.springframework.data.elasticsearch.core.query.Query.SearchType; +import org.springframework.data.elasticsearch.core.query.types.ConflictsType; +import org.springframework.data.elasticsearch.core.query.types.OperatorType; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +import java.time.Duration; +import java.util.EnumSet; +import java.util.List; + +/** + * Defines a delete request. + * + * @author Aouichaoui Youssef + * @see docs + */ +public class DeleteQuery { + // For Lucene query + /** + * Query in the Lucene query string syntax. + */ + @Nullable + private final String q; + + /** + * If true, wildcard and prefix queries are analyzed. Defaults to false. + * This parameter can only be used when the lucene query {@code q} parameter is specified. + */ + @Nullable + private final Boolean analyzeWildcard; + + /** + * Analyzer to use for the query string. + * This parameter can only be used when the lucene query {@code q} parameter is specified. + */ + @Nullable + private final String analyzer; + + /** + * The default operator for a query string query: {@literal AND} or {@literal OR}. Defaults to {@literal OR}. + * This parameter can only be used when the lucene query {@code q} parameter is specified. + */ + @Nullable + private final OperatorType defaultOperator; + + /** + * Field to be used as the default when no field prefix is specified in the query string. + * This parameter can only be used when the lucene query {@code q} parameter is specified. + *

+ * e.g: {@code {"query":{"prefix":{"user.name":{"value":"es"}}}} } + */ + @Nullable + private final String df; + + /** + * If a query contains errors related to the format of the data being entered, they will be disregarded unless specified otherwise. + * By default, this feature is turned off. + */ + @Nullable + private final Boolean lenient; + + // For ES query + + /** + * An error will occur if the condition is {@code false} and any of the following are true: a wildcard expression, + * an index alias, or the {@literal _all value} only targets missing or closed indices. + * By default, this is set to {@code true}. + */ + @Nullable + private final Boolean allowNoIndices; + + /** + * Define the types of conflicts that occur when a query encounters version conflicts: abort or proceed. + * Defaults to abort. + */ + @Nullable + private final ConflictsType conflicts; + + /** + * Type of index that wildcard patterns can match. + * Defaults to {@literal open}. + */ + @Nullable + private final EnumSet expandWildcards; + + /** + * An error occurs if it is directed at an index that is missing or closed when it is {@code false}. + * By default, this is set to {@code false}. + */ + @Nullable + private final Boolean ignoreUnavailable; + + /** + * Maximum number of documents to process. + * Defaults to all documents. + */ + @Nullable + private final Long maxDocs; + + /** + * Specifies the node or shard the operation should be performed on. + */ + @Nullable + private final String preference; + + /** + * Use the request cache when it is {@code true}. + * By default, use the index-level setting. + */ + @Nullable + private final Boolean requestCache; + + /** + * Refreshes all shards involved in the deleting by query after the request completes when it is {@code true}. + * By default, this is set to {@code false}. + */ + @Nullable + private final Boolean refresh; + + /** + * Limited this request to a certain number of sub-requests per second. + * By default, this is set to {@code -1} (no throttle). + */ + @Nullable + private final Float requestsPerSecond; + + /** + * Custom value used to route operations to a specific shard. + */ + @Nullable + private final String routing; + + /** + * Period to retain the search context for scrolling. + */ + @Nullable + private final Duration scroll; + + /** + * Size of the scroll request that powers the operation. + * By default, this is set to {@code 1000}. + */ + @Nullable + private final Long scrollSize; + + /** + * The type of the search operation. + */ + @Nullable + private final SearchType searchType; + + /** + * Explicit timeout for each search request. + * By default, this is set to no timeout. + */ + @Nullable + private final Duration searchTimeout; + + /** + * The number of slices this task should be divided into. + * By default, this is set to {@code 1} meaning the task isn’t sliced into subtasks. + */ + @Nullable + private final Integer slices; + + /** + * Sort search results in a specific order. + */ + @Nullable + private final Sort sort; + + /** + * Specific {@code tag} of the request for logging and statistical purposes. + */ + @Nullable + private final List stats; + + /** + * The Maximum number of documents that can be collected for each shard. + * If a query exceeds this limit, Elasticsearch will stop the query. + */ + @Nullable + private final Long terminateAfter; + + /** + * Period each deletion request waits for active shards. + * By default, this is set to {@code 1m} (one minute). + */ + @Nullable + private final Duration timeout; + + /** + * Returns the document version as part of a hit. + */ + @Nullable + private final Boolean version; + + // Body + /** + * Query that specifies the documents to delete. + */ + private final Query query; + + public static Builder builder(Query query) { + return new Builder(query); + } + + private DeleteQuery(Builder builder) { + this.q = builder.luceneQuery; + this.analyzeWildcard = builder.analyzeWildcard; + this.analyzer = builder.analyzer; + this.defaultOperator = builder.defaultOperator; + this.df = builder.defaultField; + this.lenient = builder.lenient; + + this.allowNoIndices = builder.allowNoIndices; + this.conflicts = builder.conflicts; + this.expandWildcards = builder.expandWildcards; + this.ignoreUnavailable = builder.ignoreUnavailable; + this.maxDocs = builder.maxDocs; + this.preference = builder.preference; + this.requestCache = builder.requestCache; + this.refresh = builder.refresh; + this.requestsPerSecond = builder.requestsPerSecond; + this.routing = builder.routing; + this.scroll = builder.scrollTime; + this.scrollSize = builder.scrollSize; + this.searchType = builder.searchType; + this.searchTimeout = builder.searchTimeout; + this.slices = builder.slices; + this.sort = builder.sort; + this.stats = builder.stats; + this.terminateAfter = builder.terminateAfter; + this.timeout = builder.timeout; + this.version = builder.version; + + this.query = builder.query; + } + + @Nullable + public String getQ() { + return q; + } + + @Nullable + public Boolean getAnalyzeWildcard() { + return analyzeWildcard; + } + + @Nullable + public String getAnalyzer() { + return analyzer; + } + + @Nullable + public OperatorType getDefaultOperator() { + return defaultOperator; + } + + @Nullable + public String getDf() { + return df; + } + + @Nullable + public Boolean getLenient() { + return lenient; + } + + @Nullable + public Boolean getAllowNoIndices() { + return allowNoIndices; + } + + @Nullable + public ConflictsType getConflicts() { + return conflicts; + } + + @Nullable + public EnumSet getExpandWildcards() { + return expandWildcards; + } + + @Nullable + public Boolean getIgnoreUnavailable() { + return ignoreUnavailable; + } + + @Nullable + public Long getMaxDocs() { + return maxDocs; + } + + @Nullable + public String getPreference() { + return preference; + } + + @Nullable + public Boolean getRequestCache() { + return requestCache; + } + + @Nullable + public Boolean getRefresh() { + return refresh; + } + + @Nullable + public Float getRequestsPerSecond() { + return requestsPerSecond; + } + + @Nullable + public String getRouting() { + return routing; + } + + @Nullable + public Duration getScroll() { + return scroll; + } + + @Nullable + public Long getScrollSize() { + return scrollSize; + } + + @Nullable + public SearchType getSearchType() { + return searchType; + } + + @Nullable + public Duration getSearchTimeout() { + return searchTimeout; + } + + @Nullable + public Integer getSlices() { + return slices; + } + + @Nullable + public Sort getSort() { + return sort; + } + + @Nullable + public List getStats() { + return stats; + } + + @Nullable + public Long getTerminateAfter() { + return terminateAfter; + } + + @Nullable + public Duration getTimeout() { + return timeout; + } + + @Nullable + public Boolean getVersion() { + return version; + } + + @Nullable + public Query getQuery() { + return query; + } + + public static final class Builder { + // For Lucene query + @Nullable + private String luceneQuery; + @Nullable + private Boolean analyzeWildcard; + @Nullable + private String analyzer; + @Nullable + private OperatorType defaultOperator; + @Nullable + private String defaultField; + @Nullable + private Boolean lenient; + + // For ES query + @Nullable + private Boolean allowNoIndices; + @Nullable + private ConflictsType conflicts; + @Nullable + private EnumSet expandWildcards; + @Nullable + private Boolean ignoreUnavailable; + @Nullable + private Long maxDocs; + @Nullable + private String preference; + @Nullable + private Boolean requestCache; + @Nullable + private Boolean refresh; + @Nullable + private Float requestsPerSecond; + @Nullable + private String routing; + @Nullable + private Duration scrollTime; + @Nullable + private Long scrollSize; + @Nullable + private SearchType searchType; + @Nullable + private Duration searchTimeout; + @Nullable + private Integer slices; + @Nullable + private Sort sort; + @Nullable + private List stats; + @Nullable + private Long terminateAfter; + @Nullable + private Duration timeout; + @Nullable + private Boolean version; + + // Body + private final Query query; + + private Builder(Query query) { + Assert.notNull(query, "query must not be null"); + + this.query = query; + } + + /** + * Query in the Lucene query string syntax. + */ + public Builder withLuceneQuery(@Nullable String luceneQuery) { + this.luceneQuery = luceneQuery; + + return this; + } + + /** + * If true, wildcard and prefix queries are analyzed. Defaults to false. + * This parameter can only be used when the lucene query {@code q} parameter is specified. + */ + public Builder withAnalyzeWildcard(@Nullable Boolean analyzeWildcard) { + this.analyzeWildcard = analyzeWildcard; + + return this; + } + + /** + * Analyzer to use for the query string. + * This parameter can only be used when the lucene query {@code q} parameter is specified. + */ + public Builder withAnalyzer(@Nullable String analyzer) { + this.analyzer = analyzer; + + return this; + } + + /** + * The default operator for a query string query: {@literal AND} or {@literal OR}. Defaults to {@literal OR}. + * This parameter can only be used when the lucene query {@code q} parameter is specified. + */ + public Builder withDefaultOperator(@Nullable OperatorType defaultOperator) { + this.defaultOperator = defaultOperator; + + return this; + } + + /** + * Field to be used as the default when no field prefix is specified in the query string. + * This parameter can only be used when the lucene query {@code q} parameter is specified. + *

+ * e.g: {@code {"query":{"prefix":{"user.name":{"value":"es"}}}} } + */ + public Builder withDefaultField(@Nullable String defaultField) { + this.defaultField = defaultField; + + return this; + } + + /** + * If a query contains errors related to the format of the data being entered, they will be disregarded unless specified otherwise. + * By default, this feature is turned off. + */ + public Builder withLenient(@Nullable Boolean lenient) { + this.lenient = lenient; + + return this; + } + + /** + * An error will occur if the condition is {@code false} and any of the following are true: a wildcard expression, + * an index alias, or the {@literal _all value} only targets missing or closed indices. + * By default, this is set to {@code true}. + */ + public Builder withAllowNoIndices(@Nullable Boolean allowNoIndices) { + this.allowNoIndices = allowNoIndices; + + return this; + } + + /** + * Define the types of conflicts that occur when a query encounters version conflicts: abort or proceed. + * Defaults to abort. + */ + public Builder withConflicts(@Nullable ConflictsType conflicts) { + this.conflicts = conflicts; + + return this; + } + + /** + * Type of index that wildcard patterns can match. + * Defaults to {@literal open}. + */ + public Builder setExpandWildcards(@Nullable EnumSet expandWildcards) { + this.expandWildcards = expandWildcards; + + return this; + } + + /** + * An error occurs if it is directed at an index that is missing or closed when it is {@code false}. + * By default, this is set to {@code false}. + */ + public Builder withIgnoreUnavailable(@Nullable Boolean ignoreUnavailable) { + this.ignoreUnavailable = ignoreUnavailable; + + return this; + } + + /** + * Maximum number of documents to process. + * Defaults to all documents. + */ + public Builder withMaxDocs(@Nullable Long maxDocs) { + this.maxDocs = maxDocs; + + return this; + } + + /** + * Specifies the node or shard the operation should be performed on. + */ + public Builder withPreference(@Nullable String preference) { + this.preference = preference; + + return this; + } + + /** + * Use the request cache when it is {@code true}. + * By default, use the index-level setting. + */ + public Builder withRequestCache(@Nullable Boolean requestCache) { + this.requestCache = requestCache; + + return this; + } + + /** + * Refreshes all shards involved in the deleting by query after the request completes when it is {@code true}. + * By default, this is set to {@code false}. + */ + public Builder withRefresh(@Nullable Boolean refresh) { + this.refresh = refresh; + + return this; + } + + /** + * Limited this request to a certain number of sub-requests per second. + * By default, this is set to {@code -1} (no throttle). + */ + public Builder withRequestsPerSecond(@Nullable Float requestsPerSecond) { + this.requestsPerSecond = requestsPerSecond; + + return this; + } + + /** + * Custom value used to route operations to a specific shard. + */ + public Builder withRouting(@Nullable String routing) { + this.routing = routing; + + return this; + } + + /** + * Period to retain the search context for scrolling. + */ + public Builder withScrollTime(@Nullable Duration scrollTime) { + this.scrollTime = scrollTime; + + return this; + } + + /** + * Size of the scroll request that powers the operation. + * By default, this is set to {@code 1000}. + */ + public Builder withScrollSize(@Nullable Long scrollSize) { + this.scrollSize = scrollSize; + + return this; + } + + /** + * The type of the search operation. + */ + public Builder withSearchType(@Nullable SearchType searchType) { + this.searchType = searchType; + + return this; + } + + /** + * Explicit timeout for each search request. + * By default, this is set to no timeout. + */ + public Builder withSearchTimeout(@Nullable Duration searchTimeout) { + this.searchTimeout = searchTimeout; + + return this; + } + + /** + * The number of slices this task should be divided into. + * By default, this is set to {@code 1} meaning the task isn’t sliced into subtasks. + */ + public Builder withSlices(@Nullable Integer slices) { + this.slices = slices; + + return this; + } + + /** + * Sort search results in a specific order. + */ + public Builder withSort(@Nullable Sort sort) { + this.sort = sort; + + return this; + } + + /** + * Specific {@code tag} of the request for logging and statistical purposes. + */ + public Builder withStats(@Nullable List stats) { + this.stats = stats; + + return this; + } + + /** + * The Maximum number of documents that can be collected for each shard. + * If a query exceeds this limit, Elasticsearch will stop the query. + */ + public Builder withTerminateAfter(@Nullable Long terminateAfter) { + this.terminateAfter = terminateAfter; + + return this; + } + + /** + * Period each deletion request waits for active shards. + * By default, this is set to {@code 1m} (one minute). + */ + public Builder withTimeout(@Nullable Duration timeout) { + this.timeout = timeout; + + return this; + } + + /** + * Returns the document version as part of a hit. + */ + public Builder withVersion(@Nullable Boolean version) { + this.version = version; + + return this; + } + + public DeleteQuery build() { + if (luceneQuery == null) { + if (defaultField != null) { + throw new IllegalArgumentException("When defining the df parameter, you must include the Lucene query."); + } + if (analyzer != null) { + throw new IllegalArgumentException("When defining the analyzer parameter, you must include the Lucene query."); + } + if (analyzeWildcard != null) { + throw new IllegalArgumentException("When defining the analyzeWildcard parameter, you must include the Lucene query."); + } + if (defaultOperator != null) { + throw new IllegalArgumentException("When defining the defaultOperator parameter, you must include the Lucene query."); + } + if (lenient != null) { + throw new IllegalArgumentException("When defining the lenient parameter, you must include the Lucene query."); + } + } + + return new DeleteQuery(this); + } + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/query/types/ConflictsType.java b/src/main/java/org/springframework/data/elasticsearch/core/query/types/ConflictsType.java new file mode 100644 index 000000000..c9a4a6548 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/query/types/ConflictsType.java @@ -0,0 +1,25 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core.query.types; + +/** + * Define the types of conflicts that occur when a query encounters version conflicts. + * + * @author Aouichaoui Youssef + */ +public enum ConflictsType { + Abort, Proceed +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/query/types/OperatorType.java b/src/main/java/org/springframework/data/elasticsearch/core/query/types/OperatorType.java new file mode 100644 index 000000000..2e79249f8 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/query/types/OperatorType.java @@ -0,0 +1,25 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core.query.types; + +/** + * Define the default operator for a query string query. + * + * @author Aouichaoui Youssef + */ +public enum OperatorType { + And, Or +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/query/types/package-info.java b/src/main/java/org/springframework/data/elasticsearch/core/query/types/package-info.java new file mode 100644 index 000000000..a75996934 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/query/types/package-info.java @@ -0,0 +1,3 @@ +@org.springframework.lang.NonNullApi +@org.springframework.lang.NonNullFields +package org.springframework.data.elasticsearch.core.query.types; diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchIntegrationTests.java index 6a807e415..5fcb77fdc 100755 --- a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchIntegrationTests.java @@ -3776,6 +3776,44 @@ void shouldThrowVersionConflictExceptionWhenSavingInvalidVersion() { }).isInstanceOf(VersionConflictException.class); } + @Test // GH-2865 + public void shouldDeleteDocumentForGivenQueryUsingParameters() { + // Given + String documentId = nextIdAsString(); + SampleEntity sampleEntity = SampleEntity.builder().id(documentId).message("some message") + .version(System.currentTimeMillis()).build(); + + IndexQuery indexQuery = getIndexQuery(sampleEntity); + String indexName = indexNameProvider.indexName(); + + operations.index(indexQuery, IndexCoordinates.of(indexName)); + + // When + final Query query = getTermQuery("id", documentId); + final DeleteQuery deleteQuery = DeleteQuery.builder(query).withSlices(2).build(); + ByQueryResponse result = operations.delete(deleteQuery, SampleEntity.class, IndexCoordinates.of(indexName)); + + // Then + assertThat(result.getDeleted()).isEqualTo(1); + SearchHits searchHits = operations.search(query, SampleEntity.class, + IndexCoordinates.of(indexName)); + assertThat(searchHits.getTotalHits()).isEqualTo(0); + } + + @Test + public void shouldDeleteDocumentForGivenQueryAndUnavailableIndex() { + // Given + String indexName = UUID.randomUUID().toString(); + + // When + final Query query = operations.matchAllQuery(); + final DeleteQuery deleteQuery = DeleteQuery.builder(query).withIgnoreUnavailable(true).build(); + ByQueryResponse result = operations.delete(deleteQuery, SampleEntity.class, IndexCoordinates.of(indexName)); + + // Then + assertThat(result.getDeleted()).isEqualTo(0); + } + // region entities @Document(indexName = "#{@indexNameProvider.indexName()}") @Setting(shards = 1, replicas = 0, refreshInterval = "-1") From c96423d5ba1ac4f1390bb569a819e33090180164 Mon Sep 17 00:00:00 2001 From: Peter-Josef Meisch Date: Tue, 26 Mar 2024 19:53:14 +0100 Subject: [PATCH 005/148] Polishing. --- .../client/elc/ElasticsearchTemplate.java | 23 +- .../elc/ReactiveElasticsearchTemplate.java | 1125 +++++++------- .../client/elc/RequestConverter.java | 9 +- .../elasticsearch/client/elc/TypeUtils.java | 44 +- ...AbstractReactiveElasticsearchTemplate.java | 2 +- .../core/DocumentOperations.java | 4 +- .../core/ReactiveDocumentOperations.java | 2 +- .../elasticsearch/core/query/DeleteQuery.java | 1348 ++++++++--------- .../core/query/types/ConflictsType.java | 3 +- .../core/query/types/OperatorType.java | 3 +- .../core/ElasticsearchIntegrationTests.java | 31 +- 11 files changed, 1263 insertions(+), 1331 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchTemplate.java index 20ddf2a0d..444d744ab 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchTemplate.java @@ -51,15 +51,7 @@ import org.springframework.data.elasticsearch.core.document.Document; import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; -import org.springframework.data.elasticsearch.core.query.BaseQueryBuilder; -import org.springframework.data.elasticsearch.core.query.BulkOptions; -import org.springframework.data.elasticsearch.core.query.ByQueryResponse; -import org.springframework.data.elasticsearch.core.query.DeleteQuery; -import org.springframework.data.elasticsearch.core.query.IndexQuery; -import org.springframework.data.elasticsearch.core.query.MoreLikeThisQuery; -import org.springframework.data.elasticsearch.core.query.Query; -import org.springframework.data.elasticsearch.core.query.SearchTemplateQuery; -import org.springframework.data.elasticsearch.core.query.UpdateQuery; +import org.springframework.data.elasticsearch.core.query.*; import org.springframework.data.elasticsearch.core.query.UpdateResponse; import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; @@ -517,18 +509,19 @@ public List> multiSearch(List queries, List> multiSearch(List multiSearchQueryParameters, boolean isSearchTemplateQuery) { - return isSearchTemplateQuery ? - doMultiTemplateSearch(multiSearchQueryParameters.stream() - .map(p -> new MultiSearchTemplateQueryParameter((SearchTemplateQuery) p.query, p.clazz, p.index)) - .toList()) + return isSearchTemplateQuery ? doMultiTemplateSearch(multiSearchQueryParameters.stream() + .map(p -> new MultiSearchTemplateQueryParameter((SearchTemplateQuery) p.query, p.clazz, p.index)) + .toList()) : doMultiSearch(multiSearchQueryParameters); } - private List> doMultiTemplateSearch(List mSearchTemplateQueryParameters) { + private List> doMultiTemplateSearch( + List mSearchTemplateQueryParameters) { MsearchTemplateRequest request = requestConverter.searchMsearchTemplateRequest(mSearchTemplateQueryParameters, routingResolver.getRouting()); - MsearchTemplateResponse response = execute(client -> client.msearchTemplate(request, EntityAsMap.class)); + MsearchTemplateResponse response = execute( + client -> client.msearchTemplate(request, EntityAsMap.class)); List> responseItems = response.responses(); Assert.isTrue(mSearchTemplateQueryParameters.size() == responseItems.size(), diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java index 20b6c038e..12f1a59ee 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java @@ -25,7 +25,6 @@ import co.elastic.clients.json.JsonpMapper; import co.elastic.clients.transport.Version; import co.elastic.clients.transport.endpoints.BooleanResponse; -import org.springframework.data.elasticsearch.core.query.DeleteQuery; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; @@ -62,6 +61,7 @@ import org.springframework.data.elasticsearch.core.query.BaseQueryBuilder; import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.ByQueryResponse; +import org.springframework.data.elasticsearch.core.query.DeleteQuery; import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.query.SearchTemplateQuery; import org.springframework.data.elasticsearch.core.query.UpdateQuery; @@ -85,584 +85,583 @@ */ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearchTemplate { - private static final Log LOGGER = LogFactory.getLog(ReactiveElasticsearchTemplate.class); - - private final ReactiveElasticsearchClient client; - private final RequestConverter requestConverter; - private final ResponseConverter responseConverter; - private final JsonpMapper jsonpMapper; - private final ElasticsearchExceptionTranslator exceptionTranslator; - - public ReactiveElasticsearchTemplate(ReactiveElasticsearchClient client, ElasticsearchConverter converter) { - super(converter); - - Assert.notNull(client, "client must not be null"); - - this.client = client; - this.jsonpMapper = client._transport().jsonpMapper(); - requestConverter = new RequestConverter(converter, jsonpMapper); - responseConverter = new ResponseConverter(jsonpMapper); - exceptionTranslator = new ElasticsearchExceptionTranslator(jsonpMapper); - } - - // region Document operations - @Override - protected Mono> doIndex(T entity, IndexCoordinates index) { - - IndexRequest indexRequest = requestConverter.documentIndexRequest(getIndexQuery(entity), index, - getRefreshPolicy()); - return Mono.just(entity) // - .zipWith(// - Mono.from(execute(client -> client.index(indexRequest))) // - .map(indexResponse -> new IndexResponseMetaData(indexResponse.id(), // - indexResponse.index(), // - indexResponse.seqNo(), // - indexResponse.primaryTerm(), // - indexResponse.version() // - ))); - } - - @Override - public Flux saveAll(Mono> entitiesPublisher, IndexCoordinates index) { - - Assert.notNull(entitiesPublisher, "entitiesPublisher must not be null!"); - - return entitiesPublisher // - .flatMapMany(entities -> Flux.fromIterable(entities) // - .concatMap(entity -> maybeCallbackBeforeConvert(entity, index)) // - ).collectList() // - .map(Entities::new) // - .flatMapMany(entities -> { - - if (entities.isEmpty()) { - return Flux.empty(); - } - - return doBulkOperation(entities.indexQueries(), BulkOptions.defaultOptions(), index)// - .index() // - .flatMap(indexAndResponse -> { - T savedEntity = entities.entityAt(indexAndResponse.getT1()); - BulkResponseItem response = indexAndResponse.getT2(); - var updatedEntity = entityOperations.updateIndexedObject( - savedEntity, new IndexedObjectInformation( // - response.id(), // - response.index(), // - response.seqNo(), // - response.primaryTerm(), // - response.version()), - converter, - routingResolver); - return maybeCallbackAfterSave(updatedEntity, index); - }); - }); - } - - @Override - protected Mono doExists(String id, IndexCoordinates index) { - - Assert.notNull(id, "id must not be null"); - Assert.notNull(index, "index must not be null"); - - ExistsRequest existsRequest = requestConverter.documentExistsRequest(id, routingResolver.getRouting(), index); - - return Mono.from(execute( - ((ClientCallback>) client -> client.exists(existsRequest)))) - .map(BooleanResponse::value) // - .onErrorReturn(NoSuchIndexException.class, false); - } - - @Override - public Mono delete(Query query, Class entityType, IndexCoordinates index) { - - Assert.notNull(query, "query must not be null"); - - DeleteByQueryRequest request = requestConverter.documentDeleteByQueryRequest(query, routingResolver.getRouting(), - entityType, index, getRefreshPolicy()); - return Mono.from(execute(client -> client.deleteByQuery(request))).map(responseConverter::byQueryResponse); - } + private static final Log LOGGER = LogFactory.getLog(ReactiveElasticsearchTemplate.class); + + private final ReactiveElasticsearchClient client; + private final RequestConverter requestConverter; + private final ResponseConverter responseConverter; + private final JsonpMapper jsonpMapper; + private final ElasticsearchExceptionTranslator exceptionTranslator; + + public ReactiveElasticsearchTemplate(ReactiveElasticsearchClient client, ElasticsearchConverter converter) { + super(converter); + + Assert.notNull(client, "client must not be null"); + + this.client = client; + this.jsonpMapper = client._transport().jsonpMapper(); + requestConverter = new RequestConverter(converter, jsonpMapper); + responseConverter = new ResponseConverter(jsonpMapper); + exceptionTranslator = new ElasticsearchExceptionTranslator(jsonpMapper); + } + + // region Document operations + @Override + protected Mono> doIndex(T entity, IndexCoordinates index) { + + IndexRequest indexRequest = requestConverter.documentIndexRequest(getIndexQuery(entity), index, + getRefreshPolicy()); + return Mono.just(entity) // + .zipWith(// + Mono.from(execute(client -> client.index(indexRequest))) // + .map(indexResponse -> new IndexResponseMetaData(indexResponse.id(), // + indexResponse.index(), // + indexResponse.seqNo(), // + indexResponse.primaryTerm(), // + indexResponse.version() // + ))); + } + + @Override + public Flux saveAll(Mono> entitiesPublisher, IndexCoordinates index) { + + Assert.notNull(entitiesPublisher, "entitiesPublisher must not be null!"); + + return entitiesPublisher // + .flatMapMany(entities -> Flux.fromIterable(entities) // + .concatMap(entity -> maybeCallbackBeforeConvert(entity, index)) // + ).collectList() // + .map(Entities::new) // + .flatMapMany(entities -> { + + if (entities.isEmpty()) { + return Flux.empty(); + } + + return doBulkOperation(entities.indexQueries(), BulkOptions.defaultOptions(), index)// + .index() // + .flatMap(indexAndResponse -> { + T savedEntity = entities.entityAt(indexAndResponse.getT1()); + BulkResponseItem response = indexAndResponse.getT2(); + var updatedEntity = entityOperations.updateIndexedObject( + savedEntity, new IndexedObjectInformation( // + response.id(), // + response.index(), // + response.seqNo(), // + response.primaryTerm(), // + response.version()), + converter, + routingResolver); + return maybeCallbackAfterSave(updatedEntity, index); + }); + }); + } + + @Override + protected Mono doExists(String id, IndexCoordinates index) { + + Assert.notNull(id, "id must not be null"); + Assert.notNull(index, "index must not be null"); + + ExistsRequest existsRequest = requestConverter.documentExistsRequest(id, routingResolver.getRouting(), index); + + return Mono.from(execute( + ((ClientCallback>) client -> client.exists(existsRequest)))) + .map(BooleanResponse::value) // + .onErrorReturn(NoSuchIndexException.class, false); + } + + @Override + public Mono delete(Query query, Class entityType, IndexCoordinates index) { + + Assert.notNull(query, "query must not be null"); + + DeleteByQueryRequest request = requestConverter.documentDeleteByQueryRequest(query, routingResolver.getRouting(), + entityType, index, getRefreshPolicy()); + return Mono.from(execute(client -> client.deleteByQuery(request))).map(responseConverter::byQueryResponse); + } - @Override - public Mono delete(DeleteQuery query, Class entityType, IndexCoordinates index) { - Assert.notNull(query, "query must not be null"); - - DeleteByQueryRequest request = requestConverter.documentDeleteByQueryRequest(query, routingResolver.getRouting(), - entityType, index, getRefreshPolicy()); - return Mono.from(execute(client -> client.deleteByQuery(request))).map(responseConverter::byQueryResponse); - } - - @Override - public Mono get(String id, Class entityType, IndexCoordinates index) { - - Assert.notNull(id, "id must not be null"); - Assert.notNull(entityType, "entityType must not be null"); - Assert.notNull(index, "index must not be null"); - - GetRequest getRequest = requestConverter.documentGetRequest(id, routingResolver.getRouting(), index); - - Mono> getResponse = Mono - .from(execute(client -> client.get(getRequest, EntityAsMap.class))); - - ReadDocumentCallback callback = new ReadDocumentCallback<>(converter, entityType, index); - return getResponse.flatMap(response -> callback.toEntity(DocumentAdapters.from(response))); - } - - @Override - public Mono reindex(ReindexRequest reindexRequest) { - - Assert.notNull(reindexRequest, "reindexRequest must not be null"); - - co.elastic.clients.elasticsearch.core.ReindexRequest reindexRequestES = requestConverter.reindex(reindexRequest, - true); - - return Mono.from(execute( // - client -> client.reindex(reindexRequestES))).map(responseConverter::reindexResponse); - } - - @Override - public Mono submitReindex(ReindexRequest reindexRequest) { - - Assert.notNull(reindexRequest, "reindexRequest must not be null"); - - co.elastic.clients.elasticsearch.core.ReindexRequest reindexRequestES = requestConverter.reindex(reindexRequest, - false); - - return Mono.from(execute( // - client -> client.reindex(reindexRequestES))) - .flatMap(response -> (response.task() == null) - ? Mono.error( - new UnsupportedBackendOperation("ElasticsearchClient did not return a task id on submit request")) - : Mono.just(response.task())); - } + @Override + public Mono delete(DeleteQuery query, Class entityType, IndexCoordinates index) { + Assert.notNull(query, "query must not be null"); + + DeleteByQueryRequest request = requestConverter.documentDeleteByQueryRequest(query, routingResolver.getRouting(), + entityType, index, getRefreshPolicy()); + return Mono.from(execute(client -> client.deleteByQuery(request))).map(responseConverter::byQueryResponse); + } + + @Override + public Mono get(String id, Class entityType, IndexCoordinates index) { + + Assert.notNull(id, "id must not be null"); + Assert.notNull(entityType, "entityType must not be null"); + Assert.notNull(index, "index must not be null"); + + GetRequest getRequest = requestConverter.documentGetRequest(id, routingResolver.getRouting(), index); + + Mono> getResponse = Mono + .from(execute(client -> client.get(getRequest, EntityAsMap.class))); + + ReadDocumentCallback callback = new ReadDocumentCallback<>(converter, entityType, index); + return getResponse.flatMap(response -> callback.toEntity(DocumentAdapters.from(response))); + } + + @Override + public Mono reindex(ReindexRequest reindexRequest) { + + Assert.notNull(reindexRequest, "reindexRequest must not be null"); + + co.elastic.clients.elasticsearch.core.ReindexRequest reindexRequestES = requestConverter.reindex(reindexRequest, + true); + + return Mono.from(execute( // + client -> client.reindex(reindexRequestES))).map(responseConverter::reindexResponse); + } + + @Override + public Mono submitReindex(ReindexRequest reindexRequest) { + + Assert.notNull(reindexRequest, "reindexRequest must not be null"); + + co.elastic.clients.elasticsearch.core.ReindexRequest reindexRequestES = requestConverter.reindex(reindexRequest, + false); + + return Mono.from(execute( // + client -> client.reindex(reindexRequestES))) + .flatMap(response -> (response.task() == null) + ? Mono.error( + new UnsupportedBackendOperation("ElasticsearchClient did not return a task id on submit request")) + : Mono.just(response.task())); + } - @Override - public Mono update(UpdateQuery updateQuery, IndexCoordinates index) { - - Assert.notNull(updateQuery, "UpdateQuery must not be null"); - Assert.notNull(index, "Index must not be null"); - - UpdateRequest request = requestConverter.documentUpdateRequest(updateQuery, index, getRefreshPolicy(), - routingResolver.getRouting()); - - return Mono.from(execute(client -> client.update(request, Document.class))).flatMap(response -> { - UpdateResponse.Result result = result(response.result()); - return result == null ? Mono.empty() : Mono.just(UpdateResponse.of(result)); - }); - } + @Override + public Mono update(UpdateQuery updateQuery, IndexCoordinates index) { + + Assert.notNull(updateQuery, "UpdateQuery must not be null"); + Assert.notNull(index, "Index must not be null"); + + UpdateRequest request = requestConverter.documentUpdateRequest(updateQuery, index, getRefreshPolicy(), + routingResolver.getRouting()); + + return Mono.from(execute(client -> client.update(request, Document.class))).flatMap(response -> { + UpdateResponse.Result result = result(response.result()); + return result == null ? Mono.empty() : Mono.just(UpdateResponse.of(result)); + }); + } - @Override - public Mono updateByQuery(UpdateQuery updateQuery, IndexCoordinates index) { - throw new UnsupportedOperationException("not implemented"); - } + @Override + public Mono updateByQuery(UpdateQuery updateQuery, IndexCoordinates index) { + throw new UnsupportedOperationException("not implemented"); + } - @Override - public Mono bulkUpdate(List queries, BulkOptions bulkOptions, IndexCoordinates index) { + @Override + public Mono bulkUpdate(List queries, BulkOptions bulkOptions, IndexCoordinates index) { - Assert.notNull(queries, "List of UpdateQuery must not be null"); - Assert.notNull(bulkOptions, "BulkOptions must not be null"); - Assert.notNull(index, "Index must not be null"); + Assert.notNull(queries, "List of UpdateQuery must not be null"); + Assert.notNull(bulkOptions, "BulkOptions must not be null"); + Assert.notNull(index, "Index must not be null"); - return doBulkOperation(queries, bulkOptions, index).then(); - } + return doBulkOperation(queries, bulkOptions, index).then(); + } - private Flux doBulkOperation(List queries, BulkOptions bulkOptions, IndexCoordinates index) { + private Flux doBulkOperation(List queries, BulkOptions bulkOptions, IndexCoordinates index) { - BulkRequest bulkRequest = requestConverter.documentBulkRequest(queries, bulkOptions, index, getRefreshPolicy()); - return client.bulk(bulkRequest) - .onErrorMap(e -> new UncategorizedElasticsearchException("Error executing bulk request", e)) - .flatMap(this::checkForBulkOperationFailure) // - .flatMapMany(response -> Flux.fromIterable(response.items())); + BulkRequest bulkRequest = requestConverter.documentBulkRequest(queries, bulkOptions, index, getRefreshPolicy()); + return client.bulk(bulkRequest) + .onErrorMap(e -> new UncategorizedElasticsearchException("Error executing bulk request", e)) + .flatMap(this::checkForBulkOperationFailure) // + .flatMapMany(response -> Flux.fromIterable(response.items())); - } + } - private Mono checkForBulkOperationFailure(BulkResponse bulkResponse) { + private Mono checkForBulkOperationFailure(BulkResponse bulkResponse) { - if (bulkResponse.errors()) { - Map failedDocuments = new HashMap<>(); + if (bulkResponse.errors()) { + Map failedDocuments = new HashMap<>(); - for (BulkResponseItem item : bulkResponse.items()) { + for (BulkResponseItem item : bulkResponse.items()) { - if (item.error() != null) { - failedDocuments.put(item.id(), new BulkFailureException.FailureDetails(item.status(), item.error().reason())); - } - } - BulkFailureException exception = new BulkFailureException( - "Bulk operation has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages [" - + failedDocuments + ']', - failedDocuments); - return Mono.error(exception); - } else { - return Mono.just(bulkResponse); - } - } + if (item.error() != null) { + failedDocuments.put(item.id(), new BulkFailureException.FailureDetails(item.status(), item.error().reason())); + } + } + BulkFailureException exception = new BulkFailureException( + "Bulk operation has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages [" + + failedDocuments + ']', + failedDocuments); + return Mono.error(exception); + } else { + return Mono.just(bulkResponse); + } + } - @Override - protected Mono doDeleteById(String id, @Nullable String routing, IndexCoordinates index) { - - Assert.notNull(id, "id must not be null"); - Assert.notNull(index, "index must not be null"); - - return Mono.defer(() -> { - DeleteRequest deleteRequest = requestConverter.documentDeleteRequest(id, routing, index, getRefreshPolicy()); - return doDelete(deleteRequest); - }); - } - - private Mono doDelete(DeleteRequest request) { - - return Mono.from(execute(client -> client.delete(request))) // - .flatMap(deleteResponse -> { - if (deleteResponse.result() == Result.NotFound) { - return Mono.empty(); - } - return Mono.just(deleteResponse.id()); - }).onErrorResume(NoSuchIndexException.class, it -> Mono.empty()); - } - - @Override - public Flux> multiGet(Query query, Class clazz, IndexCoordinates index) { - - Assert.notNull(query, "query must not be null"); - Assert.notNull(clazz, "clazz must not be null"); - - MgetRequest request = requestConverter.documentMgetRequest(query, clazz, index); - - ReadDocumentCallback callback = new ReadDocumentCallback<>(converter, clazz, index); - - Publisher> response = execute(client -> client.mget(request, EntityAsMap.class)); - - return Mono.from(response)// - .flatMapMany(it -> Flux.fromIterable(DocumentAdapters.from(it))) // - .flatMap(multiGetItem -> { - if (multiGetItem.isFailed()) { - return Mono.just(MultiGetItem.of(null, multiGetItem.getFailure())); - } else { - return callback.toEntity(multiGetItem.getItem()) // - .map(t -> MultiGetItem.of(t, multiGetItem.getFailure())); - } - }); - } - - // endregion - - @Override - protected ReactiveElasticsearchTemplate doCopy() { - return new ReactiveElasticsearchTemplate(client, converter); - } - - // region search operations - - @Override - protected Flux doFind(Query query, Class clazz, IndexCoordinates index) { - - Assert.notNull(query, "query must not be null"); - Assert.notNull(clazz, "clazz must not be null"); - Assert.notNull(index, "index must not be null"); - - if (query instanceof SearchTemplateQuery searchTemplateQuery) { - return Flux.defer(() -> doSearch(searchTemplateQuery, clazz, index)); - } else { - return Flux.defer(() -> { - boolean queryIsUnbounded = !(query.getPageable().isPaged() || query.isLimiting()); - return queryIsUnbounded ? doFindUnbounded(query, clazz, index) : doFindBounded(query, clazz, index); - }); - } - } - - private Flux doFindUnbounded(Query query, Class clazz, IndexCoordinates index) { - - if (query instanceof BaseQuery baseQuery) { - var pitKeepAlive = Duration.ofMinutes(5); - // setup functions for Flux.usingWhen() - Mono resourceSupplier = openPointInTime(index, pitKeepAlive, true) - .map(pit -> new PitSearchAfter(baseQuery, pit)); - - Function> asyncComplete = this::cleanupPit; - - BiFunction> asyncError = (psa, ex) -> { - if (LOGGER.isErrorEnabled()) { - LOGGER.error("Error during pit/search_after", ex); - } - return cleanupPit(psa); - }; - - Function> asyncCancel = psa -> { - if (LOGGER.isWarnEnabled()) { - LOGGER.warn("pit/search_after was cancelled"); - } - return cleanupPit(psa); - }; - - Function>> resourceClosure = psa -> { - - baseQuery.setPointInTime(new Query.PointInTime(psa.getPit(), pitKeepAlive)); - baseQuery.addSort(Sort.by("_shard_doc")); - SearchRequest firstSearchRequest = requestConverter.searchRequest(baseQuery, routingResolver.getRouting(), - clazz, index, false, true); - - return Mono.from(execute(client -> client.search(firstSearchRequest, EntityAsMap.class))) - .expand(entityAsMapSearchResponse -> { - - var hits = entityAsMapSearchResponse.hits().hits(); - if (CollectionUtils.isEmpty(hits)) { - return Mono.empty(); - } - - List sortOptions = hits.get(hits.size() - 1).sort().stream().map(TypeUtils::toObject) - .collect(Collectors.toList()); - baseQuery.setSearchAfter(sortOptions); - SearchRequest followSearchRequest = requestConverter.searchRequest(baseQuery, - routingResolver.getRouting(), clazz, index, false, true); - return Mono.from(execute(client -> client.search(followSearchRequest, EntityAsMap.class))); - }); - - }; - - Flux> searchResponses = Flux.usingWhen(resourceSupplier, resourceClosure, asyncComplete, - asyncError, asyncCancel); - return searchResponses.flatMapIterable(entityAsMapSearchResponse -> entityAsMapSearchResponse.hits().hits()) - .map(entityAsMapHit -> DocumentAdapters.from(entityAsMapHit, jsonpMapper)); - } else { - return Flux.error(new IllegalArgumentException("Query must be derived from BaseQuery")); - } - } - - private Publisher cleanupPit(PitSearchAfter psa) { - var baseQuery = psa.getBaseQuery(); - baseQuery.setPointInTime(null); - baseQuery.setSearchAfter(null); - baseQuery.setSort(psa.getSort()); - var pit = psa.getPit(); - return StringUtils.hasText(pit) ? closePointInTime(pit) : Mono.empty(); - } - - static private class PitSearchAfter { - private final BaseQuery baseQuery; - @Nullable - private final Sort sort; - private final String pit; - - PitSearchAfter(BaseQuery baseQuery, String pit) { - this.baseQuery = baseQuery; - this.sort = baseQuery.getSort(); - this.pit = pit; - } - - public BaseQuery getBaseQuery() { - return baseQuery; - } - - @Nullable - public Sort getSort() { - return sort; - } - - public String getPit() { - return pit; - } - } - - @Override - protected Mono doCount(Query query, Class entityType, IndexCoordinates index) { - - Assert.notNull(query, "query must not be null"); - Assert.notNull(index, "index must not be null"); + @Override + protected Mono doDeleteById(String id, @Nullable String routing, IndexCoordinates index) { + + Assert.notNull(id, "id must not be null"); + Assert.notNull(index, "index must not be null"); + + return Mono.defer(() -> { + DeleteRequest deleteRequest = requestConverter.documentDeleteRequest(id, routing, index, getRefreshPolicy()); + return doDelete(deleteRequest); + }); + } + + private Mono doDelete(DeleteRequest request) { + + return Mono.from(execute(client -> client.delete(request))) // + .flatMap(deleteResponse -> { + if (deleteResponse.result() == Result.NotFound) { + return Mono.empty(); + } + return Mono.just(deleteResponse.id()); + }).onErrorResume(NoSuchIndexException.class, it -> Mono.empty()); + } + + @Override + public Flux> multiGet(Query query, Class clazz, IndexCoordinates index) { + + Assert.notNull(query, "query must not be null"); + Assert.notNull(clazz, "clazz must not be null"); + + MgetRequest request = requestConverter.documentMgetRequest(query, clazz, index); + + ReadDocumentCallback callback = new ReadDocumentCallback<>(converter, clazz, index); + + Publisher> response = execute(client -> client.mget(request, EntityAsMap.class)); + + return Mono.from(response)// + .flatMapMany(it -> Flux.fromIterable(DocumentAdapters.from(it))) // + .flatMap(multiGetItem -> { + if (multiGetItem.isFailed()) { + return Mono.just(MultiGetItem.of(null, multiGetItem.getFailure())); + } else { + return callback.toEntity(multiGetItem.getItem()) // + .map(t -> MultiGetItem.of(t, multiGetItem.getFailure())); + } + }); + } + + // endregion + + @Override + protected ReactiveElasticsearchTemplate doCopy() { + return new ReactiveElasticsearchTemplate(client, converter); + } + + // region search operations + + @Override + protected Flux doFind(Query query, Class clazz, IndexCoordinates index) { + + Assert.notNull(query, "query must not be null"); + Assert.notNull(clazz, "clazz must not be null"); + Assert.notNull(index, "index must not be null"); + + if (query instanceof SearchTemplateQuery searchTemplateQuery) { + return Flux.defer(() -> doSearch(searchTemplateQuery, clazz, index)); + } else { + return Flux.defer(() -> { + boolean queryIsUnbounded = !(query.getPageable().isPaged() || query.isLimiting()); + return queryIsUnbounded ? doFindUnbounded(query, clazz, index) : doFindBounded(query, clazz, index); + }); + } + } + + private Flux doFindUnbounded(Query query, Class clazz, IndexCoordinates index) { + + if (query instanceof BaseQuery baseQuery) { + var pitKeepAlive = Duration.ofMinutes(5); + // setup functions for Flux.usingWhen() + Mono resourceSupplier = openPointInTime(index, pitKeepAlive, true) + .map(pit -> new PitSearchAfter(baseQuery, pit)); + + Function> asyncComplete = this::cleanupPit; + + BiFunction> asyncError = (psa, ex) -> { + if (LOGGER.isErrorEnabled()) { + LOGGER.error("Error during pit/search_after", ex); + } + return cleanupPit(psa); + }; + + Function> asyncCancel = psa -> { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("pit/search_after was cancelled"); + } + return cleanupPit(psa); + }; + + Function>> resourceClosure = psa -> { + + baseQuery.setPointInTime(new Query.PointInTime(psa.getPit(), pitKeepAlive)); + baseQuery.addSort(Sort.by("_shard_doc")); + SearchRequest firstSearchRequest = requestConverter.searchRequest(baseQuery, routingResolver.getRouting(), + clazz, index, false, true); + + return Mono.from(execute(client -> client.search(firstSearchRequest, EntityAsMap.class))) + .expand(entityAsMapSearchResponse -> { + + var hits = entityAsMapSearchResponse.hits().hits(); + if (CollectionUtils.isEmpty(hits)) { + return Mono.empty(); + } + + List sortOptions = hits.get(hits.size() - 1).sort().stream().map(TypeUtils::toObject) + .collect(Collectors.toList()); + baseQuery.setSearchAfter(sortOptions); + SearchRequest followSearchRequest = requestConverter.searchRequest(baseQuery, + routingResolver.getRouting(), clazz, index, false, true); + return Mono.from(execute(client -> client.search(followSearchRequest, EntityAsMap.class))); + }); + + }; + + Flux> searchResponses = Flux.usingWhen(resourceSupplier, resourceClosure, asyncComplete, + asyncError, asyncCancel); + return searchResponses.flatMapIterable(entityAsMapSearchResponse -> entityAsMapSearchResponse.hits().hits()) + .map(entityAsMapHit -> DocumentAdapters.from(entityAsMapHit, jsonpMapper)); + } else { + return Flux.error(new IllegalArgumentException("Query must be derived from BaseQuery")); + } + } + + private Publisher cleanupPit(PitSearchAfter psa) { + var baseQuery = psa.getBaseQuery(); + baseQuery.setPointInTime(null); + baseQuery.setSearchAfter(null); + baseQuery.setSort(psa.getSort()); + var pit = psa.getPit(); + return StringUtils.hasText(pit) ? closePointInTime(pit) : Mono.empty(); + } + + static private class PitSearchAfter { + private final BaseQuery baseQuery; + @Nullable private final Sort sort; + private final String pit; + + PitSearchAfter(BaseQuery baseQuery, String pit) { + this.baseQuery = baseQuery; + this.sort = baseQuery.getSort(); + this.pit = pit; + } + + public BaseQuery getBaseQuery() { + return baseQuery; + } + + @Nullable + public Sort getSort() { + return sort; + } + + public String getPit() { + return pit; + } + } + + @Override + protected Mono doCount(Query query, Class entityType, IndexCoordinates index) { + + Assert.notNull(query, "query must not be null"); + Assert.notNull(index, "index must not be null"); - SearchRequest searchRequest = requestConverter.searchRequest(query, routingResolver.getRouting(), entityType, index, - true); + SearchRequest searchRequest = requestConverter.searchRequest(query, routingResolver.getRouting(), entityType, index, + true); - return Mono.from(execute(client -> client.search(searchRequest, EntityAsMap.class))) - .map(searchResponse -> searchResponse.hits().total() != null ? searchResponse.hits().total().value() : 0L); - } - - private Flux doFindBounded(Query query, Class clazz, IndexCoordinates index) { + return Mono.from(execute(client -> client.search(searchRequest, EntityAsMap.class))) + .map(searchResponse -> searchResponse.hits().total() != null ? searchResponse.hits().total().value() : 0L); + } + + private Flux doFindBounded(Query query, Class clazz, IndexCoordinates index) { - SearchRequest searchRequest = requestConverter.searchRequest(query, routingResolver.getRouting(), clazz, index, - false, false); - - return Mono.from(execute(client -> client.search(searchRequest, EntityAsMap.class))) // - .flatMapIterable(entityAsMapSearchResponse -> entityAsMapSearchResponse.hits().hits()) // - .map(entityAsMapHit -> DocumentAdapters.from(entityAsMapHit, jsonpMapper)); - } - - private Flux doSearch(SearchTemplateQuery query, Class clazz, IndexCoordinates index) { - - var request = requestConverter.searchTemplate(query, routingResolver.getRouting(), index); - - return Mono.from(execute(client -> client.searchTemplate(request, EntityAsMap.class))) // - .flatMapIterable(entityAsMapSearchResponse -> entityAsMapSearchResponse.hits().hits()) // - .map(entityAsMapHit -> DocumentAdapters.from(entityAsMapHit, jsonpMapper)); - } - - @Override - protected Mono doFindForResponse(Query query, Class clazz, IndexCoordinates index) { - - Assert.notNull(query, "query must not be null"); - Assert.notNull(index, "index must not be null"); - - SearchRequest searchRequest = requestConverter.searchRequest(query, routingResolver.getRouting(), clazz, index, - false); - - // noinspection unchecked - SearchDocumentCallback callback = new ReadSearchDocumentCallback<>((Class) clazz, index); - SearchDocumentResponse.EntityCreator entityCreator = searchDocument -> callback.toEntity(searchDocument) - .toFuture(); - - return Mono.from(execute(client -> client.search(searchRequest, EntityAsMap.class))) - .map(searchResponse -> SearchDocumentResponseBuilder.from(searchResponse, entityCreator, jsonpMapper)); - } - - @Override - public Flux> aggregate(Query query, Class entityType, IndexCoordinates index) { - - return doFindForResponse(query, entityType, index).flatMapMany(searchDocumentResponse -> { - ElasticsearchAggregations aggregations = (ElasticsearchAggregations) searchDocumentResponse.getAggregations(); - return aggregations == null ? Flux.empty() : Flux.fromIterable(aggregations.aggregations()); - }); - } - - @Override - public Mono openPointInTime(IndexCoordinates index, Duration keepAlive, Boolean ignoreUnavailable) { - - Assert.notNull(index, "index must not be null"); - Assert.notNull(keepAlive, "keepAlive must not be null"); - Assert.notNull(ignoreUnavailable, "ignoreUnavailable must not be null"); - - var request = requestConverter.searchOpenPointInTimeRequest(index, keepAlive, ignoreUnavailable); - return Mono.from(execute(client -> client.openPointInTime(request))).map(OpenPointInTimeResponse::id); - } - - @Override - public Mono closePointInTime(String pit) { - - Assert.notNull(pit, "pit must not be null"); - - ClosePointInTimeRequest request = requestConverter.searchClosePointInTime(pit); - return Mono.from(execute(client -> client.closePointInTime(request))).map(ClosePointInTimeResponse::succeeded); - } - - // endregion - - // region script operations - @Override - public Mono putScript(Script script) { - - Assert.notNull(script, "script must not be null"); - - var request = requestConverter.scriptPut(script); - return Mono.from(execute(client -> client.putScript(request))).map(PutScriptResponse::acknowledged); - } - - @Override - public Mono