diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..dd6df5f --- /dev/null +++ b/.gitignore @@ -0,0 +1,14 @@ +build +/.idea/ +/.gradle/ +/lib/ +.settings/ +build/ +.classpath +.project +.DS_Store +*/build +*/logs/ +*.log +/logs +out/ diff --git a/README.md b/README.md index 5affc46..30da8fb 100644 --- a/README.md +++ b/README.md @@ -12,25 +12,28 @@ This is an implementation of Tron eventsubscribe model. 2. Go to eventplugin `cd eventplugin` 3. run `./gradlew build` -* This will produce one plugin zip, named `plugin-kafka-1.0.0.zip`, located in the `eventplugin/build/plugins/` directory. +* This will produce plugin zips, named `plugin-kafka-1.0.0.zip` and `plugin-mongodb-1.0.0.zip`, located in the `eventplugin/build/plugins/` directory. -### Edit **config.conf** of Java-tron, add the following fileds: +### Edit **config.conf** of Java-tron, add the following fileds: ``` event.subscribe = { path = "" // absolute path of plugin server = "" // target server address to receive event triggers - dbconfig="" // dbname|username|password + dbconfig = "" // dbname|username|password, if you want to create indexes for collections when the collections are not exist, you can add version and set it to 2, as dbname|username|password|version topics = [ { triggerName = "block" // block trigger, the value can't be modified enable = false topic = "block" // plugin topic, the value could be modified + solidified = true // if set true, just need solidified block, default is false }, { triggerName = "transaction" enable = false topic = "transaction" + solidified = true + ethCompatible = true // if set true, add transactionIndex, cumulativeEnergyUsed, preCumulativeLogCount, logList, energyUnitPrice, default is false }, { triggerName = "contractevent" @@ -41,6 +44,23 @@ event.subscribe = { triggerName = "contractlog" enable = true topic = "contractlog" + redundancy = true // if set true, contractevent will also be regarded as contractlog + }, + { + triggerName = "solidity" // solidity block trigger(just include solidity block number and timestamp), the value can't be modified + enable = true // the default value is true + topic = "solidity" + }, + { + triggerName = "solidityevent" + enable = false + topic = "solidityevent" + }, + { + triggerName = "soliditylog" + enable = false + topic = "soliditylog" + redundancy = true // if set true, solidityevent will also be regarded as soliditylog } ] @@ -59,13 +79,17 @@ event.subscribe = { ``` - * **path**: is the absolute path of "plugin-kafka-1.0.0.zip" - * **server**: Kafka server address - * **topics**: each event type maps to one Kafka topic, we support four event types subscribing, block, transaction, contractlog and contractevent. - * **dbconfig**: db configuration information for mongodb, if using kafka, delete this one; if using Mongodb, add like that dbname|username|password - * **triggerName**: the trigger type, the value can't be modified. - * **enable**: plugin can receive nothing if the value is false. - * **topic**: the value is the kafka topic to receive events. Make sure it has been created and Kafka process is running + * **path**: is the absolute path of "plugin-kafka-1.0.0.zip" or "plugin-mongodb-1.0.0.zip" + * **server**: Kafka(or MongoDB) server address, the default port is 9092(MongoDB is 27017) + * **dbconfig**: db configuration information for mongodb, if using kafka, delete this one; if using Mongodb, add like that dbname|username|password or dbname|username|password|version if you want to create indexes when init + * **topics**: each event type maps to one Kafka topic(or MongoDB collection), we support seven event types subscribing, block, transaction, contractlog, contractevent, solidity, soliditylog and solidityevent. + **triggerName**: the trigger type, the value can't be modified. + **enable**: plugin can receive nothing if the value is false. + **topic**: the value is the kafka topic to receive events. Make sure it has been created and Kafka process is running + **solidified**: if just need solidified data, just works for block and transaction + **redundancy**: if will also trigger event as log, just works for contractlog and soliditylog + **ethCompatible**: if set to true, will add some fields to transaction: transactionIndex, cumulativeEnergyUsed, preCumulativeLogCount, logList, energyUnitPrice + * **filter**: filter condition for process trigger. **note**: if the server is not 127.0.0.1, pls set some properties in config/server.properties file remove comment and set listeners=PLAINTEXT://:9092 @@ -87,9 +111,6 @@ mv kafka_2.10-0.10.2.2 kafka add "export PATH=$PATH:/usr/local/kafka/bin" to end of /etc/profile source /etc/profile - -kafka-server-start.sh /usr/local/kafka/config/server.properties & - ``` **Note**: make sure the version of Kafka is the same as the version set in build.gradle of eventplugin project.(kafka_2.10-0.10.2.2 kafka) @@ -114,14 +135,20 @@ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partit kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic transaction kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic contractlog kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic contractevent +kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic solidity +kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic solidityevent +kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic soliditylog ``` **On Linux**: ``` -kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic block +kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic block kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic transaction kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic contractlog kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic contractevent +kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic solidity +kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic solidityevent +kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic soliditylog ``` #### Kafka consumer @@ -132,6 +159,9 @@ kafka-console-consumer --bootstrap-server localhost:9092 --topic block kafka-console-consumer --bootstrap-server localhost:9092 --topic transaction kafka-console-consumer --bootstrap-server localhost:9092 --topic contractlog kafka-console-consumer --bootstrap-server localhost:9092 --topic contractevent +kafka-console-consumer --bootstrap-server localhost:9092 --topic solidity +kafka-console-consumer --bootstrap-server localhost:9092 --topic solidityevent +kafka-console-consumer --bootstrap-server localhost:9092 --topic soliditylog ``` **On Linux**: @@ -140,12 +170,15 @@ kafka-console-consumer.sh --zookeeper localhost:2181 --topic block kafka-console-consumer.sh --zookeeper localhost:2181 --topic transaction kafka-console-consumer.sh --zookeeper localhost:2181 --topic contractlog kafka-console-consumer.sh --zookeeper localhost:2181 --topic contractevent +kafka-console-consumer.sh --zookeeper localhost:2181 --topic solidity +kafka-console-consumer.sh --zookeeper localhost:2181 --topic solidityevent +kafka-console-consumer.sh --zookeeper localhost:2181 --topic soliditylog ``` ### Load plugin in Java-tron * add --es to command line, for example: ``` - java -jar FullNode.jar -p privatekey -c config.conf --es + java -jar FullNode.jar -c config.conf --es ``` diff --git a/api/build.gradle b/api/build.gradle index 76b0804..e69de29 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -1,11 +0,0 @@ -dependencies { - compile (group: 'org.pf4j', name: 'pf4j', version: "${pf4jVersion}") - - compileOnly (group: 'org.apache.commons', name: 'commons-lang3', version: '3.5') - - testCompile group: 'junit', name: 'junit', version: '4.+' - - compileOnly ("org.projectlombok:lombok:1.16.18") { - exclude group: "org.slf4j" - } -} diff --git a/api/src/main/java/org/tron/common/logsfilter/IPluginEventListener.java b/api/src/main/java/org/tron/common/logsfilter/IPluginEventListener.java index b1d57b5..58fdf4b 100644 --- a/api/src/main/java/org/tron/common/logsfilter/IPluginEventListener.java +++ b/api/src/main/java/org/tron/common/logsfilter/IPluginEventListener.java @@ -22,4 +22,9 @@ public interface IPluginEventListener extends ExtensionPoint { void handleSolidityTrigger(Object trigger); + void handleSolidityLogTrigger(Object trigger); + + void handleSolidityEventTrigger(Object trigger); + + int getPendingSize(); } diff --git a/api/src/main/java/org/tron/common/logsfilter/trigger/Trigger.java b/api/src/main/java/org/tron/common/logsfilter/trigger/Trigger.java index 3e87c85..e58f23a 100644 --- a/api/src/main/java/org/tron/common/logsfilter/trigger/Trigger.java +++ b/api/src/main/java/org/tron/common/logsfilter/trigger/Trigger.java @@ -17,6 +17,8 @@ public class Trigger { public static final int CONTRACTLOG_TRIGGER = 2; public static final int CONTRACTEVENT_TRIGGER = 3; public static final int SOLIDITY_TRIGGER = 4; + public static final int SOLIDITY_EVENT = 5; + public static final int SOLIDITY_LOG = 6; public static final String BLOCK_TRIGGER_NAME = "blockTrigger"; public static final String TRANSACTION_TRIGGER_NAME = "transactionTrigger"; diff --git a/app/build.gradle b/app/build.gradle index d7a8dea..0020ffa 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -3,28 +3,27 @@ apply plugin: 'application' mainClassName = 'org.tron.eventplugin.app.PluginLauncher' dependencies { - compile project(':api') - compile group: 'org.pf4j', name: 'pf4j', version: "${pf4jVersion}" - compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.5' - compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.25' - - testCompile group: 'junit', name: 'junit', version: '4.+' - compile "org.projectlombok:lombok:1.16.18" - compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.8.5' - compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.8.5' - compile "com.madgag.spongycastle:core:1.58.0.0" - compile "com.madgag.spongycastle:prov:1.58.0.0" + implementation project(':api') + implementation (group: 'org.pf4j', name: 'pf4j', version: "${pf4jVersion}") { + exclude group: "org.slf4j" + } + implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.5' + implementation group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.25' + implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.18.3' + implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.18.3' + implementation "com.madgag.spongycastle:core:1.58.0.0" + implementation "com.madgag.spongycastle:prov:1.58.0.0" } task uberjar(type: Jar, dependsOn: ['compileJava']) { zip64 true - from configurations.runtime.asFileTree.files.collect { + from configurations.runtimeClasspath.asFileTree.files.collect { exclude "META-INF/*.SF" exclude "META-INF/*.DSA" exclude "META-INF/*.RSA" zipTree(it) } - from files(sourceSets.main.output.classesDir) + from files(sourceSets.main.output.classesDirs) from files(sourceSets.main.resources) manifest { attributes 'Main-Class': mainClassName diff --git a/app/src/main/java/org/tron/eventplugin/app/PluginLauncher.java b/app/src/main/java/org/tron/eventplugin/app/PluginLauncher.java index 69909de..19450c3 100644 --- a/app/src/main/java/org/tron/eventplugin/app/PluginLauncher.java +++ b/app/src/main/java/org/tron/eventplugin/app/PluginLauncher.java @@ -69,6 +69,10 @@ protected CompoundPluginDescriptorFinder createPluginDescriptorFinder() { listener.setTopic(Trigger.TRANSACTION_TRIGGER, "transaction"); listener.setTopic(Trigger.CONTRACTEVENT_TRIGGER, "contractevent"); listener.setTopic(Trigger.CONTRACTLOG_TRIGGER, "contractlog"); + listener.setTopic(Trigger.SOLIDITY_TRIGGER, "solidity"); + listener.setTopic(Trigger.SOLIDITY_EVENT, "solidityevent"); + listener.setTopic(Trigger.SOLIDITY_LOG, "soliditylog"); + }); eventListeners.forEach(listener -> { diff --git a/build.gradle b/build.gradle index 9968360..edf5ae0 100644 --- a/build.gradle +++ b/build.gradle @@ -5,10 +5,39 @@ subprojects { mavenLocal() mavenCentral() } + + dependencies { + compileOnly group: 'org.apache.commons', name: 'commons-lang3', version: '3.5' + compileOnly 'org.projectlombok:lombok:1.18.12' + compileOnly group: 'org.pf4j', name: 'pf4j', version: "${pf4jVersion}" + compileOnly group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.13' + annotationProcessor 'org.projectlombok:lombok:1.18.12' + annotationProcessor group: 'org.pf4j', name: 'pf4j', version: "${pf4jVersion}" + testCompileOnly 'org.projectlombok:lombok:1.18.12' + testAnnotationProcessor 'org.projectlombok:lombok:1.18.12' + testAnnotationProcessor group: 'org.pf4j', name: 'pf4j', version: "${pf4jVersion}" + compileOnly group: 'org.slf4j', name: 'slf4j-api', version: '1.7.25' + compileOnly group: 'org.slf4j', name: 'jcl-over-slf4j', version: '1.7.25' + testImplementation group: 'junit', name: 'junit', version: '4.+' + } + + jar { + manifest { + attributes 'Plugin-Id' : 'plungin_a' + attributes 'Plugin-Version' : '1.0.0' + } + } + + task sourcesJar(type: Jar) { + duplicatesStrategy = DuplicatesStrategy.INCLUDE + } + + tasks.withType(AbstractArchiveTask) { + duplicatesStrategy = DuplicatesStrategy.INCLUDE // allow duplicates + } } + // plugin location ext.pluginsDir = rootProject.buildDir.path + '/plugins' task build(dependsOn: [':app:uberjar']) - - diff --git a/gradle.properties b/gradle.properties index 9aad160..af9cd62 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ # PF4J -pf4jVersion=2.5.0 +pf4jVersion=3.10.0 diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 1948b90..5c2d1cf 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index d2c45a4..3994438 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-4.8-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.6.4-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/plugins/build.gradle b/plugins/build.gradle index f48b252..fae7cb7 100644 --- a/plugins/build.gradle +++ b/plugins/build.gradle @@ -21,7 +21,7 @@ subprojects { with jar } into('lib') { - from configurations.compile + from configurations.runtimeClasspath } extension('zip') } diff --git a/plugins/kafkaplugin/build.gradle b/plugins/kafkaplugin/build.gradle index 037aea8..280462a 100644 --- a/plugins/kafkaplugin/build.gradle +++ b/plugins/kafkaplugin/build.gradle @@ -1,24 +1,8 @@ dependencies { // compileOnly important!!! We do not want to put the api into the zip file since the main program has it already! compileOnly project(':api') - compileOnly(group: 'org.pf4j', name: 'pf4j', version: "${pf4jVersion}") { - exclude group: "org.slf4j" - } - compileOnly group: 'org.apache.commons', name: 'commons-lang3', version: '3.5' - testCompile group: 'junit', name: 'junit', version: '4.+' - - compile ("org.apache.kafka:kafka_2.10:0.10.2.2"){ - exclude group: "org.slf4j" - } - compile ("org.apache.kafka:kafka-clients:0.10.2.2"){ - exclude group: "org.slf4j" - } - - compileOnly (group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3') { - exclude group: "org.slf4j" - } - compileOnly ("org.projectlombok:lombok:1.16.18") { + implementation ("org.apache.kafka:kafka-clients:3.9.0"){ exclude group: "org.slf4j" } } diff --git a/plugins/kafkaplugin/out/production/classes/META-INF/extensions.idx b/plugins/kafkaplugin/out/production/classes/META-INF/extensions.idx new file mode 100644 index 0000000..269b4a8 --- /dev/null +++ b/plugins/kafkaplugin/out/production/classes/META-INF/extensions.idx @@ -0,0 +1,2 @@ +# Generated by PF4J +org.tron.eventplugin.KafkaEventListener diff --git a/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/Constant.java b/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/Constant.java index 7f04a44..0ff4ac9 100644 --- a/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/Constant.java +++ b/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/Constant.java @@ -6,12 +6,16 @@ public class Constant { public static final int CONTRACTLOG_TRIGGER = 2; public static final int CONTRACTEVENT_TRIGGER = 3; public static final int SOLIDITY_TRIGGER = 4; + public static final int SOLIDITY_EVENT = 5; + public static final int SOLIDITY_LOG = 6; public static final String BLOCK_TRIGGER_NAME = "blockTrigger"; public static final String TRANSACTION_TRIGGER_NAME = "transactionTrigger"; public static final String CONTRACTLOG_TRIGGER_NAME = "contractLogTrigger"; public static final String CONTRACTEVENT_TRIGGER_NAME = "contractEventTrigger"; public static final String SOLIDITY_TRIGGER_NAME = "solidityTrigger"; + public static final String SOLIDITYLOG_TRIGGER_NAME = "solidityLogTrigger"; + public static final String SOLIDITYEVENT_TRIGGER_NAME = "solidityEventTrigger"; private Constant(){} } diff --git a/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/KafkaEventListener.java b/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/KafkaEventListener.java index 1fc42ce..65f2ba2 100644 --- a/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/KafkaEventListener.java +++ b/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/KafkaEventListener.java @@ -65,6 +65,29 @@ public void handleSolidityTrigger(Object data) { MessageSenderImpl.getInstance().getTriggerQueue().offer(data); } + @Override + public void handleSolidityLogTrigger(Object data) { + if (Objects.isNull(data)){ + return; + } + + MessageSenderImpl.getInstance().getTriggerQueue().offer(data); + } + + @Override + public void handleSolidityEventTrigger(Object data) { + if (Objects.isNull(data)){ + return; + } + + MessageSenderImpl.getInstance().getTriggerQueue().offer(data); + } + + @Override + public int getPendingSize() { + return MessageSenderImpl.getInstance().getTriggerQueue().size(); + } + @Override public void handleContractLogTrigger(Object data) { if (Objects.isNull(data)){ diff --git a/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/MessageSenderImpl.java b/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/MessageSenderImpl.java index 0768721..31d758b 100644 --- a/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/MessageSenderImpl.java +++ b/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/MessageSenderImpl.java @@ -25,6 +25,9 @@ public class MessageSenderImpl{ private String contractEventTopic = ""; private String contractLogTopic = ""; private String solidityTopic = ""; + private String solidityLogTopic = ""; + private String solidityEventTopic = ""; + private Thread triggerProcessThread; private boolean isRunTriggerProcessThread = true; @@ -56,6 +59,9 @@ public void init(){ createProducer(Constant.TRANSACTION_TRIGGER); createProducer(Constant.CONTRACTLOG_TRIGGER); createProducer(Constant.CONTRACTEVENT_TRIGGER); + createProducer(Constant.SOLIDITY_TRIGGER); + createProducer(Constant.SOLIDITY_EVENT); + createProducer(Constant.SOLIDITY_LOG); triggerProcessThread = new Thread(triggerProcessLoop); triggerProcessThread.start(); @@ -63,25 +69,24 @@ public void init(){ loaded = true; } - public void setTopic(int triggerType, String topic){ - if (triggerType == Constant.BLOCK_TRIGGER){ + public void setTopic(int triggerType, String topic) { + if (triggerType == Constant.BLOCK_TRIGGER) { blockTopic = topic; - } - else if (triggerType == Constant.TRANSACTION_TRIGGER){ + } else if (triggerType == Constant.TRANSACTION_TRIGGER) { transactionTopic = topic; - } - else if (triggerType == Constant.CONTRACTEVENT_TRIGGER){ + } else if (triggerType == Constant.CONTRACTEVENT_TRIGGER) { contractEventTopic = topic; - } - else if (triggerType == Constant.CONTRACTLOG_TRIGGER){ + } else if (triggerType == Constant.CONTRACTLOG_TRIGGER) { contractLogTopic = topic; - } - else if (triggerType == Constant.SOLIDITY_TRIGGER) { + } else if (triggerType == Constant.SOLIDITY_TRIGGER) { solidityTopic = topic; + } else if (triggerType == Constant.SOLIDITY_EVENT) { + solidityEventTopic = topic; + } else if (triggerType == Constant.SOLIDITY_LOG) { + solidityLogTopic = topic; } } - private KafkaProducer createProducer(int eventType){ KafkaProducer producer = null; @@ -183,7 +188,19 @@ public void handleSolidityTrigger(Object data) { if (Objects.isNull(data) || Objects.isNull(solidityTopic)){ return; } - MessageSenderImpl.getInstance().sendKafkaRecord(Constant.SOLIDITY_TRIGGER, contractEventTopic, data); + MessageSenderImpl.getInstance().sendKafkaRecord(Constant.SOLIDITY_TRIGGER, solidityTopic, data); + } + public void handleSolidityLogTrigger(Object data) { + if (Objects.isNull(data) || Objects.isNull(solidityLogTopic)){ + return; + } + MessageSenderImpl.getInstance().sendKafkaRecord(Constant.SOLIDITY_LOG, solidityLogTopic, data); + } + public void handleSolidityEventTrigger(Object data) { + if (Objects.isNull(data) || Objects.isNull(solidityEventTopic)){ + return; + } + MessageSenderImpl.getInstance().sendKafkaRecord(Constant.SOLIDITY_EVENT, solidityEventTopic, data); } private Runnable triggerProcessLoop = @@ -211,6 +228,12 @@ else if (triggerData.contains(Constant.CONTRACTEVENT_TRIGGER_NAME)){ else if (triggerData.contains(Constant.SOLIDITY_TRIGGER_NAME)) { handleSolidityTrigger(triggerData); } + else if (triggerData.contains(Constant.SOLIDITYLOG_TRIGGER_NAME)) { + handleSolidityLogTrigger(triggerData); + } + else if (triggerData.contains(Constant.SOLIDITYEVENT_TRIGGER_NAME)) { + handleSolidityEventTrigger(triggerData); + } } catch (InterruptedException ex) { log.info(ex.getMessage()); Thread.currentThread().interrupt(); diff --git a/plugins/mongodbplugin/build.gradle b/plugins/mongodbplugin/build.gradle index 54b9959..4740fc1 100644 --- a/plugins/mongodbplugin/build.gradle +++ b/plugins/mongodbplugin/build.gradle @@ -1,27 +1,12 @@ dependencies { // compileOnly important!!! We do not want to put the api into the zip file since the main program has it already! compileOnly project(':api') - compileOnly(group: 'org.pf4j', name: 'pf4j', version: "${pf4jVersion}") { - exclude group: "org.slf4j" - } - compileOnly group: 'org.apache.commons', name: 'commons-lang3', version: '3.5' - testCompile group: 'junit', name: 'junit', version: '4.+' - compileOnly (group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3') { - exclude group: "org.slf4j" - } + compileOnly group: 'com.alibaba', name: 'fastjson', version: '1.2.83' + compileOnly group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.18.3' + compileOnly group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.18.3' - compileOnly ("org.projectlombok:lombok:1.16.18") { + implementation ("org.mongodb:mongo-java-driver:3.12.10"){ exclude group: "org.slf4j" } - - compile ("org.mongodb:mongo-java-driver:3.6.3"){ - exclude group: "org.slf4j" - } - - compileOnly group: 'com.alibaba', name: 'fastjson', version: '1.2.44' - - compileOnly group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.8.5' - compileOnly group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.8.5' - } diff --git a/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/Constant.java b/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/Constant.java index 7f04a44..d20528a 100644 --- a/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/Constant.java +++ b/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/Constant.java @@ -6,12 +6,16 @@ public class Constant { public static final int CONTRACTLOG_TRIGGER = 2; public static final int CONTRACTEVENT_TRIGGER = 3; public static final int SOLIDITY_TRIGGER = 4; + public static final int SOLIDITY_EVENT_TRIGGER = 5; + public static final int SOLIDITY_LOG_TRIGGER = 6; public static final String BLOCK_TRIGGER_NAME = "blockTrigger"; public static final String TRANSACTION_TRIGGER_NAME = "transactionTrigger"; public static final String CONTRACTLOG_TRIGGER_NAME = "contractLogTrigger"; public static final String CONTRACTEVENT_TRIGGER_NAME = "contractEventTrigger"; public static final String SOLIDITY_TRIGGER_NAME = "solidityTrigger"; + public static final String SOLIDITYLOG_TRIGGER_NAME = "solidityLogTrigger"; + public static final String SOLIDITYEVENT_TRIGGER_NAME = "solidityEventTrigger"; private Constant(){} } diff --git a/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbEventListener.java b/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbEventListener.java index c4a224e..444cc84 100644 --- a/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbEventListener.java +++ b/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbEventListener.java @@ -82,4 +82,28 @@ public void handleSolidityTrigger(Object data) { MongodbSenderImpl.getInstance().getTriggerQueue().offer(data); } + + @Override + public void handleSolidityLogTrigger(Object data) { + if (Objects.isNull(data)){ + return; + } + + MongodbSenderImpl.getInstance().getTriggerQueue().offer(data); + } + + @Override + public void handleSolidityEventTrigger(Object data) { + if (Objects.isNull(data)){ + return; + } + + MongodbSenderImpl.getInstance().getTriggerQueue().offer(data); + } + + @Override + public int getPendingSize() { + return MongodbSenderImpl.getInstance().getTriggerQueue().size() + + MongodbSenderImpl.getInstance().getQueue().size(); + } } diff --git a/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbSenderImpl.java b/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbSenderImpl.java index f8842e2..3db8b0a 100644 --- a/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbSenderImpl.java +++ b/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbSenderImpl.java @@ -1,354 +1,485 @@ package org.tron.eventplugin; -import com.alibaba.fastjson.JSONObject; -import org.pf4j.util.StringUtils; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import java.io.IOException; import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import lombok.Getter; +import org.pf4j.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import org.tron.mongodb.MongoConfig; import org.tron.mongodb.MongoManager; import org.tron.mongodb.MongoTemplate; -public class MongodbSenderImpl{ - private static MongodbSenderImpl instance = null; - private static final Logger log = LoggerFactory.getLogger(MongodbSenderImpl.class); - ExecutorService service = Executors.newFixedThreadPool(8); +public class MongodbSenderImpl { + + private static MongodbSenderImpl instance = null; + private static final Logger log = LoggerFactory.getLogger(MongodbSenderImpl.class); + @Getter + BlockingQueue queue = new LinkedBlockingQueue(); + private ExecutorService service = new ThreadPoolExecutor(8, 8, + 0L, TimeUnit.MILLISECONDS, queue); - private boolean loaded = false; - private BlockingQueue triggerQueue = new LinkedBlockingQueue(); + private boolean loaded = false; + private BlockingQueue triggerQueue = new LinkedBlockingQueue<>(); - private String blockTopic = ""; - private String transactionTopic = ""; - private String contractEventTopic = ""; - private String contractLogTopic = ""; - private String solidityTopic = ""; + private String blockTopic = ""; + private String transactionTopic = ""; + private String contractEventTopic = ""; + private String contractLogTopic = ""; + private String solidityTopic = ""; + private String solidityEventTopic = ""; + private String solidityLogTopic = ""; - private Thread triggerProcessThread; - private boolean isRunTriggerProcessThread = true; + private Thread triggerProcessThread; + private boolean isRunTriggerProcessThread = true; - private MongoManager mongoManager; - private Map mongoTemplateMap; + private MongoManager mongoManager; + private Map mongoTemplateMap; - private String dbName; - private String dbUserName; - private String dbPassword; + private String dbName; + private String dbUserName; + private String dbPassword; + private int version; // 1: no index, 2: has index - private MongoConfig mongoConfig; + private MongoConfig mongoConfig; - public static MongodbSenderImpl getInstance(){ + public static MongodbSenderImpl getInstance() { + if (Objects.isNull(instance)) { + synchronized (MongodbSenderImpl.class) { if (Objects.isNull(instance)) { - synchronized (MongodbSenderImpl.class){ - if (Objects.isNull(instance)){ - instance = new MongodbSenderImpl(); - } - } + instance = new MongodbSenderImpl(); } - - return instance; + } } - public void setDBConfig(String dbConfig){ - if (StringUtils.isNullOrEmpty(dbConfig)){ - return; - } - - String[] params = dbConfig.split("\\|"); - if (params.length != 3){ - return; - } - - dbName = params[0]; - dbUserName = params[1]; - dbPassword = params[2]; + return instance; + } - loadMongoConfig(); + public void setDBConfig(String dbConfig) { + if (StringUtils.isNullOrEmpty(dbConfig)) { + return; } - public void setServerAddress(final String serverAddress){ - if (StringUtils.isNullOrEmpty(serverAddress)){ - return; - } + String[] params = dbConfig.split("\\|"); + if (params.length != 3 && params.length != 4) { + return; + } - String[] params = serverAddress.split(":"); - if (params.length != 2){ - return; - } + dbName = params[0]; + dbUserName = params[1]; + dbPassword = params[2]; + version = 1; - String mongoHostName = ""; - int mongoPort = -1; + if (params.length == 4) { + version = Integer.valueOf(params[3]); + } - try{ - mongoHostName = params[0]; - mongoPort = Integer.valueOf(params[1]); - } - catch (Exception e){ - e.printStackTrace(); - return; - } + loadMongoConfig(); + } - if (Objects.isNull(mongoConfig)){ - mongoConfig = new MongoConfig(); - } + public void setServerAddress(final String serverAddress) { + if (StringUtils.isNullOrEmpty(serverAddress)) { + return; + } - mongoConfig.setHost(mongoHostName); - mongoConfig.setPort(mongoPort); + String[] params = serverAddress.split(":"); + if (params.length != 2) { + return; } - public void init(){ + String mongoHostName = ""; + int mongoPort = -1; - if (loaded){ - return; - } + try { + mongoHostName = params[0]; + mongoPort = Integer.valueOf(params[1]); + } catch (Exception e) { + e.printStackTrace(); + return; + } - if (Objects.isNull(mongoManager)){ - mongoManager = new MongoManager(); - mongoManager.initConfig(mongoConfig); - } + if (Objects.isNull(mongoConfig)) { + mongoConfig = new MongoConfig(); + } - mongoTemplateMap = new HashMap<>(); - createCollections(); + mongoConfig.setHost(mongoHostName); + mongoConfig.setPort(mongoPort); + } - triggerProcessThread = new Thread(triggerProcessLoop); - triggerProcessThread.start(); + public void init() { - loaded = true; + if (loaded) { + return; } - private void createCollections(){ - mongoManager.createCollection(blockTopic); - createMongoTemplate(blockTopic); - - mongoManager.createCollection(transactionTopic); - createMongoTemplate(transactionTopic); + if (Objects.isNull(mongoManager)) { + mongoManager = new MongoManager(); + mongoManager.initConfig(mongoConfig); + } - mongoManager.createCollection(contractLogTopic); - createMongoTemplate(contractLogTopic); + mongoTemplateMap = new HashMap<>(); + createCollections(); + + triggerProcessThread = new Thread(triggerProcessLoop); + triggerProcessThread.start(); + + loaded = true; + } + + private void createCollections() { + if (mongoConfig.enabledIndexes()) { + Map indexOptions = new HashMap<>(); + indexOptions.put("blockNumber", true); + mongoManager.createCollection(blockTopic, indexOptions); + + indexOptions = new HashMap<>(); + indexOptions.put("transactionId", true); + mongoManager.createCollection(transactionTopic, indexOptions); + + indexOptions = new HashMap<>(); + indexOptions.put("latestSolidifiedBlockNumber", true); + mongoManager.createCollection(solidityTopic, indexOptions); + + indexOptions = new HashMap<>(); + indexOptions.put("uniqueId", true); + mongoManager.createCollection(solidityEventTopic, indexOptions); + mongoManager.createCollection(contractEventTopic, indexOptions); + + indexOptions = new HashMap<>(); + indexOptions.put("uniqueId", true); + indexOptions.put("contractAddress", false); + mongoManager.createCollection(solidityLogTopic, indexOptions); + mongoManager.createCollection(contractLogTopic, indexOptions); + } else { + mongoManager.createCollection(blockTopic); + mongoManager.createCollection(transactionTopic); + mongoManager.createCollection(contractLogTopic); + mongoManager.createCollection(contractEventTopic); + mongoManager.createCollection(solidityTopic); + mongoManager.createCollection(solidityEventTopic); + mongoManager.createCollection(solidityLogTopic); + } - mongoManager.createCollection(contractEventTopic); - createMongoTemplate(contractEventTopic); + createMongoTemplate(blockTopic); + createMongoTemplate(transactionTopic); + createMongoTemplate(contractLogTopic); + createMongoTemplate(contractEventTopic); + createMongoTemplate(solidityTopic); + createMongoTemplate(solidityEventTopic); + createMongoTemplate(solidityLogTopic); + } + + private void loadMongoConfig() { + if (Objects.isNull(mongoConfig)) { + mongoConfig = new MongoConfig(); + } - mongoManager.createCollection(solidityTopic); - createMongoTemplate(solidityTopic); + if (StringUtils.isNullOrEmpty(dbName)) { + return; } - private void loadMongoConfig(){ - if (Objects.isNull(mongoConfig)){ - mongoConfig = new MongoConfig(); - } + Properties properties = new Properties(); + + try { + InputStream input = getClass().getClassLoader().getResourceAsStream("mongodb.properties"); + if (Objects.isNull(input)) { + return; + } + properties.load(input); + + int connectionsPerHost = Integer.parseInt(properties.getProperty("mongo.connectionsPerHost")); + int threadsAllowedToBlockForConnectionMultiplie = Integer.parseInt( + properties.getProperty("mongo.threadsAllowedToBlockForConnectionMultiplier")); + + mongoConfig.setDbName(dbName); + mongoConfig.setUsername(dbUserName); + mongoConfig.setPassword(dbPassword); + mongoConfig.setVersion(version); + mongoConfig.setConnectionsPerHost(connectionsPerHost); + mongoConfig.setThreadsAllowedToBlockForConnectionMultiplier( + threadsAllowedToBlockForConnectionMultiplie); + } catch (IOException e) { + e.printStackTrace(); + } catch (Exception e) { + e.printStackTrace(); + } + } - if (StringUtils.isNullOrEmpty(dbName)){ - return; - } + private MongoTemplate createMongoTemplate(final String collectionName) { - Properties properties = new Properties(); + MongoTemplate template = mongoTemplateMap.get(collectionName); + if (Objects.nonNull(template)) { + return template; + } - try { - InputStream input = getClass().getClassLoader().getResourceAsStream("mongodb.properties"); - if (Objects.isNull(input)){ - return; - } - properties.load(input); - - int connectionsPerHost = Integer.parseInt(properties.getProperty("mongo.connectionsPerHost")); - int threadsAllowedToBlockForConnectionMultiplie = Integer.parseInt( - properties.getProperty("mongo.threadsAllowedToBlockForConnectionMultiplier")); - - mongoConfig.setDbName(dbName); - mongoConfig.setUsername(dbUserName); - mongoConfig.setPassword(dbPassword); - mongoConfig.setConnectionsPerHost(connectionsPerHost); - mongoConfig.setThreadsAllowedToBlockForConnectionMultiplier(threadsAllowedToBlockForConnectionMultiplie); - } catch (IOException e) { - e.printStackTrace(); - } catch (Exception e){ - e.printStackTrace(); - } + template = new MongoTemplate(mongoManager) { + @Override + protected String collectionName() { + return collectionName; + } + + @Override + protected Class getReferencedClass() { + return null; + } + }; + + mongoTemplateMap.put(collectionName, template); + + return template; + } + + + public void setTopic(int triggerType, String topic) { + if (triggerType == Constant.BLOCK_TRIGGER) { + blockTopic = topic; + } else if (triggerType == Constant.TRANSACTION_TRIGGER) { + transactionTopic = topic; + } else if (triggerType == Constant.CONTRACTEVENT_TRIGGER) { + contractEventTopic = topic; + } else if (triggerType == Constant.CONTRACTLOG_TRIGGER) { + contractLogTopic = topic; + } else if (triggerType == Constant.SOLIDITY_TRIGGER) { + solidityTopic = topic; + } else if (triggerType == Constant.SOLIDITY_EVENT_TRIGGER) { + solidityEventTopic = topic; + } else if (triggerType == Constant.SOLIDITY_LOG_TRIGGER) { + solidityLogTopic = topic; + } else { + return; } + } + + public void close() { + } + + public BlockingQueue getTriggerQueue() { + return triggerQueue; + } + + public void upsertEntityLong(MongoTemplate template, Object data, String indexKey) { + String dataStr = (String) data; + try { + JSONObject jsStr = JSON.parseObject(dataStr); + Long indexValue = jsStr.getLong(indexKey); + if (indexValue != null) { + template.upsertEntity(indexKey, indexValue, dataStr); + } else { + template.addEntity(dataStr); + } + } catch (Exception ex) { + log.error("upsertEntityLong exception happened in parse object ", ex); + } + } + + public void upsertEntityString(MongoTemplate template, Object data, String indexKey) { + String dataStr = (String) data; + try { + JSONObject jsStr = JSON.parseObject(dataStr); + String indexValue = jsStr.getString(indexKey); + if (indexValue != null) { + template.upsertEntity(indexKey, indexValue, dataStr); + } else { + template.addEntity(dataStr); + } + } catch (Exception ex) { + log.error("upsertEntityLong exception happened in parse object ", ex); + } + } - private MongoTemplate createMongoTemplate(final String collectionName){ + public void handleBlockEvent(Object data) { + if (blockTopic == null || blockTopic.length() == 0) { + return; + } - MongoTemplate template = mongoTemplateMap.get(collectionName); - if (Objects.nonNull(template)){ - return template; + MongoTemplate template = mongoTemplateMap.get(blockTopic); + if (Objects.nonNull(template)) { + service.execute(new Runnable() { + @Override + public void run() { + if (mongoConfig.enabledIndexes()) { + upsertEntityLong(template, data, "blockNumber"); + } else { + template.addEntity((String) data); + } } + }); + } + } - template = new MongoTemplate(mongoManager) { - @Override - protected String collectionName() { - return collectionName; - } - - @Override - protected Class getReferencedClass() { - return null; - } - }; - - mongoTemplateMap.put(collectionName, template); + public void handleTransactionTrigger(Object data) { + if (Objects.isNull(data) || Objects.isNull(transactionTopic)) { + return; + } - return template; + MongoTemplate template = mongoTemplateMap.get(transactionTopic); + if (Objects.nonNull(template)) { + service.execute(new Runnable() { + @Override + public void run() { + if (mongoConfig.enabledIndexes()) { + upsertEntityString(template, data, "transactionId"); + } else { + template.addEntity((String) data); + } + } + }); } + } + public void handleSolidityTrigger(Object data) { + if (Objects.isNull(data) || Objects.isNull(solidityTopic)) { + return; + } - public void setTopic(int triggerType, String topic){ - if (triggerType == Constant.BLOCK_TRIGGER){ - blockTopic = topic; - } - else if (triggerType == Constant.TRANSACTION_TRIGGER){ - transactionTopic = topic; - } - else if (triggerType == Constant.CONTRACTEVENT_TRIGGER){ - contractEventTopic = topic; - } - else if (triggerType == Constant.CONTRACTLOG_TRIGGER){ - contractLogTopic = topic; - } else if (triggerType == Constant.SOLIDITY_TRIGGER) { - solidityTopic = topic; - } - else { - return; + MongoTemplate template = mongoTemplateMap.get(solidityTopic); + if (Objects.nonNull(template)) { + service.execute(new Runnable() { + @Override + public void run() { + if (mongoConfig.enabledIndexes()) { + upsertEntityLong(template, data, "latestSolidifiedBlockNumber"); + } else { + template.addEntity((String) data); + } } + }); } + } - public void close() { + public void handleInsertContractTrigger(MongoTemplate template, Object data, String indexKey) { + if (mongoConfig.enabledIndexes()) { + upsertEntityString(template, data, indexKey); + } else { + template.addEntity((String) data); } + } - public BlockingQueue getTriggerQueue(){ - return triggerQueue; + // will not delete when removed is set to true + public void handleContractLogTrigger(Object data) { + if (Objects.isNull(data) || Objects.isNull(contractLogTopic)) { + return; } - public void handleBlockEvent(Object data) { - if (blockTopic == null || blockTopic.length() == 0){ - return; - } - MongoTemplate template = mongoTemplateMap.get(blockTopic); - if (Objects.nonNull(template)) { - service.execute(new Runnable() { - @Override - public void run() { - template.addEntity((String)data); - } - }); + MongoTemplate template = mongoTemplateMap.get(contractLogTopic); + if (Objects.nonNull(template)) { + service.execute(new Runnable() { + @Override + public void run() { + handleInsertContractTrigger(template, data, "uniqueId"); } + }); } + } - public void handleTransactionTrigger(Object data) { - if (Objects.isNull(data) || Objects.isNull(transactionTopic)){ - return; - } + public void handleContractEventTrigger(Object data) { + if (Objects.isNull(data) || Objects.isNull(contractEventTopic)) { + return; + } - MongoTemplate template = mongoTemplateMap.get(transactionTopic); - if (Objects.nonNull(template)) { - service.execute(new Runnable() { - @Override - public void run() { - template.addEntity((String)data); - } - }); + MongoTemplate template = mongoTemplateMap.get(contractEventTopic); + if (Objects.nonNull(template)) { + service.execute(new Runnable() { + @Override + public void run() { + String dataStr = (String) data; + if (dataStr.contains("\"removed\":true")) { + try { + JSONObject jsStr = JSON.parseObject(dataStr); + String uniqueId = jsStr.getString("uniqueId"); + if (uniqueId != null) { + template.delete("uniqueId", uniqueId); + } + } catch (Exception ex) { + log.error("unknown exception happened in parse object ", ex); + } + } else { + handleInsertContractTrigger(template, data, "uniqueId"); + } } + }); } + } - public void handleSolidityTrigger(Object data) { - if (Objects.isNull(data) || Objects.isNull(solidityTopic)){ - return; - } + public void handleSolidityLogTrigger(Object data) { + if (Objects.isNull(data) || Objects.isNull(solidityLogTopic)) { + return; + } - MongoTemplate template = mongoTemplateMap.get(solidityTopic); - if (Objects.nonNull(template)) { - service.execute(new Runnable() { - @Override - public void run() { - template.addEntity((String)data); - } - }); + MongoTemplate template = mongoTemplateMap.get(solidityLogTopic); + if (Objects.nonNull(template)) { + service.execute(new Runnable() { + @Override + public void run() { + handleInsertContractTrigger(template, data, "uniqueId"); } + }); } + } - public void handleContractLogTrigger(Object data) { - if (Objects.isNull(data) || Objects.isNull(contractLogTopic)){ - return; - } + public void handleSolidityEventTrigger(Object data) { + if (Objects.isNull(data) || Objects.isNull(solidityEventTopic)) { + return; + } - MongoTemplate template = mongoTemplateMap.get(contractLogTopic); - if (Objects.nonNull(template)) { - service.execute(new Runnable() { - @Override - public void run() { - template.addEntity((String)data); - } - }); + MongoTemplate template = mongoTemplateMap.get(solidityEventTopic); + if (Objects.nonNull(template)) { + service.execute(new Runnable() { + @Override + public void run() { + handleInsertContractTrigger(template, data, "uniqueId"); } + }); } + } - public void handleContractEventTrigger(Object data) { - if (Objects.isNull(data) || Objects.isNull(contractEventTopic)){ - return; - } + private Runnable triggerProcessLoop = + () -> { + while (isRunTriggerProcessThread) { + try { + String triggerData = (String) triggerQueue.poll(1, TimeUnit.SECONDS); - MongoTemplate template = mongoTemplateMap.get(contractEventTopic); - if (Objects.nonNull(template)) { - service.execute(new Runnable() { - @Override - public void run() { - String dataStr = (String)data; - if (dataStr.contains("\"removed\":true")) { - try { - JSONObject jsStr = JSONObject.parseObject(dataStr); - String uniqueId = jsStr.getString("uniqueId"); - if (uniqueId != null) { - template.delete("uniqueId", uniqueId); - } - } catch (Exception ex) { - log.error("unknown exception happened in parse object ", ex); - } - } else { - template.addEntity(dataStr); - } - } - }); - } - } + if (Objects.isNull(triggerData)) { + continue; + } - private Runnable triggerProcessLoop = - () -> { - while (isRunTriggerProcessThread) { - try { - String triggerData = (String)triggerQueue.poll(1, TimeUnit.SECONDS); - - if (Objects.isNull(triggerData)){ - continue; - } - - if (triggerData.contains(Constant.BLOCK_TRIGGER_NAME)){ - handleBlockEvent(triggerData); - } - else if (triggerData.contains(Constant.TRANSACTION_TRIGGER_NAME)){ - handleTransactionTrigger(triggerData); - } - else if (triggerData.contains(Constant.CONTRACTLOG_TRIGGER_NAME)){ - handleContractLogTrigger(triggerData); - } - else if (triggerData.contains(Constant.CONTRACTEVENT_TRIGGER_NAME)){ - handleContractEventTrigger(triggerData); - } else if (triggerData.contains(Constant.SOLIDITY_TRIGGER_NAME)) { - handleSolidityTrigger(triggerData); - } - } catch (InterruptedException ex) { - log.info(ex.getMessage()); - Thread.currentThread().interrupt(); - } catch (Exception ex) { - log.error("unknown exception happened in process capsule loop", ex); - } catch (Throwable throwable) { - log.error("unknown throwable happened in process capsule loop", throwable); - } - } - }; + if (triggerData.contains(Constant.BLOCK_TRIGGER_NAME)) { + handleBlockEvent(triggerData); + } else if (triggerData.contains(Constant.TRANSACTION_TRIGGER_NAME)) { + handleTransactionTrigger(triggerData); + } else if (triggerData.contains(Constant.CONTRACTLOG_TRIGGER_NAME)) { + handleContractLogTrigger(triggerData); + } else if (triggerData.contains(Constant.CONTRACTEVENT_TRIGGER_NAME)) { + handleContractEventTrigger(triggerData); + } else if (triggerData.contains(Constant.SOLIDITY_TRIGGER_NAME)) { + handleSolidityTrigger(triggerData); + } else if (triggerData.contains(Constant.SOLIDITYLOG_TRIGGER_NAME)) { + handleSolidityLogTrigger(triggerData); + } else if (triggerData.contains(Constant.SOLIDITYEVENT_TRIGGER_NAME)) { + handleSolidityEventTrigger(triggerData); + } + } catch (InterruptedException ex) { + log.info(ex.getMessage()); + Thread.currentThread().interrupt(); + } catch (Exception ex) { + log.error("unknown exception happened in process capsule loop", ex); + } catch (Throwable throwable) { + log.error("unknown throwable happened in process capsule loop", throwable); + } + } + }; } diff --git a/plugins/mongodbplugin/src/main/java/org/tron/mongodb/MongoConfig.java b/plugins/mongodbplugin/src/main/java/org/tron/mongodb/MongoConfig.java index 72431d1..78d9e98 100644 --- a/plugins/mongodbplugin/src/main/java/org/tron/mongodb/MongoConfig.java +++ b/plugins/mongodbplugin/src/main/java/org/tron/mongodb/MongoConfig.java @@ -2,68 +2,83 @@ public class MongoConfig { - private String host; - private int port = 27017; - private String dbName; - private String username; - private String password; - private int connectionsPerHost = 10; - private int threadsAllowedToBlockForConnectionMultiplier = 10; - - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - - public String getDbName() { - return dbName; - } - - public void setDbName(String dbName) { - this.dbName = dbName; - } - - public String getUsername() { - return username; - } - - public void setUsername(String username) { - this.username = username; - } - - public String getPassword() { - return password; - } - - public void setPassword(String password) { - this.password = password; - } - - public int getConnectionsPerHost() { - return connectionsPerHost; - } - - public void setConnectionsPerHost(int connectionsPerHost) { - this.connectionsPerHost = connectionsPerHost; - } - - public int getThreadsAllowedToBlockForConnectionMultiplier() { - return threadsAllowedToBlockForConnectionMultiplier; - } - - public void setThreadsAllowedToBlockForConnectionMultiplier(int threadsAllowedToBlockForConnectionMultiplier) { - this.threadsAllowedToBlockForConnectionMultiplier = threadsAllowedToBlockForConnectionMultiplier; - } + private String host; + private int port = 27017; + private String dbName; + private String username; + private String password; + private int version; + private int connectionsPerHost = 10; + private int threadsAllowedToBlockForConnectionMultiplier = 10; + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getDbName() { + return dbName; + } + + public void setDbName(String dbName) { + this.dbName = dbName; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public int getVersion() { + return version; + } + + public void setVersion(int version) { + this.version = version; + } + + public int getConnectionsPerHost() { + return connectionsPerHost; + } + + public void setConnectionsPerHost(int connectionsPerHost) { + this.connectionsPerHost = connectionsPerHost; + } + + public int getThreadsAllowedToBlockForConnectionMultiplier() { + return threadsAllowedToBlockForConnectionMultiplier; + } + + public void setThreadsAllowedToBlockForConnectionMultiplier( + int threadsAllowedToBlockForConnectionMultiplier) { + this.threadsAllowedToBlockForConnectionMultiplier = + threadsAllowedToBlockForConnectionMultiplier; + } + + public boolean enabledIndexes() { + return version == 2; + } } diff --git a/plugins/mongodbplugin/src/main/java/org/tron/mongodb/MongoManager.java b/plugins/mongodbplugin/src/main/java/org/tron/mongodb/MongoManager.java index aad0e2d..2057a0d 100644 --- a/plugins/mongodbplugin/src/main/java/org/tron/mongodb/MongoManager.java +++ b/plugins/mongodbplugin/src/main/java/org/tron/mongodb/MongoManager.java @@ -5,67 +5,99 @@ import com.mongodb.MongoCredential; import com.mongodb.ServerAddress; import com.mongodb.client.MongoDatabase; -import org.pf4j.util.StringUtils; +import com.mongodb.client.model.IndexOptions; +import com.mongodb.client.model.Indexes; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Objects; +import lombok.extern.slf4j.Slf4j; +import org.pf4j.util.StringUtils; +@Slf4j public class MongoManager { - private MongoClient mongo; - private MongoDatabase db; + private MongoClient mongo; + private MongoDatabase db; - public void initConfig(MongoConfig config) { - int connectionsPerHost = config.getConnectionsPerHost(); - int threadsAllowedToBlockForConnectionMultiplier = config.getThreadsAllowedToBlockForConnectionMultiplier(); - MongoClientOptions options = MongoClientOptions.builder().connectionsPerHost(connectionsPerHost) - .threadsAllowedToBlockForConnectionMultiplier(threadsAllowedToBlockForConnectionMultiplier).build(); + public void initConfig(MongoConfig config) { + int connectionsPerHost = config.getConnectionsPerHost(); + int threadsAllowedToBlockForConnectionMultiplier = + config.getThreadsAllowedToBlockForConnectionMultiplier(); + MongoClientOptions options = MongoClientOptions.builder().connectionsPerHost(connectionsPerHost) + .threadsAllowedToBlockForConnectionMultiplier(threadsAllowedToBlockForConnectionMultiplier) + .build(); - String host = config.getHost(); - int port = config.getPort(); - ServerAddress serverAddress = new ServerAddress(host, port); - List addrs = new ArrayList(); - addrs.add(serverAddress); + String host = config.getHost(); + int port = config.getPort(); + ServerAddress serverAddress = new ServerAddress(host, port); + List addrs = new ArrayList(); + addrs.add(serverAddress); - String username = config.getUsername(); - String password = config.getPassword(); - String databaseName = config.getDbName(); + String username = config.getUsername(); + String password = config.getPassword(); + String databaseName = config.getDbName(); - if (StringUtils.isNullOrEmpty(databaseName)){ - return; - } + if (StringUtils.isNullOrEmpty(databaseName)) { + return; + } - MongoCredential credential = MongoCredential.createScramSha1Credential(username, databaseName, - password.toCharArray()); - List credentials = new ArrayList(); - credentials.add(credential); + MongoCredential credential = MongoCredential.createScramSha1Credential(username, databaseName, + password.toCharArray()); + List credentials = new ArrayList(); + credentials.add(credential); - mongo = new MongoClient(addrs, credential, options); - db = mongo.getDatabase(databaseName); + mongo = new MongoClient(addrs, credential, options); + db = mongo.getDatabase(databaseName); + } + + public void createCollection(String collectionName) { + if (db != null && StringUtils.isNotNullOrEmpty(collectionName)) { + if (Objects.isNull(db.getCollection(collectionName))){ + db.createCollection(collectionName); + } } + } + + public void createCollection(String collectionName, Map indexOptions) { + log.info("[createCollection] collection={} start", collectionName); + + if (db != null && StringUtils.isNotNullOrEmpty(collectionName)) { + List collectionList = new ArrayList<>(); + db.listCollectionNames().into(collectionList); - public void createCollection(String collectionName) { - if (db != null && StringUtils.isNotNullOrEmpty(collectionName)) { - if (Objects.isNull(db.getCollection(collectionName))){ - db.createCollection(collectionName); - } + if (!collectionList.contains(collectionName)) { + db.createCollection(collectionName); + + // create index + if (indexOptions == null) { + return; + } + for (String col : indexOptions.keySet()) { + log.info("create index, col={}", col); + db.getCollection(collectionName).createIndex(Indexes.ascending(col), + new IndexOptions().name(col).unique(indexOptions.get(col))); } + } else { + log.info("[createCollection] collection={} already exists", collectionName); + } } + } - public MongoClient getMongo() { - return mongo; - } + public MongoClient getMongo() { + return mongo; + } - public void setMongo(MongoClient mongo) { - this.mongo = mongo; - } + public void setMongo(MongoClient mongo) { + this.mongo = mongo; + } - public MongoDatabase getDb() { - return db; - } + public MongoDatabase getDb() { + return db; + } - public void setDb(MongoDatabase db) { - this.db = db; - } + public void setDb(MongoDatabase db) { + this.db = db; + } } diff --git a/plugins/mongodbplugin/src/main/java/org/tron/mongodb/MongoTemplate.java b/plugins/mongodbplugin/src/main/java/org/tron/mongodb/MongoTemplate.java index 7163aaa..4666efc 100644 --- a/plugins/mongodbplugin/src/main/java/org/tron/mongodb/MongoTemplate.java +++ b/plugins/mongodbplugin/src/main/java/org/tron/mongodb/MongoTemplate.java @@ -1,5 +1,6 @@ package org.tron.mongodb; +import com.mongodb.client.model.ReplaceOptions; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -18,165 +19,174 @@ public abstract class MongoTemplate { - private MongoManager manager; - private MongoCollection collection = null; - - public MongoTemplate(MongoManager manager) { - this.manager = manager; - } - - protected abstract String collectionName(); - - protected abstract Class getReferencedClass(); - - public void add(Document document) { - MongoCollection collection = getCollection(); - collection.insertOne(document); - } - - public void addEntity(String entity) { - MongoCollection collection = getCollection(); - if (Objects.nonNull(collection)){ - collection.insertOne(Converter.jsonStringToDocument(entity)); - } - } - - public void addEntityList(List entities) { - MongoCollection collection = getCollection(); - List documents = new ArrayList(); - if (entities != null && !entities.isEmpty()) { - for (String entity : entities) { - documents.add(Converter.jsonStringToDocument(Converter.objectToJsonString(entity))); - } - } - collection.insertMany(documents); - } - - public void addList(List documents) { - MongoCollection collection = getCollection(); - collection.insertMany(documents); - } - - public long update(String updateColumn, Object updateValue, String whereColumn, Object whereValue) { - MongoCollection collection = getCollection(); - UpdateResult result = collection.updateMany(Filters.eq(whereColumn, whereValue), - new Document("$set", new Document(updateColumn, updateValue))); - return result.getModifiedCount(); - } - - public UpdateResult updateMany(Bson filter, Bson update) { - MongoCollection collection = getCollection(); - UpdateResult result = collection.updateMany(filter, update); - return result; - } - - public long delete(String whereColumn, String whereValue) { - MongoCollection collection = getCollection(); - DeleteResult result = collection.deleteOne(Filters.eq(whereColumn, whereValue)); - return result.getDeletedCount(); - } - - public DeleteResult deleteMany(Bson filter) { - MongoCollection collection = getCollection(); - return collection.deleteMany(filter); - } - - /** - * replace the new document - * - * @param filter - * @param replacement - */ - public void replace(Bson filter, Document replacement) { - MongoCollection collection = getCollection(); - collection.replaceOne(filter, replacement); - } - - public List queryByCondition(Bson filter) { - MongoCollection collection = getCollection(); - List documents = new ArrayList(); - FindIterable iterables = collection.find(filter); - MongoCursor mongoCursor = iterables.iterator(); - while (mongoCursor.hasNext()) { - documents.add(mongoCursor.next()); - } - return documents; - } - - public Document queryOne(String key, String value) { - Bson filter = Filters.eq(key, value); - return this.getCollection().find(filter).first(); - } - - public Document queryOneEntity(String key, String value) { - Bson filter = Filters.eq(key, value); - Document document = this.getCollection().find(filter).first(); - String jsonString = document.toJson(); - return Converter.jsonStringToObject(jsonString, this.getReferencedClass()); - } - - public List queryAll() { - MongoCollection collection = getCollection(); - FindIterable findIterable = collection.find(); - MongoCursor mongoCursor = findIterable.iterator(); - List documents = new ArrayList(); - while (mongoCursor.hasNext()) { - documents.add(mongoCursor.next()); - } - return documents; - } - - public List queryAllEntity() { - MongoCollection collection = getCollection(); - FindIterable findIterable = collection.find(); - List list = getEntityList(findIterable); - return list; - } - - public Pager queryPagerList(Bson filter, int pageIndex, int pageSize) { - MongoCollection collection = getCollection(); - long totalCount = collection.count(filter); - FindIterable findIterable = collection.find().skip((pageIndex - 1) * pageSize).sort(new BasicDBObject()).limit(pageSize); - List resultList = getEntityList(findIterable); - Pager pager = new Pager(resultList, totalCount, pageIndex, pageSize); - return pager; - } - - public MongoManager getManager() { - return manager; - } - - public void setManager(MongoManager manager) { - this.manager = manager; - } - - private List getEntityList(FindIterable findIterable) { - MongoCursor mongoCursor = findIterable.iterator(); - List list = new ArrayList(); - Document document = null; - while (mongoCursor.hasNext()) { - document = mongoCursor.next(); - T object; - try { - object = Converter.jsonStringToObject(document.toJson(), getReferencedClass()); - list.add(object); - } catch (Exception e) { - e.printStackTrace(); - } - } - return list; - } - - private MongoCollection getCollection() { - if (Objects.isNull(manager) || Objects.isNull(manager.getDb())){ - return null; - } - - if (Objects.isNull(collection)) { - collection = manager.getDb().getCollection(collectionName()); - } - - return collection; - } + private MongoManager manager; + private MongoCollection collection = null; + + public MongoTemplate(MongoManager manager) { + this.manager = manager; + } + + protected abstract String collectionName(); + + protected abstract Class getReferencedClass(); + + public void add(Document document) { + MongoCollection mongoCollection = getCollection(); + mongoCollection.insertOne(document); + } + + public void addEntity(String entity) { + MongoCollection mongoCollection = getCollection(); + if (Objects.nonNull(mongoCollection)) { + mongoCollection.insertOne(Converter.jsonStringToDocument(entity)); + } + } + + public void upsertEntity(String indexKey, Object indexValue, String entity) { + MongoCollection mongoCollection = getCollection(); + if (Objects.nonNull(mongoCollection)) { + Bson filter = Filters.eq(indexKey, indexValue); + mongoCollection.replaceOne(filter, Converter.jsonStringToDocument(entity), + new ReplaceOptions().upsert(true)); + } + } + + public void addEntityList(List entities) { + MongoCollection mongoCollection = getCollection(); + List documents = new ArrayList(); + if (entities != null && !entities.isEmpty()) { + for (String entity : entities) { + documents.add(Converter.jsonStringToDocument(Converter.objectToJsonString(entity))); + } + } + mongoCollection.insertMany(documents); + } + + public void addList(List documents) { + MongoCollection mongoCollection = getCollection(); + mongoCollection.insertMany(documents); + } + + public long update(String updateColumn, Object updateValue, String whereColumn, + Object whereValue) { + MongoCollection mongoCollection = getCollection(); + UpdateResult result = mongoCollection.updateMany(Filters.eq(whereColumn, whereValue), + new Document("$set", new Document(updateColumn, updateValue))); + return result.getModifiedCount(); + } + + public UpdateResult updateMany(Bson filter, Bson update) { + MongoCollection mongoCollection = getCollection(); + UpdateResult result = mongoCollection.updateMany(filter, update); + return result; + } + + public long delete(String whereColumn, String whereValue) { + MongoCollection mongoCollection = getCollection(); + DeleteResult result = mongoCollection.deleteOne(Filters.eq(whereColumn, whereValue)); + return result.getDeletedCount(); + } + + public DeleteResult deleteMany(Bson filter) { + MongoCollection mongoCollection = getCollection(); + return mongoCollection.deleteMany(filter); + } + + /** + * replace the new document + */ + public void replace(Bson filter, Document replacement) { + MongoCollection mongoCollection = getCollection(); + mongoCollection.replaceOne(filter, replacement); + } + + public List queryByCondition(Bson filter) { + MongoCollection mongoCollection = getCollection(); + List documents = new ArrayList(); + FindIterable iterables = mongoCollection.find(filter); + MongoCursor mongoCursor = iterables.iterator(); + while (mongoCursor.hasNext()) { + documents.add(mongoCursor.next()); + } + return documents; + } + + public Document queryOne(String key, String value) { + Bson filter = Filters.eq(key, value); + return this.getCollection().find(filter).first(); + } + + public Document queryOneEntity(String key, String value) { + Bson filter = Filters.eq(key, value); + Document document = this.getCollection().find(filter).first(); + String jsonString = document.toJson(); + return Converter.jsonStringToObject(jsonString, this.getReferencedClass()); + } + + public List queryAll() { + MongoCollection mongoCollection = getCollection(); + FindIterable findIterable = mongoCollection.find(); + MongoCursor mongoCursor = findIterable.iterator(); + List documents = new ArrayList(); + while (mongoCursor.hasNext()) { + documents.add(mongoCursor.next()); + } + return documents; + } + + public List queryAllEntity() { + MongoCollection mongoCollection = getCollection(); + FindIterable findIterable = mongoCollection.find(); + List list = getEntityList(findIterable); + return list; + } + + public Pager queryPagerList(Bson filter, int pageIndex, int pageSize) { + MongoCollection mongoCollection = getCollection(); + long totalCount = mongoCollection.countDocuments(filter); + FindIterable findIterable = + mongoCollection.find().skip((pageIndex - 1) * pageSize).sort(new BasicDBObject()) + .limit(pageSize); + List resultList = getEntityList(findIterable); + Pager pager = new Pager(resultList, totalCount, pageIndex, pageSize); + return pager; + } + + public MongoManager getManager() { + return manager; + } + + public void setManager(MongoManager manager) { + this.manager = manager; + } + + private List getEntityList(FindIterable findIterable) { + MongoCursor mongoCursor = findIterable.iterator(); + List list = new ArrayList(); + Document document = null; + while (mongoCursor.hasNext()) { + document = mongoCursor.next(); + T object; + try { + object = Converter.jsonStringToObject(document.toJson(), getReferencedClass()); + list.add(object); + } catch (Exception e) { + e.printStackTrace(); + } + } + return list; + } + + private MongoCollection getCollection() { + if (Objects.isNull(manager) || Objects.isNull(manager.getDb())) { + return null; + } + + if (Objects.isNull(collection)) { + collection = manager.getDb().getCollection(collectionName()); + } + + return collection; + } }