Skip to content

Commit 8f10dfd

Browse files
committed
#490 向指定Consumer发起消费消息请求,运维需要。服务端开发完成。
1 parent 3c7c1a8 commit 8f10dfd

File tree

3 files changed

+34
-8
lines changed

3 files changed

+34
-8
lines changed

rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/AdminBrokerProcessor.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.UnsupportedEncodingException;
2222
import java.net.InetSocketAddress;
2323
import java.net.SocketAddress;
24+
import java.net.UnknownHostException;
2425
import java.util.HashMap;
2526
import java.util.HashSet;
2627
import java.util.Iterator;
@@ -45,6 +46,8 @@
4546
import com.alibaba.rocketmq.common.admin.TopicOffset;
4647
import com.alibaba.rocketmq.common.admin.TopicStatsTable;
4748
import com.alibaba.rocketmq.common.constant.LoggerName;
49+
import com.alibaba.rocketmq.common.message.MessageDecoder;
50+
import com.alibaba.rocketmq.common.message.MessageId;
4851
import com.alibaba.rocketmq.common.message.MessageQueue;
4952
import com.alibaba.rocketmq.common.protocol.RequestCode;
5053
import com.alibaba.rocketmq.common.protocol.ResponseCode;
@@ -90,13 +93,13 @@
9093
import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
9194
import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
9295
import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
93-
import com.alibaba.rocketmq.remoting.CommandCustomHeader;
9496
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
9597
import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
9698
import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
9799
import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
98100
import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
99101
import com.alibaba.rocketmq.store.DefaultMessageStore;
102+
import com.alibaba.rocketmq.store.SelectMapedBufferResult;
100103

101104

102105
/**
@@ -231,7 +234,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
231234

232235
private RemotingCommand callConsumer(//
233236
final int requestCode,//
234-
final CommandCustomHeader requestHeader, //
237+
final RemotingCommand request, //
235238
final String consumerGroup,//
236239
final String clientId) throws RemotingCommandException {
237240
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
@@ -254,7 +257,10 @@ private RemotingCommand callConsumer(//
254257
}
255258

256259
try {
257-
RemotingCommand newRequest = RemotingCommand.createRequestCommand(requestCode, requestHeader);
260+
RemotingCommand newRequest = RemotingCommand.createRequestCommand(requestCode, null);
261+
newRequest.setExtFields(request.getExtFields());
262+
newRequest.setBody(request.getBody());
263+
258264
RemotingCommand consumerResponse =
259265
this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(),
260266
newRequest);
@@ -274,7 +280,27 @@ private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, Remoti
274280
final ConsumeMessageDirectlyResultRequestHeader requestHeader =
275281
(ConsumeMessageDirectlyResultRequestHeader) request
276282
.decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);
277-
return this.callConsumer(RequestCode.CONSUME_MESSAGE_DIRECTLY, requestHeader,
283+
284+
request.getExtFields().put("brokerName", this.brokerController.getBrokerConfig().getBrokerName());
285+
SelectMapedBufferResult selectMapedBufferResult = null;
286+
try {
287+
MessageId messageId = MessageDecoder.decodeMessageId(requestHeader.getMsgId());
288+
selectMapedBufferResult =
289+
this.brokerController.getMessageStore().selectOneMessageByOffset(messageId.getOffset());
290+
291+
byte[] body = new byte[selectMapedBufferResult.getSize()];
292+
selectMapedBufferResult.getByteBuffer().get(body);
293+
request.setBody(body);
294+
}
295+
catch (UnknownHostException e) {
296+
}
297+
finally {
298+
if (selectMapedBufferResult != null) {
299+
selectMapedBufferResult.release();
300+
}
301+
}
302+
303+
return this.callConsumer(RequestCode.CONSUME_MESSAGE_DIRECTLY, request,
278304
requestHeader.getConsumerGroup(), requestHeader.getClientId());
279305
}
280306

@@ -288,7 +314,7 @@ private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, Remoti
288314
(GetConsumerRunningInfoRequestHeader) request
289315
.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
290316

291-
return this.callConsumer(RequestCode.GET_CONSUMER_RUNNING_INFO, requestHeader,
317+
return this.callConsumer(RequestCode.GET_CONSUMER_RUNNING_INFO, request,
292318
requestHeader.getConsumerGroup(), requestHeader.getClientId());
293319
}
294320

rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumeMessageDirectlyResultRequestHeader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ public class ConsumeMessageDirectlyResultRequestHeader implements CommandCustomH
1010
@CFNotNull
1111
private String consumerGroup;
1212
@CFNullable
13-
private String brokerName;
14-
@CFNullable
1513
private String clientId;
1614
@CFNullable
1715
private String msgId;
16+
@CFNullable
17+
private String brokerName;
1818

1919

2020
@Override

rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ private static void setCmdVersion(RemotingCommand cmd) {
134134
}
135135

136136

137-
private void makeCustomHeaderToNet() {
137+
public void makeCustomHeaderToNet() {
138138
if (this.customHeader != null) {
139139
Field[] fields = this.customHeader.getClass().getDeclaredFields();
140140
if (null == this.extFields) {

0 commit comments

Comments
 (0)