Skip to content

Commit 072fcaa

Browse files
committed
Bulk UDP, closes elastic#2201
A Bulk UDP service is a service listening over UDP for bulk format requests. The idea is to provide a low latency UDP service that allows to easily index data that is not of critical nature. The Bulk UDP service is disabled by default, but can be enabled by setting `bulk.udp.enabled` to `true`. The bulk UDP service performs intenral bulk aggregation of the data and then flushes it based on several parametres: * `bulk.udp.bulk_actions`: The number of actions to flush a bulk after, defaults to `1000`. * `bulk.udp.bulk_size`: The size of the current bulk request to flush the request once exceeded, defaults to `5mb`. * `bulk.udp.flush_interval`: An interval after which the current request is flushed, regarldess of the above limits. Defaults to `5s`. * `bulk.udp.concurrent_requests`: The number on max in flight bulk requests allowed. Defaults to `4`. The network settings allowed are: * `bulk.udp.host`: The host to bind to, defualts to `network.host` which defaults to any. * `bulk.udp.port`: The port to use, defaults to `9700-9800`. Here is an example of how it can be used: > cat bulk.txt { "index" : { "_index" : "test", "_type" : "type1" } } { "field1" : "value1" } { "index" : { "_index" : "test", "_type" : "type1" } } { "field1" : "value1" } > cat bulk.txt | nc -w 0 -u localhost 9700
1 parent ba98b04 commit 072fcaa

File tree

3 files changed

+260
-1
lines changed

3 files changed

