Skip to content

Commit f317399

Browse files
committed
allow to flush bulk processor based on time, and better listener api
1 parent ab49a8c commit f317399

File tree

5 files changed

+153
-15
lines changed

5 files changed

+153
-15
lines changed

src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java

Lines changed: 131 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,48 +19,84 @@
1919

2020
package org.elasticsearch.action.bulk;
2121

22+
import org.elasticsearch.ElasticSearchIllegalStateException;
2223
import org.elasticsearch.action.ActionListener;
2324
import org.elasticsearch.action.ActionRequest;
2425
import org.elasticsearch.action.delete.DeleteRequest;
2526
import org.elasticsearch.action.index.IndexRequest;
2627
import org.elasticsearch.client.Client;
28+
import org.elasticsearch.client.internal.InternalClient;
2729
import org.elasticsearch.common.Nullable;
2830
import org.elasticsearch.common.bytes.BytesReference;
2931
import org.elasticsearch.common.unit.ByteSizeUnit;
3032
import org.elasticsearch.common.unit.ByteSizeValue;
33+
import org.elasticsearch.common.unit.TimeValue;
34+
import org.elasticsearch.common.util.concurrent.EsExecutors;
3135

32-
import java.util.concurrent.Semaphore;
36+
import java.util.concurrent.*;
37+
import java.util.concurrent.atomic.AtomicLong;
3338

