Skip to content

Commit 0890048

Browse files
committed
add consensus
2 parents bc289ef + 24cdcce commit 0890048

24 files changed

+769
-233
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ dependencies {
7777
compile group: 'net.jcip', name: 'jcip-annotations', version: '1.0'
7878

7979
compile group: 'org.fusesource.jansi', name: 'jansi', version: '1.16'
80+
// https://mvnrepository.com/artifact/com.alibaba/fastjson
81+
compile group: 'com.alibaba', name: 'fastjson', version: '1.2.44'
8082
}
8183

8284
tasks.matching { it instanceof Test }.all {

src/main/java/org/tron/command/Cli.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,12 @@
1515
package org.tron.command;
1616

1717
import org.tron.peer.Peer;
18-
1918
import java.util.Arrays;
2019
import java.util.Scanner;
2120

2221
public class Cli {
2322
public Cli() {
2423
}
25-
2624
public void run(Peer peer) {
2725
Scanner in = new Scanner(System.in);
2826

@@ -57,7 +55,7 @@ public void run(Peer peer) {
5755
new ConsensusCommand().server();
5856
break;
5957
case "getmessage":
60-
new ConsensusCommand().getClient(cmdParameters);
58+
new ConsensusCommand().getClient(peer,cmdParameters);
6159
break;
6260
case "putmessage":
6361
new ConsensusCommand().putClient(cmdParameters);
@@ -66,6 +64,11 @@ public void run(Peer peer) {
6664
case "quit":
6765
case "bye":
6866
new ExitCommand().execute(peer, cmdParameters);
67+
case "put":
68+
new ConsensusCommand().execute(peer, cmdParameters);
69+
break;
70+
case "loadblock":
71+
new ConsensusCommand().loadBlock(peer);
6972
break;
7073
case "help":
7174
default:

src/main/java/org/tron/command/ConsensusCommand.java

Lines changed: 70 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,25 @@
1414
*/
1515
package org.tron.command;
1616

17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
1719
import org.tron.consensus.client.Client;
20+
import org.tron.consensus.client.MessageType;
1821
import org.tron.consensus.server.Server;
22+
import org.tron.core.TransactionUtils;
23+
import org.tron.example.Tron;
24+
import org.tron.overlay.message.Message;
25+
import org.tron.overlay.message.Type;
26+
import org.tron.peer.Peer;
27+
import org.tron.protos.core.TronTransaction;
28+
import org.tron.utils.ByteArray;
1929

2030
import static org.fusesource.jansi.Ansi.ansi;
2131

22-
public class ConsensusCommand {
32+
public class ConsensusCommand extends Command {
2333

24-
public ConsensusCommand() {
25-
26-
}
34+
private static final Logger logger = LoggerFactory.getLogger
35+
("consensus-command");
2736

2837
public void server() {
2938
Server.serverRun();
@@ -33,8 +42,19 @@ public void putClient(String[] args) {
3342
Client.putMessage(args);
3443
}
3544

36-
public void getClient(String[] args) {
37-
Client.getMessage(args[0]);
45+
public void getClient(Peer peer,String[] args) {
46+
//Client.getMessage(args[0]);
47+
if (Tron.getPeer().getType().equals(Peer.PEER_SERVER)) {
48+
Client.getMessage(peer,MessageType.TRANSACTION);
49+
Client.getMessage(peer,MessageType.BLOCK);
50+
}else{
51+
Client.getMessage(peer,MessageType.BLOCK);
52+
}
53+
54+
}
55+
public void loadBlock(Peer peer){
56+
//System.out.println("BlockChain loading ...");
57+
//Client.loadBlock(peer);
3858
}
3959

4060
public void usage() {
@@ -77,4 +97,48 @@ public void usage() {
7797
System.out.println("");
7898
}
7999

100+
@Override
101+
public void execute(Peer peer, String[] parameters) {
102+
if (check(parameters)) {
103+
String to = parameters[0];
104+
long amount = Long.valueOf(parameters[1]);
105+
TronTransaction.Transaction transaction = TransactionUtils
106+
.newTransaction(peer.getWallet(), to, amount, peer.getUTXOSet());
107+
108+
if (transaction != null) {
109+
Message message = new Message(ByteArray.toHexString
110+
(transaction.toByteArray()), Type.TRANSACTION);
111+
Client.putMessage1(message);
112+
}
113+
}
114+
}
115+
116+
@Override
117+
public boolean check(String[] parameters) {
118+
if (parameters.length < 2) {
119+
logger.error("missing parameter");
120+
return false;
121+
}
122+
123+
if (parameters[0].length() != 40) {
124+
logger.error("address invalid");
125+
return false;
126+
}
127+
128+
129+
long amount = 0;
130+
try {
131+
amount = Long.valueOf(parameters[1]);
132+
} catch (NumberFormatException e) {
133+
logger.error("amount invalid");
134+
return false;
135+
}
136+
137+
if (amount < 0) {
138+
logger.error("amount required a positive number");
139+
return false;
140+
}
141+
142+
return true;
143+
}
80144
}

src/main/java/org/tron/consensus/client/Client.java

Lines changed: 143 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818
import io.atomix.catalyst.transport.netty.NettyTransport;
1919
import io.atomix.copycat.client.ConnectionStrategies;
2020
import io.atomix.copycat.client.CopycatClient;
21-
import org.apache.kafka.clients.consumer.ConsumerRecord;
22-
import org.apache.kafka.clients.consumer.ConsumerRecords;
2321
import org.tron.consensus.common.GetQuery;
2422
import org.tron.consensus.common.PutCommand;
25-
import org.tron.overlay.kafka.ConsumerWorker;
23+
import org.tron.overlay.message.Message;
24+
import org.tron.overlay.message.Type;
25+
import org.tron.peer.Peer;
2626

2727
import java.net.InetAddress;
2828
import java.net.UnknownHostException;
@@ -45,6 +45,11 @@ public class Client{
4545
client.serializer().register(PutCommand.class);
4646
client.serializer().register(GetQuery.class);
4747

48+
/*Collection<Address> cluster = Arrays.asList(
49+
new Address("192.168.0.109", 5000)
50+
);
51+
CompletableFuture<CopycatClient> future = client.connect(cluster);
52+
future.join();*/
4853
InetAddress localhost = null;
4954
try {
5055
localhost = InetAddress.getLocalHost();
@@ -71,28 +76,143 @@ public static void putMessage(String[] args) {
7176
}
7277

7378
public static void getMessage1(String key) {
74-
75-
client.submit(new GetQuery(key)).thenAccept(result -> {
76-
System.out.println("Consensus " + key + " is: " + result);
77-
});
78-
// Thread thread = new Thread(() -> {
79-
// while (true) {
80-
// try {
81-
// client.submit(new GetQuery(key)).thenAccept(result -> {
82-
// System.out.println("Consensus " + key + " is: " +
83-
// result);
84-
// });
85-
// Thread.sleep(5000);
86-
// } catch (InterruptedException e) {
87-
// e.printStackTrace();
88-
// }
89-
// }
90-
// });
91-
//thread.start();
92-
}
93-
public static void getMessage(String key) {
9479
Object result = client.submit(new GetQuery(key)).join();
9580
System.out.println("Consensus " + key + " is: " + result);
81+
}
82+
83+
public static void putMessage1(Message message) {
84+
if (message.getType() == Type.TRANSACTION) {
85+
/*
86+
System.out.println("transaction:" + message.getType().toString()
87+
+ "; type: " + message.getMessage().getClass().getSimpleName
88+
() + "; message: " + message.getMessage()); */
89+
client.submit(new PutCommand("transaction", message.getMessage()));
90+
client.submit(new PutCommand("time", System.currentTimeMillis()));
91+
System.out.println("transaction: consensus success");
92+
}
93+
94+
if (message.getType() == Type.BLOCK) {
95+
/*
96+
System.out.println("block:" + message.getType().toString()
97+
+ "; type: " + message.getMessage().getClass().getSimpleName
98+
() + "; message:" + message.getMessage());*/
99+
100+
//client.submit(new PutCommand("block", message.getMessage()));
101+
//System.out.println("Block: consensus success");
102+
103+
int i = 1;
104+
boolean f = true;
105+
while(f){
106+
String block_key = "block" + i;
107+
Object block = client.submit(new GetQuery(block_key)).join();
108+
try {
109+
if (!(block == null)) {
110+
f =true;
111+
i = i+1;
112+
}else {
113+
client.submit(new PutCommand(block_key, message.getMessage()));
114+
System.out.println("Block: consensus success");
115+
f = false;
116+
}
117+
} catch (NullPointerException e) {
118+
e.printStackTrace();
119+
System.out.println("object == null");
120+
}
121+
}
122+
}
123+
}
124+
125+
public static void getMessage(Peer peer,String key) {
126+
final String[] preMessage = {null};
127+
final String[] preTime = {null};
128+
if (key.equals("transaction")) {
129+
Thread thread = new Thread(() -> {
130+
while(true){
131+
Object time = client.submit(new GetQuery("time")).join();
132+
if(!time.toString().equals(preTime[0])) {
133+
client.submit(new GetQuery(key)).thenAccept(transaction
134+
-> {
135+
//System.out.println("Consensus " + key + " is: " + result);
136+
//System.out.println("type: " + result.getClass().getSimpleName());
137+
peer.addReceiveTransaction(String.valueOf(transaction));
138+
});
139+
preTime[0] = time.toString();
140+
}else {
141+
preTime[0] = preTime[0];
142+
}
143+
try {
144+
Thread.sleep(3000);
145+
}catch (Exception e){
146+
e.printStackTrace();
147+
}
148+
}
149+
});
150+
thread.start();
151+
}
96152

153+
if (key.equals("block")) {
154+
Thread thread = new Thread(() -> {
155+
while(true){
156+
int i = 1;
157+
boolean f = true;
158+
String block_key;
159+
while(f){
160+
block_key = "block" + i;
161+
Object block = client.submit(new GetQuery(block_key)).join();
162+
try {
163+
if (!(block == null)) {
164+
f =true;
165+
i = i+1;
166+
}else {
167+
f = false;
168+
}
169+
} catch (NullPointerException e) {
170+
e.printStackTrace();
171+
}
172+
}
173+
174+
i = i-1;
175+
String finalBlock_key = "block" + i;
176+
client.submit(new GetQuery(finalBlock_key)).thenAccept(block -> {
177+
/*System.out.println("Consensus " + key + " is: " +
178+
block);*/
179+
if (!String.valueOf(block).equals(preMessage[0])) {
180+
peer.addReceiveBlock(String.valueOf(block));
181+
preMessage[0] = String.valueOf(block);
182+
}else {
183+
preMessage[0] = preMessage[0];
184+
}
185+
});
186+
try {
187+
Thread.sleep(3000);
188+
} catch (Exception e) {
189+
e.printStackTrace();
190+
}
191+
}
192+
});
193+
thread.start();
194+
}
195+
}
196+
public static void loadBlock(Peer peer) {
197+
int i = 2;
198+
final boolean[] f = {true};
199+
while (f[0]) {
200+
String block_key = "block" + i;
201+
client.submit(new GetQuery(block_key)).thenAccept((Object block) -> {
202+
if (!(block == null)) {
203+
peer.addReceiveBlock(String.valueOf
204+
(block));
205+
f[0] = true;
206+
} else{
207+
f[0] = false;
208+
}
209+
});
210+
i++;
211+
try {
212+
Thread.sleep(3000);
213+
} catch (InterruptedException e) {
214+
e.printStackTrace();
215+
}
216+
}
97217
}
98218
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* java-tron is free software: you can redistribute it and/or modify
3+
* it under the terms of the GNU General Public License as published by
4+
* the Free Software Foundation, either version 3 of the License, or
5+
* (at your option) any later version.
6+
*
7+
* java-tron is distributed in the hope that it will be useful,
8+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
9+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10+
* GNU General Public License for more details.
11+
*
12+
* You should have received a copy of the GNU General Public License
13+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
14+
*/
15+
package org.tron.consensus.client;
16+
17+
public class MessageType {
18+
public final static String TRANSACTION = "transaction";
19+
public final static String BLOCK = "block";
20+
}

src/main/java/org/tron/consensus/server/Server.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,7 @@ public static void serverRun() {
3636
localhost = InetAddress.getLocalHost();
3737
System.out.println("Server localhost: " + localhost.getHostAddress
3838
());
39-
4039
Address address = new Address(localhost.getHostAddress(), 5000);
41-
4240
CopycatServer server = CopycatServer.builder(address)
4341
.withStateMachine(MapstateMachine::new)
4442
.withTransport(NettyTransport.builder()
@@ -57,20 +55,12 @@ public static void serverRun() {
5755
future.join();
5856

5957
//Collection<Address> cluster = Collections.singleton(new Address
60-
// ("192.16.50.129", 5000));
58+
// ("192.168.0.100", 5000));
6159
//server.join(cluster).join();
6260

6361
System.out.println("Server xxd: " + server.cluster().members());
64-
6562
CopycatServer.State state = server.state();
6663
System.out.println("Server state: " + state);
67-
server.onStateChange(state1 -> {
68-
if (state == CopycatServer.State.LEADER) {
69-
System.out.println("Server state: " + state);
70-
}
71-
});
72-
server.context();
73-
7464
} catch (UnknownHostException e) {
7565
e.printStackTrace();
7666
}

0 commit comments

Comments
 (0)