Skip to content

Commit e7edf2f

Browse files
committed
指定Consumer消费某条消息。
1 parent 06a8438 commit e7edf2f

File tree

5 files changed

+214
-25
lines changed

5 files changed

+214
-25
lines changed

rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java

Lines changed: 75 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,18 @@
1515
*/
1616
package com.alibaba.rocketmq.client.impl.consumer;
1717

18+
import java.util.ArrayList;
19+
import java.util.Collections;
20+
import java.util.List;
21+
import java.util.concurrent.BlockingQueue;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.LinkedBlockingQueue;
24+
import java.util.concurrent.ScheduledExecutorService;
25+
import java.util.concurrent.ThreadPoolExecutor;
26+
import java.util.concurrent.TimeUnit;
27+
28+
import org.slf4j.Logger;
29+
1830
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
1931
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
2032
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -27,13 +39,9 @@
2739
import com.alibaba.rocketmq.common.message.MessageConst;
2840
import com.alibaba.rocketmq.common.message.MessageExt;
2941
import com.alibaba.rocketmq.common.message.MessageQueue;
42+
import com.alibaba.rocketmq.common.protocol.body.CMResult;
43+
import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
3044
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
31-
import org.slf4j.Logger;
32-
33-
import java.util.ArrayList;
34-
import java.util.Collections;
35-
import java.util.List;
36-
import java.util.concurrent.*;
3745

3846

3947
/**
@@ -105,17 +113,6 @@ public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQ
105113
}
106114

107115

108-
private void resetRetryTopic(final List<MessageExt> msgs) {
109-
final String groupTopic = MixAll.getRetryTopic(consumerGroup);
110-
for (MessageExt msg : msgs) {
111-
String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
112-
if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
113-
msg.setTopic(retryTopic);
114-
}
115-
}
116-
}
117-
118-
119116
@Override
120117
public void run() {
121118
if (this.processQueue.isDroped()) {
@@ -145,7 +142,7 @@ public void run() {
145142
long beginTimestamp = System.currentTimeMillis();
146143

147144
try {
148-
this.resetRetryTopic(msgs);
145+
ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
149146
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
150147
}
151148
catch (Throwable e) {
@@ -168,8 +165,8 @@ public void run() {
168165

169166
// 执行Hook
170167
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
171-
consumeMessageContext.setStatus(status.toString());
172-
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
168+
consumeMessageContext.setStatus(status.toString());
169+
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
173170
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl
174171
.executeHookAfter(consumeMessageContext);
175172
}
@@ -390,4 +387,62 @@ public void decCorePoolSize() {
390387
public int getCorePoolSize() {
391388
return this.consumeExecutor.getCorePoolSize();
392389
}
390+
391+
392+
@Override
393+
public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName) {
394+
ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult();
395+
result.setOrder(false);
396+
result.setAutoCommit(true);
397+
398+
List<MessageExt> msgs = new ArrayList<MessageExt>();
399+
MessageQueue mq = new MessageQueue();
400+
mq.setBrokerName(brokerName);
401+
mq.setTopic(msg.getTopic());
402+
mq.setQueueId(msg.getQueueId());
403+
404+
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(mq);
405+
406+
this.resetRetryTopic(msgs);
407+
408+
final long beginTime = System.currentTimeMillis();
409+
410+
try {
411+
ConsumeConcurrentlyStatus status = this.messageListener.consumeMessage(msgs, context);
412+
if (status != null) {
413+
switch (status) {
414+
case CONSUME_SUCCESS:
415+
result.setConsumeResult(CMResult.CR_SUCCESS);
416+
break;
417+
case RECONSUME_LATER:
418+
result.setConsumeResult(CMResult.CR_LATER);
419+
break;
420+
default:
421+
break;
422+
}
423+
}
424+
else {
425+
result.setConsumeResult(CMResult.CR_RETURN_NULL);
426+
}
427+
}
428+
catch (Throwable e) {
429+
result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
430+
result.setRemark(RemotingHelper.exceptionSimpleDesc(e));
431+
}
432+
433+
result.setSpentTimeMills(System.currentTimeMillis() - beginTime);
434+
435+
return result;
436+
}
437+
438+
439+
public void resetRetryTopic(final List<MessageExt> msgs) {
440+
final String groupTopic = MixAll.getRetryTopic(consumerGroup);
441+
for (MessageExt msg : msgs) {
442+
String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
443+
if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
444+
msg.setTopic(retryTopic);
445+
}
446+
}
447+
}
393448
}

rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,18 @@
1515
*/
1616
package com.alibaba.rocketmq.client.impl.consumer;
1717

