Skip to content

Commit 9b620b3

Browse files
authored
DATAES-799 - Support optimistic locking for full update scenario using seq_no + primary_term.
Original PR: spring-projects#441
1 parent 853980c commit 9b620b3

29 files changed

+1414
-41
lines changed

src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.springframework.data.elasticsearch.core.query.MoreLikeThisQuery;
5454
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
5555
import org.springframework.data.elasticsearch.core.query.Query;
56+
import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm;
5657
import org.springframework.data.elasticsearch.support.VersionInfo;
5758
import org.springframework.data.mapping.callback.EntityCallbacks;
5859
import org.springframework.data.util.CloseableIterator;
@@ -438,18 +439,40 @@ private Long getEntityVersion(Object entity) {
438439
return null;
439440
}
440441

442+
@Nullable
443+
private SeqNoPrimaryTerm getEntitySeqNoPrimaryTerm(Object entity) {
444+
ElasticsearchPersistentEntity<?> persistentEntity = getRequiredPersistentEntity(entity.getClass());
445+
ElasticsearchPersistentProperty property = persistentEntity.getSeqNoPrimaryTermProperty();
446+
447+
if (property != null) {
448+
Object seqNoPrimaryTerm = persistentEntity.getPropertyAccessor(entity).getProperty(property);
449+
450+
if (seqNoPrimaryTerm != null && SeqNoPrimaryTerm.class.isAssignableFrom(seqNoPrimaryTerm.getClass())) {
451+
return (SeqNoPrimaryTerm) seqNoPrimaryTerm;
452+
}
453+
}
454+
455+
return null;
456+
}
457+
441458
private <T> IndexQuery getIndexQuery(T entity) {
442459
String id = getEntityId(entity);
443460

444461
if (id != null) {
445462
id = elasticsearchConverter.convertId(id);
446463
}
447464

448-
return new IndexQueryBuilder() //
465+
IndexQueryBuilder builder = new IndexQueryBuilder() //
449466
.withId(id) //
450-
.withVersion(getEntityVersion(entity)) //
451-
.withObject(entity) //
452-
.build();
467+
.withObject(entity);
468+
SeqNoPrimaryTerm seqNoPrimaryTerm = getEntitySeqNoPrimaryTerm(entity);
469+
if (seqNoPrimaryTerm != null) {
470+
builder.withSeqNoPrimaryTerm(seqNoPrimaryTerm);
471+
} else {
472+
// version cannot be used together with seq_no and primary_term
473+
builder.withVersion(getEntityVersion(entity));
474+
}
475+
return builder.build();
453476
}
454477

455478
/**

src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ default void bulkUpdate(List<UpdateQuery> queries, IndexCoordinates index) {
250250
* @param clazz the type of the object to be returned
251251
* @param index the index from which the object is read.
252252
* @return the found object
253-
* @deprecated since 4.0, use {@link #getById(String, Class, IndexCoordinates)}
253+
* @deprecated since 4.0, use {@link #get(String, Class, IndexCoordinates)}
254254
*/
255255
@Deprecated
256256
@Nullable

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchExceptionTranslator.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@
2222
import org.elasticsearch.ElasticsearchException;
2323
import org.elasticsearch.ElasticsearchStatusException;
2424
import org.elasticsearch.common.ValidationException;
25+
import org.elasticsearch.index.engine.VersionConflictEngineException;
26+
import org.elasticsearch.rest.RestStatus;
2527
import org.springframework.dao.DataAccessException;
2628
import org.springframework.dao.DataAccessResourceFailureException;
2729
import org.springframework.dao.DataIntegrityViolationException;
30+
import org.springframework.dao.OptimisticLockingFailureException;
2831
import org.springframework.dao.support.PersistenceExceptionTranslator;
2932
import org.springframework.data.elasticsearch.NoSuchIndexException;
3033
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
@@ -35,6 +38,7 @@
3538
/**
3639
* @author Christoph Strobl
3740
* @author Peter-Josef Meisch
41+
* @author Roman Puchkovskiy
3842
* @since 3.2
3943
*/
4044
public class ElasticsearchExceptionTranslator implements PersistenceExceptionTranslator {
@@ -50,6 +54,12 @@ public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
5054
return new NoSuchIndexException(ObjectUtils.nullSafeToString(elasticsearchException.getMetadata("es.index")),
5155
ex);
5256
}
57+
58+
if (isSeqNoConflict(elasticsearchException)) {
59+
return new OptimisticLockingFailureException("Cannot index a document due to seq_no+primary_term conflict",
60+
elasticsearchException);
61+
}
62+
5363
return new UncategorizedElasticsearchException(ex.getMessage(), ex);
5464
}
5565

