Skip to content

Commit dea1df4

Browse files
committed
#490 向指定Consumer发起消费消息请求,运维需要。客户端开发完成。
1 parent 8f10dfd commit dea1df4

File tree

6 files changed

+109
-2
lines changed

6 files changed

+109
-2
lines changed

rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientAPIImpl.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import com.alibaba.rocketmq.common.protocol.RequestCode;
5757
import com.alibaba.rocketmq.common.protocol.ResponseCode;
5858
import com.alibaba.rocketmq.common.protocol.body.ClusterInfo;
59+
import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
5960
import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection;
6061
import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
6162
import com.alibaba.rocketmq.common.protocol.body.GetConsumerStatusBody;
@@ -69,6 +70,7 @@
6970
import com.alibaba.rocketmq.common.protocol.body.ResetOffsetBody;
7071
import com.alibaba.rocketmq.common.protocol.body.TopicList;
7172
import com.alibaba.rocketmq.common.protocol.body.UnlockBatchRequestBody;
73+
import com.alibaba.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
7274
import com.alibaba.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
7375
import com.alibaba.rocketmq.common.protocol.header.CreateTopicRequestHeader;
7476
import com.alibaba.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader;
@@ -2152,6 +2154,41 @@ public ConsumerRunningInfo getConsumerRunningInfo(final String addr, String cons
21522154
}
21532155

21542156

2157+
/**
2158+
* 通过调用Broker,向指定Consumer发送某条消息,并返回消费结果
2159+
*/
2160+
public ConsumeMessageDirectlyResult consumeMessageDirectly(final String addr, //
2161+
String consumerGroup, //
2162+
String clientId, //
2163+
String msgId, //
2164+
final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
2165+
ConsumeMessageDirectlyResultRequestHeader requestHeader =
2166+
new ConsumeMessageDirectlyResultRequestHeader();
2167+
requestHeader.setConsumerGroup(consumerGroup);
2168+
requestHeader.setClientId(clientId);
2169+
2170+
RemotingCommand request =
2171+
RemotingCommand.createRequestCommand(RequestCode.CONSUME_MESSAGE_DIRECTLY, requestHeader);
2172+
2173+
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
2174+
assert response != null;
2175+
switch (response.getCode()) {
2176+
case ResponseCode.SUCCESS: {
2177+
byte[] body = response.getBody();
2178+
if (body != null) {
2179+
ConsumeMessageDirectlyResult info =
2180+
ConsumeMessageDirectlyResult.decode(body, ConsumeMessageDirectlyResult.class);
2181+
return info;
2182+
}
2183+
}
2184+
default:
2185+
break;
2186+
}
2187+
2188+
throw new MQClientException(response.getCode(), response.getRemark());
2189+
}
2190+
2191+
21552192
public String getProjectGroupPrefix() {
21562193
return projectGroupPrefix;
21572194
}

rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package com.alibaba.rocketmq.common.protocol.body;
22

3-
public class ConsumeMessageDirectlyResult {
3+
import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
4+
5+
6+
public class ConsumeMessageDirectlyResult extends RemotingSerializable {
47
private boolean order = false;
58
private boolean autoCommit = true;
69
private CMResult consumeResult;
@@ -56,4 +59,12 @@ public long getSpentTimeMills() {
5659
public void setSpentTimeMills(long spentTimeMills) {
5760
this.spentTimeMills = spentTimeMills;
5861
}
62+
63+
64+
@Override
65+
public String toString() {
66+
return "ConsumeMessageDirectlyResult [order=" + order + ", autoCommit=" + autoCommit
67+
+ ", consumeResult=" + consumeResult + ", remark=" + remark + ", spentTimeMills="
68+
+ spentTimeMills + "]";
69+
}
5970
}

rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExt.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.alibaba.rocketmq.common.message.MessageQueue;
3535
import com.alibaba.rocketmq.common.protocol.body.ClusterInfo;
3636
import com.alibaba.rocketmq.common.protocol.body.ConsumeByWho;
37+
import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
3738
import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection;
3839
import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
3940
import com.alibaba.rocketmq.common.protocol.body.GroupList;
@@ -409,4 +410,12 @@ public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String c
409410
throws RemotingException, MQClientException, InterruptedException {
410411
return defaultMQAdminExtImpl.getConsumerRunningInfo(consumerGroup, clientId);
411412
}
413+
414+
415+
@Override
416+
public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId,
417+
String msgId) throws RemotingException, MQClientException, InterruptedException,
418+
MQBrokerException {
419+
return defaultMQAdminExtImpl.consumeMessageDirectly(consumerGroup, clientId, msgId);
420+
}
412421
}

rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExtImpl.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import com.alibaba.rocketmq.common.protocol.ResponseCode;
5151
import com.alibaba.rocketmq.common.protocol.body.ClusterInfo;
5252
import com.alibaba.rocketmq.common.protocol.body.ConsumeByWho;
53+
import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
5354
import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection;
5455
import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
5556
import com.alibaba.rocketmq.common.protocol.body.GroupList;
@@ -62,6 +63,7 @@
6263
import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
6364
import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
6465
import com.alibaba.rocketmq.remoting.RPCHook;
66+
import com.alibaba.rocketmq.remoting.common.RemotingUtil;
6567
import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
6668
import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
6769
import com.alibaba.rocketmq.remoting.exception.RemotingException;
@@ -714,4 +716,15 @@ public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String c
714716
}
715717
return null;
716718
}
719+
720+
721+
@Override
722+
public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId,
723+
String msgId) throws RemotingException, MQClientException, InterruptedException,
724+
MQBrokerException {
725+
MessageExt msg = this.viewMessage(msgId);
726+
727+
return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(
728+
RemotingUtil.socketAddress2String(msg.getStoreHost()), consumerGroup, clientId, msgId, 10000);
729+
}
717730
}

rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/MQAdminExt.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.alibaba.rocketmq.common.message.MessageQueue;
3232
import com.alibaba.rocketmq.common.protocol.body.ClusterInfo;
3333
import com.alibaba.rocketmq.common.protocol.body.ConsumeByWho;
34+
import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
3435
import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection;
3536
import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
3637
import com.alibaba.rocketmq.common.protocol.body.GroupList;
@@ -567,4 +568,22 @@ public boolean cleanExpiredConsumerQueueByAddr(String addr) throws RemotingConne
567568
*/
568569
public ConsumerRunningInfo getConsumerRunningInfo(final String consumerGroup, final String clientId)
569570
throws RemotingException, MQClientException, InterruptedException;
571+
572+
573+
/**
574+
* 向指定Consumer发送某条消息
575+
*
576+
* @param consumerGroup
577+
* @param clientId
578+
* @param msgId
579+
* @return
580+
* @throws InterruptedException
581+
* @throws MQClientException
582+
* @throws RemotingException
583+
* @throws MQBrokerException
584+
*/
585+
public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, //
586+
String clientId, //
587+
String msgId) throws RemotingException, MQClientException, InterruptedException,
588+
MQBrokerException;
570589
}

rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.alibaba.rocketmq.client.exception.MQClientException;
2929
import com.alibaba.rocketmq.common.UtilAll;
3030
import com.alibaba.rocketmq.common.message.MessageExt;
31+
import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
3132
import com.alibaba.rocketmq.remoting.RPCHook;
3233
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
3334
import com.alibaba.rocketmq.remoting.exception.RemotingException;
@@ -61,6 +62,14 @@ public Options buildCommandlineOptions(Options options) {
6162
opt.setRequired(true);
6263
options.addOption(opt);
6364

65+
opt = new Option("g", "consumerGroup", true, "consumer group name");
66+
opt.setRequired(false);
67+
options.addOption(opt);
68+
69+
opt = new Option("d", "clientId", true, "The consumer's client id");
70+
opt.setRequired(false);
71+
options.addOption(opt);
72+
6473
return options;
6574
}
6675

@@ -153,7 +162,16 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
153162

154163
try {
155164
final String msgId = commandLine.getOptionValue('i').trim();
156-
queryById(defaultMQAdminExt, msgId);
165+
if (commandLine.hasOption('g') && commandLine.hasOption('d')) {
166+
final String consumerGroup = commandLine.getOptionValue('g').trim();
167+
final String clientId = commandLine.getOptionValue('d').trim();
168+
ConsumeMessageDirectlyResult result =
169+
defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, msgId);
170+
System.out.println(result);
171+
}
172+
else {
173+
queryById(defaultMQAdminExt, msgId);
174+
}
157175
}
158176
catch (Exception e) {
159177
e.printStackTrace();

0 commit comments

Comments
 (0)