Skip to content

Commit 97f60ff

Browse files
committed
Merge branch 'shijia2' into develop
Conflicts: rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
2 parents a557b7b + 2609ee3 commit 97f60ff

File tree

14 files changed

+205
-125
lines changed

14 files changed

+205
-125
lines changed

.travis.yml

Whitespace-only changes.

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
### RocketMQ是什么?
1+
### RocketMQ是什么?[![Build Status](https://travis-ci.org/alibaba/rocketmq.svg?branch=develop)](https://travis-ci.org/alibaba/rocketmq)
22
RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:
33

44
* 支持严格的消息顺序

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@
331331
<dependency>
332332
<groupId>io.netty</groupId>
333333
<artifactId>netty-all</artifactId>
334-
<version>4.0.19.Final</version>
334+
<version>4.0.21.Final</version>
335335
</dependency>
336336
<dependency>
337337
<groupId>com.alibaba</groupId>

rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
5757
import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
5858
import com.alibaba.rocketmq.common.sysflag.PullSysFlag;
59-
import com.alibaba.rocketmq.remoting.RPCHook;
6059
import com.alibaba.rocketmq.remoting.exception.RemotingException;
6160

6261

@@ -77,18 +76,12 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
7776
// Rebalance实现
7877
private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);
7978

80-
// 通信层hook
81-
private final RPCHook rpcHook;
82-
83-
84-
public DefaultMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, RPCHook rpcHook) {
85-
this.defaultMQPullConsumer = defaultMQPullConsumer;
86-
this.rpcHook = rpcHook;
87-
}
79+
// Consumer启动时间
80+
private final long consumerStartTimestamp = System.currentTimeMillis();
8881

8982

9083
public DefaultMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer) {
91-
this(defaultMQPullConsumer, null);
84+
this.defaultMQPullConsumer = defaultMQPullConsumer;
9285
}
9386

9487

@@ -504,8 +497,7 @@ public void start() throws MQClientException {
504497
}
505498

506499
this.mQClientFactory =
507-
MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer,
508-
rpcHook);
500+
MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer);
509501

510502
// 初始化Rebalance变量
511503
this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
@@ -689,10 +681,16 @@ public ConsumerRunningInfo consumerRunningInfo() {
689681

690682
// 各种配置及运行数据
691683
Properties prop = MixAll.object2Properties(this.defaultMQPullConsumer);
684+
prop.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, this.consumerStartTimestamp);
692685
info.setProperties(prop);
693686

694687
// 订阅关系
695688
info.getSubscriptionSet().addAll(this.subscriptions());
696689
return info;
697690
}
691+
692+
693+
public long getConsumerStartTimestamp() {
694+
return consumerStartTimestamp;
695+
}
698696
}

rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@
6868
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
6969
import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
7070
import com.alibaba.rocketmq.common.sysflag.PullSysFlag;
71-
import com.alibaba.rocketmq.remoting.RPCHook;
7271
import com.alibaba.rocketmq.remoting.exception.RemotingException;
7372

7473

@@ -110,27 +109,23 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
110109
// 消息过滤 hook
111110
private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
112111

113-
// 消费每条消息会回调
114-
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
115-
116-
// 通信层hook
117-
private final RPCHook rpcHook;
112+
// Consumer启动时间
113+
private final long consumerStartTimestamp = System.currentTimeMillis();
118114

119115

120-
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
121-
this.defaultMQPushConsumer = defaultMQPushConsumer;
122-
this.rpcHook = rpcHook;
116+
public void registerFilterMessageHook(final FilterMessageHook hook) {
117+
this.filterMessageHookList.add(hook);
118+
log.info("register FilterMessageHook Hook, {}", hook.hookName());
123119
}
124120

125-
126-
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer) {
127-
this(defaultMQPushConsumer, null);
128-
}
121+
/**
122+
* 消费每条消息会回调
123+
*/
124+
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
129125

130126

131-
public void registerFilterMessageHook(final FilterMessageHook hook) {
132-
this.filterMessageHookList.add(hook);
133-
log.info("register FilterMessageHook Hook, {}", hook.hookName());
127+
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer) {
128+
this.defaultMQPushConsumer = defaultMQPushConsumer;
134129
}
135130

136131

@@ -663,8 +658,7 @@ public void start() throws MQClientException {
663658
}
664659

665660
this.mQClientFactory =
666-
MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer,
667-
rpcHook);
661+
MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer);
668662

669663
// 初始化Rebalance变量
670664
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
@@ -1089,6 +1083,7 @@ public ConsumerRunningInfo consumerRunningInfo() {
10891083

10901084
prop.put(ConsumerRunningInfo.PROP_CONSUME_ORDERLY, this.consumeOrderly);
10911085
prop.put(ConsumerRunningInfo.PROP_THREADPOOL_CORE_SIZE, this.consumeMessageService.getCorePoolSize());
1086+
prop.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, this.consumerStartTimestamp);
10921087

10931088
info.setProperties(prop);
10941089

rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerRunningInfo.java

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public class ConsumerRunningInfo extends RemotingSerializable {
2121
public static final String PROP_CONSUME_ORDERLY = "PROP_CONSUMEORDERLY";
2222
public static final String PROP_CONSUME_TYPE = "PROP_CONSUME_TYPE";
2323
public static final String PROP_CLIENT_VERSION = "PROP_CLIENT_VERSION";
24+
public static final String PROP_CONSUMER_START_TIMESTAMP = "PROP_CONSUMER_START_TIMESTAMP";
2425

2526
// 各种配置及运行数据
2627
private Properties properties = new Properties();
@@ -191,9 +192,22 @@ public String formatString() {
191192
public static boolean analyzeSubscription(
192193
final TreeMap<String/* clientId */, ConsumerRunningInfo> criTable) {
193194
ConsumerRunningInfo prev = criTable.firstEntry().getValue();
194-
String property = prev.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_TYPE);
195+
196+
boolean push = false;
197+
{
198+
String property = prev.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_TYPE);
199+
push = ConsumeType.valueOf(property) == ConsumeType.CONSUME_PASSIVELY;
200+
}
201+
202+
boolean startForAWhile = false;
203+
{
204+
String property =
205+
prev.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP);
206+
startForAWhile = (System.currentTimeMillis() - Long.parseLong(property)) > (1000 * 60 * 2);
207+
}
208+
195209
// 只检测PUSH
196-
if (ConsumeType.valueOf(property) == ConsumeType.CONSUME_PASSIVELY) {
210+
if (push && startForAWhile) {
197211
// 分析订阅关系是否相同
198212
{
199213
Iterator<Entry<String, ConsumerRunningInfo>> it = criTable.entrySet().iterator();
@@ -222,4 +236,70 @@ public static boolean analyzeSubscription(
222236

223237
return true;
224238
}
239+
240+
241+
public static boolean analyzeRebalance(final TreeMap<String/* clientId */, ConsumerRunningInfo> criTable) {
242+
return true;
243+
}
244+
245+
246+
public static String analyzeProcessQueue(final String clientId, ConsumerRunningInfo info) {
247+
StringBuilder sb = new StringBuilder();
248+
boolean push = false;
249+
{
250+
String property = info.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_TYPE);
251+
push = ConsumeType.valueOf(property) == ConsumeType.CONSUME_PASSIVELY;
252+
}
253+
254+
boolean orderMsg = false;
255+
{
256+
String property = info.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_ORDERLY);
257+
orderMsg = Boolean.parseBoolean(property);
258+
}
259+
260+
if (push) {
261+
Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = info.getMqTable().entrySet().iterator();
262+
while (it.hasNext()) {
263+
Entry<MessageQueue, ProcessQueueInfo> next = it.next();
264+
MessageQueue mq = next.getKey();
265+
ProcessQueueInfo pq = next.getValue();
266+
267+
// 顺序消息
268+
if (orderMsg) {
269+
// 没锁住
270+
if (!pq.isLocked()) {
271+
sb.append(String.format("%s %s can't lock for a while, %dms\n", //
272+
clientId,//
273+
mq,//
274+
System.currentTimeMillis() - pq.getLastLockTimestamp()));
275+
}
276+
// 锁住
277+
else {
278+
// Rebalance已经丢弃此队列,但是没有正常释放Lock
279+
if (pq.isDroped() && (pq.getTryUnlockTimes() > 0)) {
280+
sb.append(String.format("%s %s unlock %d times, still failed\n",//
281+
clientId,//
282+
mq,//
283+
pq.getTryUnlockTimes()));
284+
}
285+
}
286+
287+
// 事务消息未提交
288+
}
289+
// 乱序消息
290+
else {
291+
long diff = System.currentTimeMillis() - pq.getLastConsumeTimestamp();
292+
// 在有消息的情况下,超过1分钟没再消费消息了
293+
if (diff > (1000 * 60) && pq.getCachedMsgCount() > 0) {
294+
sb.append(String.format("%s %s can't consume for a while, maybe blocked, %dms\n",//
295+
clientId,//
296+
mq, //
297+
diff));
298+
}
299+
}
300+
}
301+
}
302+
303+
return sb.toString();
304+
}
225305
}

rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProcessQueueInfo.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package com.alibaba.rocketmq.common.protocol.body;
22

