Skip to content

Commit c66cd00

Browse files
committed
#491 增加消息id被谁消费api,扩展id查询命令
1 parent bf1ad71 commit c66cd00

File tree

4 files changed

+19
-10
lines changed

4 files changed

+19
-10
lines changed

rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/Producer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public static void main(String[] args) throws MQClientException, InterruptedExce
3030
* 因为服务器会回查这个Group下的任意一个Producer
3131
*/
3232
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
33-
producer.setNamesrvAddr("10.235.170.7:9877");
33+
//producer.setNamesrvAddr("10.235.170.7:9877");
3434
/**
3535
* Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>
3636
* 注意:切记不可以在每次发送消息时,都调用start方法

rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExt.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -422,9 +422,9 @@ public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup,
422422

423423

424424
@Override
425-
public List<MessageTrack> messageTrackDetail(MessageExt msg) {
426-
// TODO Auto-generated method stub
427-
return null;
425+
public List<MessageTrack> messageTrackDetail(MessageExt msg) throws RemotingException, MQClientException,
426+
InterruptedException, MQBrokerException {
427+
return this.defaultMQAdminExtImpl.messageTrackDetail(msg);
428428
}
429429

430430
}

rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExtImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import com.alibaba.rocketmq.common.admin.OffsetWrapper;
4646
import com.alibaba.rocketmq.common.admin.RollbackStats;
4747
import com.alibaba.rocketmq.common.admin.TopicStatsTable;
48-
import com.alibaba.rocketmq.common.filter.FilterAPI;
4948
import com.alibaba.rocketmq.common.help.FAQUrl;
5049
import com.alibaba.rocketmq.common.message.MessageExt;
5150
import com.alibaba.rocketmq.common.message.MessageQueue;
@@ -770,7 +769,6 @@ public List<MessageTrack> messageTrackDetail(MessageExt msg) throws RemotingExce
770769
GroupList groupList = this.queryTopicConsumeByWho(msg.getTopic());
771770

772771
for (String group : groupList.getGroupList()) {
773-
774772
// 查询连接
775773
MessageTrack mt = new MessageTrack();
776774
mt.setConsumerGroup(group);
@@ -813,8 +811,10 @@ public List<MessageTrack> messageTrackDetail(MessageExt msg) throws RemotingExce
813811
catch (Exception e) {
814812
mt.setExceptionDesc(RemotingHelper.exceptionSimpleDesc(e));
815813
}
814+
815+
result.add(mt);
816816
}
817817

818-
return null;
818+
return result;
819819
}
820820
}

rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,18 @@ public static void queryById(final DefaultMQAdminExt admin, final String msgId)
154154
bodyTmpFilePath//
155155
);
156156

157-
List<MessageTrack> mtdList = admin.messageTrackDetail(msg);
158-
for (MessageTrack mt : mtdList) {
159-
System.out.println(mt);
157+
try {
158+
List<MessageTrack> mtdList = admin.messageTrackDetail(msg);
159+
if (mtdList.isEmpty()) {
160+
System.out.println("No Consumer");
161+
}
162+
else {
163+
for (MessageTrack mt : mtdList) {
164+
System.out.println(mt);
165+
}
166+
}
167+
}
168+
catch (Exception e) {
160169
}
161170
}
162171

0 commit comments

Comments
 (0)