Skip to content

Commit 723e11a

Browse files
authored
Merge pull request tronprotocol#1097 from tronprotocol/add_message
Add message
2 parents 0656d35 + f7a6786 commit 723e11a

File tree

19 files changed

+158
-102
lines changed

19 files changed

+158
-102
lines changed

src/main/java/org/tron/common/overlay/discover/node/NodeHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public enum State {
8787
private NodeStatistics nodeStatistics;
8888
private NodeHandler replaceCandidate;
8989
private volatile boolean waitForPong = false;
90+
private volatile boolean waitForNeighbors = false;
9091
private volatile int pingTrials = 3;
9192
private long pingSent;
9293

@@ -204,6 +205,11 @@ public void handlePong(PongMessage msg) {
204205
}
205206

206207
public void handleNeighbours(NeighborsMessage msg) {
208+
if (!waitForNeighbors){
209+
logger.warn("Receive neighbors from {} without send find nodes.", node.getHost());
210+
return;
211+
}
212+
waitForNeighbors = false;
207213
getNodeStatistics().discoverInNeighbours.add();
208214
for (Node n : msg.getNodes()) {
209215
if (!nodeManager.getPublicHomeNode().getHexId().equals(n.getHexId())) {
@@ -269,6 +275,7 @@ public void sendNeighbours(List<Node> neighbours) {
269275
}
270276

271277
public void sendFindNode(byte[] target) {
278+
waitForNeighbors = true;
272279
Message findNode = new FindNodeMessage(nodeManager.getPublicHomeNode(), target);
273280
sendMessage(findNode);
274281
getNodeStatistics().discoverOutFind.add();

src/main/java/org/tron/common/overlay/discover/node/NodeManager.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,6 @@ public void run() {
138138
for (Node node : bootNodes) {
139139
getNodeHandler(node);
140140
}
141-
142-
for (Node node : args.getNodeActive()) {
143-
getNodeHandler(node).getNodeStatistics().setPredefined(true);
144-
}
145141
}
146142
}
147143

src/main/java/org/tron/common/overlay/discover/node/NodeStatistics.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,6 @@ public NodeStatistics(Node node) {
8585
discoverMessageLatency = new SimpleStatter(node.getIdString());
8686
}
8787

88-
private int getSessionReputation() {
89-
return getSessionFairReputation() + (isPredefined ? REPUTATION_PREDEFINED : 0);
90-
}
91-
9288
private int getSessionFairReputation() {
9389
int discoverReput = 0;
9490

@@ -129,7 +125,14 @@ private int getSessionFairReputation() {
129125
}
130126

131127
public int getReputation() {
132-
return isReputationPenalized() ? 0 : persistedReputation / 2 + getSessionReputation();
128+
int score = 0;
129+
if (!isReputationPenalized()){
130+
score += persistedReputation / 2 + getSessionFairReputation();
131+
}
132+
if (isPredefined){
133+
score += REPUTATION_PREDEFINED;
134+
}
135+
return score;
133136
}
134137

135138
public ReasonCode getDisconnectReason() {

src/main/java/org/tron/common/overlay/discover/table/NodeEntry.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,15 @@ public class NodeEntry {
3333
public NodeEntry(Node n) {
3434
this.node = n;
3535
this.ownerId = n.getId();
36-
entryId = n.toString();
36+
entryId = n.getHost();
3737
distance = distance(ownerId, n.getId());
3838
touch();
3939
}
4040

4141
public NodeEntry(byte[] ownerId, Node n) {
4242
this.node = n;
4343
this.ownerId = ownerId;
44-
entryId = n.toString();
44+
entryId = n.getHost();
4545
distance = distance(ownerId, n.getId());
4646
touch();
4747
}

src/main/java/org/tron/common/overlay/discover/table/NodeTable.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,14 @@ public final void initialize() {
5959

6060
public synchronized Node addNode(Node n) {
6161
NodeEntry e = new NodeEntry(node.getId(), n);
62+
if (nodes.contains(e)) {
63+
nodes.forEach(nodeEntry -> {
64+
if (nodeEntry.equals(e)) {
65+
nodeEntry.touch();
66+
}
67+
});
68+
return null;
69+
}
6270
NodeEntry lastSeen = buckets[getBucketId(e)].addNode(e);
6371
if (lastSeen != null) {
6472
return lastSeen.getNode();

src/main/java/org/tron/common/overlay/server/ChannelManager.java

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,25 @@
1-
/*
2-
* Copyright (c) [2016] [ <ether.camp> ]
3-
* This file is part of the ethereumJ library.
4-
*
5-
* The ethereumJ library is free software: you can redistribute it and/or modify
6-
* it under the terms of the GNU Lesser General Public License as published by
7-
* the Free Software Foundation, either version 3 of the License, or
8-
* (at your option) any later version.
9-
*
10-
* The ethereumJ library is distributed in the hope that it will be useful,
11-
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12-
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13-
* GNU Lesser General Public License for more details.
14-
*
15-
* You should have received a copy of the GNU Lesser General Public License
16-
* along with the ethereumJ library. If not, see <http://www.gnu.org/licenses/>.
17-
*/
181
package org.tron.common.overlay.server;
192

203
import static org.tron.protos.Protocol.ReasonCode.DUPLICATE_PEER;
214
import static org.tron.protos.Protocol.ReasonCode.TOO_MANY_PEERS;
5+
import static org.tron.protos.Protocol.ReasonCode.TOO_MANY_PEERS_WITH_SAME_IP;
226
import static org.tron.protos.Protocol.ReasonCode.UNKNOWN;
237

248
import com.google.common.cache.Cache;
259
import com.google.common.cache.CacheBuilder;
2610
import java.net.InetAddress;
11+
import java.net.InetSocketAddress;
2712
import java.util.Collection;
2813
import java.util.Map;
2914
import java.util.concurrent.ConcurrentHashMap;
3015
import java.util.concurrent.TimeUnit;
16+
import lombok.Getter;
3117
import org.slf4j.Logger;
3218
import org.slf4j.LoggerFactory;
3319
import org.springframework.beans.factory.annotation.Autowired;
3420
import org.springframework.stereotype.Component;
3521
import org.tron.common.overlay.client.PeerClient;
22+
import org.tron.common.overlay.discover.node.Node;
3623
import org.tron.core.config.args.Args;
3724
import org.tron.core.db.ByteArrayWrapper;
3825
import org.tron.protos.Protocol.ReasonCode;
@@ -53,9 +40,14 @@ public class ChannelManager {
5340
private Cache<InetAddress, ReasonCode> recentlyDisconnected = CacheBuilder.newBuilder().maximumSize(1000)
5441
.expireAfterWrite(30, TimeUnit.SECONDS).recordStats().build();
5542

43+
@Getter
44+
private Map<InetAddress, Node> trustPeers = new ConcurrentHashMap();
45+
5646
private Args args = Args.getInstance();
5747

58-
private int maxActivePeers = args.getNodeMaxActiveNodes() > 0 ? args.getNodeMaxActiveNodes() : 30;
48+
private int maxActivePeers = args.getNodeMaxActiveNodes();
49+
50+
private int getMaxActivePeersWithSameIp = args.getNodeMaxActiveNodesWithSameIp();
5951

6052
private PeerServer peerServer;
6153

@@ -73,6 +65,11 @@ private ChannelManager(final PeerServer peerServer, final PeerClient peerClient)
7365
new Thread(() -> peerServer.start(Args.getInstance().getNodeListenPort()),
7466
"PeerServerThread").start();
7567
}
68+
69+
for (Node node : args.getPassiveNodes()){
70+
trustPeers.put(new InetSocketAddress(node.getHost(), node.getPort()).getAddress() , node);
71+
}
72+
logger.info("Trust peer size {}", trustPeers.size());
7673
}
7774

7875
public void processDisconnect(Channel channel, ReasonCode reason){
@@ -110,19 +107,26 @@ public void notifyDisconnect(Channel channel) {
110107

111108
public synchronized boolean processPeer(Channel peer) {
112109

113-
if (recentlyDisconnected.getIfPresent(peer) != null){
114-
logger.info("Peer {} recently disconnected.", peer.getInetAddress());
115-
return false;
116-
}
110+
if (!trustPeers.containsKey(peer.getInetAddress())){
111+
if (recentlyDisconnected.getIfPresent(peer) != null){
112+
logger.info("Peer {} recently disconnected.", peer.getInetAddress());
113+
return false;
114+
}
117115

118-
if (badPeers.getIfPresent(peer) != null) {
119-
peer.disconnect(peer.getNodeStatistics().getDisconnectReason());
120-
return false;
121-
}
116+
if (badPeers.getIfPresent(peer) != null) {
117+
peer.disconnect(peer.getNodeStatistics().getDisconnectReason());
118+
return false;
119+
}
122120

123-
if (!peer.isActive() && activePeers.size() >= maxActivePeers) {
124-
peer.disconnect(TOO_MANY_PEERS);
125-
return false;
121+
if (!peer.isActive() && activePeers.size() >= maxActivePeers) {
122+
peer.disconnect(TOO_MANY_PEERS);
123+
return false;
124+
}
125+
126+
if (getConnectionNum(peer.getInetAddress()) >= getMaxActivePeersWithSameIp){
127+
peer.disconnect(TOO_MANY_PEERS_WITH_SAME_IP);
128+
return false;
129+
}
126130
}
127131

128132
if (activePeers.containsKey(peer.getNodeIdWrapper())) {
@@ -140,6 +144,16 @@ public synchronized boolean processPeer(Channel peer) {
140144
return true;
141145
}
142146

147+
public int getConnectionNum(InetAddress inetAddress){
148+
int cnt = 0;
149+
for (Channel channel: activePeers.values()){
150+
if (channel.getInetAddress().equals(inetAddress)){
151+
cnt++;
152+
}
153+
}
154+
return cnt;
155+
}
156+
143157
public Collection<Channel> getActivePeers() {
144158
return activePeers.values();
145159
}

src/main/java/org/tron/common/overlay/server/HandshakeHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.netty.buffer.ByteBuf;
2121
import io.netty.channel.ChannelHandlerContext;
2222
import io.netty.handler.codec.ByteToMessageDecoder;
23+
import java.net.InetAddress;
2324
import java.net.InetSocketAddress;
2425
import java.util.Arrays;
2526
import java.util.List;
@@ -125,7 +126,8 @@ private void sendHelloMsg(ChannelHandlerContext ctx, long time){
125126
private void handleHelloMsg(ChannelHandlerContext ctx, HelloMessage msg) {
126127
if (remoteId.length != 64) {
127128
channel.initNode(msg.getFrom().getId(), msg.getFrom().getPort());
128-
if (!syncPool.isCanConnect()) {
129+
InetAddress address = ((InetSocketAddress)ctx.channel().remoteAddress()).getAddress();
130+
if (!channelManager.getTrustPeers().keySet().contains(address) && !syncPool.isCanConnect()) {
129131
channel.disconnect(ReasonCode.TOO_MANY_PEERS);
130132
return;
131133
}

src/main/java/org/tron/common/overlay/server/SyncPool.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.google.common.cache.Cache;
2121
import com.google.common.cache.CacheBuilder;
22+
import com.google.common.collect.Lists;
2223
import java.net.InetAddress;
2324
import java.util.ArrayList;
2425
import java.util.Collections;
@@ -31,8 +32,6 @@
3132
import java.util.concurrent.TimeUnit;
3233
import java.util.concurrent.atomic.AtomicInteger;
3334
import java.util.function.Predicate;
34-
35-
import com.google.common.collect.Lists;
3635
import org.slf4j.Logger;
3736
import org.slf4j.LoggerFactory;
3837
import org.springframework.beans.factory.annotation.Autowired;
@@ -42,6 +41,7 @@
4241
import org.tron.common.overlay.discover.node.Node;
4342
import org.tron.common.overlay.discover.node.NodeHandler;
4443
import org.tron.common.overlay.discover.node.NodeManager;
44+
import org.tron.common.overlay.discover.node.NodeStatistics;
4545
import org.tron.core.config.args.Args;
4646
import org.tron.core.net.peer.PeerConnection;
4747
import org.tron.core.net.peer.PeerConnectionDelegate;
@@ -52,8 +52,10 @@ public class SyncPool {
5252
public static final Logger logger = LoggerFactory.getLogger("SyncPool");
5353

5454
private static final double factor = 0.4;
55+
private static final double activeFactor = 0.2;
5556

56-
private final List<PeerConnection> activePeers = Collections.synchronizedList(new ArrayList<PeerConnection>());
57+
private final List<PeerConnection> activePeers = Collections
58+
.synchronizedList(new ArrayList<PeerConnection>());
5759
private final AtomicInteger passivePeersCount = new AtomicInteger(0);
5860
private final AtomicInteger activePeersCount = new AtomicInteger(0);
5961

@@ -72,7 +74,9 @@ public class SyncPool {
7274

7375
private Args args = Args.getInstance();
7476

75-
private int maxActiveNodes = args.getNodeMaxActiveNodes() > 0 ? args.getNodeMaxActiveNodes() : 30;
77+
private int maxActiveNodes = args.getNodeMaxActiveNodes();
78+
79+
private int getMaxActivePeersWithSameIp = args.getNodeMaxActiveNodesWithSameIp();
7680

7781
private ScheduledExecutorService poolLoopExecutor = Executors.newSingleThreadScheduledExecutor();
7882

@@ -87,6 +91,10 @@ public void init(PeerConnectionDelegate peerDel) {
8791

8892
peerClient = ctx.getBean(PeerClient.class);
8993

94+
for (Node node : args.getActiveNodes()) {
95+
nodeManager.getNodeHandler(node).getNodeStatistics().setPredefined(true);
96+
}
97+
9098
poolLoopExecutor.scheduleWithFixedDelay(() -> {
9199
try {
92100
fillUp();
@@ -104,8 +112,11 @@ public void init(PeerConnectionDelegate peerDel) {
104112
}
105113

106114
private void fillUp() {
107-
int lackSize = (int) (maxActiveNodes * factor) - activePeers.size();
108-
if(lackSize <= 0) return;
115+
int lackSize = Math.max((int) (maxActiveNodes * factor) - activePeers.size(),
116+
(int) (maxActiveNodes * activeFactor - activePeersCount.get()));
117+
if (lackSize <= 0) {
118+
return;
119+
}
109120

110121
final Set<String> nodesInUse = new HashSet<>();
111122
channelManager.getActivePeers().forEach(channel -> nodesInUse.add(channel.getPeerId()));
@@ -189,7 +200,7 @@ public synchronized void onDisconnect(Channel peer) {
189200
}
190201

191202
public boolean isCanConnect() {
192-
if (activePeers.size() >= maxActiveNodes) {
203+
if (passivePeersCount.get() >= maxActiveNodes * (1 - activeFactor)) {
193204
return false;
194205
}
195206
return true;
@@ -220,15 +231,22 @@ public boolean test(NodeHandler handler) {
220231
return false;
221232
}
222233

234+
if (nodesInUse != null && nodesInUse.contains(handler.getNode().getHexId())) {
235+
return false;
236+
}
237+
238+
if (handler.getNodeStatistics().getReputation() >= NodeStatistics.REPUTATION_PREDEFINED){
239+
return true;
240+
}
241+
223242
InetAddress inetAddress = handler.getInetSocketAddress().getAddress();
224243
if (channelManager.getRecentlyDisconnected().getIfPresent(inetAddress) != null) {
225244
return false;
226245
}
227246
if (channelManager.getBadPeers().getIfPresent(inetAddress) != null) {
228247
return false;
229248
}
230-
231-
if (nodesInUse != null && nodesInUse.contains(handler.getNode().getHexId())) {
249+
if (channelManager.getConnectionNum(inetAddress) >= getMaxActivePeersWithSameIp){
232250
return false;
233251
}
234252

src/main/java/org/tron/common/utils/Sha256Hash.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,6 @@ public class Sha256Hash implements Serializable, Comparable<Sha256Hash> {
4545

4646
private final byte[] bytes;
4747

48-
private long blockNum;
49-
50-
5148
private byte[] generateBlockId(long blockNum, Sha256Hash blockHash) {
5249
byte[] numBytes = Longs.toByteArray(blockNum);
5350
byte[] hash = blockHash.getBytes();
@@ -62,22 +59,16 @@ private byte[] generateBlockId(long blockNum, byte[] blockHash) {
6259
return hash;
6360
}
6461

65-
public long getBlockNum(){
66-
return blockNum;
67-
}
68-
6962
public Sha256Hash(long num, byte[] hash) {
7063
byte[] rawHashBytes = this.generateBlockId(num, hash);
7164
checkArgument(rawHashBytes.length == LENGTH);
7265
this.bytes = rawHashBytes;
73-
this.blockNum = num;
7466
}
7567

7668
public Sha256Hash(long num, Sha256Hash hash) {
7769
byte[] rawHashBytes = this.generateBlockId(num, hash);
7870
checkArgument(rawHashBytes.length == LENGTH);
7971
this.bytes = rawHashBytes;
80-
this.blockNum = num;
8172
}
8273

8374
/**

0 commit comments

Comments
 (0)