+260
-1
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to ElasticSearch and Shay Banon under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. ElasticSearch licenses this
6+
* file to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.bulk.udp;
21+
22+
import org.elasticsearch.common.inject.AbstractModule;
23+
24+
/**
25+
*/
26+
public class BulkUdpModule extends AbstractModule {
27+
28+
@Override
29+
protected void configure() {
30+
bind(BulkUdpService.class).asEagerSingleton();
31+
}
32+
}
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
/*
2+
* Licensed to ElasticSearch and Shay Banon under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. ElasticSearch licenses this
6+
* file to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.bulk.udp;
21+
22+
import org.elasticsearch.ElasticSearchException;
23+
import org.elasticsearch.action.bulk.BulkProcessor;
24+
import org.elasticsearch.action.bulk.BulkRequest;
25+
import org.elasticsearch.action.bulk.BulkResponse;
26+
import org.elasticsearch.client.Client;
27+
import org.elasticsearch.common.bytes.ChannelBufferBytesReference;
28+
import org.elasticsearch.common.component.AbstractLifecycleComponent;
29+
import org.elasticsearch.common.inject.Inject;
30+
import org.elasticsearch.common.network.NetworkService;
31+
import org.elasticsearch.common.settings.Settings;
32+
import org.elasticsearch.common.transport.PortsRange;
33+
import org.elasticsearch.common.unit.ByteSizeUnit;
34+
import org.elasticsearch.common.unit.ByteSizeValue;
35+
import org.elasticsearch.common.unit.TimeValue;
36+
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
37+
import org.jboss.netty.buffer.ChannelBuffer;
38+
import org.jboss.netty.channel.*;
39+
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
40+
41+
import java.io.IOException;
42+
import java.net.BindException;
43+
import java.net.InetAddress;
44+
import java.net.InetSocketAddress;
45+
import java.util.concurrent.Executors;
46+
import java.util.concurrent.atomic.AtomicReference;
47+
48+
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
49+
50+
/**
51+
*/
52+
public class BulkUdpService extends AbstractLifecycleComponent<BulkUdpService> {
53+
54+
private final Client client;
55+
private final NetworkService networkService;
56+
57+
private final boolean enabled;
58+
59+
final String host;
60+
final String port;
61+
62+
final ByteSizeValue receiveBufferSize;
63+
final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory;
64+
final int bulkActions;
65+
final ByteSizeValue bulkSize;
66+
final TimeValue flushInterval;
67+
final int concurrentRequests;
68+
69+
private BulkProcessor bulkProcessor;
70+
private ConnectionlessBootstrap bootstrap;
71+
private Channel channel;
72+
73+
@Inject
74+
public BulkUdpService(Settings settings, Client client, NetworkService networkService) {
75+
super(settings);
76+
this.client = client;
77+
this.networkService = networkService;
78+
79+
this.host = componentSettings.get("host");
80+
this.port = componentSettings.get("port", "9700-9800");
81+
82+
this.bulkActions = componentSettings.getAsInt("bulk_actions", 1000);
83+
this.bulkSize = componentSettings.getAsBytesSize("bulk_size", new ByteSizeValue(5, ByteSizeUnit.MB));
84+
this.flushInterval = componentSettings.getAsTime("flush_interval", TimeValue.timeValueSeconds(5));
85+
this.concurrentRequests = componentSettings.getAsInt("concurrent_requests", 4);
86+
87+
this.receiveBufferSize = componentSettings.getAsBytesSize("receive_buffer_size", new ByteSizeValue(10, ByteSizeUnit.MB));
88+
this.receiveBufferSizePredictorFactory = new FixedReceiveBufferSizePredictorFactory(componentSettings.getAsBytesSize("receive_predictor_size", receiveBufferSize).bytesAsInt());
89+
90+
this.enabled = componentSettings.getAsBoolean("enabled", false);
91+
92+
logger.debug("using enabled [{}], host [{}], port [{}], bulk_actions [{}], bulk_size [{}], flush_interval [{}], concurrent_requests [{}]",
93+
enabled, host, port, bulkActions, bulkSize, flushInterval, concurrentRequests);
94+
}
95+
96+
@Override
97+
protected void doStart() throws ElasticSearchException {
98+
if (!enabled) {
99+
return;
100+
}
101+
bulkProcessor = BulkProcessor.builder(client, new BulkListener())
102+
.setBulkActions(bulkActions)
103+
.setBulkSize(bulkSize)
104+
.setFlushInterval(flushInterval)
105+
.setConcurrentRequests(concurrentRequests)
106+
.build();
107+
108+
109+
bootstrap = new ConnectionlessBootstrap(new NioDatagramChannelFactory(Executors.newCachedThreadPool(daemonThreadFactory(settings, "bulk_udp_worker"))));
110+
111+
bootstrap.setOption("receiveBufferSize", receiveBufferSize.bytesAsInt());
112+
bootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
113+
114+
// Enable broadcast
115+
bootstrap.setOption("broadcast", "false");
116+
117+
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
118+
@Override
119+
public ChannelPipeline getPipeline() throws Exception {
120+
return Channels.pipeline(new Handler());
121+
}
122+
});
123+
124+
125+
InetAddress hostAddressX;
126+
try {
127+
hostAddressX = networkService.resolveBindHostAddress(host);
128+
} catch (IOException e) {
129+
logger.warn("failed to resolve host {}", e, host);
130+
return;
131+
}
132+
final InetAddress hostAddress = hostAddressX;
133+
134+
PortsRange portsRange = new PortsRange(port);
135+
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
136+
boolean success = portsRange.iterate(new PortsRange.PortCallback() {
137+
@Override
138+
public boolean onPortNumber(int portNumber) {
139+
try {
140+
channel = bootstrap.bind(new InetSocketAddress(hostAddress, portNumber));
141+
} catch (Exception e) {
142+
lastException.set(e);
143+
return false;
144+
}
145+
return true;
146+
}
147+
});
148+
if (!success) {
149+
logger.warn("failed to bind to {}/{}", lastException.get(), hostAddress, port);
150+
return;
151+
}
152+
153+
logger.info("address {}", channel.getLocalAddress());
154+
}
155+
156+
@Override
157+
protected void doStop() throws ElasticSearchException {
158+
if (!enabled) {
159+
return;
160+
}
161+
if (channel != null) {
162+
channel.close().awaitUninterruptibly();
163+
}
164+
if (bootstrap != null) {
165+
bootstrap.releaseExternalResources();
166+
}
167+
bulkProcessor.close();
168+
}
169+
170+
@Override
171+
protected void doClose() throws ElasticSearchException {
172+
}
173+
174+
class Handler extends SimpleChannelUpstreamHandler {
175+
176+
@Override
177+
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
178+
ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
179+
try {
180+
bulkProcessor.add(new ChannelBufferBytesReference(buffer), false, null, null);
181+
} catch (Exception e1) {
182+
logger.warn("failed to execute bulk request", e1);
183+
}
184+
}
185+
186+
@Override
187+
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
188+
if (e.getCause() instanceof BindException) {
189+
// ignore, this happens when we retry binding to several ports, its fine if we fail...
190+
return;
191+
}
192+
logger.warn("failure caught", e.getCause());
193+
}
194+
}
195+
196+
class BulkListener implements BulkProcessor.Listener {
197+
198+
@Override
199+
public void beforeBulk(long executionId, BulkRequest request) {
200+
if (logger.isTraceEnabled()) {
201+
logger.trace("[{}] executing [{}]/[{}]", executionId, request.numberOfActions(), new ByteSizeValue(request.estimatedSizeInBytes()));
202+
}
203+
}
204+
205+
@Override
206+
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
207+
if (logger.isTraceEnabled()) {
208+
logger.trace("[{}] executed [{}]/[{}], took [{}]", executionId, request.numberOfActions(), new ByteSizeValue(request.estimatedSizeInBytes()), response.took());
209+
}
210+
if (response.hasFailures()) {
211+
logger.warn("[{}] failed to execute bulk request: {}", executionId, response.buildFailureMessage());
212+
}
213+
}
214+
215+
@Override
216+
public void afterBulk(long executionId, BulkRequest request, Throwable e) {
217+
logger.warn("[{}] failed to execute bulk request", e, executionId);
218+
}
219+
}
220+
}

