Skip to content

Commit a84c254

Browse files
committed
添加服务端消息轨迹
1 parent 241f708 commit a84c254

File tree

13 files changed

+655
-149
lines changed

13 files changed

+655
-149
lines changed

rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerController.java

Lines changed: 37 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,46 +15,22 @@
1515
*/
1616
package com.alibaba.rocketmq.broker;
1717

18-
import java.io.IOException;
19-
import java.util.Properties;
20-
import java.util.concurrent.BlockingQueue;
21-
import java.util.concurrent.ExecutorService;
22-
import java.util.concurrent.Executors;
23-
import java.util.concurrent.LinkedBlockingQueue;
24-
import java.util.concurrent.ScheduledExecutorService;
25-
import java.util.concurrent.ThreadPoolExecutor;
26-
import java.util.concurrent.TimeUnit;
27-
28-
import org.slf4j.Logger;
29-
import org.slf4j.LoggerFactory;
30-
31-
import com.alibaba.rocketmq.broker.client.ClientHousekeepingService;
32-
import com.alibaba.rocketmq.broker.client.ConsumerIdsChangeListener;
33-
import com.alibaba.rocketmq.broker.client.ConsumerManager;
34-
import com.alibaba.rocketmq.broker.client.DefaultConsumerIdsChangeListener;
35-
import com.alibaba.rocketmq.broker.client.ProducerManager;
18+
import com.alibaba.rocketmq.broker.client.*;
3619
import com.alibaba.rocketmq.broker.client.net.Broker2Client;
3720
import com.alibaba.rocketmq.broker.client.rebalance.RebalanceLockManager;
3821
import com.alibaba.rocketmq.broker.filtersrv.FilterServerManager;
3922
import com.alibaba.rocketmq.broker.longpolling.PullRequestHoldService;
23+
import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageHook;
24+
import com.alibaba.rocketmq.broker.mqtrace.SendMessageHook;
4025
import com.alibaba.rocketmq.broker.offset.ConsumerOffsetManager;
4126
import com.alibaba.rocketmq.broker.out.BrokerOuterAPI;
42-
import com.alibaba.rocketmq.broker.processor.AdminBrokerProcessor;
43-
import com.alibaba.rocketmq.broker.processor.ClientManageProcessor;
44-
import com.alibaba.rocketmq.broker.processor.EndTransactionProcessor;
45-
import com.alibaba.rocketmq.broker.processor.PullMessageProcessor;
46-
import com.alibaba.rocketmq.broker.processor.QueryMessageProcessor;
47-
import com.alibaba.rocketmq.broker.processor.SendMessageProcessor;
27+
import com.alibaba.rocketmq.broker.processor.*;
4828
import com.alibaba.rocketmq.broker.slave.SlaveSynchronize;
4929
import com.alibaba.rocketmq.broker.stats.BrokerStats;
5030
import com.alibaba.rocketmq.broker.stats.BrokerStatsManager;
5131
import com.alibaba.rocketmq.broker.subscription.SubscriptionGroupManager;
5232
import com.alibaba.rocketmq.broker.topic.TopicConfigManager;
53-
import com.alibaba.rocketmq.common.BrokerConfig;
54-
import com.alibaba.rocketmq.common.DataVersion;
55-
import com.alibaba.rocketmq.common.MixAll;
56-
import com.alibaba.rocketmq.common.ThreadFactoryImpl;
57-
import com.alibaba.rocketmq.common.UtilAll;
33+
import com.alibaba.rocketmq.common.*;
5834
import com.alibaba.rocketmq.common.constant.LoggerName;
5935
import com.alibaba.rocketmq.common.namesrv.RegisterBrokerResult;
6036
import com.alibaba.rocketmq.common.protocol.RequestCode;
@@ -69,6 +45,14 @@
6945
import com.alibaba.rocketmq.store.MessageStore;
7046
import com.alibaba.rocketmq.store.config.BrokerRole;
7147
import com.alibaba.rocketmq.store.config.MessageStoreConfig;
48+
import org.slf4j.Logger;
49+
import org.slf4j.LoggerFactory;
50+
51+
import java.io.IOException;
52+
import java.util.ArrayList;
53+
import java.util.List;
54+
import java.util.Properties;
55+
import java.util.concurrent.*;
7256

7357

