Skip to content

Commit 08648ec

Browse files
committed
Transport Client: Adding more nodes causes more scheduled reconnect tasks, closes elastic#1062.
1 parent 40bbb87 commit 08648ec

File tree

1 file changed

+30
-20
lines changed

1 file changed

+30
-20
lines changed

modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@
3333
import org.elasticsearch.common.transport.TransportAddress;
3434
import org.elasticsearch.common.unit.TimeValue;
3535
import org.elasticsearch.threadpool.ThreadPool;
36-
import org.elasticsearch.transport.*;
36+
import org.elasticsearch.transport.BaseTransportResponseHandler;
37+
import org.elasticsearch.transport.ConnectTransportException;
38+
import org.elasticsearch.transport.FutureTransportResponseHandler;
39+
import org.elasticsearch.transport.TransportException;
40+
import org.elasticsearch.transport.TransportService;
3741

3842
import java.util.HashSet;
3943
import java.util.Iterator;
@@ -66,7 +70,7 @@ public class TransportClientNodesService extends AbstractComponent {
6670

6771
private final AtomicInteger tempNodeIdGenerator = new AtomicInteger();
6872

69-
private final Runnable nodesSampler;
73+
private final NodeSampler nodesSampler;
7074

7175
private volatile ScheduledFuture nodesSamplerFuture;
7276

@@ -88,11 +92,11 @@ public class TransportClientNodesService extends AbstractComponent {
8892
}
8993

9094
if (componentSettings.getAsBoolean("sniff", false)) {
91-
this.nodesSampler = new ScheduledSniffNodesSampler();
95+
this.nodesSampler = new SniffNodesSampler();
9296
} else {
93-
this.nodesSampler = new ScheduledConnectNodeSampler();
97+
this.nodesSampler = new SimpleNodeSampler();
9498
}
95-
this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.CACHED, nodesSampler);
99+
this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.CACHED, new ScheduledNodeSampler());
96100

97101
// we want the transport service to throw connect exceptions, so we can retry
98102
transportService.throwConnectException(true);
@@ -115,7 +119,7 @@ public TransportClientNodesService addTransportAddress(TransportAddress transpor
115119
ImmutableList.Builder<DiscoveryNode> builder = ImmutableList.builder();
116120
listedNodes = builder.addAll(listedNodes).add(new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress)).build();
117121
}
118-
nodesSampler.run();
122+
nodesSampler.sample();
119123
return this;
120124
}
121125

@@ -129,7 +133,7 @@ public TransportClientNodesService removeTransportAddress(TransportAddress trans
129133
}
130134
listedNodes = builder.build();
131135
}
132-
nodesSampler.run();
136+
nodesSampler.sample();
133137
return this;
134138
}
135139

@@ -157,8 +161,22 @@ public void close() {
157161
transportService.disconnectFromNode(listedNode);
158162
}
159163

160-
private class ScheduledConnectNodeSampler implements Runnable {
161-
@Override public synchronized void run() {
164+
interface NodeSampler {
165+
void sample();
166+
}
167+
168+
class ScheduledNodeSampler implements Runnable {
169+
@Override public void run() {
170+
nodesSampler.sample();
171+
if (!closed) {
172+
nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.CACHED, this);
173+
}
174+
}
175+
}
176+
177+
class SimpleNodeSampler implements NodeSampler {
178+
179+
@Override public synchronized void sample() {
162180
if (closed) {
163181
return;
164182
}
@@ -188,16 +206,12 @@ private class ScheduledConnectNodeSampler implements Runnable {
188206
}
189207
}
190208
nodes = new ImmutableList.Builder<DiscoveryNode>().addAll(newNodes).build();
191-
192-
if (!closed) {
193-
nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.CACHED, this);
194-
}
195209
}
196210
}
197211

198-
private class ScheduledSniffNodesSampler implements Runnable {
212+
class SniffNodesSampler implements NodeSampler {
199213

200-
@Override public synchronized void run() {
214+
@Override public synchronized void sample() {
201215
if (closed) {
202216
return;
203217
}
@@ -256,7 +270,7 @@ private class ScheduledSniffNodesSampler implements Runnable {
256270
}
257271
}
258272
// now, make sure we are connected to all the updated nodes
259-
for (Iterator<DiscoveryNode> it = newNodes.iterator(); it.hasNext();) {
273+
for (Iterator<DiscoveryNode> it = newNodes.iterator(); it.hasNext(); ) {
260274
DiscoveryNode node = it.next();
261275
try {
262276
transportService.connectToNode(node);
@@ -266,10 +280,6 @@ private class ScheduledSniffNodesSampler implements Runnable {
266280
}
267281
}
268282
nodes = new ImmutableList.Builder<DiscoveryNode>().addAll(newNodes).build();
269-
270-
if (!closed) {
271-
nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.CACHED, this);
272-
}
273283
}
274284
}
275285

0 commit comments

Comments
 (0)