Skip to content

Commit 6f5009b

Browse files
committed
#490 向指定Consumer发起消费消息请求,运维需要。服务端开发完成。
1 parent 9da7c53 commit 6f5009b

File tree

4 files changed

+176
-11
lines changed

4 files changed

+176
-11
lines changed

rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/net/Broker2Client.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,14 @@ public RemotingCommand getConsumerRunningInfo(//
108108
}
109109

110110

111+
public RemotingCommand consumeMessageDirectly(//
112+
final Channel channel,//
113+
final RemotingCommand request//
114+
) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
115+
return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000);
116+
}
117+
118+
111119
/**
112120
* Broker主动通知Consumer,Id列表发生变化,Oneway
113121
*/

rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/AdminBrokerProcessor.java

Lines changed: 101 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,23 @@
1515
*/
1616
package com.alibaba.rocketmq.broker.processor;
1717

18+
import io.netty.channel.Channel;
19+
import io.netty.channel.ChannelHandlerContext;
20+
21+
import java.io.UnsupportedEncodingException;
22+
import java.net.InetSocketAddress;
23+
import java.net.SocketAddress;
24+
import java.util.HashMap;
25+
import java.util.HashSet;
26+
import java.util.Iterator;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.Properties;
30+
import java.util.Set;
31+
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
1835
import com.alibaba.rocketmq.broker.BrokerController;
1936
import com.alibaba.rocketmq.broker.client.ClientChannelInfo;
2037
import com.alibaba.rocketmq.broker.client.ConsumerGroupInfo;
@@ -31,8 +48,44 @@
3148
import com.alibaba.rocketmq.common.message.MessageQueue;
3249
import com.alibaba.rocketmq.common.protocol.RequestCode;
3350
import com.alibaba.rocketmq.common.protocol.ResponseCode;
34-
import com.alibaba.rocketmq.common.protocol.body.*;
35-
import com.alibaba.rocketmq.common.protocol.header.*;
51+
import com.alibaba.rocketmq.common.protocol.body.Connection;
52+
import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection;
53+
import com.alibaba.rocketmq.common.protocol.body.GroupList;
54+
import com.alibaba.rocketmq.common.protocol.body.KVTable;
55+
import com.alibaba.rocketmq.common.protocol.body.LockBatchRequestBody;
56+
import com.alibaba.rocketmq.common.protocol.body.LockBatchResponseBody;
57+
import com.alibaba.rocketmq.common.protocol.body.ProducerConnection;
58+
import com.alibaba.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
59+
import com.alibaba.rocketmq.common.protocol.body.QueueTimeSpan;
60+
import com.alibaba.rocketmq.common.protocol.body.TopicList;
61+
import com.alibaba.rocketmq.common.protocol.body.UnlockBatchRequestBody;
62+
import com.alibaba.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
63+
import com.alibaba.rocketmq.common.protocol.header.CreateTopicRequestHeader;
64+
import com.alibaba.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader;
65+
import com.alibaba.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
66+
import com.alibaba.rocketmq.common.protocol.header.GetAllTopicConfigResponseHeader;
67+
import com.alibaba.rocketmq.common.protocol.header.GetBrokerConfigResponseHeader;
68+
import com.alibaba.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader;
69+
import com.alibaba.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader;
70+
import com.alibaba.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader;
71+
import com.alibaba.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
72+
import com.alibaba.rocketmq.common.protocol.header.GetEarliestMsgStoretimeRequestHeader;
73+
import com.alibaba.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
74+
import com.alibaba.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader;
75+
import com.alibaba.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
76+
import com.alibaba.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
77+
import com.alibaba.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
78+
import com.alibaba.rocketmq.common.protocol.header.GetProducerConnectionListRequestHeader;
79+
import com.alibaba.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
80+
import com.alibaba.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHeader;
81+
import com.alibaba.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
82+
import com.alibaba.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
83+
import com.alibaba.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader;
84+
import com.alibaba.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
85+
import com.alibaba.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
86+
import com.alibaba.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
87+
import com.alibaba.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
88+
import com.alibaba.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
3689
import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader;
3790
import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
3891
import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
@@ -43,15 +96,6 @@
4396
import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
4497
import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
4598
import com.alibaba.rocketmq.store.DefaultMessageStore;
46-
import io.netty.channel.Channel;
47-
import io.netty.channel.ChannelHandlerContext;
48-
import org.slf4j.Logger;
49-
import org.slf4j.LoggerFactory;
50-
51-
import java.io.UnsupportedEncodingException;
52-
import java.net.InetSocketAddress;
53-
import java.net.SocketAddress;
54-
import java.util.*;
5599

