Skip to content

Commit b207fee

Browse files
committed
mongodb support
1 parent 9ef21d9 commit b207fee

File tree

16 files changed

+742
-1
lines changed

16 files changed

+742
-1
lines changed

app/src/main/java/org/tron/eventplugin/app/PluginLauncher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class PluginLauncher {
3434
private static final Logger logger = LoggerFactory.getLogger(PluginLauncher.class);
3535

3636
public static void main(String[] args) {
37-
String path = "/home/java-tron/plugin-kafka-1.0.0.zip";
37+
String path = "/Users/tron/sourcecode/eventplugin/build/plugins/plugin-mongodb-1.0.0.zip";
3838

3939
File dir = new File(path);
4040
// create the plugin manager

plugins/mongodbplugin/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
build

plugins/mongodbplugin/build.gradle

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
dependencies {
2+
// compileOnly important!!! We do not want to put the api into the zip file since the main program has it already!
3+
compileOnly project(':api')
4+
compileOnly(group: 'org.pf4j', name: 'pf4j', version: "${pf4jVersion}") {
5+
exclude group: "org.slf4j"
6+
}
7+
compileOnly group: 'org.apache.commons', name: 'commons-lang3', version: '3.5'
8+
testCompile group: 'junit', name: 'junit', version: '4.+'
9+
10+
compileOnly (group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3') {
11+
exclude group: "org.slf4j"
12+
}
13+
14+
compileOnly ("org.projectlombok:lombok:1.16.18") {
15+
exclude group: "org.slf4j"
16+
}
17+
18+
compile ("org.springframework.data:spring-data-mongodb:2.0.0.RELEASE") {
19+
exclude group: "org.slf4j"
20+
}
21+
compile ("org.mongodb:mongo-java-driver:3.1.0"){
22+
exclude group: "org.slf4j"
23+
}
24+
25+
compileOnly group: 'com.alibaba', name: 'fastjson', version: '1.2.44'
26+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
version=1.0.0
2+
3+
pluginId=mongodb
4+
pluginClass=org.tron.eventplugin.MongodbLogFilterPlugin
5+
pluginProvider=Tron
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package org.tron.eventplugin;
2+
3+
public class Constant {
4+
public static final int BLOCK_TRIGGER = 0;
5+
public static final int TRANSACTION_TRIGGER = 1;
6+
public static final int CONTRACTLOG_TRIGGER = 2;
7+
public static final int CONTRACTEVENT_TRIGGER = 3;
8+
9+
public static final String BLOCK_TRIGGER_NAME = "blockTrigger";
10+
public static final String TRANSACTION_TRIGGER_NAME = "transactionTrigger";
11+
public static final String CONTRACTLOG_TRIGGER_NAME = "contractLogTrigger";
12+
public static final String CONTRACTEVENT_TRIGGER_NAME = "contractEventTrigger";
13+
14+
private Constant(){}
15+
}
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
package org.tron.eventplugin;
2+
import org.slf4j.Logger;
3+
import org.slf4j.LoggerFactory;
4+
5+
import java.text.SimpleDateFormat;
6+
import java.util.*;
7+
import java.util.concurrent.BlockingQueue;
8+
import java.util.concurrent.LinkedBlockingQueue;
9+
import java.util.concurrent.TimeUnit;
10+
11+
public class MessageSenderImpl{
12+
private static MessageSenderImpl instance = null;
13+
private static final Logger log = LoggerFactory.getLogger(MessageSenderImpl.class);
14+
15+
private String serverAddress = "";
16+
private boolean loaded = false;
17+
18+
private BlockingQueue<Object> triggerQueue = new LinkedBlockingQueue();
19+
20+
private String blockTopic = "";
21+
private String transactionTopic = "";
22+
private String contractEventTopic = "";
23+
private String contractLogTopic = "";
24+
25+
private Thread triggerProcessThread;
26+
private boolean isRunTriggerProcessThread = true;
27+
28+
29+
public static MessageSenderImpl getInstance(){
30+
if (Objects.isNull(instance)) {
31+
synchronized (MessageSenderImpl.class){
32+
if (Objects.isNull(instance)){
33+
instance = new MessageSenderImpl();
34+
}
35+
}
36+
}
37+
38+
return instance;
39+
}
40+
41+
public void setServerAddress(String address){
42+
this.serverAddress = address;
43+
}
44+
45+
public void init(){
46+
47+
if (loaded){
48+
return;
49+
}
50+
51+
triggerProcessThread = new Thread(triggerProcessLoop);
52+
triggerProcessThread.start();
53+
54+
loaded = true;
55+
}
56+
57+
public void setTopic(int triggerType, String topic){
58+
if (triggerType == Constant.BLOCK_TRIGGER){
59+
blockTopic = topic;
60+
}
61+
else if (triggerType == Constant.TRANSACTION_TRIGGER){
62+
transactionTopic = topic;
63+
}
64+
else if (triggerType == Constant.CONTRACTEVENT_TRIGGER){
65+
contractEventTopic = topic;
66+
}
67+
else if (triggerType == Constant.CONTRACTLOG_TRIGGER){
68+
contractLogTopic = topic;
69+
}
70+
}
71+
72+
private void printTimestamp(String data){
73+
Date date = new Date();
74+
SimpleDateFormat ft = new SimpleDateFormat("hh:mm:ss:SSS");
75+
System.out.println(ft.format(date) + ": " + data);
76+
}
77+
78+
79+
public void close() {
80+
}
81+
82+
public void insertDBRecord(int eventType, String eventTopic, Object data){
83+
System.out.println("insertDBRecord: " + eventType + ", " + eventTopic + ", " + data);
84+
}
85+
86+
public BlockingQueue<Object> getTriggerQueue(){
87+
return triggerQueue;
88+
}
89+
90+
public void handleBlockEvent(Object data) {
91+
if (blockTopic == null || blockTopic.length() == 0){
92+
return;
93+
}
94+
95+
MessageSenderImpl.getInstance().insertDBRecord(Constant.BLOCK_TRIGGER, blockTopic, data);
96+
}
97+
98+
public void handleTransactionTrigger(Object data) {
99+
if (Objects.isNull(data) || Objects.isNull(transactionTopic)){
100+
return;
101+
}
102+
103+
MessageSenderImpl.getInstance().insertDBRecord(Constant.TRANSACTION_TRIGGER, transactionTopic, data);
104+
}
105+
106+
public void handleContractLogTrigger(Object data) {
107+
if (Objects.isNull(data) || Objects.isNull(contractLogTopic)){
108+
return;
109+
}
110+
111+
MessageSenderImpl.getInstance().insertDBRecord(Constant.CONTRACTLOG_TRIGGER, contractLogTopic, data);
112+
}
113+
114+
public void handleContractEventTrigger(Object data) {
115+
if (Objects.isNull(data) || Objects.isNull(contractEventTopic)){
116+
return;
117+
}
118+
119+
MessageSenderImpl.getInstance().insertDBRecord(Constant.CONTRACTEVENT_TRIGGER, contractEventTopic, data);
120+
}
121+
122+
private Runnable triggerProcessLoop =
123+
() -> {
124+
while (isRunTriggerProcessThread) {
125+
try {
126+
String triggerData = (String)triggerQueue.poll(1, TimeUnit.SECONDS);
127+
128+
if (Objects.isNull(triggerData)){
129+
continue;
130+
}
131+
132+
if (triggerData.contains(Constant.BLOCK_TRIGGER_NAME)){
133+
handleBlockEvent(triggerData);
134+
}
135+
else if (triggerData.contains(Constant.TRANSACTION_TRIGGER_NAME)){
136+
handleTransactionTrigger(triggerData);
137+
}
138+
else if (triggerData.contains(Constant.CONTRACTLOG_TRIGGER_NAME)){
139+
handleContractLogTrigger(triggerData);
140+
}
141+
else if (triggerData.contains(Constant.CONTRACTEVENT_TRIGGER_NAME)){
142+
handleContractEventTrigger(triggerData);
143+
}
144+
} catch (InterruptedException ex) {
145+
log.info(ex.getMessage());
146+
Thread.currentThread().interrupt();
147+
} catch (Exception ex) {
148+
log.error("unknown exception happened in process capsule loop", ex);
149+
} catch (Throwable throwable) {
150+
log.error("unknown throwable happened in process capsule loop", throwable);
151+
}
152+
}
153+
};
154+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package org.tron.eventplugin;
2+
3+
import org.pf4j.Extension;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
import org.tron.common.logsfilter.IPluginEventListener;
7+
import java.util.Objects;
8+
9+
@Extension
10+
public class MongodbEventListener implements IPluginEventListener {
11+
12+
private static final Logger log = LoggerFactory.getLogger(MongodbEventListener.class);
13+
14+
@Override
15+
public void setServerAddress(String address) {
16+
17+
if (Objects.isNull(address) || address.length() == 0){
18+
return;
19+
}
20+
21+
MessageSenderImpl.getInstance().setServerAddress(address);
22+
23+
// MessageSenderImpl should never init until server address is set
24+
MessageSenderImpl.getInstance().init();
25+
}
26+
27+
@Override
28+
public void setTopic(int eventType, String topic) {
29+
MessageSenderImpl.getInstance().setTopic(eventType, topic);
30+
}
31+
32+
@Override
33+
public void handleBlockEvent(Object data) {
34+
35+
if (Objects.isNull(data)){
36+
return;
37+
}
38+
39+
MessageSenderImpl.getInstance().getTriggerQueue().offer((String)data);
40+
}
41+
42+
@Override
43+
public void handleTransactionTrigger(Object data) {
44+
if (Objects.isNull(data)){
45+
return;
46+
}
47+
48+
MessageSenderImpl.getInstance().getTriggerQueue().offer((String)data);
49+
}
50+
51+
@Override
52+
public void handleContractLogTrigger(Object data) {
53+
if (Objects.isNull(data)){
54+
return;
55+
}
56+
57+
MessageSenderImpl.getInstance().getTriggerQueue().offer((String)data);
58+
}
59+
60+
@Override
61+
public void handleContractEventTrigger(Object data) {
62+
if (Objects.isNull(data)){
63+
return;
64+
}
65+
66+
MessageSenderImpl.getInstance().getTriggerQueue().offer((String)data);
67+
}
68+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package org.tron.eventplugin;
2+
3+
import org.pf4j.Plugin;
4+
import org.pf4j.PluginWrapper;
5+
6+
public class MongodbLogFilterPlugin extends Plugin {
7+
8+
public MongodbLogFilterPlugin(PluginWrapper wrapper) {
9+
super(wrapper);
10+
}
11+
12+
@Override
13+
public void start() {
14+
}
15+
16+
@Override
17+
public void stop() {
18+
MessageSenderImpl.getInstance().close();
19+
}
20+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package org.tron.orm.mongo;
2+
3+
import org.tron.orm.mongo.entity.EventLogEntity;
4+
5+
import java.util.List;
6+
7+
/**
8+
* 事件日志MongoDB基础接口
9+
*
10+
* @param <T>
11+
*/
12+
public interface EventLogMongoDao<T> extends MongoBaseDao {
13+
public List<EventLogEntity> findAll(String contractAddress, String collectionName);
14+
15+
public EventLogEntity findOne(String contractAddress, String collectionName);
16+
17+
public List<EventLogEntity> findAll(String contractAddress, String entryName, String collectionName);
18+
19+
public EventLogEntity findOne(String contractAddress, String entryName, String collectionName);
20+
21+
public List<EventLogEntity> findAll(String contractAddress, String entryName, long blockNumber, String collectionName);
22+
23+
public EventLogEntity findOne(String contractAddress, String entryName, long blockNumber, String collectionName);
24+
25+
public List<EventLogEntity> findAllByTransactionId(String transactionId, String collectionName);
26+
27+
public EventLogEntity findOneByTransactionId(String transactionId, String collectionName);
28+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.tron.orm.mongo;
2+
3+
/**
4+
* MongoDB
5+
*
6+
* @param <T>
7+
*/
8+
public interface MongoBaseDao<T> {
9+
10+
public void insert(T object, String collectionName);
11+
12+
}

0 commit comments

Comments
 (0)