Skip to content

Commit 7501c19

Browse files
authored
DATAES-785 - Various entity callbacks implementation improvements.
Original PR: spring-projects#431
1 parent 5019793 commit 7501c19

18 files changed

+415
-233
lines changed

src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -463,60 +463,60 @@ private <T> IndexQuery getIndexQuery(T entity) {
463463
// endregion
464464

465465
// region Entity callbacks
466-
protected <T> T maybeCallbackBeforeConvert(T entity) {
466+
protected <T> T maybeCallbackBeforeConvert(T entity, IndexCoordinates index) {
467467

468468
if (entityCallbacks != null) {
469-
return entityCallbacks.callback(BeforeConvertCallback.class, entity);
469+
return entityCallbacks.callback(BeforeConvertCallback.class, entity, index);
470470
}
471471

472472
return entity;
473473
}
474474

475-
protected void maybeCallbackBeforeConvertWithQuery(Object query) {
475+
protected void maybeCallbackBeforeConvertWithQuery(Object query, IndexCoordinates index) {
476476

477477
if (query instanceof IndexQuery) {
478478
IndexQuery indexQuery = (IndexQuery) query;
479479
Object queryObject = indexQuery.getObject();
480480

481481
if (queryObject != null) {
482-
queryObject = maybeCallbackBeforeConvert(queryObject);
482+
queryObject = maybeCallbackBeforeConvert(queryObject, index);
483483
indexQuery.setObject(queryObject);
484484
}
485485
}
486486
}
487487

488488
// this can be called with either a List<IndexQuery> or a List<UpdateQuery>; these query classes
489489
// don't have a common base class, therefore the List<?> argument
490-
protected void maybeCallbackBeforeConvertWithQueries(List<?> queries) {
491-
queries.forEach(this::maybeCallbackBeforeConvertWithQuery);
490+
protected void maybeCallbackBeforeConvertWithQueries(List<?> queries, IndexCoordinates index) {
491+
queries.forEach(query -> maybeCallbackBeforeConvertWithQuery(query, index));
492492
}
493493

494-
protected <T> T maybeCallbackAfterSave(T entity) {
494+
protected <T> T maybeCallbackAfterSave(T entity, IndexCoordinates index) {
495495

496496
if (entityCallbacks != null) {
497-
return entityCallbacks.callback(AfterSaveCallback.class, entity);
497+
return entityCallbacks.callback(AfterSaveCallback.class, entity, index);
498498
}
499499

500500
return entity;
501501
}
502502

503-
protected void maybeCallbackAfterSaveWithQuery(Object query) {
503+
protected void maybeCallbackAfterSaveWithQuery(Object query, IndexCoordinates index) {
504504

505505
if (query instanceof IndexQuery) {
506506
IndexQuery indexQuery = (IndexQuery) query;
507507
Object queryObject = indexQuery.getObject();
508508

509509
if (queryObject != null) {
510-
queryObject = maybeCallbackAfterSave(queryObject);
510+
queryObject = maybeCallbackAfterSave(queryObject, index);
511511
indexQuery.setObject(queryObject);
512512
}
513513
}
514514
}
515515

516516
// this can be called with either a List<IndexQuery> or a List<UpdateQuery>; these query classes
517517
// don't have a common base class, therefore the List<?> argument
518-
protected void maybeCallbackAfterSaveWithQueries(List<?> queries) {
519-
queries.forEach(this::maybeCallbackAfterSaveWithQuery);
518+
protected void maybeCallbackAfterSaveWithQueries(List<?> queries, IndexCoordinates index) {
519+
queries.forEach(query -> maybeCallbackAfterSaveWithQuery(query, index));
520520
}
521521

522522
protected <T> T maybeCallbackAfterConvert(T entity, Document document, IndexCoordinates index) {

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public IndexOperations indexOps(IndexCoordinates index) {
136136
@Override
137137
public String index(IndexQuery query, IndexCoordinates index) {
138138

139-
maybeCallbackBeforeConvertWithQuery(query);
139+
maybeCallbackBeforeConvertWithQuery(query, index);
140140

141141
IndexRequest request = requestFactory.indexRequest(query, index);
142142
String documentId = execute(client -> client.index(request, RequestOptions.DEFAULT).getId());
@@ -147,7 +147,7 @@ public String index(IndexQuery query, IndexCoordinates index) {
147147
setPersistentEntityId(queryObject, documentId);
148148
}
149149

150-
maybeCallbackAfterSaveWithQuery(query);
150+
maybeCallbackAfterSaveWithQuery(query, index);
151151

152152
return documentId;
153153
}
@@ -232,11 +232,11 @@ public UpdateResponse update(UpdateQuery query, IndexCoordinates index) {
232232
}
233233

234234
private List<String> doBulkOperation(List<?> queries, BulkOptions bulkOptions, IndexCoordinates index) {
235-
maybeCallbackBeforeConvertWithQueries(queries);
235+
maybeCallbackBeforeConvertWithQueries(queries, index);
236236
BulkRequest bulkRequest = requestFactory.bulkRequest(queries, bulkOptions, index);
237237
List<String> ids = checkForBulkOperationFailure(
238238
execute(client -> client.bulk(bulkRequest, RequestOptions.DEFAULT)));
239-
maybeCallbackAfterSaveWithQueries(queries);
239+
maybeCallbackAfterSaveWithQueries(queries, index);
240240
return ids;
241241
}
242242
// endregion

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public void setSearchTimeout(String searchTimeout) {
142142
@Override
143143
public String index(IndexQuery query, IndexCoordinates index) {
144144

145-
maybeCallbackBeforeConvertWithQuery(query);
145+
maybeCallbackBeforeConvertWithQuery(query, index);
146146

147147
IndexRequestBuilder indexRequestBuilder = requestFactory.indexRequestBuilder(client, query, index);
148148
String documentId = indexRequestBuilder.execute().actionGet().getId();
@@ -153,7 +153,7 @@ public String index(IndexQuery query, IndexCoordinates index) {
153153
setPersistentEntityId(queryObject, documentId);
154154
}
155155

156-
maybeCallbackAfterSaveWithQuery(query);
156+
maybeCallbackAfterSaveWithQuery(query, index);
157157

158158
return documentId;
159159
}
@@ -196,7 +196,7 @@ public List<String> bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions,
196196

197197
List<String> ids = doBulkOperation(queries, bulkOptions, index);
198198

199-
maybeCallbackAfterSaveWithQueries(queries);
199+
maybeCallbackAfterSaveWithQueries(queries, index);
200200

201201
return ids;
202202
}
@@ -245,7 +245,7 @@ public UpdateResponse update(UpdateQuery query, IndexCoordinates index) {
245245
}
246246

247247
private List<String> doBulkOperation(List<?> queries, BulkOptions bulkOptions, IndexCoordinates index) {
248-
maybeCallbackBeforeConvertWithQueries(queries);
248+
maybeCallbackBeforeConvertWithQueries(queries, index);
249249
BulkRequestBuilder bulkRequest = requestFactory.bulkRequestBuilder(client, queries, bulkOptions, index);
250250
return checkForBulkOperationFailure(bulkRequest.execute().actionGet());
251251
}

src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java

Lines changed: 73 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@
1919

2020
import reactor.core.publisher.Flux;
2121
import reactor.core.publisher.Mono;
22+
import reactor.util.function.Tuple2;
2223

2324
import java.util.ArrayList;
2425
import java.util.Collection;
2526
import java.util.Collections;
2627
import java.util.HashMap;
27-
import java.util.Iterator;
2828
import java.util.List;
2929
import java.util.Map;
3030
import java.util.stream.Collectors;
@@ -187,12 +187,15 @@ public <T> Mono<T> save(T entity, IndexCoordinates index) {
187187

188188
Assert.notNull(entity, "Entity must not be null!");
189189

190-
AdaptibleEntity<T> adaptableEntity = operations.forEntity(entity, converter.getConversionService());
191-
192-
return doIndex(entity, adaptableEntity, index) //
190+
return maybeCallBeforeConvert(entity, index)
191+
.flatMap(entityAfterBeforeConversionCallback -> doIndex(entityAfterBeforeConversionCallback, index)) //
193192
.map(it -> {
194-
return adaptableEntity.populateIdIfNecessary(it.getId());
195-
}).flatMap(this::maybeCallAfterSave);
193+
T savedEntity = it.getT1();
194+
IndexResponse indexResponse = it.getT2();
195+
AdaptibleEntity<T> adaptableEntity = operations.forEntity(savedEntity, converter.getConversionService());
196+
return adaptableEntity.populateIdIfNecessary(indexResponse.getId());
197+
})
198+
.flatMap(saved -> maybeCallAfterSave(saved, index));
196199
}
197200

198201
@Override
@@ -201,32 +204,35 @@ public <T> Mono<T> save(T entity) {
201204
}
202205

203206
@Override
204-
public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entities, IndexCoordinates index) {
205-
206-
Assert.notNull(entities, "Entities must not be null!");
207-
208-
return entities.flatMapMany(entityList -> {
209-
210-
List<AdaptibleEntity<? extends T>> adaptibleEntities = entityList.stream() //
211-
.map(e -> operations.forEntity(e, converter.getConversionService())) //
212-
.collect(Collectors.toList());
207+
public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entitiesPublisher, IndexCoordinates index) {
208+
209+
Assert.notNull(entitiesPublisher, "Entities must not be null!");
210+
211+
return entitiesPublisher
212+
.flatMapMany(entities -> {
213+
return Flux.fromIterable(entities) //
214+
.concatMap(entity -> maybeCallBeforeConvert(entity, index));
215+
})
216+
.collectList()
217+
.map(Entities::new)
218+
.flatMapMany(entities -> {
219+
if (entities.isEmpty()) {
220+
return Flux.empty();
221+
}
213222

214-
if (adaptibleEntities.isEmpty()) {
215-
return Flux.empty();
216-
}
223+
return doBulkOperation(entities.indexQueries(), BulkOptions.defaultOptions(), index) //
224+
.index()
225+
.flatMap(indexAndResponse -> {
226+
T savedEntity = entities.entityAt(indexAndResponse.getT1());
227+
BulkItemResponse bulkItemResponse = indexAndResponse.getT2();
217228

218-
Iterator<AdaptibleEntity<? extends T>> iterator = adaptibleEntities.iterator();
219-
List<IndexQuery> indexRequests = adaptibleEntities.stream() //
220-
.map(e -> getIndexQuery(e.getBean(), e)) //
221-
.collect(Collectors.toList());
222-
return doBulkOperation(indexRequests, BulkOptions.defaultOptions(), index) //
223-
.flatMap(bulkItemResponse -> {
229+
AdaptibleEntity<T> adaptibleEntity = operations.forEntity(savedEntity,
230+
converter.getConversionService());
231+
adaptibleEntity.populateIdIfNecessary(bulkItemResponse.getResponse().getId());
224232

225-
AdaptibleEntity<? extends T> mappedEntity = iterator.next();
226-
mappedEntity.populateIdIfNecessary(bulkItemResponse.getResponse().getId());
227-
return maybeCallAfterSave(mappedEntity.getBean());
228-
});
229-
});
233+
return maybeCallAfterSave(savedEntity, index);
234+
});
235+
});
230236
}
231237

232238
@Override
@@ -332,13 +338,12 @@ protected Mono<Boolean> doExists(GetRequest request) {
332338
.onErrorReturn(NoSuchIndexException.class, false);
333339
}
334340

335-
private Mono<IndexResponse> doIndex(Object value, AdaptibleEntity<?> entity, IndexCoordinates index) {
341+
private <T> Mono<Tuple2<T, IndexResponse>> doIndex(T entity, IndexCoordinates index) {
336342

337-
return maybeCallBeforeConvert(value).flatMap(it -> {
338-
IndexRequest request = getIndexRequest(value, entity, index);
339-
request = prepareIndexRequest(value, request);
340-
return doIndex(request);
341-
});
343+
AdaptibleEntity<?> adaptibleEntity = operations.forEntity(entity, converter.getConversionService());
344+
IndexRequest request = getIndexRequest(entity, adaptibleEntity, index);
345+
request = prepareIndexRequest(entity, request);
346+
return Mono.just(entity).zipWith(doIndex(request));
342347
}
343348

344349
private IndexRequest getIndexRequest(Object value, AdaptibleEntity<?> entity, IndexCoordinates index) {
@@ -361,7 +366,9 @@ private IndexRequest getIndexRequest(Object value, AdaptibleEntity<?> entity, In
361366
return request;
362367
}
363368

364-
private IndexQuery getIndexQuery(Object value, AdaptibleEntity<?> entity) {
369+
private IndexQuery getIndexQuery(Object value) {
370+
AdaptibleEntity<?> entity = operations.forEntity(value, converter.getConversionService());
371+
365372
Object id = entity.getId();
366373
IndexQuery query = new IndexQuery();
367374
if (id != null) {
@@ -912,19 +919,19 @@ private RuntimeException translateException(Throwable throwable) {
912919
}
913920

914921
// region callbacks
915-
protected <T> Mono<T> maybeCallBeforeConvert(T entity) {
922+
protected <T> Mono<T> maybeCallBeforeConvert(T entity, IndexCoordinates index) {
916923

917924
if (null != entityCallbacks) {
918-
return entityCallbacks.callback(ReactiveBeforeConvertCallback.class, entity);
925+
return entityCallbacks.callback(ReactiveBeforeConvertCallback.class, entity, index);
919926
}
920927

921928
return Mono.just(entity);
922929
}
923930

924-
protected <T> Mono<T> maybeCallAfterSave(T entity) {
931+
protected <T> Mono<T> maybeCallAfterSave(T entity, IndexCoordinates index) {
925932

926933
if (null != entityCallbacks) {
927-
return entityCallbacks.callback(ReactiveAfterSaveCallback.class, entity);
934+
return entityCallbacks.callback(ReactiveAfterSaveCallback.class, entity, index);
928935
}
929936

930937
return Mono.just(entity);
@@ -995,4 +1002,30 @@ public Mono<SearchHit<T>> doWith(SearchDocument response) {
9951002
.map(entity -> SearchHitMapping.mappingFor(type, converter.getMappingContext()).mapHit(response, entity));
9961003
}
9971004
}
1005+
1006+
private class Entities<T> {
1007+
private final List<T> entities;
1008+
1009+
private Entities(List<T> entities) {
1010+
Assert.notNull(entities, "entities cannot be null");
1011+
1012+
this.entities = entities;
1013+
}
1014+
1015+
private boolean isEmpty() {
1016+
return entities.isEmpty();
1017+
}
1018+
1019+
private List<IndexQuery> indexQueries() {
1020+
return entities.stream()
1021+
.map(ReactiveElasticsearchTemplate.this::getIndexQuery)
1022+
.collect(Collectors.toList());
1023+
}
1024+
1025+
private T entityAt(long index) {
1026+
// it's safe to cast to int because the original indexed colleciton was fitting in memory
1027+
int intIndex = (int) index;
1028+
return entities.get(intIndex);
1029+
}
1030+
}
9981031
}

src/main/java/org/springframework/data/elasticsearch/core/event/AfterSaveCallback.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package org.springframework.data.elasticsearch.core.event;
1717

18+
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
1819
import org.springframework.data.mapping.callback.EntityCallback;
1920
import org.springframework.data.mapping.callback.EntityCallbacks;
2021

@@ -33,7 +34,8 @@ public interface AfterSaveCallback<T> extends EntityCallback<T> {
3334
* the domain object.
3435
*
3536
* @param entity the domain object that was saved.
37+
* @param index must not be {@literal null}.
3638
* @return the domain object that was persisted.
3739
*/
38-
T onAfterSave(T entity);
40+
T onAfterSave(T entity, IndexCoordinates index);
3941
}

src/main/java/org/springframework/data/elasticsearch/core/event/AuditingEntityCallback.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
import org.springframework.beans.factory.ObjectFactory;
1919
import org.springframework.core.Ordered;
2020
import org.springframework.data.auditing.IsNewAwareAuditingHandler;
21+
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
2122
import org.springframework.data.mapping.callback.EntityCallback;
2223
import org.springframework.util.Assert;
2324

2425
/**
2526
* {@link EntityCallback} to populate auditing related fields on an entity about to be saved.
2627
*
2728
* @author Peter-Josef Meisch
29+
* @author Roman Puchkovskiy
2830
* @since 4.0
2931
*/
3032
public class AuditingEntityCallback implements BeforeConvertCallback<Object>, Ordered {
@@ -45,7 +47,7 @@ public AuditingEntityCallback(ObjectFactory<IsNewAwareAuditingHandler> auditingH
4547
}
4648

4749
@Override
48-
public Object onBeforeConvert(Object entity) {
50+
public Object onBeforeConvert(Object entity, IndexCoordinates index) {
4951
return auditingHandlerFactory.getObject().markAudited(entity);
5052
}
5153

src/main/java/org/springframework/data/elasticsearch/core/event/BeforeConvertCallback.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
*/
1616
package org.springframework.data.elasticsearch.core.event;
1717

18+
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
1819
import org.springframework.data.mapping.callback.EntityCallback;
1920

2021
/**
2122
* Callback being invoked before a domain object is converted to be persisted.
2223
*
2324
* @author Peter-Josef Meisch
25+
* @author Roman Puchkovskiy
2426
* @since 4.0
2527
*/
2628
@FunctionalInterface
@@ -31,7 +33,8 @@ public interface BeforeConvertCallback<T> extends EntityCallback<T> {
3133
* the domain entity class.
3234
*
3335
* @param entity the entity being converted
36+
* @param index must not be {@literal null}.
3437
* @return the entity to be converted
3538
*/
36-
T onBeforeConvert(T entity);
39+
T onBeforeConvert(T entity, IndexCoordinates index);
3740
}

0 commit comments

Comments
 (0)