3.2.1 介绍
工作队列模型一次只能将消息发给一个队列,绑定队列的多个消息者只能有一个消费者处理消息。如果一条消息要发给多个应用程序使用工作队列模型将无法实现。举例,下图中支付成功后支付服务将消息发给交易服务和通知服务,使用工作队列模型将无法实现。
使用发布订阅模型可以实现上图的需求,发布订阅模型可以实现一条消息发给多个队列,每个队列绑定到同一个交换机,最终实现了向多个消费者发送一条消息,这种模式称为“发布/订阅”模型。
发布订阅模型里,生产者只能将消息发送到交换机,由交换机将消息推送到队列,交换机可以将消息推送给绑定它的所有队列,也可以有针对性的将消息推送给某几个队列,这就相当于有一批消费者订阅了消息,交换机根据各自的订阅去推送消息,组成部分如下:
● Publisher:生产者,不再发送消息到队列中,而是发给交换机
● Exchange:交换机,一方面,接收生产者发送的消息。另一方面,将消息推送给队列,是将消息推送给某个特别队列、递交给所有队列、或是将消息丢弃,到底如何操作,取决于交换机的类型。
● Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
● Consumer:消费者,与以前一样,订阅队列,没有变化
注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力[可以暂存,但不支持持久化],因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
3.2.2 交换机类型
交换机是如何实现将消息推送给所有队列,还是有针对性的将消息推送给某几个队列呢?
实现不同的需求要选用不同类型的交换机,可用的交换机类型:direct, topic, headers 和fanout.
● Fanout:广播类型,将消息交给所有绑定到交换机的队列。
● Direct:直接类型,基于RoutingKey(路由key)发送给订阅了消息的队列,交换机根据routingkey去判断消息应该转发到哪个队列
● Topic:通配符类型(主题类型),与Direct类似,只不过RoutingKey可以使用通配符
● Headers:头匹配类型,基于MQ的消息头匹配,用的较少。
课堂中,我们讲解前面的三种交换机模式。
3.3. fanout交换机
3.3.1 介绍
Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。在广播模式下,消息发送流程是这样的:
● 1) 可以有多个队列
● 2) 每个队列都要绑定到Exchange(交换机)
● 3) 生产者发送的消息,只能发送到交换机
● 4) 交换机把消息发送给绑定过的所有队列
● 5) 订阅队列的消费者接收消息,每个队列订阅的消费者只有一个能拿到消息。
3.3.2 测试
3.3.2.1 创建队列
我们的计划是这样的:
● 创建一个名为 hmall.fanout的交换机,类型是Fanout
● 创建两个队列fanout.queue1和fanout.queue2,绑定到交换机hmall.fanout
在控制台创建队列fanout.queue1:
在创建一个队列fanout.queue2:
3.3.2.2 创建交换机
然后再创建一个交换机hmall.fanout:
3.3.2.3 绑定队列到交换机
然后绑定两个队列到交换机:
3.3.2.4 发送消息
下边实现消息发送:
在publisher服务的SpringAmqpTest类中添加测试方法:
@Test
public void testFanoutExchange() {
// 交换机名称
String exchangeName = "hmall.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
发送成功查看mq控制台消息转发到了绑定此交换机的两个队列
3.3.2.5 接收消息
下边实现消息接收:
在consumer服务的SpringRabbitListener中添加两个方法,作为消费者:
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
将consumer服务启动起来后可以通过rabbitmq控制台查看消费者监听情况
进入队列界面,点击fanout.queue1:
进入队列界面,查看consumers,下图表示fanout.queue1队列有一个监听者。
同样的方法可以查看fanout.queue2队列的监听情况。
fanout.queue1和fanout.queue2每个队列都有一个监听者。
下边执行发送消息程序,观察控制台,下图说明每个消费者成功收到消息。
消费者1接收到Fanout消息:【hello, everyone!】
消费者2接收到Fanout消息:【hello, everyone!】
3.3.2.6 启动多个消费者实例
下边我们把consumer服务启动两个实例
此时再观察fanout.queue1和fanout.queue2的监听者,发现每个队列有两个监听者
此时的结果相当于下图:
此时执行发送消息程序后四个消息者都可以收到消息吗?
通过测试我们发现:
在每个队列的消费者中,发送一条消息只会有一个消费者接收到消息。
每个队列默认采用轮询的方式向消费者推送消息。
3.3.3 小结
交换机的作用是什么?
● 接收publisher发送的消息
● 将消息按照规则路由到与之绑定的队列
● 不能缓存消息,路由失败,消息丢失
● FanoutExchange的会将消息路由到每个绑定的队列