7458
/**
@@ -346,7 +330,8 @@ public void registerProcessor() {
346330
/**
347331
* SendMessageProcessor
348332
*/
349-
NettyRequestProcessor sendProcessor = new SendMessageProcessor(this);
333+
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
334+
sendProcessor.registerSendMessageHook(sendMessageHookList);
350335
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor,
351336
this.sendMessageExecutor);
352337
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor,
@@ -357,6 +342,7 @@ public void registerProcessor() {
357342
*/
358343
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor,
359344
this.pullMessageExecutor);
345+
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
360346

361347
/**
362348
* QueryMessageProcessor
@@ -387,8 +373,9 @@ public void registerProcessor() {
387373
/**
388374
* Default
389375
*/
390-
this.remotingServer
391-
.registerDefaultProcessor(new AdminBrokerProcessor(this), this.adminBrokerExecutor);
376+
AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
377+
adminProcessor.registerConsumeMessageHook(this.consumeMessageHookList);
378+
this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
392379
}
393380

394381

@@ -765,6 +752,24 @@ public void run() {
765752
}, 5, TimeUnit.MINUTES);
766753
}
767754

755+
// 注册发送消息轨迹 hook
756+
private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
757+
758+
759+
public void registerSendMessageHook(final SendMessageHook hook) {
760+
this.sendMessageHookList.add(hook);
761+
log.info("register SendMessageHook Hook, {}", hook.hookName());
762+
}
763+
764+
// 注册消费消息轨迹 hook
765+
private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
766+
767+
768+
public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
769+
this.consumeMessageHookList.add(hook);
770+
log.info("register ConsumeMessageHook Hook, {}", hook.hookName());
771+
}
772+
768773

769774
public void registerServerRPCHook(RPCHook rpcHook) {
770775
getRemotingServer().registerRPCHook(rpcHook);
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/**
2+
* Copyright (C) 2010-2013 Alibaba Group Holding Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.alibaba.rocketmq.broker.mqtrace;
17+
18+
import java.util.Map;
19+
20+
21+
public class ConsumeMessageContext {
22+
private String consumerGroup;
23+
private String topic;
24+
private Integer queueId;
25+
private String clientHost;
26+
private String storeHost;
27+
private Map<Long, String> messageIds;
28+
private boolean success;
29+
private String status;
30+
31+
32+
public String getConsumerGroup() {
33+
return consumerGroup;
34+
}
35+
36+
37+
public void setConsumerGroup(String consumerGroup) {
38+
this.consumerGroup = consumerGroup;
39+
}
40+
41+
42+
public String getTopic() {
43+
return topic;
44+
}
45+
46+
47+
public void setTopic(String topic) {
48+
this.topic = topic;
49+
}
50+
51+
52+
public Integer getQueueId() {
53+
return queueId;
54+
}
55+
56+
57+
public void setQueueId(Integer queueId) {
58+
this.queueId = queueId;
59+
}
60+
61+
62+
public String getClientHost() {
63+
return clientHost;
64+
}
65+
66+
67+
public void setClientHost(String clientHost) {
68+
this.clientHost = clientHost;
69+
}
70+
71+
72+
public Map<Long, String> getMessageIds() {
73+
return messageIds;
74+
}
75+
76+
77+
public void setMessageIds(Map<Long, String> messageIds) {
78+
this.messageIds = messageIds;
79+
}
80+
81+
82+
public boolean isSuccess() {
83+
return success;
84+
}
85+
86+
87+
public void setSuccess(boolean success) {
88+
this.success = success;
89+
}
90+
91+
92+
public String getStatus() {
93+
return status;
94+
}
95+
96+
97+
public void setStatus(String status) {
98+
this.status = status;
99+
}
100+
101+
102+
public String getStoreHost() {
103+
return storeHost;
104+
}
105+
106+
107+
public void setStoreHost(String storeHost) {
108+
this.storeHost = storeHost;
109+
}
110+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/**
2+
* Copyright (C) 2010-2013 Alibaba Group Holding Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.alibaba.rocketmq.broker.mqtrace;
17+
18+
19+
20+
public interface ConsumeMessageHook {
21+
public String hookName();
22+
23+
24+
public void consumeMessageBefore(final ConsumeMessageContext context);
25+
26+
27+
public void consumeMessageAfter(final ConsumeMessageContext context);
28+
}

0 commit comments

Comments
 (0)