@@ -65,6 +75,25 @@ public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
6575
return null;
6676
}
6777

78+
private boolean isSeqNoConflict(ElasticsearchException exception) {
79+
80+
if (exception instanceof ElasticsearchStatusException) {
81+
ElasticsearchStatusException statusException = (ElasticsearchStatusException) exception;
82+
return statusException.status() == RestStatus.CONFLICT
83+
&& statusException.getMessage() != null
84+
&& statusException.getMessage().contains("type=version_conflict_engine_exception")
85+
&& statusException.getMessage().contains("version conflict, required seqNo");
86+
}
87+
88+
if (exception instanceof VersionConflictEngineException) {
89+
VersionConflictEngineException versionConflictEngineException = (VersionConflictEngineException) exception;
90+
return versionConflictEngineException.getMessage() != null
91+
&& versionConflictEngineException.getMessage().contains("version conflict, required seqNo");
92+
}
93+
94+
return false;
95+
}
96+
6897
private boolean indexAvailable(ElasticsearchException ex) {
6998

7099
List<String> metadata = ex.getMetadata("es.index_uuid");

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.action.get.GetResponse;
2828
import org.elasticsearch.action.get.MultiGetRequestBuilder;
2929
import org.elasticsearch.action.index.IndexRequestBuilder;
30+
import org.elasticsearch.action.index.IndexResponse;
3031
import org.elasticsearch.action.search.MultiSearchRequest;
3132
import org.elasticsearch.action.search.MultiSearchResponse;
3233
import org.elasticsearch.action.search.SearchRequestBuilder;
@@ -89,6 +90,8 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
8990
private Client client;
9091
@Nullable private String searchTimeout;
9192

93+
private final ElasticsearchExceptionTranslator exceptionTranslator = new ElasticsearchExceptionTranslator();
94+
9295
// region Initialization
9396
public ElasticsearchTemplate(Client client) {
9497
this.client = client;
@@ -145,7 +148,14 @@ public String index(IndexQuery query, IndexCoordinates index) {
145148
maybeCallbackBeforeConvertWithQuery(query, index);
146149

147150
IndexRequestBuilder indexRequestBuilder = requestFactory.indexRequestBuilder(client, query, index);
148-
String documentId = indexRequestBuilder.execute().actionGet().getId();
151+
ActionFuture<IndexResponse> future = indexRequestBuilder.execute();
152+
IndexResponse response;
153+
try {
154+
response = future.actionGet();
155+
} catch (RuntimeException e) {
156+
throw translateException(e);
157+
}
158+
String documentId = response.getId();
149159

150160
// We should call this because we are not going through a mapper.
151161
Object queryObject = query.getObject();
@@ -360,4 +370,22 @@ public Client getClient() {
360370
return client;
361371
}
362372
// endregion
373+
374+
/**
375+
* translates an Exception if possible. Exceptions that are no {@link RuntimeException}s are wrapped in a
376+
* RuntimeException
377+
*
378+
* @param exception the Exception to map
379+
* @return the potentially translated RuntimeException.
380+
* @since 4.0
381+
*/
382+
private RuntimeException translateException(Exception exception) {
383+
384+
RuntimeException runtimeException = exception instanceof RuntimeException ? (RuntimeException) exception
385+
: new RuntimeException(exception.getMessage(), exception);
386+
RuntimeException potentiallyTranslatedException = exceptionTranslator
387+
.translateExceptionIfPossible(runtimeException);
388+
389+
return potentiallyTranslatedException != null ? potentiallyTranslatedException : runtimeException;
390+
}
363391
}

src/main/java/org/springframework/data/elasticsearch/core/EntityOperations.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
2222
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
2323
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
24+
import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm;
2425
import org.springframework.data.mapping.IdentifierAccessor;
2526
import org.springframework.data.mapping.PersistentPropertyAccessor;
2627
import org.springframework.data.mapping.context.MappingContext;
@@ -35,6 +36,7 @@
3536
* @author Mark Paluch
3637
* @author Christoph Strobl
3738
* @author Peter-Josef Meisch
39+
* @author Roman Puchkovskiy
3840
* @since 3.2
3941
*/
4042
class EntityOperations {
@@ -256,6 +258,21 @@ interface AdaptibleEntity<T> extends Entity<T> {
256258
@Override
257259
@Nullable
258260
Number getVersion();
261+
262+
/**
263+
* Returns whether there is a property with type SeqNoPrimaryTerm in this entity.
264+
*
265+
* @return true if there is SeqNoPrimaryTerm property
266+
* @since 4.0
267+
*/
268+
boolean hasSeqNoPrimaryTerm();
269+
270+
/**
271+
* Returns SeqNoPropertyTerm for this entity.
272+
*
273+
* @return SeqNoPrimaryTerm, may be {@literal null}
274+
*/
275+
@Nullable SeqNoPrimaryTerm getSeqNoPrimaryTerm();
259276
}
260277

261278
/**
@@ -333,6 +350,16 @@ public Number getVersion() {
333350
return null;
334351
}
335352

353+
@Override
354+
public boolean hasSeqNoPrimaryTerm() {
355+
return false;
356+
}
357+
358+
@Override
359+
public SeqNoPrimaryTerm getSeqNoPrimaryTerm() {
360+
return null;
361+
}
362+
336363
/*
337364
* (non-Javadoc)
338365
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#incrementVersion()
@@ -588,6 +615,19 @@ public Number getVersion() {
588615
return propertyAccessor.getProperty(versionProperty, Number.class);
589616
}
590617

618+
@Override
619+
public boolean hasSeqNoPrimaryTerm() {
620+
return entity.hasSeqNoPrimaryTermProperty();
621+
}
622+
623+
@Override
624+
public SeqNoPrimaryTerm getSeqNoPrimaryTerm() {
625+
626+
ElasticsearchPersistentProperty seqNoPrimaryTermProperty = entity.getRequiredSeqNoPrimaryTermProperty();
627+
628+
return propertyAccessor.getProperty(seqNoPrimaryTermProperty, SeqNoPrimaryTerm.class);
629+
}
630+
591631
/*
592632
* (non-Javadoc)
593633
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#initializeVersionProperty()

src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import org.springframework.data.elasticsearch.core.query.IndexQuery;
8585
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
8686
import org.springframework.data.elasticsearch.core.query.Query;
87+
import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm;
8788
import org.springframework.data.elasticsearch.core.query.StringQuery;
8889
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
8990
import org.springframework.data.elasticsearch.support.VersionInfo;
@@ -347,7 +348,19 @@ private IndexRequest getIndexRequest(Object value, AdaptibleEntity<?> entity, In
347348

348349
request.source(converter.mapObject(value).toJson(), Requests.INDEX_CONTENT_TYPE);
349350

350-
if (entity.isVersionedEntity()) {
351+
boolean usingSeqNo = false;
352+
if (entity.hasSeqNoPrimaryTerm()) {
353+
SeqNoPrimaryTerm seqNoPrimaryTerm = entity.getSeqNoPrimaryTerm();
354+
355+
if (seqNoPrimaryTerm != null) {
356+
request.setIfSeqNo(seqNoPrimaryTerm.getSequenceNumber());
357+
request.setIfPrimaryTerm(seqNoPrimaryTerm.getPrimaryTerm());
358+
usingSeqNo = true;
359+
}
360+
}
361+
362+
// seq_no and version are incompatible in the same request
363+
if (!usingSeqNo && entity.isVersionedEntity()) {
351364

352365
Number version = entity.getVersion();
353366

@@ -356,6 +369,7 @@ private IndexRequest getIndexRequest(Object value, AdaptibleEntity<?> entity, In
356369
request.versionType(EXTERNAL);
357370
}
358371
}
372+
359373
return request;
360374
}
361375

src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
*
8484
* @author Peter-Josef Meisch
8585
* @author Sascha Woo
86+
* @author Roman Puchkovskiy
8687
* @since 4.0
8788
*/
8889
class RequestFactory {
@@ -342,6 +343,13 @@ public IndexRequest indexRequest(IndexQuery query, IndexCoordinates index) {
342343
indexRequest.versionType(versionType);
343344
}
344345

346+
if (query.getSeqNo() != null) {
347+
indexRequest.setIfSeqNo(query.getSeqNo());
348+
}
349+
if (query.getPrimaryTerm() != null) {
350+
indexRequest.setIfPrimaryTerm(query.getPrimaryTerm());
351+
}
352+
345353
return indexRequest;
346354
}
347355

@@ -374,6 +382,13 @@ public IndexRequestBuilder indexRequestBuilder(Client client, IndexQuery query,
374382
indexRequestBuilder.setVersionType(versionType);
375383
}
376384

385+
if (query.getSeqNo() != null) {
386+
indexRequestBuilder.setIfSeqNo(query.getSeqNo());
387+
}
388+
if (query.getPrimaryTerm() != null) {
389+
indexRequestBuilder.setIfPrimaryTerm(query.getPrimaryTerm());
390+
}
391+
377392
return indexRequestBuilder;
378393
}
379394

@@ -618,6 +633,9 @@ private SearchRequest prepareSearchRequest(Query query, @Nullable Class<?> clazz
618633
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
619634
sourceBuilder.version(true);
620635
sourceBuilder.trackScores(query.getTrackScores());
636+
if (hasSeqNoPrimaryTermProperty(clazz)) {
637+
sourceBuilder.seqNoAndPrimaryTerm(true);
638+
}
621639

622640
if (query.getSourceFilter() != null) {
623641
SourceFilter sourceFilter = query.getSourceFilter();
@@ -681,7 +699,20 @@ private SearchRequest prepareSearchRequest(Query query, @Nullable Class<?> clazz
681699
return request;
682700
}
683701

684-
@SuppressWarnings("unchecked")
702+
private boolean hasSeqNoPrimaryTermProperty(@Nullable Class<?> entityClass) {
703+
704+
if (entityClass == null) {
705+
return false;
706+
}
707+
708+
if (!elasticsearchConverter.getMappingContext().hasPersistentEntityFor(entityClass)) {
709+
return false;
710+
}
711+
712+
ElasticsearchPersistentEntity<?> entity = elasticsearchConverter.getMappingContext().getRequiredPersistentEntity(entityClass);
713+
return entity.hasSeqNoPrimaryTermProperty();
714+
}
715+
685716
public PutMappingRequest putMappingRequest(IndexCoordinates index, Document mapping) {
686717
PutMappingRequest request = new PutMappingRequest(index.getIndexName());
687718
request.source(mapping);
@@ -784,6 +815,9 @@ private SearchRequestBuilder prepareSearchRequestBuilder(Query query, Client cli
784815
.setSearchType(query.getSearchType()) //
785816
.setVersion(true) //
786817
.setTrackScores(query.getTrackScores());
818+
if (hasSeqNoPrimaryTermProperty(clazz)) {
819+
searchRequestBuilder.seqNoAndPrimaryTerm(true);
820+
}
787821

788822
if (query.getSourceFilter() != null) {
789823
SourceFilter sourceFilter = query.getSourceFilter();

0 commit comments

Comments
 (0)