Skip to content

Commit 7917526

Browse files
committed
Merge branch 'authorize' into develop
Conflicts: pom.xml rocketmq-broker/pom.xml rocketmq-client/pom.xml rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientAPIImpl.java rocketmq-common/pom.xml rocketmq-example/pom.xml rocketmq-filtersrv/pom.xml rocketmq-namesrv/pom.xml rocketmq-remoting/pom.xml rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommand.java rocketmq-research/pom.xml rocketmq-srvutil/pom.xml rocketmq-store/pom.xml rocketmq-tools/pom.xml rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExtImpl.java rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/command/offset/GetConsumerStatusCommand.java
2 parents cb33cea + 3376081 commit 7917526

File tree

52 files changed

+419
-328
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+419
-328
lines changed

pom.xml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
<url>https://github.com/alibaba/rocketmq</url>
2424
<description>https://github.com/alibaba/RocketMQ/blob/develop/README.md</description>
2525

26+
2627
<modules>
2728
<module>rocketmq-client</module>
2829
<module>rocketmq-common</module>
@@ -65,11 +66,11 @@
6566
</license>
6667
</licenses>
6768

68-
<scm>
69-
<url>http://gitlab.alibaba-inc.com/middleware/rocketmq.git</url>
70-
<connection>scm:git:http://gitlab.alibaba-inc.com/middleware/rocketmq.git</connection>
71-
<developerConnection>scm:git:http://gitlab.alibaba-inc.com/middleware/rocketmq.git</developerConnection>
72-
</scm>
69+
<scm>
70+
<url>http://gitlab.alibaba-inc.com/middleware/rocketmq.git</url>
71+
<connection>scm:git:http://gitlab.alibaba-inc.com/middleware/rocketmq.git</connection>
72+
<developerConnection>scm:git:http://gitlab.alibaba-inc.com/middleware/rocketmq.git</developerConnection>
73+
</scm>
7374

7475
<properties>
7576
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerController.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import com.alibaba.rocketmq.common.namesrv.RegisterBrokerResult;
6060
import com.alibaba.rocketmq.common.protocol.RequestCode;
6161
import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
62+
import com.alibaba.rocketmq.remoting.RPCHook;
6263
import com.alibaba.rocketmq.remoting.RemotingServer;
6364
import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
6465
import com.alibaba.rocketmq.remoting.netty.NettyRemotingServer;
@@ -763,4 +764,14 @@ public void run() {
763764
}
764765
}, 5, TimeUnit.MINUTES);
765766
}
767+
768+
769+
public void registerServerRPCHook(RPCHook rpcHook) {
770+
getRemotingServer().registerRPCHook(rpcHook);
771+
}
772+
773+
774+
public void registerClientRPCHook(RPCHook rpcHook) {
775+
this.getBrokerOuterAPI().registerRPCHook(rpcHook);
776+
}
766777
}

rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerStartup.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public class BrokerStartup {
5555
public static Properties properties = null;
5656
public static CommandLine commandLine = null;
5757
public static String configFile = null;
58+
public static Logger log;
5859

5960

6061
public static Options buildCommandlineOptions(final Options options) {
@@ -75,11 +76,11 @@ public static Options buildCommandlineOptions(final Options options) {
7576

7677

7778
public static void main(String[] args) {
78-
main0(args);
79+
start(createBrokerController(args));
7980
}
8081

8182

82-
public static BrokerController main0(String[] args) {
83+
public static BrokerController createBrokerController(String[] args) {
8384
System.setProperty(RemotingCommand.RemotingVersionKey, Integer.toString(MQVersion.CurrentVersion));
8485

8586
// Socket发送缓冲区大小
@@ -206,7 +207,7 @@ else if (commandLine.hasOption('m')) {
206207
configurator.setContext(lc);
207208
lc.reset();
208209
configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
209-
final Logger log = LoggerFactory.getLogger(LoggerName.BrokerLoggerName);
210+
log = LoggerFactory.getLogger(LoggerName.BrokerLoggerName);
210211

211212
// 打印启动参数
212213
MixAll.printObjectProperties(log, brokerConfig);
@@ -246,14 +247,27 @@ public void run() {
246247
}
247248
}, "ShutdownHook"));
248249

250+
return controller;
251+
}
252+
catch (Throwable e) {
253+
e.printStackTrace();
254+
System.exit(-1);
255+
}
256+
257+
return null;
258+
}
259+
260+
261+
public static BrokerController start(BrokerController controller) {
262+
try {
249263
// 启动服务控制对象
250264
controller.start();
251265
String tip =
252266
"The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
253267
+ controller.getBrokerAddr() + "] boot success.";
254268

255-
if (null != brokerConfig.getNamesrvAddr()) {
256-
tip += " and name server is " + brokerConfig.getNamesrvAddr();
269+
if (null != controller.getBrokerConfig().getNamesrvAddr()) {
270+
tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
257271
}
258272

259273
log.info(tip);

rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/out/BrokerOuterAPI.java

Lines changed: 9 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,13 @@
1616
package com.alibaba.rocketmq.broker.out;
1717

1818
import java.util.ArrayList;
19-
import java.util.HashMap;
2019
import java.util.List;
2120

2221
import org.slf4j.Logger;
2322
import org.slf4j.LoggerFactory;
2423

2524
import com.alibaba.rocketmq.client.exception.MQBrokerException;
2625
import com.alibaba.rocketmq.common.MixAll;
27-
import com.alibaba.rocketmq.common.SessionCredentials;
2826
import com.alibaba.rocketmq.common.constant.LoggerName;
2927
import com.alibaba.rocketmq.common.namesrv.RegisterBrokerResult;
3028
import com.alibaba.rocketmq.common.namesrv.TopAddressing;
@@ -61,37 +59,15 @@ public class BrokerOuterAPI {
6159
private final TopAddressing topAddressing = new TopAddressing(MixAll.WS_ADDR);
6260
private String nameSrvAddr = null;
6361

64-
// 客户端授权
65-
private volatile SessionCredentials sessionCredentials = new SessionCredentials();
6662

67-
class RPCHookImpl implements RPCHook {
68-
@Override
69-
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
70-
BrokerOuterAPI.this.attachSessionCredentials(request);
71-
}
72-
73-
74-
@Override
75-
public void doAfterResponse(RemotingCommand request, RemotingCommand response) {
76-
}
77-
}
78-
79-
80-
private void attachSessionCredentials(final RemotingCommand cmd) {
81-
SessionCredentials tmp = this.sessionCredentials;
82-
if (tmp != null) {
83-
if (tmp.getAccessKey() != null && tmp.getSecretKey() != null) {
84-
HashMap<String, String> extFields = new HashMap<String, String>();
85-
extFields.put(SessionCredentials.AccessKey, tmp.getAccessKey());
86-
cmd.setExtFields(extFields);
87-
}
88-
}
63+
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {
64+
this.remotingClient = new NettyRemotingClient(nettyClientConfig);
65+
this.remotingClient.registerRPCHook(rpcHook);
8966
}
9067

9168

9269
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
93-
this.remotingClient = new NettyRemotingClient(nettyClientConfig);
94-
this.remotingClient.registerRPCHook(new RPCHookImpl());
70+
this(nettyClientConfig, null);
9571
}
9672

9773

@@ -359,4 +335,9 @@ public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(final String addr)
359335

360336
throw new MQBrokerException(response.getCode(), response.getRemark());
361337
}
338+
339+
340+
public void registerRPCHook(RPCHook rpcHook) {
341+
remotingClient.registerRPCHook(rpcHook);
342+
}
362343
}

rocketmq-client/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
23
<parent>
34
<groupId>com.alibaba.rocketmq</groupId>
45
<artifactId>rocketmq-all</artifactId>

rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPullConsumer.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.alibaba.rocketmq.common.message.MessageExt;
3030
import com.alibaba.rocketmq.common.message.MessageQueue;
3131
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
32+
import com.alibaba.rocketmq.remoting.RPCHook;
3233
import com.alibaba.rocketmq.remoting.exception.RemotingException;
3334

3435

@@ -39,12 +40,12 @@
3940
* @since 2013-7-24
4041
*/
4142
public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer {
42-
protected final transient DefaultMQPullConsumerImpl defaultMQPullConsumerImpl =
43-
new DefaultMQPullConsumerImpl(this);
43+
protected final transient DefaultMQPullConsumerImpl defaultMQPullConsumerImpl;
44+
4445
/**
4546
* 做同样事情的Consumer归为同一个Group,应用必须设置,并保证命名唯一
4647
*/
47-
private String consumerGroup = MixAll.DEFAULT_CONSUMER_GROUP;
48+
private String consumerGroup;
4849
/**
4950
* 长轮询模式,Consumer连接在Broker挂起最长时间,不建议修改
5051
*/
@@ -84,11 +85,23 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
8485

8586

8687
public DefaultMQPullConsumer() {
88+
this(MixAll.DEFAULT_CONSUMER_GROUP, null);
8789
}
8890

8991

9092
public DefaultMQPullConsumer(final String consumerGroup) {
93+
this(consumerGroup, null);
94+
}
95+
96+
97+
public DefaultMQPullConsumer(RPCHook rpcHook) {
98+
this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);
99+
}
100+
101+
102+
public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) {
91103
this.consumerGroup = consumerGroup;
104+
defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook);
92105
}
93106

94107

rocketmq-client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPushConsumer.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.alibaba.rocketmq.common.message.MessageExt;
3434
import com.alibaba.rocketmq.common.message.MessageQueue;
3535
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
36+
import com.alibaba.rocketmq.remoting.RPCHook;
3637
import com.alibaba.rocketmq.remoting.exception.RemotingException;
3738

3839

@@ -44,12 +45,11 @@
4445
* @since 2013-7-24
4546
*/
4647
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
47-
protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl =
48-
new DefaultMQPushConsumerImpl(this);
48+
protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
4949
/**
5050
* 做同样事情的Consumer归为同一个Group,应用必须设置,并保证命名唯一
5151
*/
52-
private String consumerGroup = MixAll.DEFAULT_CONSUMER_GROUP;
52+
private String consumerGroup;
5353
/**
5454
* 集群消费/广播消费
5555
*/
@@ -129,12 +129,23 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
129129

