Skip to content

Commit 36ff6c9

Browse files
committed
add a lock not allowing connecting to nodes while shutting down
1 parent 333293b commit 36ff6c9

File tree

1 file changed

+48
-30
lines changed

1 file changed

+48
-30
lines changed

src/main/java/org/elasticsearch/transport/netty/NettyTransport.java

Lines changed: 48 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@
6565
import java.util.concurrent.TimeUnit;
6666
import java.util.concurrent.atomic.AtomicInteger;
6767
import java.util.concurrent.atomic.AtomicReference;
68+
import java.util.concurrent.locks.ReadWriteLock;
69+
import java.util.concurrent.locks.ReentrantReadWriteLock;
6870

6971
import static org.elasticsearch.common.network.NetworkService.TcpSettings.*;
7072
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
@@ -138,6 +140,9 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
138140
private volatile BoundTransportAddress boundAddress;
139141

140142
private final Object[] connectMutex;
143+
// this lock is here to make sure we close this transport and disconnect all the client nodes
144+
// connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?)
145+
private final ReadWriteLock globalLock = new ReentrantReadWriteLock();
141146

142147
public NettyTransport(ThreadPool threadPool) {
143148
this(EMPTY_SETTINGS, threadPool, new NetworkService(EMPTY_SETTINGS));
@@ -367,6 +372,7 @@ protected void doStop() throws ElasticSearchException {
367372
threadPool.generic().execute(new Runnable() {
368373
@Override
369374
public void run() {
375+
globalLock.writeLock().lock();
370376
try {
371377
for (Iterator<NodeChannels> it = connectedNodes.values().iterator(); it.hasNext(); ) {
372378
NodeChannels nodeChannels = it.next();
@@ -403,6 +409,7 @@ public void run() {
403409
clientBootstrap = null;
404410
}
405411
} finally {
412+
globalLock.writeLock().unlock();
406413
latch.countDown();
407414
}
408415
}
@@ -535,46 +542,57 @@ public void connectToNode(DiscoveryNode node) {
535542

536543
public void connectToNode(DiscoveryNode node, boolean light) {
537544
if (!lifecycle.started()) {
538-
throw new ElasticSearchIllegalStateException("Can't add nodes to a stopped transport");
545+
throw new ElasticSearchIllegalStateException("can't add nodes to a stopped transport");
539546
}
540547
if (node == null) {
541-
throw new ConnectTransportException(null, "Can't connect to a null node");
548+
throw new ConnectTransportException(null, "can't connect to a null node");
542549
}
543-
synchronized (connectLock(node.id())) {
544-
try {
545-
NodeChannels nodeChannels = connectedNodes.get(node);
546-
if (nodeChannels != null) {
547-
return;
550+
globalLock.readLock().lock();
551+
try {
552+
if (!lifecycle.started()) {
553+
throw new ElasticSearchIllegalStateException("can't add nodes to a stopped transport");
554+
}
555+
synchronized (connectLock(node.id())) {
556+
if (!lifecycle.started()) {
557+
throw new ElasticSearchIllegalStateException("can't add nodes to a stopped transport");
548558
}
559+
try {
560+
NodeChannels nodeChannels = connectedNodes.get(node);
561+
if (nodeChannels != null) {
562+
return;
563+
}
549564

550-
if (light) {
551-
nodeChannels = connectToChannelsLight(node);
552-
} else {
553-
nodeChannels = new NodeChannels(new Channel[connectionsPerNodeLow], new Channel[connectionsPerNodeMed], new Channel[connectionsPerNodeHigh]);
554-
try {
555-
connectToChannels(nodeChannels, node);
556-
} catch (Exception e) {
557-
nodeChannels.close();
558-
throw e;
565+
if (light) {
566+
nodeChannels = connectToChannelsLight(node);
567+
} else {
568+
nodeChannels = new NodeChannels(new Channel[connectionsPerNodeLow], new Channel[connectionsPerNodeMed], new Channel[connectionsPerNodeHigh]);
569+
try {
570+
connectToChannels(nodeChannels, node);
571+
} catch (Exception e) {
572+
nodeChannels.close();
573+
throw e;
574+
}
559575
}
560-
}
561576

562-
NodeChannels existing = connectedNodes.putIfAbsent(node, nodeChannels);
563-
if (existing != null) {
564-
// we are already connected to a node, close this ones
565-
nodeChannels.close();
566-
} else {
567-
if (logger.isDebugEnabled()) {
568-
logger.debug("connected to node [{}]", node);
577+
NodeChannels existing = connectedNodes.putIfAbsent(node, nodeChannels);
578+
if (existing != null) {
579+
// we are already connected to a node, close this ones
580+
nodeChannels.close();
581+
} else {
582+
if (logger.isDebugEnabled()) {
583+
logger.debug("connected to node [{}]", node);
584+
}
585+
transportServiceAdapter.raiseNodeConnected(node);
569586
}
570-
transportServiceAdapter.raiseNodeConnected(node);
571-
}
572587

573-
} catch (ConnectTransportException e) {
574-
throw e;
575-
} catch (Exception e) {
576-
throw new ConnectTransportException(node, "General node connection failure", e);
588+
} catch (ConnectTransportException e) {
589+
throw e;
590+
} catch (Exception e) {
591+
throw new ConnectTransportException(node, "General node connection failure", e);
592+
}
577593
}
594+
} finally {
595+
globalLock.readLock().unlock();
578596
}
579597
}
580598

0 commit comments

Comments
 (0)