3+
import com.alibaba.rocketmq.common.UtilAll;
4+
5+
36
public class ProcessQueueInfo {
47
/**
58
* 消费到哪里,提交的offset
@@ -169,7 +172,9 @@ public String toString() {
169172
+ cachedMsgCount + ", transactionMsgMinOffset=" + transactionMsgMinOffset
170173
+ ", transactionMsgMaxOffset=" + transactionMsgMaxOffset + ", transactionMsgCount="
171174
+ transactionMsgCount + ", locked=" + locked + ", tryUnlockTimes=" + tryUnlockTimes
172-
+ ", lastLockTimestamp=" + lastLockTimestamp + ", droped=" + droped + ", lastPullTimestamp="
173-
+ lastPullTimestamp + ", lastConsumeTimestamp=" + lastConsumeTimestamp + "]";
175+
+ ", lastLockTimestamp=" + UtilAll.timeMillisToHumanString(lastLockTimestamp) + ", droped="
176+
+ droped + ", lastPullTimestamp=" + UtilAll.timeMillisToHumanString(lastPullTimestamp)
177+
+ ", lastConsumeTimestamp=" + UtilAll.timeMillisToHumanString(lastConsumeTimestamp) + "]";
178+
174179
}
175180
}

rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommand.java

Lines changed: 48 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
import java.lang.reflect.Modifier;
2121
import java.nio.ByteBuffer;
2222
import java.util.HashMap;
23-
import java.util.Iterator;
24-
import java.util.Map.Entry;
2523
import java.util.concurrent.atomic.AtomicInteger;
2624

2725
import com.alibaba.fastjson.annotation.JSONField;
@@ -191,74 +189,62 @@ public CommandCustomHeader decodeCommandCustomHeader(Class<? extends CommandCust
191189
return null;
192190
}
193191

194-
Iterator<Entry<String, String>> it = this.extFields.entrySet().iterator();
195-
while (it.hasNext()) {
196-
Entry<String, String> entry = it.next();
197-
String name = entry.getKey();
198-
String value = entry.getValue();
199-
200-
try {
201-
Field field = objectHeader.getClass().getDeclaredField(name);
202-
field.setAccessible(true);
203-
String type = field.getType().getSimpleName();
204-
Object valueParsed = null;
205-
206-
if (type.equals("String")) {
207-
valueParsed = value;
208-
}
209-
else if (type.equals("Integer")) {
210-
valueParsed = Integer.parseInt(value);
211-
}
212-
else if (type.equals("Long")) {
213-
valueParsed = Long.parseLong(value);
214-
}
215-
else if (type.equals("Boolean")) {
216-
valueParsed = Boolean.parseBoolean(value);
217-
}
218-
else if (type.equals("Double")) {
219-
valueParsed = Double.parseDouble(value);
220-
}
221-
else if (type.equals("int")) {
222-
valueParsed = Integer.parseInt(value);
223-
}
224-
else if (type.equals("long")) {
225-
valueParsed = Long.parseLong(value);
226-
}
227-
else if (type.equals("boolean")) {
228-
valueParsed = Boolean.parseBoolean(value);
229-
}
230-
else if (type.equals("double")) {
231-
valueParsed = Double.parseDouble(value);
232-
}
233-
234-
field.set(objectHeader, valueParsed);
235-
}
236-
catch (Throwable e) {
237-
}
238-
}
239-
240192
// 检查返回对象是否有效
241193
Field[] fields = objectHeader.getClass().getDeclaredFields();
242194
for (Field field : fields) {
243195
if (!Modifier.isStatic(field.getModifiers())) {
244-
String name = field.getName();
245-
if (!name.startsWith("this")) {
246-
Object value = null;
196+
String fieldName = field.getName();
197+
if (!fieldName.startsWith("this")) {
247198
try {
199+
String value = this.extFields.get(fieldName);
200+
if (null == value) {
201+
Annotation annotation = field.getAnnotation(CFNotNull.class);
202+
if (annotation != null) {
203+
throw new RemotingCommandException("the custom field <" + fieldName
204+
+ "> is null");
205+
}
206+
}
207+
248208
field.setAccessible(true);
249-
value = field.get(objectHeader);
250-
}
251-
catch (IllegalArgumentException e) {
252-
}
253-
catch (IllegalAccessException e) {
254-
}
209+
String type = field.getType().getSimpleName();
210+
Object valueParsed = null;
255211

256-
// 空值检查
257-
if (null == value) {
258-
Annotation annotation = field.getAnnotation(CFNotNull.class);
259-
if (annotation != null) {
260-
throw new RemotingCommandException("the custom field <" + name + "> is null");
212+
if (type.equals("String")) {
213+
valueParsed = value;
214+
}
215+
else if (type.equals("Integer")) {
216+
valueParsed = Integer.parseInt(value);
217+
}
218+
else if (type.equals("Long")) {
219+
valueParsed = Long.parseLong(value);
220+
}
221+
else if (type.equals("Boolean")) {
222+
valueParsed = Boolean.parseBoolean(value);
261223
}
224+
else if (type.equals("Double")) {
225+
valueParsed = Double.parseDouble(value);
226+
}
227+
else if (type.equals("int")) {
228+
valueParsed = Integer.parseInt(value);
229+
}
230+
else if (type.equals("long")) {
231+
valueParsed = Long.parseLong(value);
232+
}
233+
else if (type.equals("boolean")) {
234+
valueParsed = Boolean.parseBoolean(value);
235+
}
236+
else if (type.equals("double")) {
237+
valueParsed = Double.parseDouble(value);
238+
}
239+
else {
240+
throw new RemotingCommandException("the custom field <" + fieldName
241+
+ "> type is not supported");
242+
}
243+
244+
field.set(objectHeader, valueParsed);
245+
246+
}
247+
catch (Throwable e) {
262248
}
263249
}
264250
}

0 commit comments

Comments
 (0)