src/main/java/org/elasticsearch/node/internal/InternalNode.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.elasticsearch.ElasticSearchException;
2323
import org.elasticsearch.Version;
2424
import org.elasticsearch.action.ActionModule;
25+
import org.elasticsearch.bulk.udp.BulkUdpModule;
26+
import org.elasticsearch.bulk.udp.BulkUdpService;
2527
import org.elasticsearch.cache.NodeCache;
2628
import org.elasticsearch.cache.NodeCacheModule;
2729
import org.elasticsearch.client.Client;
@@ -148,6 +150,7 @@ public InternalNode(Settings pSettings, boolean loadConfigSettings) throws Elast
148150
modules.add(new MonitorModule(settings));
149151
modules.add(new GatewayModule(settings));
150152
modules.add(new NodeClientModule());
153+
modules.add(new BulkUdpModule());
151154

152155
injector = modules.createInjector();
153156

@@ -197,6 +200,7 @@ public Node start() {
197200
if (settings.getAsBoolean("http.enabled", true)) {
198201
injector.getInstance(HttpServer.class).start();
199202
}
203+
injector.getInstance(BulkUdpService.class).start();
200204
injector.getInstance(JmxService.class).connectAndRegister(discoService.nodeDescription(), injector.getInstance(NetworkService.class));
201205

202206
logger.info("{{}}[{}]: started", Version.CURRENT, JvmInfo.jvmInfo().pid());
@@ -212,6 +216,7 @@ public Node stop() {
212216
ESLogger logger = Loggers.getLogger(Node.class, settings.get("name"));
213217
logger.info("{{}}[{}]: stopping ...", Version.CURRENT, JvmInfo.jvmInfo().pid());
214218

219+
injector.getInstance(BulkUdpService.class).stop();
215220
if (settings.getAsBoolean("http.enabled", true)) {
216221
injector.getInstance(HttpServer.class).stop();
217222
}
@@ -261,7 +266,9 @@ public void close() {
261266
logger.info("{{}}[{}]: closing ...", Version.CURRENT, JvmInfo.jvmInfo().pid());
262267

263268
StopWatch stopWatch = new StopWatch("node_close");
264-
stopWatch.start("http");
269+
stopWatch.start("bulk.udp");
270+
injector.getInstance(BulkUdpService.class).close();
271+
stopWatch.stop().start("http");
265272
if (settings.getAsBoolean("http.enabled", true)) {
266273
injector.getInstance(HttpServer.class).close();
267274
}

0 commit comments

Comments
 (0)