Skip to content

Commit d34a30a

Browse files
committed
Bulk API: Improve memory usage when executing large bulk requests, closes elastic#724.
1 parent ab53303 commit d34a30a

File tree

7 files changed

+83
-237
lines changed

7 files changed

+83
-237
lines changed

modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 66 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,9 @@
3535
import org.elasticsearch.cluster.metadata.MappingMetaData;
3636
import org.elasticsearch.cluster.routing.ShardIterator;
3737
import org.elasticsearch.common.Strings;
38-
import org.elasticsearch.common.collect.Sets;
3938
import org.elasticsearch.common.inject.Inject;
4039
import org.elasticsearch.common.settings.Settings;
4140
import org.elasticsearch.index.engine.Engine;
42-
import org.elasticsearch.index.engine.EngineException;
4341
import org.elasticsearch.index.mapper.DocumentMapper;
4442
import org.elasticsearch.index.mapper.MapperService;
4543
import org.elasticsearch.index.mapper.ParsedDocument;
@@ -54,7 +52,6 @@
5452
import org.elasticsearch.transport.TransportService;
5553

5654
import java.io.IOException;
57-
import java.util.Set;
5855

5956
/**
6057
* Performs the index operation.
@@ -106,11 +103,14 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
106103
}
107104

108105
@Override protected PrimaryResponse<BulkShardResponse> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
109-
IndexShard indexShard = indexShard(shardRequest);
110106
final BulkShardRequest request = shardRequest.request;
111-
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
107+
IndexShard indexShard = indexShard(shardRequest);
108+
112109
Engine.Operation[] ops = new Engine.Operation[request.items().length];
113-
for (int i = 0; i < ops.length; i++) {
110+
111+
112+
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
113+
for (int i = 0; i < request.items().length; i++) {
114114
BulkItemRequest item = request.items()[i];
115115
if (item.request() instanceof IndexRequest) {
116116
IndexRequest indexRequest = (IndexRequest) item.request();
@@ -126,11 +126,39 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
126126

127127
SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id())
128128
.routing(indexRequest.routing()).parent(indexRequest.parent());
129+
long version;
130+
ParsedDocument doc;
131+
Engine.Operation op;
129132
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
130-
ops[i] = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.PRIMARY);
133+
Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.PRIMARY);
134+
doc = indexShard.index(index);
135+
version = index.version();
136+
op = index;
131137
} else {
132-
ops[i] = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.PRIMARY);
138+
Engine.Create create = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.PRIMARY);
139+
doc = indexShard.create(create);
140+
version = create.version();
141+
op = create;
142+
}
143+
// update the version on request so it will happen on the replicas
144+
indexRequest.version(version);
145+
146+
// update mapping on master if needed, we won't update changes to the same type, since once its changed, it won't have mappers added
147+
if (doc.mappersAdded()) {
148+
updateMappingOnMaster(indexRequest);
149+
}
150+
151+
// if we are going to percolate, then we need to keep this op for the postPrimary operation
152+
if (Strings.hasLength(indexRequest.percolate())) {
153+
if (ops == null) {
154+
ops = new Engine.Operation[request.items().length];
155+
}
156+
ops[i] = op;
133157
}
158+
159+
// add the response
160+
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().toString().toLowerCase(),
161+
new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id(), version));
134162
} catch (Exception e) {
135163
if (logger.isDebugEnabled()) {
136164
logger.debug("[" + shardRequest.request.index() + "][" + shardRequest.shardId + "]" + ": Failed to execute bulk item (index) [" + indexRequest + "]", e);
@@ -141,7 +169,14 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
141169
} else if (item.request() instanceof DeleteRequest) {
142170
DeleteRequest deleteRequest = (DeleteRequest) item.request();
143171
try {
144-
ops[i] = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version()).origin(Engine.Operation.Origin.PRIMARY);
172+
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version()).origin(Engine.Operation.Origin.PRIMARY);
173+
indexShard.delete(delete);
174+
// update the request with teh version so it will go to the replicas
175+
deleteRequest.version(delete.version());
176+
177+
// add the response
178+
responses[i] = new BulkItemResponse(item.id(), "delete",
179+
new DeleteResponse(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), delete.version(), delete.notFound()));
145180
} catch (Exception e) {
146181
if (logger.isDebugEnabled()) {
147182
logger.debug("[" + shardRequest.request.index() + "][" + shardRequest.shardId + "]" + ": Failed to execute bulk item (delete) [" + deleteRequest + "]", e);
@@ -152,61 +187,11 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
152187
}
153188
}
154189

155-
EngineException[] failures = indexShard.bulk(new Engine.Bulk(ops).refresh(request.refresh()));
156-
// process failures and mappings
157-
Set<String> processedTypes = Sets.newHashSet();
158-
for (int i = 0; i < ops.length; i++) {
159-
// failed to parse, already set the failure, skip
160-
if (ops[i] == null) {
161-
continue;
162-
}
163-
164-
BulkItemRequest item = request.items()[i];
165-
if (item.request() instanceof IndexRequest) {
166-
IndexRequest indexRequest = (IndexRequest) item.request();
167-
long version;
168-
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
169-
Engine.Index engineIndex = (Engine.Index) ops[i];
170-
version = engineIndex.version();
171-
if (!processedTypes.contains(engineIndex.type())) {
172-
processedTypes.add(engineIndex.type());
173-
ParsedDocument doc = engineIndex.parsedDoc();
174-
if (doc.mappersAdded()) {
175-
updateMappingOnMaster(indexRequest);
176-
}
177-
}
178-
} else {
179-
Engine.Create engineCreate = (Engine.Create) ops[i];
180-
version = engineCreate.version();
181-
if (!processedTypes.contains(engineCreate.type())) {
182-
processedTypes.add(engineCreate.type());
183-
ParsedDocument doc = engineCreate.parsedDoc();
184-
if (doc.mappersAdded()) {
185-
updateMappingOnMaster(indexRequest);
186-
}
187-
}
188-
}
189-
// update the version on request so it will happen on the replicas
190-
indexRequest.version(version);
191-
if (failures != null && failures[i] != null) {
192-
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().toString().toLowerCase(),
193-
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), ExceptionsHelper.detailedMessage(failures[i])));
194-
} else {
195-
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().toString().toLowerCase(),
196-
new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id(), version));
197-
}
198-
} else if (item.request() instanceof DeleteRequest) {
199-
DeleteRequest deleteRequest = (DeleteRequest) item.request();
200-
Engine.Delete engineDelete = (Engine.Delete) ops[i];
201-
// update the version on request so it will happen on the replicas
202-
deleteRequest.version(engineDelete.version());
203-
if (failures != null && failures[i] != null) {
204-
responses[i] = new BulkItemResponse(item.id(), "delete",
205-
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), ExceptionsHelper.detailedMessage(failures[i])));
206-
} else {
207-
responses[i] = new BulkItemResponse(item.id(), "delete",
208-
new DeleteResponse(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), engineDelete.version(), engineDelete.notFound()));
209-
}
190+
if (request.refresh()) {
191+
try {
192+
indexShard.refresh(new Engine.Refresh(false));
193+
} catch (Exception e) {
194+
// ignore
210195
}
211196
}
212197
BulkShardResponse response = new BulkShardResponse(new ShardId(request.index(), request.shardId()), responses);
@@ -223,6 +208,9 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
223208
// failure, continue
224209
continue;
225210
}
211+
if (ops[i] == null) {
212+
continue; // failed
213+
}
226214
if (itemRequest.request() instanceof IndexRequest) {
227215
IndexRequest indexRequest = (IndexRequest) itemRequest.request();
228216
if (!Strings.hasLength(indexRequest.percolate())) {
@@ -247,33 +235,41 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
247235
@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
248236
IndexShard indexShard = indexShard(shardRequest);
249237
final BulkShardRequest request = shardRequest.request;
250-
Engine.Operation[] ops = new Engine.Operation[request.items().length];
251-
for (int i = 0; i < ops.length; i++) {
238+
for (int i = 0; i < request.items().length; i++) {
252239
BulkItemRequest item = request.items()[i];
253240
if (item.request() instanceof IndexRequest) {
254241
IndexRequest indexRequest = (IndexRequest) item.request();
255242
try {
256243
SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id())
257244
.routing(indexRequest.routing()).parent(indexRequest.parent());
258245
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
259-
ops[i] = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.REPLICA);
246+
Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.REPLICA);
247+
indexShard.index(index);
260248
} else {
261-
ops[i] = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.REPLICA);
249+
Engine.Create create = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.REPLICA);
250+
indexShard.create(create);
262251
}
263252
} catch (Exception e) {
264253
// ignore, we are on backup
265254
}
266255
} else if (item.request() instanceof DeleteRequest) {
267256
DeleteRequest deleteRequest = (DeleteRequest) item.request();
268257
try {
269-
ops[i] = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version()).origin(Engine.Operation.Origin.REPLICA);
258+
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version()).origin(Engine.Operation.Origin.REPLICA);
259+
indexShard.delete(delete);
270260
} catch (Exception e) {
271261
// ignore, we are on backup
272262
}
273263
}
274264
}
275265

276-
indexShard.bulk(new Engine.Bulk(ops).refresh(request.refresh()));
266+
if (request.refresh()) {
267+
try {
268+
indexShard.refresh(new Engine.Refresh(false));
269+
} catch (Exception e) {
270+
// ignore
271+
}
272+
}
277273
}
278274

279275
private void updateMappingOnMaster(final IndexRequest request) {

modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
5454
*/
5555
void start() throws EngineException;
5656

