Skip to content

Commit aa682c9

Browse files
committed
feat(net):P2P message rate limit
1 parent 212a4ec commit aa682c9

File tree

5 files changed

+64
-3
lines changed

5 files changed

+64
-3
lines changed

framework/src/main/java/org/tron/core/net/P2pEventHandlerImpl.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,9 +178,11 @@ private void processMessage(PeerConnection peer, byte[] data) {
178178
handshakeService.processHelloMessage(peer, (HelloMessage) msg);
179179
break;
180180
case P2P_DISCONNECT:
181-
peer.getChannel().close();
182-
peer.getNodeStatistics()
183-
.nodeDisconnectedRemote(((DisconnectMessage)msg).getReason());
181+
if (peer.getP2pRateLimiter().tryAcquire(type.asByte())) {
182+
peer.getChannel().close();
183+
peer.getNodeStatistics()
184+
.nodeDisconnectedRemote(((DisconnectMessage)msg).getReason());
185+
}
184186
break;
185187
case SYNC_BLOCK_CHAIN:
186188
syncBlockChainMsgHandler.processMessage(peer, msg);
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package org.tron.core.net;
2+
3+
import com.google.common.cache.Cache;
4+
import com.google.common.cache.CacheBuilder;
5+
import com.google.common.util.concurrent.RateLimiter;
6+
7+
public class P2pRateLimiter {
8+
private final Cache<Byte, RateLimiter> rateLimiters = CacheBuilder.newBuilder()
9+
.maximumSize(256).build();
10+
11+
public void register (Byte type, double rate) {
12+
rateLimiters.put(type, RateLimiter.create(rate));
13+
}
14+
15+
public void acquire (Byte type) {
16+
RateLimiter rateLimiter = rateLimiters.getIfPresent(type);
17+
if (rateLimiter == null) {
18+
return;
19+
}
20+
rateLimiter.acquire();
21+
}
22+
23+
public boolean tryAcquire (Byte type) {
24+
RateLimiter rateLimiter = rateLimiters.getIfPresent(type);
25+
if (rateLimiter == null) {
26+
return true;
27+
}
28+
return rateLimiter.tryAcquire();
29+
}
30+
}

framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,13 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
5555

5656
FetchInvDataMessage fetchInvDataMsg = (FetchInvDataMessage) msg;
5757

58+
if (peer.isNeedSyncFromUs() && !peer.getP2pRateLimiter().tryAcquire(msg.getType().asByte())) {
59+
// Discard messages that exceed the rate limit
60+
logger.warn("{} message from peer {} exceeds the rate limit",
61+
msg.getType(), peer.getInetSocketAddress());
62+
return;
63+
}
64+
5865
check(peer, fetchInvDataMsg);
5966

6067
InventoryType type = fetchInvDataMsg.getInventoryType();
@@ -156,6 +163,10 @@ private void check(PeerConnection peer, FetchInvDataMessage fetchInvDataMsg) thr
156163
if (!peer.isNeedSyncFromUs()) {
157164
throw new P2pException(TypeEnum.BAD_MESSAGE, "no need sync");
158165
}
166+
if (fetchInvDataMsg.getHashList().size() > 100) {
167+
throw new P2pException(TypeEnum.BAD_MESSAGE, "fetch too more blocks, size:"
168+
+ fetchInvDataMsg.getHashList().size());
169+
}
159170
for (Sha256Hash hash : fetchInvDataMsg.getHashList()) {
160171
long blockNum = new BlockId(hash).getNum();
161172
long minBlockNum =

framework/src/main/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,13 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
3131

3232
SyncBlockChainMessage syncBlockChainMessage = (SyncBlockChainMessage) msg;
3333

34+
if (!peer.getP2pRateLimiter().tryAcquire(msg.getType().asByte())) {
35+
// Discard messages that exceed the rate limit
36+
logger.warn("{} message from peer {} exceeds the rate limit",
37+
msg.getType(), peer.getInetSocketAddress());
38+
return;
39+
}
40+
3441
if (!check(peer, syncBlockChainMessage)) {
3542
peer.disconnect(Protocol.ReasonCode.BAD_PROTOCOL);
3643
return;

framework/src/main/java/org/tron/core/net/peer/PeerConnection.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package org.tron.core.net.peer;
22

3+
import static org.tron.core.net.message.MessageTypes.FETCH_INV_DATA;
4+
import static org.tron.core.net.message.MessageTypes.P2P_DISCONNECT;
5+
import static org.tron.core.net.message.MessageTypes.SYNC_BLOCK_CHAIN;
6+
37
import com.google.common.cache.Cache;
48
import com.google.common.cache.CacheBuilder;
59
import com.google.protobuf.ByteString;
@@ -32,7 +36,9 @@
3236
import org.tron.core.config.args.Args;
3337
import org.tron.core.metrics.MetricsKey;
3438
import org.tron.core.metrics.MetricsUtil;
39+
import org.tron.core.net.P2pRateLimiter;
3540
import org.tron.core.net.TronNetDelegate;
41+
import org.tron.core.net.message.MessageTypes;
3642
import org.tron.core.net.message.adv.InventoryMessage;
3743
import org.tron.core.net.message.adv.TransactionsMessage;
3844
import org.tron.core.net.message.base.DisconnectMessage;
@@ -156,6 +162,8 @@ public class PeerConnection {
156162
@Setter
157163
@Getter
158164
private volatile boolean needSyncFromUs = true;
165+
@Getter
166+
private P2pRateLimiter p2pRateLimiter = new P2pRateLimiter();
159167

160168
public void setChannel(Channel channel) {
161169
this.channel = channel;
@@ -164,6 +172,9 @@ public void setChannel(Channel channel) {
164172
}
165173
this.nodeStatistics = TronStatsManager.getNodeStatistics(channel.getInetAddress());
166174
lastInteractiveTime = System.currentTimeMillis();
175+
p2pRateLimiter.register(SYNC_BLOCK_CHAIN.asByte(), 2);
176+
p2pRateLimiter.register(FETCH_INV_DATA.asByte(), 1);
177+
p2pRateLimiter.register(P2P_DISCONNECT.asByte(), 1);
167178
}
168179

169180
public void setBlockBothHave(BlockId blockId) {

0 commit comments

Comments
 (0)