3.2. 发布订阅模型(Publish/Subscribe)

简介: 发布订阅模型通过交换机实现消息一对多分发,生产者将消息发给交换机,由其广播至多个绑定队列,每个队列的消费者均可接收消息。Fanout交换机为广播模式,支持消息同时推送至所有绑定队列,适用于通知、日志等场景。交换机不存储消息,若无队列绑定则消息丢失。

3.2.1 介绍
工作队列模型一次只能将消息发给一个队列,绑定队列的多个消息者只能有一个消费者处理消息。如果一条消息要发给多个应用程序使用工作队列模型将无法实现。举例,下图中支付成功后支付服务将消息发给交易服务和通知服务,使用工作队列模型将无法实现。

使用发布订阅模型可以实现上图的需求,发布订阅模型可以实现一条消息发给多个队列,每个队列绑定到同一个交换机,最终实现了向多个消费者发送一条消息,这种模式称为“发布/订阅”模型。

发布订阅模型里,生产者只能将消息发送到交换机,由交换机将消息推送到队列,交换机可以将消息推送给绑定它的所有队列,也可以有针对性的将消息推送给某几个队列,这就相当于有一批消费者订阅了消息,交换机根据各自的订阅去推送消息,组成部分如下:
● Publisher:生产者,不再发送消息到队列中,而是发给交换机
● Exchange:交换机,一方面,接收生产者发送的消息。另一方面,将消息推送给队列,是将消息推送给某个特别队列、递交给所有队列、或是将消息丢弃,到底如何操作,取决于交换机的类型。
● Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
● Consumer:消费者,与以前一样,订阅队列,没有变化
注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力[可以暂存,但不支持持久化],因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
3.2.2 交换机类型
交换机是如何实现将消息推送给所有队列,还是有针对性的将消息推送给某几个队列呢?
实现不同的需求要选用不同类型的交换机,可用的交换机类型:direct, topic, headers 和fanout.
● Fanout:广播类型,将消息交给所有绑定到交换机的队列。
● Direct:直接类型,基于RoutingKey(路由key)发送给订阅了消息的队列,交换机根据routingkey去判断消息应该转发到哪个队列
● Topic:通配符类型(主题类型),与Direct类似,只不过RoutingKey可以使用通配符
● Headers:头匹配类型,基于MQ的消息头匹配,用的较少。
课堂中,我们讲解前面的三种交换机模式。
3.3. fanout交换机
3.3.1 介绍
Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。在广播模式下,消息发送流程是这样的:

● 1) 可以有多个队列
● 2) 每个队列都要绑定到Exchange(交换机)
● 3) 生产者发送的消息,只能发送到交换机
● 4) 交换机把消息发送给绑定过的所有队列
● 5) 订阅队列的消费者接收消息,每个队列订阅的消费者只有一个能拿到消息。
3.3.2 测试
3.3.2.1 创建队列
我们的计划是这样的:

● 创建一个名为 hmall.fanout的交换机,类型是Fanout
● 创建两个队列fanout.queue1和fanout.queue2,绑定到交换机hmall.fanout
在控制台创建队列fanout.queue1:

在创建一个队列fanout.queue2:

3.3.2.2 创建交换机
然后再创建一个交换机hmall.fanout:

3.3.2.3 绑定队列到交换机
然后绑定两个队列到交换机:

3.3.2.4 发送消息
下边实现消息发送:
在publisher服务的SpringAmqpTest类中添加测试方法:
@Test
public void testFanoutExchange() {
// 交换机名称
String exchangeName = "hmall.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
发送成功查看mq控制台消息转发到了绑定此交换机的两个队列

3.3.2.5 接收消息
下边实现消息接收:
在consumer服务的SpringRabbitListener中添加两个方法,作为消费者:
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
将consumer服务启动起来后可以通过rabbitmq控制台查看消费者监听情况
进入队列界面,点击fanout.queue1:

进入队列界面,查看consumers,下图表示fanout.queue1队列有一个监听者。

同样的方法可以查看fanout.queue2队列的监听情况。
fanout.queue1和fanout.queue2每个队列都有一个监听者。
下边执行发送消息程序,观察控制台,下图说明每个消费者成功收到消息。
消费者1接收到Fanout消息:【hello, everyone!】
消费者2接收到Fanout消息:【hello, everyone!】
3.3.2.6 启动多个消费者实例
下边我们把consumer服务启动两个实例

此时再观察fanout.queue1和fanout.queue2的监听者,发现每个队列有两个监听者

此时的结果相当于下图:

此时执行发送消息程序后四个消息者都可以收到消息吗?
通过测试我们发现:
在每个队列的消费者中,发送一条消息只会有一个消费者接收到消息。
每个队列默认采用轮询的方式向消费者推送消息。
3.3.3 小结
交换机的作用是什么?
● 接收publisher发送的消息
● 将消息按照规则路由到与之绑定的队列
● 不能缓存消息,路由失败,消息丢失
● FanoutExchange的会将消息路由到每个绑定的队列

相关文章
|
存储 Kubernetes 监控
K8S核心组件介绍
K8S核心组件介绍
|
24天前
|
监控 NoSQL Unix
我们来说一说 Redis IO 多路复用模型
我是小假 期待与你的下一次相遇 ~
119 4
|
4月前
|
人工智能 算法 C++
浅谈 KMP
KMP算法是一种高效的字符串匹配算法,由Knuth、Morris和Pratt提出。它通过预处理模式串构建next数组,利用匹配失败时的信息减少重复比较,从而提升匹配效率。其时间复杂度为O(m+n),适用于大规模文本匹配场景。
432 0
|
9月前
|
存储 大数据 BI
场景题:有40亿个QQ号如何去重?仅1GB内存
在处理大数据去重问题时,如40亿QQ号的去重(仅1GB内存),可采用Bitmap和布隆过滤器两种方法。Bitmap利用位图存储,每个QQ号占1位,总需512MB内存,适用于整型数据;布隆过滤器通过多个哈希函数计算下标,适合字符串或对象去重,但存在误判率。在线人员统计等场景也可使用类似思路,将ID作为偏移值标记在线状态或视频存在性。
347 4
|
存储 监控 关系型数据库
MySQL自增ID耗尽解决方案:应对策略与实践技巧
在MySQL数据库中,自增ID(AUTO_INCREMENT)是一种特殊的属性,用于自动为新插入的行生成唯一的标识符。然而,当自增ID达到其最大值时,会发生什么?又该如何解决?本文将探讨MySQL自增ID耗尽的问题,并提供一些实用的解决方案。
502 1
|
人工智能 算法 C语言
详解树状数组(C/C++)
详解树状数组(C/C++)
Manacher(马拉车)算法详解
该文章详细解释了Manacher算法,这是一种高效找出给定字符串最长回文子串的算法,通过在字符串中插入特殊字符构建新的字符串,并利用中心扩展策略来找出最长回文序列,时间复杂度为O(N),空间复杂度为O(N)。
|
消息中间件 监控 UED
【揭秘消息队列背后的秘密!】如何解决消息队列的延时及过期失效问题?深入剖析与实战指南!
【8月更文挑战第24天】本文以随笔形式探讨了消息队列在实际应用中面临的消息延时及过期失效问题。针对消息延时,文章提出了包括优化消息队列配置、提高消费者效率和利用优先级队列在内的解决方案;并通过示例代码展示了如何优化RabbitMQ中的消费者处理流程。对于消息过期失效问题,则建议设置消息TTL、采用死信队列并实施监控报警机制;同样提供了基于RabbitMQ设置消息TTL的具体实现。这些策略有助于提升消息队列的性能和系统的整体稳定性。
375 2
|
Kubernetes Cloud Native 容器
开放下载!《深入浅出Kubernetes》
一次搞懂6个核心原理吃透基础理论,一次学会6个典型问题的华丽操作
开放下载!《深入浅出Kubernetes》
|
存储 消息中间件 移动开发
还在用crontab? 分布式定时任务了解一下
还在用crontab? 分布式定时任务了解一下

热门文章

最新文章