Skip to content

Commit b2ffc23

Browse files
authored
DATAES-738 - Add entity related save methods to DocumentOperations.
Original PR: spring-projects#389
1 parent 0c15eef commit b2ffc23

File tree

8 files changed

+290
-42
lines changed

8 files changed

+290
-42
lines changed

src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package org.springframework.data.elasticsearch.core;
22

33
import java.util.ArrayList;
4+
import java.util.Arrays;
45
import java.util.HashMap;
56
import java.util.Iterator;
67
import java.util.List;
78
import java.util.Map;
9+
import java.util.stream.Collectors;
10+
import java.util.stream.Stream;
811

912
import org.elasticsearch.action.bulk.BulkItemResponse;
1013
import org.elasticsearch.action.bulk.BulkResponse;
@@ -27,10 +30,13 @@
2730
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
2831
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
2932
import org.springframework.data.elasticsearch.core.query.DeleteQuery;
33+
import org.springframework.data.elasticsearch.core.query.IndexQuery;
34+
import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder;
3035
import org.springframework.data.elasticsearch.core.query.MoreLikeThisQuery;
3136
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
3237
import org.springframework.data.elasticsearch.core.query.Query;
3338
import org.springframework.data.util.CloseableIterator;
39+
import org.springframework.data.util.Streamable;
3440
import org.springframework.lang.Nullable;
3541
import org.springframework.util.Assert;
3642

