Skip to content

Commit bf1ad71

Browse files
committed
#491 增加消息id被谁消费api,扩展id查询命令
1 parent dea1df4 commit bf1ad71

File tree

6 files changed

+182
-0
lines changed

6 files changed

+182
-0
lines changed

rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExt.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import com.alibaba.rocketmq.remoting.exception.RemotingException;
5151
import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
5252
import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
53+
import com.alibaba.rocketmq.tools.admin.api.MessageTrack;
5354

5455

5556
/**
@@ -418,4 +419,12 @@ public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup,
418419
MQBrokerException {
419420
return defaultMQAdminExtImpl.consumeMessageDirectly(consumerGroup, clientId, msgId);
420421
}
422+
423+
424+
@Override
425+
public List<MessageTrack> messageTrackDetail(MessageExt msg) {
426+
// TODO Auto-generated method stub
427+
return null;
428+
}
429+
421430
}

rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExtImpl.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
import java.util.Collections;
2222
import java.util.HashMap;
2323
import java.util.HashSet;
24+
import java.util.Iterator;
2425
import java.util.List;
2526
import java.util.Map;
27+
import java.util.Map.Entry;
2628
import java.util.Properties;
2729
import java.util.Set;
2830

@@ -43,6 +45,7 @@
4345
import com.alibaba.rocketmq.common.admin.OffsetWrapper;
4446
import com.alibaba.rocketmq.common.admin.RollbackStats;
4547
import com.alibaba.rocketmq.common.admin.TopicStatsTable;
48+
import com.alibaba.rocketmq.common.filter.FilterAPI;
4649
import com.alibaba.rocketmq.common.help.FAQUrl;
4750
import com.alibaba.rocketmq.common.message.MessageExt;
4851
import com.alibaba.rocketmq.common.message.MessageQueue;
@@ -59,16 +62,20 @@
5962
import com.alibaba.rocketmq.common.protocol.body.QueueTimeSpan;
6063
import com.alibaba.rocketmq.common.protocol.body.TopicList;
6164
import com.alibaba.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
65+
import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
6266
import com.alibaba.rocketmq.common.protocol.route.BrokerData;
6367
import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
6468
import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
6569
import com.alibaba.rocketmq.remoting.RPCHook;
70+
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
6671
import com.alibaba.rocketmq.remoting.common.RemotingUtil;
6772
import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
6873
import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
6974
import com.alibaba.rocketmq.remoting.exception.RemotingException;
7075
import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
7176
import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
77+
import com.alibaba.rocketmq.tools.admin.api.MessageTrack;
78+
import com.alibaba.rocketmq.tools.admin.api.TrackType;
7279

7380

7481
/**
@@ -727,4 +734,87 @@ public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup,
727734
return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(
728735
RemotingUtil.socketAddress2String(msg.getStoreHost()), consumerGroup, clientId, msgId, 10000);
729736
}
737+
738+
739+
public boolean consumed(final MessageExt msg, final String group) throws RemotingException,
740+
MQClientException, InterruptedException, MQBrokerException {
741+
// 查询消费进度
742+
ConsumeStats cstats = this.examineConsumeStats(group);
743+
744+
ClusterInfo ci = this.examineBrokerClusterInfo();
745+
746+
Iterator<Entry<MessageQueue, OffsetWrapper>> it = cstats.getOffsetTable().entrySet().iterator();
747+
while (it.hasNext()) {
748+
Entry<MessageQueue, OffsetWrapper> next = it.next();
749+
MessageQueue mq = next.getKey();
750+
if (mq.getTopic().equals(msg.getTopic()) && mq.getQueueId() == msg.getQueueId()) {
751+
BrokerData brokerData = ci.getBrokerAddrTable().get(mq.getBrokerName());
752+
String addr = brokerData.getBrokerAddrs().get(0);
753+
if (addr.equals(RemotingUtil.socketAddress2String(msg.getStoreHost()))) {
754+
if (next.getValue().getConsumerOffset() > msg.getQueueOffset()) {
755+
return true;
756+
}
757+
}
758+
}
759+
}
760+
761+
return false;
762+
}
763+
764+
765+
@Override
766+
public List<MessageTrack> messageTrackDetail(MessageExt msg) throws RemotingException, MQClientException,
767+
InterruptedException, MQBrokerException {
768+
List<MessageTrack> result = new ArrayList<MessageTrack>();
769+
770+
GroupList groupList = this.queryTopicConsumeByWho(msg.getTopic());
771+
772+
for (String group : groupList.getGroupList()) {
773+
774+
// 查询连接
775+
MessageTrack mt = new MessageTrack();
776+
mt.setConsumerGroup(group);
777+
mt.setTrackType(TrackType.UNKNOW_EXCEPTION);
778+
try {
779+
ConsumerConnection cc = this.examineConsumerConnectionInfo(group);
780+
switch (cc.getConsumeType()) {
781+
case CONSUME_ACTIVELY:
782+
mt.setTrackType(TrackType.SUBSCRIBED_BUT_PULL);
783+
break;
784+
case CONSUME_PASSIVELY:
785+
boolean ifConsumed = this.consumed(msg, group);
786+
if (ifConsumed) {
787+
mt.setTrackType(TrackType.SUBSCRIBED_AND_CONSUMED);
788+
789+
// 查看订阅关系是否匹配
790+
Iterator<Entry<String, SubscriptionData>> it =
791+
cc.getSubscriptionTable().entrySet().iterator();
792+
while (it.hasNext()) {
793+
Entry<String, SubscriptionData> next = it.next();
794+
if (next.getKey().equals(msg.getTopic())) {
795+
if (next.getValue().getTagsSet().contains(msg.getTags()) //
796+
|| next.getValue().getTagsSet().contains("*")) {
797+
798+
}
799+
else {
800+
mt.setTrackType(TrackType.SUBSCRIBED_BUT_FILTERD);
801+
}
802+
}
803+
}
804+
}
805+
else {
806+
mt.setTrackType(TrackType.SUBSCRIBED_AND_NOT_CONSUME);
807+
}
808+
break;
809+
default:
810+
break;
811+
}
812+
}
813+
catch (Exception e) {
814+
mt.setExceptionDesc(RemotingHelper.exceptionSimpleDesc(e));
815+
}
816+
}
817+
818+
return null;
819+
}
730820
}

rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/MQAdminExt.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.alibaba.rocketmq.common.admin.ConsumeStats;
2929
import com.alibaba.rocketmq.common.admin.RollbackStats;
3030
import com.alibaba.rocketmq.common.admin.TopicStatsTable;
31+
import com.alibaba.rocketmq.common.message.MessageExt;
3132
import com.alibaba.rocketmq.common.message.MessageQueue;
3233
import com.alibaba.rocketmq.common.protocol.body.ClusterInfo;
3334
import com.alibaba.rocketmq.common.protocol.body.ConsumeByWho;
@@ -46,6 +47,7 @@
4647
import com.alibaba.rocketmq.remoting.exception.RemotingException;
4748
import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
4849
import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
50+
import com.alibaba.rocketmq.tools.admin.api.MessageTrack;
4951

5052

5153
/**
@@ -586,4 +588,18 @@ public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup,
586588
String clientId, //
587589
String msgId) throws RemotingException, MQClientException, InterruptedException,
588590
MQBrokerException;
591+
592+
593+
/**
594+
* 查询消息被谁消费了
595+
*
596+
* @param msg
597+
* @return
598+
* @throws RemotingException
599+
* @throws MQClientException
600+
* @throws InterruptedException
601+
* @throws MQBrokerException
602+
*/
603+
public List<MessageTrack> messageTrackDetail(MessageExt msg) throws RemotingException, MQClientException,
604+
InterruptedException, MQBrokerException;
589605
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package com.alibaba.rocketmq.tools.admin.api;
2+
3+
4+
5+
public class MessageTrack {
6+
private String consumerGroup;
7+
private TrackType trackType;
8+
private String exceptionDesc;
9+
10+
11+
public String getConsumerGroup() {
12+
return consumerGroup;
13+
}
14+
15+
16+
public void setConsumerGroup(String consumerGroup) {
17+
this.consumerGroup = consumerGroup;
18+
}
19+
20+
21+
public TrackType getTrackType() {
22+
return trackType;
23+
}
24+
25+
26+
public void setTrackType(TrackType trackType) {
27+
this.trackType = trackType;
28+
}
29+
30+
31+
public String getExceptionDesc() {
32+
return exceptionDesc;
33+
}
34+
35+
36+
public void setExceptionDesc(String exceptionDesc) {
37+
this.exceptionDesc = exceptionDesc;
38+
}
39+
40+
41+
@Override
42+
public String toString() {
43+
return "MessageTrack [consumerGroup=" + consumerGroup + ", trackType=" + trackType
44+
+ ", exceptionDesc=" + exceptionDesc + "]";
45+
}
46+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.alibaba.rocketmq.tools.admin.api;
2+
3+
public enum TrackType {
4+
// 订阅了,而且消费了(Offset越过了)
5+
SUBSCRIBED_AND_CONSUMED,
6+
// 订阅了,但是被过滤掉了
7+
SUBSCRIBED_BUT_FILTERD,
8+
// 订阅了,但是是PULL,结果未知
9+
SUBSCRIBED_BUT_PULL,
10+
// 订阅了,但是没有消费(Offset小)
11+
SUBSCRIBED_AND_NOT_CONSUME,
12+
// 未知异常
13+
UNKNOW_EXCEPTION,
14+
}

rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.io.File;
2020
import java.io.FileOutputStream;
2121
import java.io.IOException;
22+
import java.util.List;
2223

2324
import org.apache.commons.cli.CommandLine;
2425
import org.apache.commons.cli.Option;
@@ -33,6 +34,7 @@
3334
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
3435
import com.alibaba.rocketmq.remoting.exception.RemotingException;
3536
import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
37+
import com.alibaba.rocketmq.tools.admin.api.MessageTrack;
3638
import com.alibaba.rocketmq.tools.command.SubCommand;
3739

3840

@@ -151,6 +153,11 @@ public static void queryById(final DefaultMQAdminExt admin, final String msgId)
151153
"Message Body Path:",//
152154
bodyTmpFilePath//
153155
);
156+
157+
List<MessageTrack> mtdList = admin.messageTrackDetail(msg);
158+
for (MessageTrack mt : mtdList) {
159+
System.out.println(mt);
160+
}
154161
}
155162

156163

0 commit comments

Comments
 (0)