1
1
package org .tron .eventplugin ;
2
2
import org .pf4j .util .StringUtils ;
3
+
4
+ import java .io .IOException ;
5
+ import java .io .InputStream ;
3
6
import java .util .concurrent .ExecutorService ;
4
7
import java .util .concurrent .Executors ;
5
8
9
12
import java .util .concurrent .BlockingQueue ;
10
13
import java .util .concurrent .LinkedBlockingQueue ;
11
14
import java .util .concurrent .TimeUnit ;
12
- import com .fasterxml .jackson .databind .ObjectMapper ;
13
- import org .springframework .util .concurrent .ListenableFuture ;
14
15
import org .tron .mongodb .MongoConfig ;
15
16
import org .tron .mongodb .MongoManager ;
16
17
import org .tron .mongodb .MongoTemplate ;
@@ -19,11 +20,8 @@ public class MongodbSenderImpl{
19
20
private static MongodbSenderImpl instance = null ;
20
21
private static final Logger log = LoggerFactory .getLogger (MongodbSenderImpl .class );
21
22
ExecutorService service = Executors .newFixedThreadPool (8 );
22
- List <ListenableFuture <?>> futures = new ArrayList <>();
23
23
24
- private String serverAddress = "" ;
25
24
private boolean loaded = false ;
26
-
27
25
private BlockingQueue <Object > triggerQueue = new LinkedBlockingQueue ();
28
26
29
27
private String blockTopic = "" ;
@@ -37,8 +35,7 @@ public class MongodbSenderImpl{
37
35
private MongoManager mongoManager ;
38
36
private Map <String , MongoTemplate > mongoTemplateMap ;
39
37
40
- private ObjectMapper mapper = new ObjectMapper ();
41
-
38
+ private MongoConfig mongoConfig ;
42
39
43
40
public static MongodbSenderImpl getInstance (){
44
41
if (Objects .isNull (instance )) {
@@ -52,8 +49,41 @@ public static MongodbSenderImpl getInstance(){
52
49
return instance ;
53
50
}
54
51
55
- public void setServerAddress (final String address ){
56
- initMongoConfig (address );
52
+ public void setServerAddress (final String serverAddress ){
53
+ if (StringUtils .isNullOrEmpty (serverAddress )){
54
+ return ;
55
+ }
56
+
57
+ String [] params = serverAddress .split (":" );
58
+ if (params .length != 2 ){
59
+ return ;
60
+ }
61
+
62
+ String mongoHostName = "" ;
63
+ int mongoPort = -1 ;
64
+
65
+ try {
66
+ mongoHostName = params [0 ];
67
+ mongoPort = Integer .valueOf (params [1 ]);
68
+ }
69
+ catch (Exception e ){
70
+ e .printStackTrace ();
71
+ return ;
72
+ }
73
+
74
+ if (Objects .isNull (mongoConfig )){
75
+ mongoConfig = new MongoConfig ();
76
+ }
77
+
78
+ mongoConfig .setHost (mongoHostName );
79
+ mongoConfig .setPort (mongoPort );
80
+
81
+ loadMongoConfig ();
82
+
83
+ if (Objects .isNull (mongoManager )){
84
+ mongoManager = new MongoManager ();
85
+ mongoManager .initConfig (mongoConfig );
86
+ }
57
87
}
58
88
59
89
public void init (){
@@ -66,44 +96,44 @@ public void init(){
66
96
triggerProcessThread .start ();
67
97
68
98
mongoTemplateMap = new HashMap <>();
69
-
70
99
loaded = true ;
71
100
}
72
101
73
- private void initMongoConfig (String serverAddress ){
74
- if (StringUtils .isNullOrEmpty (serverAddress )){
75
- return ;
76
- }
102
+ private void loadMongoConfig (){
77
103
78
- String [] params = serverAddress .split (":" );
79
- if (params .length != 2 ){
104
+ if (Objects .isNull (mongoConfig )){
80
105
return ;
81
106
}
82
107
83
- String hostName = "" ;
84
- int port = -1 ;
108
+ Properties properties = new Properties ();
85
109
86
- try {
87
- hostName = params [0 ];
88
- port = Integer .valueOf (params [1 ]);
89
- }
90
- catch (Exception e ){
91
- e .printStackTrace ();
92
- return ;
93
- }
110
+ try {
111
+ InputStream input = getClass ().getClassLoader ().getResourceAsStream ("mongodb.properties" );
112
+ if (Objects .isNull (input )){
113
+ return ;
114
+ }
115
+ properties .load (input );
94
116
95
- MongoConfig config = new MongoConfig ();
96
- config .setDbName ("eventlog" );
97
- config .setHost (hostName );
98
- config .setPort (port );
117
+ String dbName = properties .getProperty ("mongo.dbname" );
118
+ String userName = properties .getProperty ("mongo.username" );
119
+ String password = properties .getProperty ("mongo.password" );
99
120
100
- config .setUsername ("tron" );
101
- config .setPassword ("123456" );
121
+ int connectionsPerHost = Integer .parseInt (properties .getProperty ("mongo.connectionsPerHost" ));
122
+ int threadsAllowedToBlockForConnectionMultiplie = Integer .parseInt (
123
+ properties .getProperty ("mongo.threadsAllowedToBlockForConnectionMultiplier" ));
102
124
103
- if (Objects .isNull (mongoManager )){
104
- mongoManager = new MongoManager ();
105
- mongoManager .initConfig (config );
125
+ mongoConfig .setDbName (dbName );
126
+ mongoConfig .setUsername (userName );
127
+ mongoConfig .setPassword (password );
128
+
129
+ mongoConfig .setConnectionsPerHost (connectionsPerHost );
130
+ mongoConfig .setThreadsAllowedToBlockForConnectionMultiplier (threadsAllowedToBlockForConnectionMultiplie );
131
+ } catch (IOException e ) {
132
+ e .printStackTrace ();
133
+ } catch (Exception e ){
134
+ e .printStackTrace ();
106
135
}
136
+
107
137
}
108
138
109
139
private MongoTemplate createMongoTemplate (final String collectionName ){
0 commit comments