@@ -83,6 +89,68 @@ public IndexOperations getIndexOperations() {
8389
// endregion
8490

8591
// region DocumentOperations
92+
93+
@Override
94+
public <T> T save(T entity) {
95+
96+
Assert.notNull(entity, "entity must not be null");
97+
98+
return save(entity, getIndexCoordinatesFor(entity.getClass()));
99+
}
100+
101+
@Override
102+
public <T> T save(T entity, IndexCoordinates index) {
103+
104+
Assert.notNull(entity, "entity must not be null");
105+
Assert.notNull(index, "index must not be null");
106+
107+
index(getIndexQuery(entity), index);
108+
return entity;
109+
}
110+
111+
@Override
112+
public <T> Iterable<T> save(Iterable<T> entities) {
113+
114+
Assert.notNull(entities, "entities must not be null");
115+
116+
Iterator<T> iterator = entities.iterator();
117+
if (iterator.hasNext()) {
118+
return save(entities, getIndexCoordinatesFor(iterator.next().getClass()));
119+
}
120+
121+
return entities;
122+
}
123+
124+
@Override
125+
public <T> Iterable<T> save(Iterable<T> entities, IndexCoordinates index) {
126+
127+
Assert.notNull(entities, "entities must not be null");
128+
Assert.notNull(index, "index must not be null");
129+
130+
List<IndexQuery> indexQueries = Streamable.of(entities).stream().map(this::getIndexQuery)
131+
.collect(Collectors.toList());
132+
133+
if (!indexQueries.isEmpty()) {
134+
List<String> ids = bulkIndex(indexQueries, index);
135+
Iterator<String> idIterator = ids.iterator();
136+
entities.forEach(entity -> {
137+
setPersistentEntityId(entity, idIterator.next());
138+
});
139+
}
140+
141+
return entities;
142+
}
143+
144+
@Override
145+
public <T> Iterable<T> save(T... entities) {
146+
return save(Arrays.asList(entities));
147+
}
148+
149+
@Override
150+
public <T> Iterable<T> save(IndexCoordinates index, T... entities) {
151+
return save(Arrays.asList(entities), index);
152+
}
153+
86154
@Override
87155
public void delete(Query query, Class<?> clazz, IndexCoordinates index) {
88156

@@ -197,7 +265,11 @@ public IndexCoordinates getIndexCoordinatesFor(Class<?> clazz) {
197265
return getRequiredPersistentEntity(clazz).getIndexCoordinates();
198266
}
199267

200-
protected void checkForBulkOperationFailure(BulkResponse bulkResponse) {
268+
/**
269+
* @param bulkResponse
270+
* @return the list of the item id's
271+
*/
272+
protected List<String> checkForBulkOperationFailure(BulkResponse bulkResponse) {
201273

202274
if (bulkResponse.hasFailures()) {
203275
Map<String, String> failedDocuments = new HashMap<>();
@@ -211,6 +283,8 @@ protected void checkForBulkOperationFailure(BulkResponse bulkResponse) {
211283
+ failedDocuments + ']',
212284
failedDocuments);
213285
}
286+
287+
return Stream.of(bulkResponse.getItems()).map(BulkItemResponse::getId).collect(Collectors.toList());
214288
}
215289

216290
protected void setPersistentEntityId(Object entity, String id) {
@@ -227,5 +301,39 @@ protected void setPersistentEntityId(Object entity, String id) {
227301
ElasticsearchPersistentEntity<?> getRequiredPersistentEntity(Class<?> clazz) {
228302
return elasticsearchConverter.getMappingContext().getRequiredPersistentEntity(clazz);
229303
}
304+
305+
@Nullable
306+
private String getEntityId(Object entity) {
307+
ElasticsearchPersistentEntity<?> persistentEntity = getRequiredPersistentEntity(entity.getClass());
308+
ElasticsearchPersistentProperty idProperty = persistentEntity.getIdProperty();
309+
310+
if (idProperty != null) {
311+
return stringIdRepresentation(persistentEntity.getPropertyAccessor(entity).getProperty(idProperty));
312+
}
313+
314+
return null;
315+
}
316+
317+
@Nullable
318+
private Long getEntityVersion(Object entity) {
319+
ElasticsearchPersistentEntity<?> persistentEntity = getRequiredPersistentEntity(entity.getClass());
320+
ElasticsearchPersistentProperty versionProperty = persistentEntity.getVersionProperty();
321+
322+
if (versionProperty != null) {
323+
Object version = persistentEntity.getPropertyAccessor(entity).getProperty(versionProperty);
324+
325+
if (version != null && Long.class.isAssignableFrom(version.getClass())) {
326+
return ((Long) version);
327+
}
328+
}
329+
330+
return null;
331+
}
332+
333+
private <T> IndexQuery getIndexQuery(T entity) {
334+
return new IndexQueryBuilder().withObject(entity).withId(getEntityId(entity)).withVersion(getEntityVersion(entity))
335+
.build();
336+
}
337+
230338
// endregion
231339
}

src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.data.elasticsearch.core.query.IndexQuery;
2626
import org.springframework.data.elasticsearch.core.query.Query;
2727
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
28+
import org.springframework.lang.Nullable;
2829

2930
/**
3031
* The operations for the
@@ -35,6 +36,63 @@
3536
*/
3637
public interface DocumentOperations {
3738

39+
/**
40+
* Saves an entity to the index specified in the entity's Document annotation
41+
*
42+
* @param entity the entity to save, must not be {@literal null}
43+
* @param <T> the entity type
44+
* @return the saved entity
45+
*/
46+
<T> T save(T entity);
47+
48+
/**
49+
* Saves an entity to the index specified in the entity's Document annotation
50+
*
51+
* @param entity the entity to save, must not be {@literal null}
52+
* @param index the index to save the entity in, must not be {@literal null}
53+
* @param <T> the entity type
54+
* @return the saved entity
55+
*/
56+
<T> T save(T entity, IndexCoordinates index);
57+
58+
/**
59+
* saves the given entities to the index retrieved from the entities' Document annotation
60+
*
61+
* @param entities must not be {@literal null}
62+
* @param <T> the entity type
63+
* @return the saved entites
64+
*/
65+
<T> Iterable<T> save(Iterable<T> entities);
66+
67+
/**
68+
* saves the given entities to the given index
69+
*
70+
* @param entities must not be {@literal null}
71+
* @param index the idnex to save the entities in, must not be {@literal null}
72+
* @param <T> the entity type
73+
* @return the saved entites
74+
*/
75+
<T> Iterable<T> save(Iterable<T> entities, IndexCoordinates index);
76+
77+
/**
78+
* saves the given entities to the index retrieved from the entities' Document annotation
79+
*
80+
* @param entities must not be {@literal null}
81+
* @param <T> the entity type
82+
* @return the saved entites as Iterable
83+
*/
84+
<T> Iterable<T> save(T... entities);
85+
86+
/**
87+
* saves the given entities to the given index.
88+
*
89+
* @param index the idnex to save the entities in, must not be {@literal null}
90+
* @param entities must not be {@literal null}
91+
* @param <T> the entity type
92+
* @return the saved entites as Iterable
93+
*/
94+
<T> Iterable<T> save(IndexCoordinates index, T... entities);
95+
3896
/**
3997
* Index an object. Will do save or update.
4098
*
@@ -52,6 +110,7 @@ public interface DocumentOperations {
52110
* @param index the index from which the object is read.
53111
* @return the found object
54112
*/
113+
@Nullable
55114
<T> T get(GetQuery query, Class<T> clazz, IndexCoordinates index);
56115

57116
/**
@@ -68,18 +127,20 @@ public interface DocumentOperations {
68127
* Bulk index all objects. Will do save or update.
69128
*
70129
* @param queries the queries to execute in bulk
130+
* @return the ids of the indexed objects
71131
*/
72-
default void bulkIndex(List<IndexQuery> queries, IndexCoordinates index) {
73-
bulkIndex(queries, BulkOptions.defaultOptions(), index);
132+
default List<String> bulkIndex(List<IndexQuery> queries, IndexCoordinates index) {
133+
return bulkIndex(queries, BulkOptions.defaultOptions(), index);
74134
}
75135

76136
/**
77137
* Bulk index all objects. Will do save or update.
78-
*
138+
*
79139
* @param queries the queries to execute in bulk
80140
* @param bulkOptions options to be added to the bulk request
141+
* @return the ids of the indexed objects
81142
*/
82-
void bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions, IndexCoordinates index);
143+
List<String> bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions, IndexCoordinates index);
83144

84145
/**
85146
* Bulk update all objects. Will do update.

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
import java.util.List;
1919
import java.util.Map;
20+
import java.util.Objects;
2021

2122
import org.elasticsearch.cluster.metadata.AliasMetaData;
2223
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
2324
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
2425
import org.springframework.data.elasticsearch.core.query.AliasQuery;
26+
import org.springframework.lang.Nullable;
2527

2628
/**
2729
* ElasticsearchOperations. Since 4.0 this interface only contains common helper functions, the other methods have been
@@ -312,4 +314,18 @@ default void refresh(Class<?> clazz) {
312314
getIndexOperations().refresh(clazz);
313315
}
314316
// endregion
317+
318+
// region helper
319+
/**
320+
* gets the String representation for an id.
321+
*
322+
* @param id
323+
* @return
324+
* @since 4.0
325+
*/
326+
@Nullable
327+
default String stringIdRepresentation(@Nullable Object id) {
328+
return Objects.toString(id, null);
329+
}
330+
// endregion
315331
}

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ public String index(IndexQuery query, IndexCoordinates index) {
127127
}
128128

129129
@Override
130+
@Nullable
130131
public <T> T get(GetQuery query, Class<T> clazz, IndexCoordinates index) {
131132
GetRequest request = requestFactory.getRequest(query, index);
132133
try {
@@ -153,12 +154,12 @@ public <T> List<T> multiGet(Query query, Class<T> clazz, IndexCoordinates index)
153154
}
154155

155156
@Override
156-
public void bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions, IndexCoordinates index) {
157+
public List<String> bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions, IndexCoordinates index) {
157158

158159
Assert.notNull(queries, "List of IndexQuery must not be null");
159160
Assert.notNull(bulkOptions, "BulkOptions must not be null");
160161

161-
doBulkOperation(queries, bulkOptions, index);
162+
return doBulkOperation(queries, bulkOptions, index);
162163
}
163164

164165
@Override
@@ -200,10 +201,10 @@ public UpdateResponse update(UpdateQuery query, IndexCoordinates index) {
200201
}
201202
}
202203

203-
private void doBulkOperation(List<?> queries, BulkOptions bulkOptions, IndexCoordinates index) {
204+
private List<String> doBulkOperation(List<?> queries, BulkOptions bulkOptions, IndexCoordinates index) {
204205
BulkRequest bulkRequest = requestFactory.bulkRequest(queries, bulkOptions, index);
205206
try {
206-
checkForBulkOperationFailure(client.bulk(bulkRequest, RequestOptions.DEFAULT));
207+
return checkForBulkOperationFailure(client.bulk(bulkRequest, RequestOptions.DEFAULT));
207208
} catch (IOException e) {
208209
throw new ElasticsearchException("Error while bulk for request: " + bulkRequest.toString(), e);
209210
}
@@ -212,7 +213,7 @@ private void doBulkOperation(List<?> queries, BulkOptions bulkOptions, IndexCoor
212213

213214
// region SearchOperations
214215
@Override
215-
public long count(Query query,@Nullable Class<?> clazz, IndexCoordinates index) {
216+
public long count(Query query, @Nullable Class<?> clazz, IndexCoordinates index) {
216217

217218
Assert.notNull(query, "query must not be null");
218219
Assert.notNull(index, "index must not be null");

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ public String index(IndexQuery query, IndexCoordinates index) {
127127
}
128128

129129
@Override
130+
@Nullable
130131
public <T> T get(GetQuery query, Class<T> clazz, IndexCoordinates index) {
131132
GetRequestBuilder getRequestBuilder = requestFactory.getRequestBuilder(client, query, index);
132133
GetResponse response = getRequestBuilder.execute().actionGet();
@@ -145,12 +146,12 @@ public <T> List<T> multiGet(Query query, Class<T> clazz, IndexCoordinates index)
145146
}
146147

147148
@Override
148-
public void bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions, IndexCoordinates index) {
149+
public List<String> bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions, IndexCoordinates index) {
149150

150151
Assert.notNull(queries, "List of IndexQuery must not be null");
151152
Assert.notNull(bulkOptions, "BulkOptions must not be null");
152153

153-
doBulkOperation(queries, bulkOptions, index);
154+
return doBulkOperation(queries, bulkOptions, index);
154155
}
155156

156157
@Override
@@ -178,9 +179,9 @@ public UpdateResponse update(UpdateQuery query, IndexCoordinates index) {
178179
return updateRequestBuilder.execute().actionGet();
179180
}
180181

181-
private void doBulkOperation(List<?> queries, BulkOptions bulkOptions, IndexCoordinates index) {
182+
private List<String> doBulkOperation(List<?> queries, BulkOptions bulkOptions, IndexCoordinates index) {
182183
BulkRequestBuilder bulkRequest = requestFactory.bulkRequestBuilder(client, queries, bulkOptions, index);
183-
checkForBulkOperationFailure(bulkRequest.execute().actionGet());
184+
return checkForBulkOperationFailure(bulkRequest.execute().actionGet());
184185
}
185186
// endregion
186187

0 commit comments

Comments
 (0)