Skip to content

Commit 9ddae51

Browse files
author
yanpenglei
committed
ack
1 parent 84146c1 commit 9ddae51

File tree

3 files changed

+52
-18
lines changed

3 files changed

+52
-18
lines changed
Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,70 @@
11
package com.souyunku.example.springboot.rabbitmq.ack.producer;
22

3+
import com.souyunku.example.springboot.rabbitmq.ack.receiver.HelloReceiver;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
36
import org.springframework.amqp.core.Message;
47
import org.springframework.amqp.rabbit.core.RabbitTemplate;
8+
import org.springframework.amqp.rabbit.support.CorrelationData;
9+
import org.springframework.beans.factory.InitializingBean;
510
import org.springframework.beans.factory.annotation.Autowired;
611
import org.springframework.stereotype.Component;
712

813
import java.util.Date;
914

1015
@Component
11-
public class HelloSender implements RabbitTemplate.ReturnCallback {
16+
public class HelloSender implements RabbitTemplate.ReturnCallback, RabbitTemplate.ConfirmCallback {
17+
18+
private Logger logger = LoggerFactory.getLogger(HelloSender.class);
1219

1320
@Autowired
1421
private RabbitTemplate rabbitTemplate;
1522

1623
public void sendMessage(String context) {
17-
System.out.println("HelloReceiver发送内容: " + context + ",发送时间:" + new Date());
18-
1924

2025
// 消息发送失败返回到队列中, application.properties 配置 spring.rabbitmq.publisher-returns=true
2126
rabbitTemplate.setMandatory(true);
2227

2328
this.rabbitTemplate.setReturnCallback(this);
29+
this.rabbitTemplate.setConfirmCallback(this);
2430

2531
this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
2632
if (!ack) {
27-
System.out.println("sendMessage 发送失败" + cause + correlationData.toString());
33+
logger.info("HelloSender 发送失败" + cause + correlationData.toString());
2834
} else {
29-
System.out.println("sendMessage 发送成功 ");
35+
logger.info("HelloSender 发送成功");
3036
}
3137
});
38+
39+
logger.info("HelloSender 发送的消息内容:{}", context);
40+
3241
this.rabbitTemplate.convertAndSend("hello", context);
42+
3343
}
3444

3545

3646
/**
3747
* 失败后返回消息回调
38-
*
39-
* @param message 消息返回的消息
40-
* @param replyCode 回复代码
41-
* @param replyText 回复文本
42-
* @param exchange 交换交换
43-
* @param routingKey 路由密钥
48+
* <p>
49+
* 当消息发送出去找不到对应路由队列时,将会把消息退回
50+
* 如果有任何一个路由队列接收投递消息成功,则不会退回消息
4451
*/
4552
@Override
4653
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
47-
System.out.println("return--message:" + new String(message.getBody()) + ",replyCode:" + replyCode + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);
54+
logger.info("return--message:" + new String(message.getBody()) + ",replyCode:" + replyCode + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);
4855
}
4956

57+
/**
58+
* 实现ConfirmCallback
59+
* <p>
60+
* ACK=true仅仅标示消息已被Broker接收到,并不表示已成功投放至消息队列中
61+
* ACK=false标示消息由于Broker处理错误,消息并未处理成功
62+
*/
63+
@Override
64+
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
65+
logger.info("消息id: " + correlationData + "确认" + (ack ? "成功:" : "失败"));
66+
}
67+
68+
5069

5170
}

spring-boot-rabbitmq-ack/src/main/java/com/souyunku/example/springboot/rabbitmq/ack/receiver/HelloReceiver.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33

44
import com.rabbitmq.client.Channel;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
57
import org.springframework.amqp.core.Message;
68
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
79
import org.springframework.amqp.rabbit.annotation.RabbitListener;
@@ -15,20 +17,31 @@
1517
@RabbitListener(queues = "hello")
1618
public class HelloReceiver {
1719

20+
private Logger logger = LoggerFactory.getLogger(HelloReceiver.class);
21+
1822
@RabbitHandler
19-
public void process(String context, Message message, Channel channel) throws Exception {
20-
System.out.println("HelloReceiver收到内容: " + context + ",收到时间:" + new Date());
23+
public void process(String context, Message message, Channel channel) {
24+
logger.info("HelloReceiver 监听到消息内容:{}", context);
2125
try {
26+
2227
//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
2328
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
2429
//消息确认 因为我在属性配置文件里面开启了ACK确认 所以如果代码没有执行ACK确认 你在RabbitMQ的后台会看到消息会一直留在队列里面未消费掉 只要程序一启动开始接受该队列消息的时候 又会收到
2530

26-
System.out.println("HelloReceiver消息接收成功");
31+
logger.info("HelloReceiver 消息接收成功");
2732
} catch (Exception e) {
2833
e.printStackTrace();
29-
///ack返回false,并重新回到队列,api里面解释得很清楚
30-
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
31-
System.out.println("消息接收失败");
34+
35+
logger.info("HelloReceiver 消息接收失败");
36+
// ack返回false,并重新放回队列
37+
try {
38+
logger.info("HelloReceiver ack返回false,并重新放回队列");
39+
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
40+
} catch (IOException e1) {
41+
e1.printStackTrace();
42+
}
43+
44+
3245
}
3346

3447
}

spring-boot-rabbitmq-ack/src/main/resources/application.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ spring.rabbitmq.host=10.4.98.15
66
spring.rabbitmq.port=5672
77
spring.rabbitmq.username=admin
88
spring.rabbitmq.password=admin
9+
10+
911
# 开启发送确认
1012
spring.rabbitmq.publisher-confirms=true
1113
# 开启发送失败退回

0 commit comments

Comments
 (0)