3439
/**
3540
* A bulk processor is a thread safe bulk processing class, allowing to easily set when to "flush" a new bulk request
36-
* (either based on number of actions, or based on the size), and to easily control the number of concurrent bulk
41+
* (either based on number of actions, based on the size, or time), and to easily control the number of concurrent bulk
3742
* requests allowed to be executed in parallel.
3843
* <p/>
3944
* In order to create a new bulk processor, use the {@link Builder}.
4045
*/
4146
public class BulkProcessor {
4247

48+
/**
49+
* A listener for the execution.
50+
*/
51+
public static interface Listener {
52+
53+
/**
54+
* Callback before the bulk is executed.
55+
*/
56+
void beforeBulk(long executionId, BulkRequest request);
57+
58+
/**
59+
* Callback after a successful execution of bulk request.
60+
*/
61+
void afterBulk(long executionId, BulkRequest request, BulkResponse response);
62+
63+
/**
64+
* Callback after a failed execution of bulk request.
65+
*/
66+
void afterBulk(long executionId, BulkRequest request, Throwable failure);
67+
}
68+
4369
/**
4470
* A builder used to create a build an instance of a bulk processor.
4571
*/
4672
public static class Builder {
4773

4874
private final Client client;
49-
private final ActionListener<BulkResponse> listener;
75+
private final Listener listener;
5076

77+
private String name;
5178
private int concurrentRequests = 1;
5279
private int bulkActions = 1000;
5380
private ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB);
81+
private TimeValue flushInterval = null;
5482

5583
/**
5684
* Creates a builder of bulk processor with the client to use and the listener that will be used
5785
* to be notified on the completion of bulk requests.
5886
*/
59-
public Builder(Client client, ActionListener<BulkResponse> listener) {
87+
public Builder(Client client, Listener listener) {
6088
this.client = client;
6189
this.listener = listener;
6290
}
6391

92+
/**
93+
* Sets an optional name to identify this bulk processor.
94+
*/
95+
public Builder setName(String name) {
96+
this.name = name;
97+
return this;
98+
}
99+
64100
/**
65101
* Sets the number of concurrent requests allowed to be executed. A value of 0 means that only a single
66102
* request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed
@@ -89,38 +125,87 @@ public Builder setBulkSize(ByteSizeValue bulkSize) {
89125
return this;
90126
}
91127

128+
/**
129+
* Sets a flush interval flushing *any* bulk actions pending if the interval passes. Defaults to not set.
130+
* <p/>
131+
* Note, both {@link #setBulkActions(int)} and {@link #setBulkSize(org.elasticsearch.common.unit.ByteSizeValue)}
132+
* can be set to <tt>-1</tt> with the flush interval set allowing for complete async processing of bulk actions.
133+
*/
134+
public Builder setFlushInterval(TimeValue flushInterval) {
135+
this.flushInterval = flushInterval;
136+
return this;
137+
}
138+
92139
/**
93140
* Builds a new bulk processor.
94141
*/
95142
public BulkProcessor build() {
96-
return new BulkProcessor(client, listener, concurrentRequests, bulkActions, bulkSize);
143+
return new BulkProcessor(client, listener, name, concurrentRequests, bulkActions, bulkSize, flushInterval);
97144
}
98145
}
99146

100-
public static Builder builder(Client client, ActionListener<BulkResponse> listener) {
147+
public static Builder builder(Client client, Listener listener) {
101148
return new Builder(client, listener);
102149
}
103150

104151
private final Client client;
105-
private final ActionListener<BulkResponse> listener;
152+
private final Listener listener;
153+
154+
private final String name;
106155

107-
private int concurrentRequests;
156+
private final int concurrentRequests;
108157
private final int bulkActions;
109158
private final int bulkSize;
159+
private final TimeValue flushInterval;
110160

111161
private final Semaphore semaphore;
162+
private final ScheduledThreadPoolExecutor scheduler;
163+
private final ScheduledFuture scheduledFuture;
164+
165+
private final AtomicLong executionIdGen = new AtomicLong();
112166

113167
private BulkRequest bulkRequest;
114168

115-
BulkProcessor(Client client, ActionListener<BulkResponse> listener, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize) {
169+
private volatile boolean closed = false;
170+
171+
BulkProcessor(Client client, Listener listener, @Nullable String name, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval) {
116172
this.client = client;
117173
this.listener = listener;
174+
this.name = name;
118175
this.concurrentRequests = concurrentRequests;
119176
this.bulkActions = bulkActions;
120177
this.bulkSize = bulkSize.bytesAsInt();
121178

122179
this.semaphore = new Semaphore(concurrentRequests);
123180
this.bulkRequest = new BulkRequest();
181+
182+
this.flushInterval = flushInterval;
183+
if (flushInterval != null) {
184+
this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(((InternalClient) client).settings(), (name != null ? "[" + name + "]" : "") + "bulk_processor"));
185+
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
186+
this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
187+
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(new Flush(), flushInterval.millis(), flushInterval.millis(), TimeUnit.MILLISECONDS);
188+
} else {
189+
this.scheduler = null;
190+
this.scheduledFuture = null;
191+
}
192+
}
193+
194+
/**
195+
* Closes the processor. If flushing by time is enabled, then its shutdown. Any remaining bulk actions are flushed.
196+
*/
197+
public synchronized void close() {
198+
if (closed) {
199+
return;
200+
}
201+
closed = true;
202+
if (this.scheduledFuture != null) {
203+
this.scheduledFuture.cancel(false);
204+
this.scheduler.shutdown();
205+
}
206+
if (bulkRequest.numberOfActions() > 0) {
207+
execute();
208+
}
124209
}
125210

126211
/**
@@ -155,28 +240,44 @@ public synchronized BulkProcessor add(BytesReference data, boolean contentUnsafe
155240
}
156241

157242
private void executeIfNeeded() {
243+
if (closed) {
244+
throw new ElasticSearchIllegalStateException("bulk process already closed");
245+
}
246+
this.closed = true;
158247
if (!isOverTheLimit()) {
159248
return;
160249
}
250+
execute();
251+
}
252+
253+
// (currently) needs to be executed under a lock
254+
private void execute() {
255+
final BulkRequest bulkRequest = this.bulkRequest;
256+
final long executionId = executionIdGen.incrementAndGet();
257+
258+
this.bulkRequest = new BulkRequest();
259+
161260
if (concurrentRequests == 0) {
162261
// execute in a blocking fashion...
163262
try {
164-
listener.onResponse(client.bulk(bulkRequest).actionGet());
263+
listener.beforeBulk(executionId, bulkRequest);
264+
listener.afterBulk(executionId, bulkRequest, client.bulk(bulkRequest).actionGet());
165265
} catch (Exception e) {
166-
listener.onFailure(e);
266+
listener.afterBulk(executionId, bulkRequest, e);
167267
}
168268
} else {
169269
try {
170270
semaphore.acquire();
171271
} catch (InterruptedException e) {
172-
listener.onFailure(e);
272+
listener.afterBulk(executionId, bulkRequest, e);
173273
return;
174274
}
275+
listener.beforeBulk(executionId, bulkRequest);
175276
client.bulk(bulkRequest, new ActionListener<BulkResponse>() {
176277
@Override
177278
public void onResponse(BulkResponse response) {
178279
try {
179-
listener.onResponse(response);
280+
listener.afterBulk(executionId, bulkRequest, response);
180281
} finally {
181282
semaphore.release();
182283
}
@@ -185,14 +286,13 @@ public void onResponse(BulkResponse response) {
185286
@Override
186287
public void onFailure(Throwable e) {
187288
try {
188-
listener.onFailure(e);
289+
listener.afterBulk(executionId, bulkRequest, e);
189290
} finally {
190291
semaphore.release();
191292
}
192293
}
193294
});
194295
}
195-
bulkRequest = new BulkRequest();
196296
}
197297

198298
private boolean isOverTheLimit() {
@@ -204,4 +304,20 @@ private boolean isOverTheLimit() {
204304
}
205305
return false;
206306
}
307+
308+
class Flush implements Runnable {
309+
310+
@Override
311+
public void run() {
312+
synchronized (BulkProcessor.this) {
313+
if (closed) {
314+
return;
315+
}
316+
if (bulkRequest.numberOfActions() == 0) {
317+
return;
318+
}
319+
execute();
320+
}
321+
}
322+
}
207323
}

src/main/java/org/elasticsearch/client/internal/InternalClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@
2020
package org.elasticsearch.client.internal;
2121

2222
import org.elasticsearch.client.Client;
23+
import org.elasticsearch.common.settings.Settings;
2324
import org.elasticsearch.threadpool.ThreadPool;
2425

2526
/**
2627
*
2728
*/
2829
public interface InternalClient extends Client {
2930

31+
Settings settings();
32+
3033
ThreadPool threadPool();
3134
}

src/main/java/org/elasticsearch/client/node/NodeClient.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
*/
3838
public class NodeClient extends AbstractClient implements InternalClient {
3939

40+
private final Settings settings;
4041
private final ThreadPool threadPool;
4142

4243
private final NodeAdminClient admin;
@@ -45,6 +46,7 @@ public class NodeClient extends AbstractClient implements InternalClient {
4546

4647
@Inject
4748
public NodeClient(Settings settings, ThreadPool threadPool, NodeAdminClient admin, Map<GenericAction, TransportAction> actions) {
49+
this.settings = settings;
4850
this.threadPool = threadPool;
4951
this.admin = admin;
5052
MapBuilder<Action, TransportAction> actionsBuilder = new MapBuilder<Action, TransportAction>();
@@ -56,6 +58,11 @@ public NodeClient(Settings settings, ThreadPool threadPool, NodeAdminClient admi
5658
this.actions = actionsBuilder.immutableMap();
5759
}
5860

61+
@Override
62+
public Settings settings() {
63+
return this.settings;
64+
}
65+
5966
@Override
6067
public ThreadPool threadPool() {
6168
return this.threadPool;

src/main/java/org/elasticsearch/client/transport/TransportClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,11 @@ public void close() {
271271
ThreadLocals.clearReferencesThreadLocals();
272272
}
273273

274+
@Override
275+
public Settings settings() {
276+
return this.settings;
277+
}
278+
274279
@Override
275280
public ThreadPool threadPool() {
276281
return internalClient.threadPool();

src/main/java/org/elasticsearch/client/transport/support/InternalTransportClient.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
*/
4141
public class InternalTransportClient extends AbstractClient implements InternalClient {
4242

43+
private final Settings settings;
4344
private final ThreadPool threadPool;
4445

4546
private final TransportClientNodesService nodesService;
@@ -52,6 +53,7 @@ public class InternalTransportClient extends AbstractClient implements InternalC
5253
public InternalTransportClient(Settings settings, ThreadPool threadPool, TransportService transportService,
5354
TransportClientNodesService nodesService, InternalTransportAdminClient adminClient,
5455
Map<String, GenericAction> actions) {
56+
this.settings = settings;
5557
this.threadPool = threadPool;
5658
this.nodesService = nodesService;
5759
this.adminClient = adminClient;
@@ -70,6 +72,11 @@ public void close() {
7072
// nothing to do here
7173
}
7274

75+
@Override
76+
public Settings settings() {
77+
return this.settings;
78+
}
79+
7380
@Override
7481
public ThreadPool threadPool() {
7582
return this.threadPool;

0 commit comments

Comments
 (0)