Skip to content

Commit 330d9b0

Browse files
committed
发送消息网络包优化
1 parent 30ba397 commit 330d9b0

File tree

4 files changed

+224
-17
lines changed

4 files changed

+224
-17
lines changed

rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerController.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,8 @@ public void registerProcessor() {
334334
sendProcessor.registerSendMessageHook(sendMessageHookList);
335335
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor,
336336
this.sendMessageExecutor);
337+
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor,
338+
this.sendMessageExecutor);
337339
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor,
338340
this.sendMessageExecutor);
339341

rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,16 @@
1515
*/
1616
package com.alibaba.rocketmq.broker.processor;
1717

18+
import io.netty.channel.ChannelHandlerContext;
19+
20+
import java.net.InetSocketAddress;
21+
import java.net.SocketAddress;
22+
import java.util.List;
23+
import java.util.Random;
24+
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
1828
import com.alibaba.rocketmq.broker.BrokerController;
1929
import com.alibaba.rocketmq.broker.mqtrace.SendMessageContext;
2030
import com.alibaba.rocketmq.broker.mqtrace.SendMessageHook;
@@ -33,6 +43,7 @@
3343
import com.alibaba.rocketmq.common.protocol.ResponseCode;
3444
import com.alibaba.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
3545
import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
46+
import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
3647
import com.alibaba.rocketmq.common.protocol.header.SendMessageResponseHeader;
3748
import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
3849
import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
@@ -43,14 +54,6 @@
4354
import com.alibaba.rocketmq.store.MessageExtBrokerInner;
4455
import com.alibaba.rocketmq.store.PutMessageResult;
4556
import com.alibaba.rocketmq.store.config.StorePathConfigHelper;
46-
import io.netty.channel.ChannelHandlerContext;
47-
import org.slf4j.Logger;
48-
import org.slf4j.LoggerFactory;
49-
50-
import java.net.InetSocketAddress;
51-
import java.net.SocketAddress;
52-
import java.util.List;
53-
import java.util.Random;
5457

5558

