Skip to content

feat(net): disconnect from inactive nodes if necessary #5924

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions chainbase/src/main/java/org/tron/core/ChainBaseManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ public class ChainBaseManager {
@Setter
private long lowestBlockNum = -1; // except num = 0.

@Getter
@Setter
private long latestSaveBlockTime;

// for test only
public List<ByteString> getWitnesses() {
return witnessScheduleStore.getActiveWitnesses();
Expand Down Expand Up @@ -381,6 +385,7 @@ private void init() {
this.lowestBlockNum = this.blockIndexStore.getLimitNumber(1, 1).stream()
.map(BlockId::getNum).findFirst().orElse(0L);
this.nodeType = getLowestBlockNum() > 1 ? NodeType.LITE : NodeType.FULL;
this.latestSaveBlockTime = System.currentTimeMillis();
}

public void shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,9 @@ public class CommonParameter {
public boolean isOpenFullTcpDisconnect;
@Getter
@Setter
public int inactiveThreshold;
@Getter
@Setter
public boolean nodeDetectEnable;
@Getter
@Setter
Expand Down
2 changes: 2 additions & 0 deletions common/src/main/java/org/tron/core/Constant.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ public class Constant {

public static final String NODE_IS_OPEN_FULL_TCP_DISCONNECT = "node.isOpenFullTcpDisconnect";

public static final String NODE_INACTIVE_THRESHOLD = "node.inactiveThreshold";

public static final String NODE_DETECT_ENABLE = "node.nodeDetectEnable";

public static final String NODE_MAX_TRANSACTION_PENDING_SIZE = "node.maxTransactionPendingSize";
Expand Down
4 changes: 4 additions & 0 deletions framework/src/main/java/org/tron/core/config/args/Args.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ public static void clearParam() {
PARAMETER.receiveTcpMinDataLength = 2048;
PARAMETER.isOpenFullTcpDisconnect = false;
PARAMETER.nodeDetectEnable = false;
PARAMETER.inactiveThreshold = 600;
PARAMETER.supportConstant = false;
PARAMETER.debug = false;
PARAMETER.minTimeRatio = 0.0;
Expand Down Expand Up @@ -845,6 +846,9 @@ public static void setParam(final String[] args, final String confFileName) {
PARAMETER.nodeDetectEnable = config.hasPath(Constant.NODE_DETECT_ENABLE)
&& config.getBoolean(Constant.NODE_DETECT_ENABLE);

PARAMETER.inactiveThreshold = config.hasPath(Constant.NODE_INACTIVE_THRESHOLD)
? config.getInt(Constant.NODE_INACTIVE_THRESHOLD) : 600;

PARAMETER.maxTransactionPendingSize = config.hasPath(Constant.NODE_MAX_TRANSACTION_PENDING_SIZE)
? config.getInt(Constant.NODE_MAX_TRANSACTION_PENDING_SIZE) : 2000;

Expand Down
1 change: 1 addition & 0 deletions framework/src/main/java/org/tron/core/db/Manager.java
Original file line number Diff line number Diff line change
Expand Up @@ -1384,6 +1384,7 @@ public void updateDynamicProperties(BlockCapsule block) {
(chainBaseManager.getDynamicPropertiesStore().getLatestBlockHeaderNumber()
- chainBaseManager.getDynamicPropertiesStore().getLatestSolidifiedBlockNum()
+ 1));
chainBaseManager.setLatestSaveBlockTime(System.currentTimeMillis());
Metrics.gaugeSet(MetricKeys.Gauge.HEADER_HEIGHT, block.getNum());
Metrics.gaugeSet(MetricKeys.Gauge.HEADER_TIME, block.getTimeStamp());
}
Expand Down
24 changes: 24 additions & 0 deletions framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.tron.core.net.message.PbftMessageFactory;
import org.tron.core.net.message.TronMessage;
import org.tron.core.net.message.TronMessageFactory;
import org.tron.core.net.message.adv.FetchInvDataMessage;
import org.tron.core.net.message.adv.InventoryMessage;
import org.tron.core.net.message.base.DisconnectMessage;
import org.tron.core.net.message.handshake.HelloMessage;
Expand All @@ -38,6 +39,7 @@
import org.tron.p2p.P2pEventHandler;
import org.tron.p2p.connection.Channel;
import org.tron.protos.Protocol;
import org.tron.protos.Protocol.Inventory.InventoryType;
import org.tron.protos.Protocol.ReasonCode;

@Slf4j(topic = "net")
Expand Down Expand Up @@ -205,6 +207,7 @@ private void processMessage(PeerConnection peer, byte[] data) {
default:
throw new P2pException(P2pException.TypeEnum.NO_SUCH_MESSAGE, msg.getType().toString());
}
updateLastActiveTime(peer, msg);
} catch (Exception e) {
processException(peer, msg, e);
} finally {
Expand All @@ -220,6 +223,27 @@ private void processMessage(PeerConnection peer, byte[] data) {
}
}

private void updateLastActiveTime(PeerConnection peer, TronMessage msg) {
MessageTypes type = msg.getType();

boolean flag = false;
switch (type) {
case SYNC_BLOCK_CHAIN:
case BLOCK_CHAIN_INVENTORY:
case BLOCK:
flag = true;
break;
case FETCH_INV_DATA:
flag = ((FetchInvDataMessage) msg).getInventoryType().equals(InventoryType.BLOCK);
break;
default:
break;
}
if (flag) {
peer.setLastActiveTime(System.currentTimeMillis());
}
}

private void processException(PeerConnection peer, TronMessage msg, Exception ex) {
Protocol.ReasonCode code;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.tron.core.net.peer.PeerStatusCheck;
import org.tron.core.net.service.adv.AdvService;
import org.tron.core.net.service.effective.EffectiveCheckService;
import org.tron.core.net.service.effective.ResilienceService;
import org.tron.core.net.service.fetchblock.FetchBlockService;
import org.tron.core.net.service.nodepersist.NodePersistService;
import org.tron.core.net.service.relay.RelayService;
Expand Down Expand Up @@ -50,6 +51,9 @@ public class TronNetService {
@Autowired
private PeerStatusCheck peerStatusCheck;

@Autowired
private ResilienceService resilienceService;

@Autowired
private TransactionsMsgHandler transactionsMsgHandler;

Expand Down Expand Up @@ -88,6 +92,7 @@ public void start() {
advService.init();
syncService.init();
peerStatusCheck.init();
resilienceService.init();
transactionsMsgHandler.init();
fetchBlockService.init();
nodePersistService.init();
Expand All @@ -110,6 +115,7 @@ public void close() {
nodePersistService.close();
advService.close();
syncService.close();
resilienceService.close();
peerStatusCheck.close();
transactionsMsgHandler.close();
fetchBlockService.close();
Expand Down Expand Up @@ -177,7 +183,7 @@ private P2pConfig updateConfig(P2pConfig config) {
config.setMaxConnectionsWithSameIp(parameter.getMaxConnectionsWithSameIp());
config.setPort(parameter.getNodeListenPort());
config.setNetworkId(parameter.getNodeP2pVersion());
config.setDisconnectionPolicyEnable(parameter.isOpenFullTcpDisconnect());
config.setDisconnectionPolicyEnable(false);
config.setNodeDetectEnable(parameter.isNodeDetectEnable());
config.setDiscoverEnable(parameter.isNodeDiscoveryEnable());
if (StringUtils.isEmpty(config.getIp()) && hasIpv4Stack(NetUtil.getAllLocalAddress())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public void processMessage(PeerConnection peer, TronMessage msg) {
Item item = new Item(id, type);
peer.getAdvInvReceive().put(item, System.currentTimeMillis());
advService.addInv(item);
if (type.equals(InventoryType.BLOCK) && peer.getAdvInvSpread().getIfPresent(item) == null) {
peer.setLastActiveTime(System.currentTimeMillis());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
peer.disconnect(Protocol.ReasonCode.BAD_PROTOCOL);
return;
}

long remainNum = 0;

List<BlockId> summaryChainIds = syncBlockChainMessage.getBlockIds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ public class PeerConnection {
@Setter
private ByteString address;

@Getter
@Setter
private volatile long lastActiveTime;

@Getter
@Setter
private TronState tronState = TronState.INIT;
Expand Down Expand Up @@ -159,6 +163,7 @@ public void setChannel(Channel channel) {
this.isRelayPeer = true;
}
this.nodeStatistics = TronStatsManager.getNodeStatistics(channel.getInetAddress());
lastActiveTime = System.currentTimeMillis();
}

public void setBlockBothHave(BlockId blockId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package org.tron.core.net.service.effective;

import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.parameter.CommonParameter;
import org.tron.core.ChainBaseManager;
import org.tron.core.config.args.Args;
import org.tron.core.net.TronNetDelegate;
import org.tron.core.net.peer.PeerConnection;
import org.tron.protos.Protocol.ReasonCode;

@Slf4j(topic = "net")
@Component
public class ResilienceService {

private static final long inactiveThreshold =
CommonParameter.getInstance().getInactiveThreshold() * 1000L;
public static final long blockNotChangeThreshold = 90 * 1000L;

//when node is isolated, retention percent peers will not be disconnected
public static final double retentionPercent = 0.8;
private static final int initialDelay = 300;
private static final String esName = "resilience-service";
private final ScheduledExecutorService executor = ExecutorServiceManager
.newSingleThreadScheduledExecutor(esName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that you have added 3 tasks to be scheduled and executed in the same executor, the scheduling between different tasks will be affected, right? Have you considered splitting the tasks into different executors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any task only consume several milliseconds at most. There is no need to split them in different executors.


@Autowired
private TronNetDelegate tronNetDelegate;

@Autowired
private ChainBaseManager chainBaseManager;

public void init() {
if (Args.getInstance().isOpenFullTcpDisconnect) {
executor.scheduleWithFixedDelay(() -> {
try {
disconnectRandom();
} catch (Exception e) {
logger.error("DisconnectRandom node failed", e);
}
}, initialDelay, 60, TimeUnit.SECONDS);
} else {
logger.info("OpenFullTcpDisconnect is disabled");
}

executor.scheduleWithFixedDelay(() -> {
try {
disconnectLan();
} catch (Exception e) {
logger.error("DisconnectLan node failed", e);
}
}, initialDelay, 10, TimeUnit.SECONDS);

executor.scheduleWithFixedDelay(() -> {
try {
disconnectIsolated2();
} catch (Exception e) {
logger.error("DisconnectIsolated node failed", e);
}
}, initialDelay, 30, TimeUnit.SECONDS);
}

private void disconnectRandom() {
int peerSize = tronNetDelegate.getActivePeer().size();
if (peerSize >= CommonParameter.getInstance().getMaxConnections()) {
long now = System.currentTimeMillis();
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.isDisconnect())
.filter(peer -> now - peer.getLastActiveTime() >= inactiveThreshold)
.filter(peer -> !peer.getChannel().isTrustPeer())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change the original logic, not excluding active connections.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original logic don't have the inactiveThreshold restrict, it has many conditates. The number of peers whose inactive time is more inactiveThreshold is very small, it has very little condidates, so there is no need to exclude active connections.

.collect(Collectors.toList());
if (!peers.isEmpty()) {
int index = new Random().nextInt(peers.size());
disconnectFromPeer(peers.get(index), ReasonCode.RANDOM_ELIMINATION);
}
}
}

private void disconnectLan() {
if (isLanNode()) {
// disconnect from the node that has keep inactive for more than inactiveThreshold
// and its lastActiveTime is smallest
int peerSize = tronNetDelegate.getActivePeer().size();
if (peerSize >= CommonParameter.getInstance().getMinConnections()) {
long now = System.currentTimeMillis();
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.isDisconnect())
.filter(peer -> now - peer.getLastActiveTime() >= inactiveThreshold)
.filter(peer -> !peer.getChannel().isTrustPeer())
.collect(Collectors.toList());
Optional<PeerConnection> one = getEarliestPeer(peers);
one.ifPresent(peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL));
}
}
}

private void disconnectIsolated2() {
if (isIsolateLand2()) {
logger.info("Node is isolated, try to disconnect from peers");
int peerSize = tronNetDelegate.getActivePeer().size();

//disconnect from the node whose lastActiveTime is smallest
if (peerSize >= CommonParameter.getInstance().getMinActiveConnections()) {
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.isDisconnect())
.filter(peer -> !peer.getChannel().isTrustPeer())
.filter(peer -> peer.getChannel().isActive())
.collect(Collectors.toList());

Optional<PeerConnection> one = getEarliestPeer(peers);
one.ifPresent(peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL));
}

//disconnect from some passive nodes, make sure retention nodes' num <= 0.8 * maxConnection,
//so new peers can come in
peerSize = tronNetDelegate.getActivePeer().size();
int threshold = (int) (CommonParameter.getInstance().getMaxConnections() * retentionPercent);
if (peerSize > threshold) {
int disconnectSize = peerSize - threshold;
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.isDisconnect())
.filter(peer -> !peer.getChannel().isTrustPeer())
.filter(peer -> !peer.getChannel().isActive())
.collect(Collectors.toList());
try {
peers.sort(Comparator.comparing(PeerConnection::getLastActiveTime, Long::compareTo));
} catch (Exception e) {
logger.warn("Sort disconnectIsolated2 peers failed: {}", e.getMessage());
return;
}

if (peers.size() > disconnectSize) {
peers = peers.subList(0, disconnectSize);
}
peers.forEach(peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL));
}
}
}

private Optional<PeerConnection> getEarliestPeer(List<PeerConnection> pees) {
Optional<PeerConnection> one = Optional.empty();
try {
one = pees.stream()
.min(Comparator.comparing(PeerConnection::getLastActiveTime, Long::compareTo));
} catch (Exception e) {
logger.warn("Get earliest peer failed: {}", e.getMessage());
}
return one;
}

private boolean isLanNode() {
int peerSize = tronNetDelegate.getActivePeer().size();
int activePeerSize = (int) tronNetDelegate.getActivePeer().stream()
.filter(peer -> peer.getChannel().isActive())
.count();
return peerSize > 0 && peerSize == activePeerSize;
}

private boolean isIsolateLand2() {
int advPeerCount = (int) tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.isNeedSyncFromPeer() && !peer.isNeedSyncFromUs())
.count();
long diff = System.currentTimeMillis() - chainBaseManager.getLatestSaveBlockTime();
return advPeerCount >= 1 && diff >= blockNotChangeThreshold;
}

private void disconnectFromPeer(PeerConnection peer, ReasonCode reasonCode) {
int inactiveSeconds = (int) ((System.currentTimeMillis() - peer.getLastActiveTime()) / 1000);
logger.info("Disconnect from peer {}, inactive seconds {}", peer.getInetSocketAddress(),
inactiveSeconds);
peer.disconnect(reasonCode);
}

public void close() {
ExecutorServiceManager.shutdownAndAwaitTermination(executor, esName);
}
}
1 change: 1 addition & 0 deletions framework/src/main/resources/config-localtest.conf
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ node {

# check the peer data transfer ,disconnect factor
isOpenFullTcpDisconnect = true
inactiveThreshold = 600 //seconds

p2p {
version = 333 # 11111: mainnet; 20180622: testnet
Expand Down
1 change: 1 addition & 0 deletions framework/src/main/resources/config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ node {
minParticipationRate = 15

isOpenFullTcpDisconnect = false
inactiveThreshold = 600 //seconds

p2p {
version = 11111 # 11111: mainnet; 20180622: testnet
Expand Down
Loading