Skip to content

Commit 01fe2be

Browse files
committed
improve threading usage on discovery and fd
1 parent 3178b84 commit 01fe2be

File tree

4 files changed

+36
-8
lines changed

4 files changed

+36
-8
lines changed

modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,9 +208,13 @@ private void handleTransportDisconnect(DiscoveryNode node) {
208208
}
209209

210210
private void notifyDisconnectedFromMaster() {
211-
for (Listener listener : listeners) {
212-
listener.onDisconnectedFromMaster();
213-
}
211+
threadPool.cached().execute(new Runnable() {
212+
@Override public void run() {
213+
for (Listener listener : listeners) {
214+
listener.onDisconnectedFromMaster();
215+
}
216+
}
217+
});
214218
}
215219

216220
private void notifyMasterFailure(final DiscoveryNode masterNode, final String reason) {
@@ -297,6 +301,10 @@ public void stop() {
297301
}
298302
}
299303
}
304+
305+
@Override public boolean spawn() {
306+
return false; // no need to spawn, we hardly do anything
307+
}
300308
});
301309
}
302310
}

modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,14 @@ private void handleTransportDisconnect(DiscoveryNode node) {
176176
}
177177
}
178178

179-
private void notifyNodeFailure(DiscoveryNode node, String reason) {
180-
for (Listener listener : listeners) {
181-
listener.onNodeFailure(node, reason);
182-
}
179+
private void notifyNodeFailure(final DiscoveryNode node, final String reason) {
180+
threadPool.cached().execute(new Runnable() {
181+
@Override public void run() {
182+
for (Listener listener : listeners) {
183+
listener.onNodeFailure(node, reason);
184+
}
185+
}
186+
});
183187
}
184188

185189
private class SendPingRequest implements Runnable {
@@ -232,6 +236,10 @@ private SendPingRequest(DiscoveryNode node) {
232236
}
233237
}
234238
}
239+
240+
@Override public boolean spawn() {
241+
return false; // no need to spawn, we hardly do anything
242+
}
235243
});
236244
}
237245
}

modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPing.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,10 @@ class MulticastPingResponseRequestHandler extends BaseTransportRequestHandler<Mu
262262
}
263263
channel.sendResponse(VoidStreamable.INSTANCE);
264264
}
265+
266+
@Override public boolean spawn() {
267+
return false;
268+
}
265269
}
266270

267271
static class MulticastPingResponse implements Streamable {
@@ -340,7 +344,7 @@ public void stop() {
340344

341345
if (!transportService.nodeConnected(requestingNode)) {
342346
// do the connect and send on a thread pool
343-
threadPool.execute(new Runnable() {
347+
threadPool.cached().execute(new Runnable() {
344348
@Override public void run() {
345349
// connect to the node if possible
346350
try {

modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,10 @@ private void sendPings(int id, TimeValue timeout, boolean wait) {
240240
logger.warn("failed to send ping to [{}]", exp, node);
241241
}
242242
}
243+
244+
@Override public boolean spawn() {
245+
return false;
246+
}
243247
});
244248
}
245249
if (wait) {
@@ -282,6 +286,10 @@ class UnicastPingRequestHandler extends BaseTransportRequestHandler<UnicastPingR
282286
@Override public void messageReceived(UnicastPingRequest request, TransportChannel channel) throws Exception {
283287
channel.sendResponse(handlePingRequest(request));
284288
}
289+
290+
@Override public boolean spawn() {
291+
return false;
292+
}
285293
}
286294

287295
static class UnicastPingRequest implements Streamable {

0 commit comments

Comments
 (0)