|
1 | 1 | package org.tron.eventplugin;
|
| 2 | +import com.typesafe.config.Config; |
| 3 | +import com.typesafe.config.ConfigFactory; |
| 4 | +import java.io.File; |
2 | 5 | import org.apache.kafka.clients.producer.*;
|
3 | 6 | import org.slf4j.Logger;
|
4 | 7 | import org.slf4j.LoggerFactory;
|
@@ -94,7 +97,20 @@ private KafkaProducer createProducer(int eventType){
|
94 | 97 | props.put("bootstrap.servers", this.serverAddress);
|
95 | 98 | props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
96 | 99 | props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
97 |
| - |
| 100 | + String defaultConfig = "kafka.config"; |
| 101 | + File configFile = new File(defaultConfig); |
| 102 | + if(configFile.exists()) { |
| 103 | + Config config = ConfigFactory.load(defaultConfig); |
| 104 | + if (config.hasPath("authorization.user") && config.hasPath("authorization.passwd")) { |
| 105 | + String user = config.getString("authorization.user"); |
| 106 | + String passwd = config.getString("authorization.passwd"); |
| 107 | + props.put("security.protocol", "SASL_PLAINTEXT"); |
| 108 | + props.put("sasl.mechanism", "SCRAM-SHA-512"); |
| 109 | + props.put("sasl.jaas.config", |
| 110 | + "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + |
| 111 | + user + "\" password=\"" + passwd + "\""); |
| 112 | + } |
| 113 | + } |
98 | 114 | producer = new KafkaProducer<String, String>(props);
|
99 | 115 |
|
100 | 116 | producerMap.put(eventType, producer);
|
|
0 commit comments