Using Bulk Processor
editUsing Bulk Processor
editThe BulkProcessor class offers a simple interface to flush bulk operations automatically based on the number or size
of requests, or after a given period.
To use it, first create a BulkProcessor instance:
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
BulkProcessor bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) { ... }
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) { ... }
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) { ... }
})
.setBulkActions(10000)
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(1)
.build();
|
Add your elasticsearch client |
|
|
This method is called just before bulk is executed. You can for example see the numberOfActions with
|
|
|
This method is called after bulk execution. You can for example check if there was some failing requests
with |
|
|
This method is called when the bulk failed and raised a |
|
|
We want to execute the bulk every 10 000 requests |
|
|
We want to flush the bulk every 1gb |
|
|
We want to flush the bulk every 5 seconds whatever the number of requests |
|
|
Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests. |
Then you can simply add your requests to the BulkProcessor:
bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
By default, BulkProcessor:
-
sets bulkActions to
1000 -
sets bulkSize to
5mb - does not set flushInterval
- sets concurrentRequests to 1
When all documents are loaded to the BulkProcessor it can be closed by using awaitClose or close methods:
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
or
bulkProcessor.close();
Both methods flush any remaining documents and disable all other scheduled flushes if they were scheduled by setting
flushInterval. If concurrent requests were enabled the awaitClose method waits for up to the specified timeout for
all bulk requests to complete then returns true, if the specified waiting time elapses before all bulk requests complete,
false is returned. The close method doesn’t wait for any remaining bulk requests to complete and exits immediately.