56100

57101
/**
@@ -173,6 +217,9 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
173217

174218
case RequestCode.GET_CONSUMER_RUNNING_INFO:
175219
return this.getConsumerRunningInfo(ctx, request);
220+
221+
case RequestCode.CONSUME_MESSAGE_DIRECTLY:
222+
return this.consumeMessageDirectly(ctx, request);
176223
default:
177224
break;
178225
}
@@ -181,6 +228,49 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
181228
}
182229

183230

231+
private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request)
232+
throws RemotingCommandException {
233+
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
234+
final ConsumeMessageDirectlyResultRequestHeader requestHeader =
235+
(ConsumeMessageDirectlyResultRequestHeader) request
236+
.decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);
237+
238+
ClientChannelInfo clientChannelInfo =
239+
this.brokerController.getConsumerManager().findChannel(requestHeader.getConsumerGroup(),
240+
requestHeader.getClientId());
241+
242+
if (null == clientChannelInfo) {
243+
response.setCode(ResponseCode.SYSTEM_ERROR);
244+
response.setRemark(String.format("The Consumer <%s> not online", requestHeader.getClientId()));
245+
return response;
246+
}
247+
248+
if (clientChannelInfo.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) {
249+
response.setCode(ResponseCode.SYSTEM_ERROR);
250+
response.setRemark(String.format(
251+
"The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT", //
252+
requestHeader.getClientId(),//
253+
MQVersion.getVersionDesc(clientChannelInfo.getVersion())));
254+
return response;
255+
}
256+
257+
try {
258+
RemotingCommand newRequest =
259+
RemotingCommand.createRequestCommand(RequestCode.CONSUME_MESSAGE_DIRECTLY, requestHeader);
260+
RemotingCommand consumerResponse =
261+
this.brokerController.getBroker2Client().getConsumerRunningInfo(
262+
clientChannelInfo.getChannel(), newRequest);
263+
return consumerResponse;
264+
}
265+
catch (Exception e) {
266+
response.setCode(ResponseCode.SYSTEM_ERROR);
267+
response.setRemark(String.format("invoke consumer <%s> Exception: %s",
268+
requestHeader.getClientId(), RemotingHelper.exceptionSimpleDesc(e)));
269+
return response;
270+
}
271+
}
272+
273+
184274
/**
185275
* 调用Consumer,获取Consumer内存数据结构,为监控以及定位问题
186276
*/

rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/RequestCode.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,4 +147,8 @@ public class RequestCode {
147147
// 通过Broker查询Consumer内存数据
148148
// 2014-07-19 Add By shijia
149149
public static final int GET_CONSUMER_RUNNING_INFO = 307;
150+
151+
// 通过Broker直接向某个Consumer发送一条消息,并立刻消费,返回结果给broker,再返回给调用方
152+
// 2014-08-11 Add By shijia
153+
public static final int CONSUME_MESSAGE_DIRECTLY = 308;
150154
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package com.alibaba.rocketmq.common.protocol.header;
2+
3+
import com.alibaba.rocketmq.remoting.CommandCustomHeader;
4+
import com.alibaba.rocketmq.remoting.annotation.CFNotNull;
5+
import com.alibaba.rocketmq.remoting.annotation.CFNullable;
6+
import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
7+
8+
9+
public class ConsumeMessageDirectlyResultRequestHeader implements CommandCustomHeader {
10+
@CFNotNull
11+
private String consumerGroup;
12+
@CFNullable
13+
private String brokerName;
14+
@CFNullable
15+
private String clientId;
16+
@CFNullable
17+
private String msgId;
18+
19+
20+
@Override
21+
public void checkFields() throws RemotingCommandException {
22+
}
23+
24+
25+
public String getConsumerGroup() {
26+
return consumerGroup;
27+
}
28+
29+
30+
public void setConsumerGroup(String consumerGroup) {
31+
this.consumerGroup = consumerGroup;
32+
}
33+
34+
35+
public String getBrokerName() {
36+
return brokerName;
37+
}
38+
39+
40+
public void setBrokerName(String brokerName) {
41+
this.brokerName = brokerName;
42+
}
43+
44+
45+
public String getClientId() {
46+
return clientId;
47+
}
48+
49+
50+
public void setClientId(String clientId) {
51+
this.clientId = clientId;
52+
}
53+
54+
55+
public String getMsgId() {
56+
return msgId;
57+
}
58+
59+
60+
public void setMsgId(String msgId) {
61+
this.msgId = msgId;
62+
}
63+
}

0 commit comments

Comments
 (0)