5659
/**
@@ -79,14 +82,27 @@ public SendMessageProcessor(final BrokerController brokerController) {
7982
@Override
8083
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
8184
throws RemotingCommandException {
85+
SendMessageRequestHeaderV2 requestHeaderV2 = null;
86+
8287
switch (request.getCode()) {
88+
case RequestCode.SEND_MESSAGE_V2:
89+
requestHeaderV2 =
90+
(SendMessageRequestHeaderV2) request
91+
.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
8392
case RequestCode.SEND_MESSAGE:
8493
SendMessageContext context = null;
85-
// 消息轨迹:记录到达 broker 的消息
86-
if (this.hasSendMessageHook()) {
87-
final SendMessageRequestHeader requestHeader =
94+
SendMessageRequestHeader requestHeader = null;
95+
96+
if (null == requestHeaderV2) {
97+
requestHeader =
8898
(SendMessageRequestHeader) request
8999
.decodeCommandCustomHeader(SendMessageRequestHeader.class);
100+
}
101+
else {
102+
requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2);
103+
}
104+
// 消息轨迹:记录到达 broker 的消息
105+
if (this.hasSendMessageHook()) {
90106
context = new SendMessageContext();
91107
context.setProducerGroup(requestHeader.getProducerGroup());
92108
context.setTopic(requestHeader.getTopic());
@@ -96,7 +112,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
96112
this.executeSendMessageHookBefore(context);
97113
}
98114

99-
final RemotingCommand response = this.sendMessage(ctx, request);
115+
final RemotingCommand response = this.sendMessage(ctx, request, requestHeader);
100116

101117
// 消息轨迹:记录发送成功的消息
102118
if (this.hasSendMessageHook()) {
@@ -162,7 +178,7 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin
162178
// 检查topic权限
163179
if (!PermName.isWriteable(topicConfig.getPerm())) {
164180
response.setCode(ResponseCode.NO_PERMISSION);
165-
response.setRemark("the topic[" + newTopic + "] sending message is forbidden");
181+
response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
166182
return response;
167183
}
168184

@@ -278,14 +294,12 @@ private String diskUtil() {
278294
}
279295

280296

281-
private RemotingCommand sendMessage(final ChannelHandlerContext ctx, final RemotingCommand request)
282-
throws RemotingCommandException {
297+
private RemotingCommand sendMessage(final ChannelHandlerContext ctx, final RemotingCommand request,
298+
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
283299
final RemotingCommand response =
284300
RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
285301
final SendMessageResponseHeader responseHeader =
286302
(SendMessageResponseHeader) response.readCustomHeader();
287-
final SendMessageRequestHeader requestHeader =
288-
(SendMessageRequestHeader) request.decodeCommandCustomHeader(SendMessageRequestHeader.class);
289303

290304
// 由于有直接返回的逻辑,所以必须要设置
291305
response.setOpaque(request.getOpaque());

rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/RequestCode.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,4 +151,7 @@ public class RequestCode {
151151
// 通过Broker直接向某个Consumer发送一条消息,并立刻消费,返回结果给broker,再返回给调用方
152152
// 2014-08-11 Add By shijia
153153
public static final int CONSUME_MESSAGE_DIRECTLY = 308;
154+
155+
// Broker 发送消息,优化网络数据包
156+
public static final int SEND_MESSAGE_V2 = 309;
154157
}
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
package com.alibaba.rocketmq.common.protocol.header;
2+
3+
import com.alibaba.rocketmq.remoting.CommandCustomHeader;
4+
import com.alibaba.rocketmq.remoting.annotation.CFNotNull;
5+
import com.alibaba.rocketmq.remoting.annotation.CFNullable;
6+
import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
7+
8+
9+
/**
10+
* 为减少网络传输数量准备
11+
*
12+
* @author shijia.wxr<[email protected]>
13+
*/
14+
public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
15+
@CFNotNull
16+
private String a;// producerGroup;
17+
@CFNotNull
18+
private String b;// topic;
19+
@CFNotNull
20+
private String c;// defaultTopic;
21+
@CFNotNull
22+
private Integer d;// defaultTopicQueueNums;
23+
@CFNotNull
24+
private Integer e;// queueId;
25+
@CFNotNull
26+
private Integer f;// sysFlag;
27+
@CFNotNull
28+
private Long g;// bornTimestamp;
29+
@CFNotNull
30+
private Integer h;// flag;
31+
@CFNullable
32+
private String i;// properties;
33+
@CFNullable
34+
private Integer j;// reconsumeTimes;
35+
@CFNullable
36+
private boolean k;// unitMode = false;
37+
38+
39+
@Override
40+
public void checkFields() throws RemotingCommandException {
41+
}
42+
43+
44+
public static SendMessageRequestHeader createSendMessageRequestHeaderV1(
45+
final SendMessageRequestHeaderV2 v2) {
46+
SendMessageRequestHeader v1 = new SendMessageRequestHeader();
47+
v1.setProducerGroup(v2.a);
48+
v1.setTopic(v2.b);
49+
v1.setDefaultTopic(v2.c);
50+
v1.setDefaultTopicQueueNums(v2.d);
51+
v1.setQueueId(v2.e);
52+
v1.setSysFlag(v2.f);
53+
v1.setBornTimestamp(v2.g);
54+
v1.setFlag(v2.h);
55+
v1.setProperties(v2.i);
56+
v1.setReconsumeTimes(v2.j);
57+
v1.setUnitMode(v2.k);
58+
return v1;
59+
}
60+
61+
62+
public static SendMessageRequestHeaderV2 createSendMessageRequestHeaderV2(
63+
final SendMessageRequestHeader v1) {
64+
SendMessageRequestHeaderV2 v2 = new SendMessageRequestHeaderV2();
65+
v2.a = v1.getProducerGroup();
66+
v2.b = v1.getTopic();
67+
v2.c = v1.getDefaultTopic();
68+
v2.d = v1.getDefaultTopicQueueNums();
69+
v2.e = v1.getQueueId();
70+
v2.f = v1.getSysFlag();
71+
v2.g = v1.getBornTimestamp();
72+
v2.h = v1.getFlag();
73+
v2.i = v1.getProperties();
74+
v2.j = v1.getReconsumeTimes();
75+
v2.k = v1.isUnitMode();
76+
return v2;
77+
}
78+
79+
80+
public String getA() {
81+
return a;
82+
}
83+
84+
85+
public void setA(String a) {
86+
this.a = a;
87+
}
88+
89+
90+
public String getB() {
91+
return b;
92+
}
93+
94+
95+
public void setB(String b) {
96+
this.b = b;
97+
}
98+
99+
100+
public String getC() {
101+
return c;
102+
}
103+
104+
105+
public void setC(String c) {
106+
this.c = c;
107+
}
108+
109+
110+
public Integer getD() {
111+
return d;
112+
}
113+
114+
115+
public void setD(Integer d) {
116+
this.d = d;
117+
}
118+
119+
120+
public Integer getE() {
121+
return e;
122+
}
123+
124+
125+
public void setE(Integer e) {
126+
this.e = e;
127+
}
128+
129+
130+
public Integer getF() {
131+
return f;
132+
}
133+
134+
135+
public void setF(Integer f) {
136+
this.f = f;
137+
}
138+
139+
140+
public Long getG() {
141+
return g;
142+
}
143+
144+
145+
public void setG(Long g) {
146+
this.g = g;
147+
}
148+
149+
150+
public Integer getH() {
151+
return h;
152+
}
153+
154+
155+
public void setH(Integer h) {
156+
this.h = h;
157+
}
158+
159+
160+
public String getI() {
161+
return i;
162+
}
163+
164+
165+
public void setI(String i) {
166+
this.i = i;
167+
}
168+
169+
170+
public Integer getJ() {
171+
return j;
172+
}
173+
174+
175+
public void setJ(Integer j) {
176+
this.j = j;
177+
}
178+
179+
180+
public boolean isK() {
181+
return k;
182+
}
183+
184+
185+
public void setK(boolean k) {
186+
this.k = k;
187+
}
188+
}

0 commit comments

Comments
 (0)