130130

131131
public DefaultMQPushConsumer() {
132+
this(MixAll.DEFAULT_CONSUMER_GROUP, null);
133+
}
134+
132135

136+
public DefaultMQPushConsumer(RPCHook rpcHook) {
137+
this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);
133138
}
134139

135140

136141
public DefaultMQPushConsumer(final String consumerGroup) {
142+
this(consumerGroup, null);
143+
}
144+
145+
146+
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook) {
137147
this.consumerGroup = consumerGroup;
148+
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
138149
}
139150

140151

rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientAPIImpl.java

Lines changed: 18 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import com.alibaba.rocketmq.client.producer.SendStatus;
4242
import com.alibaba.rocketmq.common.MQVersion;
4343
import com.alibaba.rocketmq.common.MixAll;
44-
import com.alibaba.rocketmq.common.SessionCredentials;
4544
import com.alibaba.rocketmq.common.TopicConfig;
4645
import com.alibaba.rocketmq.common.UtilAll;
4746
import com.alibaba.rocketmq.common.admin.ConsumeStats;
@@ -145,6 +144,11 @@
145144
* @since 2013-7-24
146145
*/
147146
public class MQClientAPIImpl {
147+
148+
static {
149+
System.setProperty(RemotingCommand.RemotingVersionKey, Integer.toString(MQVersion.CurrentVersion));
150+
}
151+
148152
private final static Logger log = ClientLogger.getLog();
149153
private final RemotingClient remotingClient;
150154
private final TopAddressing topAddressing = new TopAddressing(MixAll.WS_ADDR);
@@ -153,54 +157,13 @@ public class MQClientAPIImpl {
153157
// 虚拟运行环境相关的project group
154158
private String projectGroupPrefix;
155159

156-
// 客户端授权
157-
private volatile SessionCredentials sessionCredentials = new SessionCredentials();
158-
159-
static {
160-
System.setProperty(RemotingCommand.RemotingVersionKey, Integer.toString(MQVersion.CurrentVersion));
161-
}
162-
163-
164-
private void attachSessionCredentials(final RemotingCommand cmd) {
165-
SessionCredentials tmp = this.sessionCredentials;
166-
if (tmp != null) {
167-
if (tmp.getAccessKey() != null && tmp.getSecretKey() != null) {
168-
HashMap<String, String> extFields = new HashMap<String, String>();
169-
extFields.put(SessionCredentials.AccessKey, tmp.getAccessKey());
170-
if (null == cmd.getExtFields()) {
171-
cmd.setExtFields(extFields);
172-
}
173-
else {
174-
cmd.getExtFields().putAll(extFields);
175-
}
176-
}
177-
}
178-
}
179-
180-
181-
public String getProjectGroupPrefix() {
182-
return projectGroupPrefix;
183-
}
184-
185-
class RPCHookImpl implements RPCHook {
186-
@Override
187-
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
188-
MQClientAPIImpl.this.attachSessionCredentials(request);
189-
}
190-
191-
192-
@Override
193-
public void doAfterResponse(RemotingCommand request, RemotingCommand response) {
194-
}
195-
}
196-
197160

198161
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
199-
final ClientRemotingProcessor clientRemotingProcessor) {
162+
final ClientRemotingProcessor clientRemotingProcessor, RPCHook rpcHook) {
200163
this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
201164
this.clientRemotingProcessor = clientRemotingProcessor;
202165

203-
this.remotingClient.registerRPCHook(new RPCHookImpl());
166+
this.remotingClient.registerRPCHook(rpcHook);
204167
/**
205168
* 注册客户端支持的RPC CODE
206169
*/
@@ -221,6 +184,12 @@ public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
221184
}
222185

223186

187+
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
188+
final ClientRemotingProcessor clientRemotingProcessor) {
189+
this(nettyClientConfig, clientRemotingProcessor, null);
190+
}
191+
192+
224193
public List<String> getNameServerAddressList() {
225194
return this.remotingClient.getNameServerAddressList();
226195
}
@@ -1984,16 +1953,6 @@ public Set<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final String t
19841953
}
19851954

19861955

1987-
public SessionCredentials getSessionCredentials() {
1988-
return sessionCredentials;
1989-
}
1990-
1991-
1992-
public void setSessionCredentials(SessionCredentials sessionCredentials) {
1993-
this.sessionCredentials = sessionCredentials;
1994-
}
1995-
1996-
19971956
/**
19981957
* Name Server: 获取指定集群下的所有 topic
19991958
*/
@@ -2191,4 +2150,9 @@ public ConsumerRunningInfo getConsumerRunningInfo(final String addr, String cons
21912150

21922151
throw new MQClientException(response.getCode(), response.getRemark());
21932152
}
2153+
2154+
2155+
public String getProjectGroupPrefix() {
2156+
return projectGroupPrefix;
2157+
}
21942158
}

0 commit comments

Comments
 (0)