57-
EngineException[] bulk(Bulk bulk) throws EngineException;
58-
5957
void create(Create create) throws EngineException;
6058

6159
void index(Index index) throws EngineException;
@@ -264,28 +262,6 @@ static enum Origin {
264262
Origin origin();
265263
}
266264

267-
static class Bulk {
268-
private final Operation[] ops;
269-
private boolean refresh;
270-
271-
public Bulk(Operation[] ops) {
272-
this.ops = ops;
273-
}
274-
275-
public Operation[] ops() {
276-
return this.ops;
277-
}
278-
279-
public boolean refresh() {
280-
return refresh;
281-
}
282-
283-
public Bulk refresh(boolean refresh) {
284-
this.refresh = refresh;
285-
return this;
286-
}
287-
}
288-
289265
static class Create implements Operation {
290266
private final Term uid;
291267
private final ParsedDocument doc;

modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java

Lines changed: 0 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -219,77 +219,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
219219
return refreshInterval;
220220
}
221221

222-
@Override public EngineException[] bulk(Bulk bulk) throws EngineException {
223-
EngineException[] failures = null;
224-
rwl.readLock().lock();
225-
try {
226-
IndexWriter writer = this.indexWriter;
227-
if (writer == null) {
228-
throw new EngineClosedException(shardId);
229-
}
230-
for (int i = 0; i < bulk.ops().length; i++) {
231-
Operation op = bulk.ops()[i];
232-
if (op == null) {
233-
continue;
234-
}
235-
try {
236-
switch (op.opType()) {
237-
case CREATE:
238-
Create create = (Create) op;
239-
innerCreate(create, writer);
240-
break;
241-
case INDEX:
242-
Index index = (Index) op;
243-
innerIndex(index, writer);
244-
break;
245-
case DELETE:
246-
Delete delete = (Delete) op;
247-
innerDelete(delete, writer);
248-
break;
249-
}
250-
} catch (Exception e) {
251-
if (failures == null) {
252-
failures = new EngineException[bulk.ops().length];
253-
}
254-
switch (op.opType()) {
255-
case CREATE:
256-
if (e instanceof EngineException) {
257-
failures[i] = (EngineException) e;
258-
} else {
259-
failures[i] = new CreateFailedEngineException(shardId, (Create) op, e);
260-
}
261-
break;
262-
case INDEX:
263-
if (e instanceof EngineException) {
264-
failures[i] = (EngineException) e;
265-
} else {
266-
failures[i] = new IndexFailedEngineException(shardId, (Index) op, e);
267-
}
268-
break;
269-
case DELETE:
270-
if (e instanceof EngineException) {
271-
failures[i] = (EngineException) e;
272-
} else {
273-
failures[i] = new DeleteFailedEngineException(shardId, (Delete) op, e);
274-
}
275-
break;
276-
}
277-
}
278-
}
279-
dirty = true;
280-
if (bulk.refresh()) {
281-
try {
282-
refresh(new Refresh(false));
283-
} catch (Exception e) {
284-
//ignore
285-
}
286-
}
287-
} finally {
288-
rwl.readLock().unlock();
289-
}
290-
return failures;
291-
}
292-
293222
@Override public void create(Create create) throws EngineException {
294223
rwl.readLock().lock();
295224
try {

modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,6 @@ public interface IndexShard extends IndexShardComponent {
6262

6363
void delete(Engine.Delete delete) throws ElasticSearchException;
6464

65-
EngineException[] bulk(Engine.Bulk bulk) throws ElasticSearchException;
66-
6765
void deleteByQuery(byte[] querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException;
6866

6967
byte[] get(String type, String id) throws ElasticSearchException;

modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -303,42 +303,6 @@ public InternalIndexShard start(String reason) throws IndexShardStartedException
303303
engine.delete(delete);
304304
}
305305

306-
@Override public EngineException[] bulk(Engine.Bulk bulk) throws ElasticSearchException {
307-
writeAllowed();
308-
if (listeners != null) {
309-
for (int i = 0; i < bulk.ops().length; i++) {
310-
Engine.Operation op = bulk.ops()[i];
311-
if (op == null) {
312-
continue;
313-
}
314-
switch (op.opType()) {
315-
case CREATE:
316-
Engine.Create create = (Engine.Create) op;
317-
for (OperationListener listener : listeners) {
318-
bulk.ops()[i] = listener.beforeCreate(create);
319-
}
320-
break;
321-
case INDEX:
322-
Engine.Index index = (Engine.Index) op;
323-
for (OperationListener listener : listeners) {
324-
bulk.ops()[i] = listener.beforeIndex(index);
325-
}
326-
break;
327-
case DELETE:
328-
Engine.Delete delete = (Engine.Delete) op;
329-
for (OperationListener listener : listeners) {
330-
bulk.ops()[i] = listener.beforeDelete(delete);
331-
}
332-
break;
333-
}
334-
}
335-
}
336-
if (logger.isTraceEnabled()) {
337-
logger.trace("bulk, items [{}]", bulk.ops().length);
338-
}
339-
return engine.bulk(bulk);
340-
}
341-
342306
@Override public void deleteByQuery(byte[] querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException {
343307
writeAllowed();
344308
if (types == null) {

0 commit comments

Comments
 (0)