-
Notifications
You must be signed in to change notification settings - Fork 1.5k
feat(net): disconnect from inactive nodes if necessary #5924
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3b3b668
e670386
4791cdf
735bccb
cae19ea
a718006
4f234be
14fda8f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,186 @@ | ||
package org.tron.core.net.service.effective; | ||
|
||
import java.util.Comparator; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.Random; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.stream.Collectors; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.stereotype.Component; | ||
import org.tron.common.es.ExecutorServiceManager; | ||
import org.tron.common.parameter.CommonParameter; | ||
import org.tron.core.ChainBaseManager; | ||
import org.tron.core.config.args.Args; | ||
import org.tron.core.net.TronNetDelegate; | ||
import org.tron.core.net.peer.PeerConnection; | ||
import org.tron.protos.Protocol.ReasonCode; | ||
|
||
@Slf4j(topic = "net") | ||
@Component | ||
public class ResilienceService { | ||
|
||
private static final long inactiveThreshold = | ||
CommonParameter.getInstance().getInactiveThreshold() * 1000L; | ||
public static final long blockNotChangeThreshold = 90 * 1000L; | ||
|
||
//when node is isolated, retention percent peers will not be disconnected | ||
public static final double retentionPercent = 0.8; | ||
private static final int initialDelay = 300; | ||
private static final String esName = "resilience-service"; | ||
private final ScheduledExecutorService executor = ExecutorServiceManager | ||
.newSingleThreadScheduledExecutor(esName); | ||
|
||
@Autowired | ||
private TronNetDelegate tronNetDelegate; | ||
|
||
@Autowired | ||
private ChainBaseManager chainBaseManager; | ||
|
||
public void init() { | ||
if (Args.getInstance().isOpenFullTcpDisconnect) { | ||
executor.scheduleWithFixedDelay(() -> { | ||
try { | ||
disconnectRandom(); | ||
} catch (Exception e) { | ||
logger.error("DisconnectRandom node failed", e); | ||
} | ||
}, initialDelay, 60, TimeUnit.SECONDS); | ||
} else { | ||
logger.info("OpenFullTcpDisconnect is disabled"); | ||
} | ||
|
||
executor.scheduleWithFixedDelay(() -> { | ||
try { | ||
disconnectLan(); | ||
} catch (Exception e) { | ||
logger.error("DisconnectLan node failed", e); | ||
} | ||
}, initialDelay, 10, TimeUnit.SECONDS); | ||
|
||
executor.scheduleWithFixedDelay(() -> { | ||
try { | ||
disconnectIsolated2(); | ||
} catch (Exception e) { | ||
logger.error("DisconnectIsolated node failed", e); | ||
} | ||
}, initialDelay, 30, TimeUnit.SECONDS); | ||
} | ||
|
||
private void disconnectRandom() { | ||
int peerSize = tronNetDelegate.getActivePeer().size(); | ||
if (peerSize >= CommonParameter.getInstance().getMaxConnections()) { | ||
long now = System.currentTimeMillis(); | ||
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream() | ||
.filter(peer -> !peer.isDisconnect()) | ||
.filter(peer -> now - peer.getLastActiveTime() >= inactiveThreshold) | ||
.filter(peer -> !peer.getChannel().isTrustPeer()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why change the original logic, not excluding active connections. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Original logic don't have the inactiveThreshold restrict, it has many conditates. The number of peers whose inactive time is more inactiveThreshold is very small, it has very little condidates, so there is no need to exclude active connections. |
||
.collect(Collectors.toList()); | ||
if (!peers.isEmpty()) { | ||
int index = new Random().nextInt(peers.size()); | ||
disconnectFromPeer(peers.get(index), ReasonCode.RANDOM_ELIMINATION); | ||
} | ||
} | ||
} | ||
|
||
private void disconnectLan() { | ||
if (isLanNode()) { | ||
// disconnect from the node that has keep inactive for more than inactiveThreshold | ||
// and its lastActiveTime is smallest | ||
int peerSize = tronNetDelegate.getActivePeer().size(); | ||
if (peerSize >= CommonParameter.getInstance().getMinConnections()) { | ||
long now = System.currentTimeMillis(); | ||
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream() | ||
.filter(peer -> !peer.isDisconnect()) | ||
.filter(peer -> now - peer.getLastActiveTime() >= inactiveThreshold) | ||
.filter(peer -> !peer.getChannel().isTrustPeer()) | ||
.collect(Collectors.toList()); | ||
Optional<PeerConnection> one = getEarliestPeer(peers); | ||
one.ifPresent(peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL)); | ||
} | ||
} | ||
} | ||
|
||
private void disconnectIsolated2() { | ||
if (isIsolateLand2()) { | ||
logger.info("Node is isolated, try to disconnect from peers"); | ||
int peerSize = tronNetDelegate.getActivePeer().size(); | ||
|
||
//disconnect from the node whose lastActiveTime is smallest | ||
if (peerSize >= CommonParameter.getInstance().getMinActiveConnections()) { | ||
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream() | ||
.filter(peer -> !peer.isDisconnect()) | ||
.filter(peer -> !peer.getChannel().isTrustPeer()) | ||
.filter(peer -> peer.getChannel().isActive()) | ||
.collect(Collectors.toList()); | ||
|
||
Optional<PeerConnection> one = getEarliestPeer(peers); | ||
one.ifPresent(peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL)); | ||
} | ||
|
||
//disconnect from some passive nodes, make sure retention nodes' num <= 0.8 * maxConnection, | ||
//so new peers can come in | ||
peerSize = tronNetDelegate.getActivePeer().size(); | ||
int threshold = (int) (CommonParameter.getInstance().getMaxConnections() * retentionPercent); | ||
if (peerSize > threshold) { | ||
int disconnectSize = peerSize - threshold; | ||
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream() | ||
.filter(peer -> !peer.isDisconnect()) | ||
.filter(peer -> !peer.getChannel().isTrustPeer()) | ||
.filter(peer -> !peer.getChannel().isActive()) | ||
.collect(Collectors.toList()); | ||
try { | ||
peers.sort(Comparator.comparing(PeerConnection::getLastActiveTime, Long::compareTo)); | ||
} catch (Exception e) { | ||
logger.warn("Sort disconnectIsolated2 peers failed: {}", e.getMessage()); | ||
return; | ||
} | ||
|
||
if (peers.size() > disconnectSize) { | ||
peers = peers.subList(0, disconnectSize); | ||
} | ||
peers.forEach(peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL)); | ||
} | ||
} | ||
} | ||
|
||
private Optional<PeerConnection> getEarliestPeer(List<PeerConnection> pees) { | ||
Optional<PeerConnection> one = Optional.empty(); | ||
try { | ||
one = pees.stream() | ||
.min(Comparator.comparing(PeerConnection::getLastActiveTime, Long::compareTo)); | ||
} catch (Exception e) { | ||
logger.warn("Get earliest peer failed: {}", e.getMessage()); | ||
} | ||
return one; | ||
} | ||
|
||
private boolean isLanNode() { | ||
int peerSize = tronNetDelegate.getActivePeer().size(); | ||
int activePeerSize = (int) tronNetDelegate.getActivePeer().stream() | ||
.filter(peer -> peer.getChannel().isActive()) | ||
.count(); | ||
return peerSize > 0 && peerSize == activePeerSize; | ||
} | ||
|
||
private boolean isIsolateLand2() { | ||
int advPeerCount = (int) tronNetDelegate.getActivePeer().stream() | ||
.filter(peer -> !peer.isNeedSyncFromPeer() && !peer.isNeedSyncFromUs()) | ||
.count(); | ||
long diff = System.currentTimeMillis() - chainBaseManager.getLatestSaveBlockTime(); | ||
return advPeerCount >= 1 && diff >= blockNotChangeThreshold; | ||
} | ||
|
||
private void disconnectFromPeer(PeerConnection peer, ReasonCode reasonCode) { | ||
int inactiveSeconds = (int) ((System.currentTimeMillis() - peer.getLastActiveTime()) / 1000); | ||
logger.info("Disconnect from peer {}, inactive seconds {}", peer.getInetSocketAddress(), | ||
inactiveSeconds); | ||
peer.disconnect(reasonCode); | ||
} | ||
|
||
public void close() { | ||
ExecutorServiceManager.shutdownAndAwaitTermination(executor, esName); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that you have added 3 tasks to be scheduled and executed in the same executor, the scheduling between different tasks will be affected, right? Have you considered splitting the tasks into different executors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any task only consume several milliseconds at most. There is no need to split them in different executors.