Skip to content

Commit 1e32f42

Browse files
committed
重构过滤服务器,为diamond存储过滤类做准备
1 parent a41c36d commit 1e32f42

File tree

4 files changed

+97
-34
lines changed

4 files changed

+97
-34
lines changed

rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/FiltersrvStartup.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
* @since 2014-4-10
4848
*/
4949
public class FiltersrvStartup {
50+
public static Logger log;
51+
5052

5153
public static Options buildCommandlineOptions(final Options options) {
5254
Option opt = new Option("c", "configFile", true, "Filter server config properties file");
@@ -62,11 +64,29 @@ public static Options buildCommandlineOptions(final Options options) {
6264

6365

6466
public static void main(String[] args) {
65-
main0(args);
67+
start(createController(args));
6668
}
6769

6870

69-
public static FiltersrvController main0(String[] args) {
71+
public static FiltersrvController start(FiltersrvController controller) {
72+
// 启动服务
73+
try {
74+
controller.start();
75+
}
76+
catch (Exception e) {
77+
e.printStackTrace();
78+
System.exit(-1);
79+
}
80+
81+
String tip = "The Filter Server boot success, " + controller.localAddr();
82+
log.info(tip);
83+
System.out.println(tip);
84+
85+
return controller;
86+
}
87+
88+
89+
public static FiltersrvController createController(String[] args) {
7090
System.setProperty(RemotingCommand.RemotingVersionKey, Integer.toString(MQVersion.CurrentVersion));
7191

7292
// Socket发送缓冲区大小
@@ -140,7 +160,7 @@ public static FiltersrvController main0(String[] args) {
140160
configurator.setContext(lc);
141161
lc.reset();
142162
configurator.doConfigure(filtersrvConfig.getRocketmqHome() + "/conf/logback_filtersrv.xml");
143-
final Logger log = LoggerFactory.getLogger(LoggerName.FiltersrvLoggerName);
163+
log = LoggerFactory.getLogger(LoggerName.FiltersrvLoggerName);
144164

145165
// 初始化服务控制对象
146166
final FiltersrvController controller =
@@ -171,13 +191,6 @@ public void run() {
171191
}
172192
}, "ShutdownHook"));
173193

174-
// 启动服务
175-
controller.start();
176-
177-
String tip = "The Filter Server boot success, " + controller.localAddr();
178-
log.info(tip);
179-
System.out.println(tip);
180-
181194
return controller;
182195
}
183196
catch (Throwable e) {
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package com.alibaba.rocketmq.filtersrv.filter;
2+
3+
public interface FilterClassFetchMethod {
4+
public String fetch(final String topic, final String consumerGroup, final String className);
5+
}

rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/filter/FilterClassManager.java

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
import com.alibaba.rocketmq.common.UtilAll;
1616
import com.alibaba.rocketmq.common.constant.LoggerName;
1717
import com.alibaba.rocketmq.common.filter.MessageFilter;
18-
import com.alibaba.rocketmq.common.utils.HttpTinyClient;
19-
import com.alibaba.rocketmq.common.utils.HttpTinyClient.HttpResult;
2018
import com.alibaba.rocketmq.filtersrv.FiltersrvController;
2119

2220

@@ -35,9 +33,14 @@ public class FilterClassManager {
3533
private final ScheduledExecutorService scheduledExecutorService = Executors
3634
.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSGetClassScheduledThread"));
3735

36+
private FilterClassFetchMethod filterClassFetchMethod;
37+
3838

3939
public FilterClassManager(FiltersrvController filtersrvController) {
4040
this.filtersrvController = filtersrvController;
41+
this.filterClassFetchMethod =
42+
new HttpFilterClassFetchMethod(this.filtersrvController.getFiltersrvConfig()
43+
.getFilterClassRepertoryUrl());
4144
}
4245

4346

@@ -65,28 +68,22 @@ private void fetchClassFromRemoteHost() {
6568
try {
6669
Entry<String, FilterClassInfo> next = it.next();
6770
FilterClassInfo filterClassInfo = next.getValue();
68-
69-
String url = this.filtersrvController.getFiltersrvConfig().getFilterClassRepertoryUrl();
70-
url += "/";
71-
url += filterClassInfo.getClassName();
72-
url += ".java";
73-
74-
HttpResult result = HttpTinyClient.httpGet(url, null, null, "UTF-8", 5000);
75-
if (200 == result.code) {
76-
String responseStr = result.content;
77-
byte[] filterSourceBinary = responseStr.getBytes("UTF-8");
78-
int classCRC = UtilAll.crc32(responseStr.getBytes("UTF-8"));
79-
if (classCRC != filterClassInfo.getClassCRC()) {
80-
String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);
81-
Class<?> newClass =
82-
DynaCode.compileAndLoadClass(filterClassInfo.getClassName(), javaSource);
83-
Object newInstance = newClass.newInstance();
84-
filterClassInfo.setMessageFilter((MessageFilter) newInstance);
85-
filterClassInfo.setClassCRC(classCRC);
86-
87-
log.info("fetch Remote class File OK, {} {} {}", next.getKey(),
88-
filterClassInfo.getClassName(), url);
89-
}
71+
String[] topicAndGroup = next.getKey().split("@");
72+
String responseStr =
73+
this.filterClassFetchMethod.fetch(topicAndGroup[0], topicAndGroup[1],
74+
filterClassInfo.getClassName());
75+
byte[] filterSourceBinary = responseStr.getBytes("UTF-8");
76+
int classCRC = UtilAll.crc32(responseStr.getBytes("UTF-8"));
77+
if (classCRC != filterClassInfo.getClassCRC()) {
78+
String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);
79+
Class<?> newClass =
80+
DynaCode.compileAndLoadClass(filterClassInfo.getClassName(), javaSource);
81+
Object newInstance = newClass.newInstance();
82+
filterClassInfo.setMessageFilter((MessageFilter) newInstance);
83+
filterClassInfo.setClassCRC(classCRC);
84+
85+
log.info("fetch Remote class File OK, {} {}", next.getKey(),
86+
filterClassInfo.getClassName());
9087
}
9188
}
9289
catch (Exception e) {
@@ -158,4 +155,14 @@ public boolean registerFilterClass(final String consumerGroup, final String topi
158155
public FilterClassInfo findFilterClass(final String consumerGroup, final String topic) {
159156
return this.filterClassTable.get(buildKey(consumerGroup, topic));
160157
}
158+
159+
160+
public FilterClassFetchMethod getFilterClassFetchMethod() {
161+
return filterClassFetchMethod;
162+
}
163+
164+
165+
public void setFilterClassFetchMethod(FilterClassFetchMethod filterClassFetchMethod) {
166+
this.filterClassFetchMethod = filterClassFetchMethod;
167+
}
161168
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.alibaba.rocketmq.filtersrv.filter;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
6+
import com.alibaba.rocketmq.common.constant.LoggerName;
7+
import com.alibaba.rocketmq.common.utils.HttpTinyClient;
8+
import com.alibaba.rocketmq.common.utils.HttpTinyClient.HttpResult;
9+
10+
11+
public class HttpFilterClassFetchMethod implements FilterClassFetchMethod {
12+
private static final Logger log = LoggerFactory.getLogger(LoggerName.FiltersrvLoggerName);
13+
private final String url;
14+
15+
16+
public HttpFilterClassFetchMethod(String url) {
17+
this.url = url;
18+
}
19+
20+
21+
@Override
22+
public String fetch(String topic, String consumerGroup, String className) {
23+
String thisUrl = String.format("%s/%s.java", this.url, className);
24+
25+
try {
26+
HttpResult result = HttpTinyClient.httpGet(thisUrl, null, null, "UTF-8", 5000);
27+
if (200 == result.code) {
28+
return result.content;
29+
}
30+
}
31+
catch (Exception e) {
32+
log.error(
33+
String.format("call <%s> exception, Topic: %s Group: %s", thisUrl, topic, consumerGroup), e);
34+
}
35+
36+
return null;
37+
}
38+
}

0 commit comments

Comments
 (0)