Skip to content

Commit 7e0bc0e

Browse files
committed
#491 增加消息id被谁消费api,扩展id查询命令
1 parent e3ce0cc commit 7e0bc0e

File tree

4 files changed

+21
-11
lines changed

4 files changed

+21
-11
lines changed

rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PushConsumer.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ public static void main(String[] args) throws InterruptedException, MQClientExce
3737
* 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>
3838
* 注意:ConsumerGroupName需要由应用来保证唯一
3939
*/
40-
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_222");
41-
// consumer.setNamesrvAddr("10.235.170.7:9877");
40+
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_001");
41+
// consumer.setNamesrvAddr("10.235.169.72:9876");
4242
// consumer.setNamesrvAddr("127.0.0.1:9876");
4343

4444
/**
@@ -66,8 +66,7 @@ public static void main(String[] args) throws InterruptedException, MQClientExce
6666
@Override
6767
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
6868
ConsumeConcurrentlyContext context) {
69-
// System.out.println(Thread.currentThread().getName() +
70-
// " Receive New Messages: " + msgs);
69+
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
7170

7271
MessageExt msg = msgs.get(0);
7372
if (msg.getTopic().equals("TopicTest1")) {

rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExtImpl.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -748,10 +748,12 @@ public boolean consumed(final MessageExt msg, final String group) throws Remotin
748748
MessageQueue mq = next.getKey();
749749
if (mq.getTopic().equals(msg.getTopic()) && mq.getQueueId() == msg.getQueueId()) {
750750
BrokerData brokerData = ci.getBrokerAddrTable().get(mq.getBrokerName());
751-
String addr = brokerData.getBrokerAddrs().get(0);
752-
if (addr.equals(RemotingUtil.socketAddress2String(msg.getStoreHost()))) {
753-
if (next.getValue().getConsumerOffset() > msg.getQueueOffset()) {
754-
return true;
751+
if (brokerData != null) {
752+
String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
753+
if (addr.equals(RemotingUtil.socketAddress2String(msg.getStoreHost()))) {
754+
if (next.getValue().getConsumerOffset() > msg.getQueueOffset()) {
755+
return true;
756+
}
755757
}
756758
}
757759
}
@@ -791,7 +793,9 @@ public List<MessageTrack> messageTrackDetail(MessageExt msg) throws RemotingExce
791793
Entry<String, SubscriptionData> next = it.next();
792794
if (next.getKey().equals(msg.getTopic())) {
793795
if (next.getValue().getTagsSet().contains(msg.getTags()) //
794-
|| next.getValue().getTagsSet().contains("*")) {
796+
|| next.getValue().getTagsSet().contains("*")//
797+
|| next.getValue().getTagsSet().isEmpty()//
798+
) {
795799

796800
}
797801
else {
@@ -801,7 +805,7 @@ public List<MessageTrack> messageTrackDetail(MessageExt msg) throws RemotingExce
801805
}
802806
}
803807
else {
804-
mt.setTrackType(TrackType.SUBSCRIBED_AND_NOT_CONSUME);
808+
mt.setTrackType(TrackType.SUBSCRIBED_AND_NOT_CONSUME_YET);
805809
}
806810
break;
807811
default:

rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/api/TrackType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ public enum TrackType {
88
// 订阅了,但是是PULL,结果未知
99
SUBSCRIBED_BUT_PULL,
1010
// 订阅了,但是没有消费(Offset小)
11-
SUBSCRIBED_AND_NOT_CONSUME,
11+
SUBSCRIBED_AND_NOT_CONSUME_YET,
1212
// 未知异常
1313
UNKNOW_EXCEPTION,
1414
}

rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/**
2+
23
* Copyright (C) 2010-2013 Alibaba Group Holding Limited
34
*
45
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -35,6 +36,7 @@
3536
import com.alibaba.rocketmq.remoting.exception.RemotingException;
3637
import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
3738
import com.alibaba.rocketmq.tools.admin.api.MessageTrack;
39+
import com.alibaba.rocketmq.tools.command.MQAdminStartup;
3840
import com.alibaba.rocketmq.tools.command.SubCommand;
3941

4042

@@ -218,4 +220,9 @@ private static String createBodyFile(MessageExt msg) throws IOException {
218220
dos.close();
219221
}
220222
}
223+
224+
225+
public static void main(String[] args) {
226+
MQAdminStartup.main(new String[] { "queryMsgById", "-i", "0AEBAA0500002AE5000002A693AE8961" });
227+
}
221228
}

0 commit comments

Comments
 (0)