Skip to content

Commit b3b639c

Browse files
committed
parse mongodb conf and insert data to collections
1 parent 8e2a09b commit b3b639c

17 files changed

+711
-567
lines changed

plugins/mongodbplugin/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ dependencies {
1818
compile ("org.springframework.data:spring-data-mongodb:2.0.0.RELEASE") {
1919
exclude group: "org.slf4j"
2020
}
21-
compile ("org.mongodb:mongo-java-driver:3.1.0"){
21+
compile ("org.mongodb:mongo-java-driver:3.6.3"){
2222
exclude group: "org.slf4j"
2323
}
2424

plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbEventListener.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@
44
import org.slf4j.Logger;
55
import org.slf4j.LoggerFactory;
66
import org.tron.common.logsfilter.IPluginEventListener;
7+
import org.tron.mongodb.MongoConfig;
8+
import org.tron.mongodb.MongoManager;
9+
import org.tron.mongodb.MongoTemplate;
10+
11+
import java.util.Map;
712
import java.util.Objects;
813

914
@Extension

plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbSenderImpl.java

Lines changed: 66 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,14 @@
11
package org.tron.eventplugin;
22
import org.slf4j.Logger;
33
import org.slf4j.LoggerFactory;
4-
import org.springframework.beans.factory.annotation.Autowired;
5-
import org.tron.common.logsfilter.trigger.BlockLogTrigger;
6-
import org.tron.common.logsfilter.trigger.ContractEventTrigger;
7-
import org.tron.common.logsfilter.trigger.ContractLogTrigger;
8-
import org.tron.common.logsfilter.trigger.TransactionLogTrigger;
9-
import org.tron.orm.service.impl.EventLogServiceImpl;
10-
11-
import java.io.IOException;
124
import java.util.*;
135
import java.util.concurrent.BlockingQueue;
146
import java.util.concurrent.LinkedBlockingQueue;
157
import java.util.concurrent.TimeUnit;
168
import com.fasterxml.jackson.databind.ObjectMapper;
9+
import org.tron.mongodb.MongoConfig;
10+
import org.tron.mongodb.MongoManager;
11+
import org.tron.mongodb.MongoTemplate;
1712

1813
public class MongodbSenderImpl{
1914
private static MongodbSenderImpl instance = null;
@@ -32,10 +27,10 @@ public class MongodbSenderImpl{
3227
private Thread triggerProcessThread;
3328
private boolean isRunTriggerProcessThread = true;
3429

35-
private ObjectMapper mapper = new ObjectMapper();
30+
private MongoManager mongoManager;
31+
private Map<String, MongoTemplate> mongoTemplateMap;
3632

37-
@Autowired
38-
private EventLogServiceImpl eventLogService;
33+
private ObjectMapper mapper = new ObjectMapper();
3934

4035

4136
public static MongodbSenderImpl getInstance(){
@@ -60,14 +55,53 @@ public void init(){
6055
return;
6156
}
6257

63-
eventLogService = new EventLogServiceImpl();
64-
6558
triggerProcessThread = new Thread(triggerProcessLoop);
6659
triggerProcessThread.start();
67-
6860
loaded = true;
61+
62+
initMongoConfig();
63+
64+
}
65+
66+
private void initMongoConfig(){
67+
mongoManager = new MongoManager();
68+
mongoTemplateMap = new HashMap<>();
69+
70+
MongoConfig config = new MongoConfig();
71+
config.setDbName("eventlog");
72+
config.setHost("127.0.0.1");
73+
config.setPort(27017);
74+
config.setUsername("tron");
75+
config.setPassword("123456");
76+
77+
mongoManager.initConfig(config);
78+
}
79+
80+
private MongoTemplate createMongoTemplate(final String collectionName){
81+
82+
MongoTemplate template = mongoTemplateMap.get(collectionName);
83+
if (Objects.nonNull(template)){
84+
return template;
85+
}
86+
87+
template = new MongoTemplate(mongoManager) {
88+
@Override
89+
protected String collectionName() {
90+
return collectionName;
91+
}
92+
93+
@Override
94+
protected <T> Class<T> getReferencedClass() {
95+
return null;
96+
}
97+
};
98+
99+
mongoTemplateMap.put(collectionName, template);
100+
101+
return template;
69102
}
70103

104+
71105
public void setTopic(int triggerType, String topic){
72106
if (triggerType == Constant.BLOCK_TRIGGER){
73107
blockTopic = topic;
@@ -81,6 +115,12 @@ else if (triggerType == Constant.CONTRACTEVENT_TRIGGER){
81115
else if (triggerType == Constant.CONTRACTLOG_TRIGGER){
82116
contractLogTopic = topic;
83117
}
118+
else {
119+
return;
120+
}
121+
122+
mongoManager.createCollection(topic);
123+
createMongoTemplate(topic);
84124
}
85125

86126
public void close() {
@@ -95,61 +135,43 @@ public void handleBlockEvent(Object data) {
95135
return;
96136
}
97137

98-
BlockLogTrigger trigger = new BlockLogTrigger();
99-
100-
try {
101-
trigger = mapper.readValue((String)data, BlockLogTrigger.class);
102-
} catch (IOException e) {
103-
log.error("{}", e);
138+
MongoTemplate template = mongoTemplateMap.get(blockTopic);
139+
if (Objects.nonNull(template)){
140+
template.addEntity((String)data);
104141
}
105-
eventLogService.insertBlockTrigger(trigger);
106142
}
107143

108144
public void handleTransactionTrigger(Object data) {
109145
if (Objects.isNull(data) || Objects.isNull(transactionTopic)){
110146
return;
111147
}
112148

113-
TransactionLogTrigger trigger = new TransactionLogTrigger();
114-
115-
try {
116-
trigger = mapper.readValue((String)data, TransactionLogTrigger.class);
117-
} catch (IOException e) {
118-
log.error("{}", e);
149+
MongoTemplate template = mongoTemplateMap.get(transactionTopic);
150+
if (Objects.nonNull(template)){
151+
template.addEntity((String)data);
119152
}
120-
eventLogService.insertTransactionTrigger(trigger);
121-
122153
}
123154

124155
public void handleContractLogTrigger(Object data) {
125156
if (Objects.isNull(data) || Objects.isNull(contractLogTopic)){
126157
return;
127158
}
128159

129-
ContractLogTrigger trigger = new ContractLogTrigger();
130-
131-
try {
132-
trigger = mapper.readValue((String)data, ContractLogTrigger.class);
133-
} catch (IOException e) {
134-
log.error("{}", e);
160+
MongoTemplate template = mongoTemplateMap.get(contractLogTopic);
161+
if (Objects.nonNull(template)){
162+
template.addEntity((String)data);
135163
}
136-
137-
eventLogService.insertContractLogTrigger(trigger);
138164
}
139165

140166
public void handleContractEventTrigger(Object data) {
141167
if (Objects.isNull(data) || Objects.isNull(contractEventTopic)){
142168
return;
143169
}
144170

145-
ContractEventTrigger trigger = new ContractEventTrigger();
146-
147-
try {
148-
trigger = mapper.readValue((String)data, ContractEventTrigger.class);
149-
} catch (IOException e) {
150-
log.error("{}", e);
171+
MongoTemplate template = mongoTemplateMap.get(contractEventTopic);
172+
if (Objects.nonNull(template)){
173+
template.addEntity((String)data);
151174
}
152-
eventLogService.insertContractEventTrigger(trigger);
153175
}
154176

155177
private Runnable triggerProcessLoop =
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package org.tron.mongodb;
2+
3+
import java.io.Serializable;
4+
5+
public class BaseEntity implements Serializable {
6+
7+
private Object _id;
8+
9+
public Object get_id() {
10+
return _id;
11+
}
12+
13+
public void set_id(Object _id) {
14+
this._id = _id;
15+
}
16+
17+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package org.tron.mongodb;
2+
3+
public class MongoConfig {
4+
5+
private String host;
6+
private int port = 27017;
7+
private String dbName;
8+
private String username;
9+
private String password;
10+
private int connectionsPerHost = 10;
11+
private int threadsAllowedToBlockForConnectionMultiplier = 10;
12+
13+
public String getHost() {
14+
return host;
15+
}
16+
17+
public void setHost(String host) {
18+
this.host = host;
19+
}
20+
21+
public int getPort() {
22+
return port;
23+
}
24+
25+
public void setPort(int port) {
26+
this.port = port;
27+
}
28+
29+
public String getDbName() {
30+
return dbName;
31+
}
32+
33+
public void setDbName(String dbName) {
34+
this.dbName = dbName;
35+
}
36+
37+
public String getUsername() {
38+
return username;
39+
}
40+
41+
public void setUsername(String username) {
42+
this.username = username;
43+
}
44+
45+
public String getPassword() {
46+
return password;
47+
}
48+
49+
public void setPassword(String password) {
50+
this.password = password;
51+
}
52+
53+
public int getConnectionsPerHost() {
54+
return connectionsPerHost;
55+
}
56+
57+
public void setConnectionsPerHost(int connectionsPerHost) {
58+
this.connectionsPerHost = connectionsPerHost;
59+
}
60+
61+
public int getThreadsAllowedToBlockForConnectionMultiplier() {
62+
return threadsAllowedToBlockForConnectionMultiplier;
63+
}
64+
65+
public void setThreadsAllowedToBlockForConnectionMultiplier(int threadsAllowedToBlockForConnectionMultiplier) {
66+
this.threadsAllowedToBlockForConnectionMultiplier = threadsAllowedToBlockForConnectionMultiplier;
67+
}
68+
69+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package org.tron.mongodb;
2+
3+
import com.mongodb.MongoClient;
4+
import com.mongodb.MongoClientOptions;
5+
import com.mongodb.MongoCredential;
6+
import com.mongodb.ServerAddress;
7+
import com.mongodb.client.MongoDatabase;
8+
import org.pf4j.util.StringUtils;
9+
import org.tron.common.logsfilter.trigger.BlockLogTrigger;
10+
import org.tron.eventplugin.Constant;
11+
12+
import java.util.ArrayList;
13+
import java.util.List;
14+
import java.util.Objects;
15+
16+
public class MongoManager {
17+
18+
private MongoClient mongo;
19+
private MongoDatabase db;
20+
21+
public void initConfig(MongoConfig config) {
22+
int connectionsPerHost = config.getConnectionsPerHost();
23+
int threadsAllowedToBlockForConnectionMultiplier = config.getThreadsAllowedToBlockForConnectionMultiplier();
24+
MongoClientOptions options = MongoClientOptions.builder().connectionsPerHost(connectionsPerHost)
25+
.threadsAllowedToBlockForConnectionMultiplier(threadsAllowedToBlockForConnectionMultiplier).build();
26+
27+
String host = config.getHost();
28+
int port = config.getPort();
29+
ServerAddress serverAddress = new ServerAddress(host, port);
30+
List<ServerAddress> addrs = new ArrayList<ServerAddress>();
31+
addrs.add(serverAddress);
32+
33+
String username = config.getUsername();
34+
String password = config.getPassword();
35+
String databaseName = config.getDbName();
36+
MongoCredential credential = MongoCredential.createScramSha1Credential(username, databaseName,
37+
password.toCharArray());
38+
List<MongoCredential> credentials = new ArrayList<MongoCredential>();
39+
credentials.add(credential);
40+
41+
mongo = new MongoClient(addrs, credential, options);
42+
db = mongo.getDatabase(databaseName);
43+
}
44+
45+
public void createCollection(String collectionName) {
46+
if (db != null && StringUtils.isNotNullOrEmpty(collectionName)) {
47+
if (Objects.isNull(db.getCollection(collectionName))){
48+
db.createCollection(collectionName);
49+
}
50+
}
51+
}
52+
53+
public MongoClient getMongo() {
54+
return mongo;
55+
}
56+
57+
public void setMongo(MongoClient mongo) {
58+
this.mongo = mongo;
59+
}
60+
61+
public MongoDatabase getDb() {
62+
return db;
63+
}
64+
65+
public void setDb(MongoDatabase db) {
66+
this.db = db;
67+
}
68+
69+
}

0 commit comments

Comments
 (0)