18+
import java.util.ArrayList;
19+
import java.util.Collections;
20+
import java.util.List;
21+
import java.util.concurrent.BlockingQueue;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.LinkedBlockingQueue;
24+
import java.util.concurrent.ScheduledExecutorService;
25+
import java.util.concurrent.ThreadPoolExecutor;
26+
import java.util.concurrent.TimeUnit;
27+
28+
import org.slf4j.Logger;
29+
1830
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
1931
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
2032
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
@@ -25,13 +37,10 @@
2537
import com.alibaba.rocketmq.common.ThreadFactoryImpl;
2638
import com.alibaba.rocketmq.common.message.MessageExt;
2739
import com.alibaba.rocketmq.common.message.MessageQueue;
40+
import com.alibaba.rocketmq.common.protocol.body.CMResult;
41+
import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
2842
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
2943
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
30-
import org.slf4j.Logger;
31-
32-
import java.util.Collections;
33-
import java.util.List;
34-
import java.util.concurrent.*;
3544

3645

3746
/**
@@ -476,4 +485,56 @@ public void decCorePoolSize() {
476485
public int getCorePoolSize() {
477486
return this.consumeExecutor.getCorePoolSize();
478487
}
488+
489+
490+
@Override
491+
public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName) {
492+
ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult();
493+
result.setOrder(true);
494+
495+
List<MessageExt> msgs = new ArrayList<MessageExt>();
496+
MessageQueue mq = new MessageQueue();
497+
mq.setBrokerName(brokerName);
498+
mq.setTopic(msg.getTopic());
499+
mq.setQueueId(msg.getQueueId());
500+
501+
ConsumeOrderlyContext context = new ConsumeOrderlyContext(mq);
502+
503+
final long beginTime = System.currentTimeMillis();
504+
505+
try {
506+
ConsumeOrderlyStatus status = this.messageListener.consumeMessage(msgs, context);
507+
if (status != null) {
508+
switch (status) {
509+
case COMMIT:
510+
result.setConsumeResult(CMResult.CR_COMMIT);
511+
break;
512+
case ROLLBACK:
513+
result.setConsumeResult(CMResult.CR_ROLLBACK);
514+
break;
515+
case SUCCESS:
516+
result.setConsumeResult(CMResult.CR_SUCCESS);
517+
break;
518+
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
519+
result.setConsumeResult(CMResult.CR_LATER);
520+
break;
521+
default:
522+
break;
523+
}
524+
}
525+
else {
526+
result.setConsumeResult(CMResult.CR_RETURN_NULL);
527+
}
528+
}
529+
catch (Throwable e) {
530+
result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
531+
result.setRemark(RemotingHelper.exceptionSimpleDesc(e));
532+
}
533+
534+
result.setAutoCommit(context.isAutoCommit());
535+
result.setSpentTimeMills(System.currentTimeMillis() - beginTime);
536+
537+
return result;
538+
}
539+
479540
}

rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.alibaba.rocketmq.common.message.MessageExt;
2121
import com.alibaba.rocketmq.common.message.MessageQueue;
22+
import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
2223

2324

2425
/**
@@ -46,6 +47,9 @@ public interface ConsumeMessageService {
4647
public int getCorePoolSize();
4748

4849

50+
public ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
51+
52+
4953
public void submitConsumeRequest(//
5054
final List<MessageExt> msgs, //
5155
final ProcessQueue processQueue, //
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.alibaba.rocketmq.common.protocol.body;
2+
3+
public enum CMResult {
4+
CR_SUCCESS,
5+
CR_LATER,
6+
CR_ROLLBACK,
7+
CR_COMMIT,
8+
CR_THROW_EXCEPTION,
9+
CR_RETURN_NULL,
10+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package com.alibaba.rocketmq.common.protocol.body;
2+
3+
public class ConsumeMessageDirectlyResult {
4+
private boolean order = false;
5+
private boolean autoCommit = true;
6+
private CMResult consumeResult;
7+
private String remark;
8+
private long spentTimeMills;
9+
10+
11+
public boolean isOrder() {
12+
return order;
13+
}
14+
15+
16+
public void setOrder(boolean order) {
17+
this.order = order;
18+
}
19+
20+
21+
public boolean isAutoCommit() {
22+
return autoCommit;
23+
}
24+
25+
26+
public void setAutoCommit(boolean autoCommit) {
27+
this.autoCommit = autoCommit;
28+
}
29+
30+
31+
public String getRemark() {
32+
return remark;
33+
}
34+
35+
36+
public void setRemark(String remark) {
37+
this.remark = remark;
38+
}
39+
40+
41+
public CMResult getConsumeResult() {
42+
return consumeResult;
43+
}
44+
45+
46+
public void setConsumeResult(CMResult consumeResult) {
47+
this.consumeResult = consumeResult;
48+
}
49+
50+
51+
public long getSpentTimeMills() {
52+
return spentTimeMills;
53+
}
54+
55+
56+
public void setSpentTimeMills(long spentTimeMills) {
57+
this.spentTimeMills = spentTimeMills;
58+
}
59+
}

0 commit comments

Comments
 (0)