Skip to content

Commit 39cb522

Browse files
committed
#490 向指定Consumer发起消费消息请求,运维需要。服务端开发完成。
1 parent 6f5009b commit 39cb522

File tree

2 files changed

+27
-52
lines changed

2 files changed

+27
-52
lines changed

rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/net/Broker2Client.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public RemotingCommand getConsumerRunningInfo(//
108108
}
109109

110110

111-
public RemotingCommand consumeMessageDirectly(//
111+
public RemotingCommand callClient(//
112112
final Channel channel,//
113113
final RemotingCommand request//
114114
) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException {

rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/AdminBrokerProcessor.java

Lines changed: 26 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
9191
import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
9292
import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
93+
import com.alibaba.rocketmq.remoting.CommandCustomHeader;
9394
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
9495
import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
9596
import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
@@ -228,93 +229,67 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
228229
}
229230

230231

231-
private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request)
232-
throws RemotingCommandException {
232+
private RemotingCommand callConsumer(//
233+
final int requestCode,//
234+
final CommandCustomHeader requestHeader, //
235+
final String consumerGroup,//
236+
final String clientId) throws RemotingCommandException {
233237
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
234-
final ConsumeMessageDirectlyResultRequestHeader requestHeader =
235-
(ConsumeMessageDirectlyResultRequestHeader) request
236-
.decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);
237-
238238
ClientChannelInfo clientChannelInfo =
239-
this.brokerController.getConsumerManager().findChannel(requestHeader.getConsumerGroup(),
240-
requestHeader.getClientId());
239+
this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId);
241240

242241
if (null == clientChannelInfo) {
243242
response.setCode(ResponseCode.SYSTEM_ERROR);
244-
response.setRemark(String.format("The Consumer <%s> not online", requestHeader.getClientId()));
243+
response.setRemark(String.format("The Consumer <%s> <%s> not online", consumerGroup, clientId));
245244
return response;
246245
}
247246

248247
if (clientChannelInfo.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) {
249248
response.setCode(ResponseCode.SYSTEM_ERROR);
250249
response.setRemark(String.format(
251250
"The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT", //
252-
requestHeader.getClientId(),//
251+
clientId,//
253252
MQVersion.getVersionDesc(clientChannelInfo.getVersion())));
254253
return response;
255254
}
256255

257256
try {
258-
RemotingCommand newRequest =
259-
RemotingCommand.createRequestCommand(RequestCode.CONSUME_MESSAGE_DIRECTLY, requestHeader);
257+
RemotingCommand newRequest = RemotingCommand.createRequestCommand(requestCode, requestHeader);
260258
RemotingCommand consumerResponse =
261-
this.brokerController.getBroker2Client().getConsumerRunningInfo(
262-
clientChannelInfo.getChannel(), newRequest);
259+
this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(),
260+
newRequest);
263261
return consumerResponse;
264262
}
265263
catch (Exception e) {
266264
response.setCode(ResponseCode.SYSTEM_ERROR);
267-
response.setRemark(String.format("invoke consumer <%s> Exception: %s",
268-
requestHeader.getClientId(), RemotingHelper.exceptionSimpleDesc(e)));
265+
response.setRemark(String.format("invoke consumer <%s> <%s> Exception: %s", consumerGroup,
266+
clientId, RemotingHelper.exceptionSimpleDesc(e)));
269267
return response;
270268
}
271269
}
272270

273271

272+
private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request)
273+
throws RemotingCommandException {
274+
final ConsumeMessageDirectlyResultRequestHeader requestHeader =
275+
(ConsumeMessageDirectlyResultRequestHeader) request
276+
.decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);
277+
return this.callConsumer(RequestCode.CONSUME_MESSAGE_DIRECTLY, requestHeader,
278+
requestHeader.getConsumerGroup(), requestHeader.getClientId());
279+
}
280+
281+
274282
/**
275283
* 调用Consumer,获取Consumer内存数据结构,为监控以及定位问题
276284
*/
277285
private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request)
278286
throws RemotingCommandException {
279-
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
280287
final GetConsumerRunningInfoRequestHeader requestHeader =
281288
(GetConsumerRunningInfoRequestHeader) request
282289
.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
283290

284-
ClientChannelInfo clientChannelInfo =
285-
this.brokerController.getConsumerManager().findChannel(requestHeader.getConsumerGroup(),
286-
requestHeader.getClientId());
287-
288-
if (null == clientChannelInfo) {
289-
response.setCode(ResponseCode.SYSTEM_ERROR);
290-
response.setRemark(String.format("The Consumer <%s> not online", requestHeader.getClientId()));
291-
return response;
292-
}
293-
294-
if (clientChannelInfo.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) {
295-
response.setCode(ResponseCode.SYSTEM_ERROR);
296-
response.setRemark(String.format(
297-
"The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT", //
298-
requestHeader.getClientId(),//
299-
MQVersion.getVersionDesc(clientChannelInfo.getVersion())));
300-
return response;
301-
}
302-
303-
try {
304-
RemotingCommand newRequest =
305-
RemotingCommand
306-
.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, requestHeader);
307-
RemotingCommand consumerResponse =
308-
this.brokerController.getBroker2Client().getConsumerRunningInfo(
309-
clientChannelInfo.getChannel(), newRequest);
310-
return consumerResponse;
311-
}
312-
catch (Exception e) {
313-
response.setCode(ResponseCode.SYSTEM_ERROR);
314-
response.setRemark(String.format("invoke consumer <%s> Exception: %s",
315-
requestHeader.getClientId(), RemotingHelper.exceptionSimpleDesc(e)));
316-
return response;
317-
}
291+
return this.callConsumer(RequestCode.GET_CONSUMER_RUNNING_INFO, requestHeader,
292+
requestHeader.getConsumerGroup(), requestHeader.getClientId());
318293
}
319294

320295

0 commit comments

Comments
 (0)