1
1
package org .tron .eventplugin ;
2
2
import org .pf4j .util .StringUtils ;
3
+ import java .util .concurrent .ExecutorService ;
4
+ import java .util .concurrent .Executors ;
5
+
3
6
import org .slf4j .Logger ;
4
7
import org .slf4j .LoggerFactory ;
5
8
import java .util .*;
6
9
import java .util .concurrent .BlockingQueue ;
7
10
import java .util .concurrent .LinkedBlockingQueue ;
8
11
import java .util .concurrent .TimeUnit ;
9
12
import com .fasterxml .jackson .databind .ObjectMapper ;
13
+ import org .springframework .util .concurrent .ListenableFuture ;
10
14
import org .tron .mongodb .MongoConfig ;
11
15
import org .tron .mongodb .MongoManager ;
12
16
import org .tron .mongodb .MongoTemplate ;
13
17
14
18
public class MongodbSenderImpl {
15
19
private static MongodbSenderImpl instance = null ;
16
20
private static final Logger log = LoggerFactory .getLogger (MongodbSenderImpl .class );
21
+ ExecutorService service = Executors .newFixedThreadPool (8 );
22
+ List <ListenableFuture <?>> futures = new ArrayList <>();
17
23
18
24
private String serverAddress = "" ;
19
25
private boolean loaded = false ;
@@ -157,11 +163,14 @@ public void handleBlockEvent(Object data) {
157
163
if (blockTopic == null || blockTopic .length () == 0 ){
158
164
return ;
159
165
}
160
-
161
166
MongoTemplate template = mongoTemplateMap .get (blockTopic );
162
- if (Objects .nonNull (template )){
163
- template .addEntity ((String )data );
164
- System .out .println ("handleBlockEvent " + data );
167
+ if (Objects .nonNull (template )) {
168
+ service .execute (new Runnable () {
169
+ @ Override
170
+ public void run () {
171
+ template .addEntity ((String )data );
172
+ }
173
+ });
165
174
}
166
175
}
167
176
@@ -170,9 +179,15 @@ public void handleTransactionTrigger(Object data) {
170
179
return ;
171
180
}
172
181
182
+
173
183
MongoTemplate template = mongoTemplateMap .get (transactionTopic );
174
- if (Objects .nonNull (template )){
175
- template .addEntity ((String )data );
184
+ if (Objects .nonNull (template )) {
185
+ service .execute (new Runnable () {
186
+ @ Override
187
+ public void run () {
188
+ template .addEntity ((String )data );
189
+ }
190
+ });
176
191
}
177
192
}
178
193
@@ -182,8 +197,13 @@ public void handleContractLogTrigger(Object data) {
182
197
}
183
198
184
199
MongoTemplate template = mongoTemplateMap .get (contractLogTopic );
185
- if (Objects .nonNull (template )){
186
- template .addEntity ((String )data );
200
+ if (Objects .nonNull (template )) {
201
+ service .execute (new Runnable () {
202
+ @ Override
203
+ public void run () {
204
+ template .addEntity ((String )data );
205
+ }
206
+ });
187
207
}
188
208
}
189
209
@@ -193,8 +213,13 @@ public void handleContractEventTrigger(Object data) {
193
213
}
194
214
195
215
MongoTemplate template = mongoTemplateMap .get (contractEventTopic );
196
- if (Objects .nonNull (template )){
197
- template .addEntity ((String )data );
216
+ if (Objects .nonNull (template )) {
217
+ service .execute (new Runnable () {
218
+ @ Override
219
+ public void run () {
220
+ template .addEntity ((String )data );
221
+ }
222
+ });
198
223
}
199
224
}
200
225
0 commit comments