RabbitMQ消息队列高级特性
[TOC]
如何保障100%的投递成功
什么是生产端的可靠性投递
- 保障消息的成功发出
- 保障MQ节点的成功接收
- 发送端收到MQ节点(Broker)
- 完善的消息进行补偿机制
生产端的可靠性投递
- 消息落库,对消息状态进行打标
- 消息的延迟投递,做二次确认,回调检查
消息信心落库,对消息状态进行打标
- 两次写盘操作,分别写入订单库和消息库
- 发送消息到消息队列
- 异步监听消息是否发送成功
- 更新消息状态为1,表示发送成功
- 分布式定时任务,检测失败的消息
- 重新发送消息
- 重发次数过多的消息标记状态2,使用其他方法解决
缺点:数据库插入操作过多
消息的延迟投递,做二次确认,回调检查
- 业务消息数据落库,再发送消息去消息队列
- 第二次发送消息,延迟消息做二次确认()
- 发送消息到下游服务
- 下游服务发送确认信息
- 确认消息发送到回调服务
- 回调服务发送确认消息,并且将消息入消息库
- 发送失败就重新发送消息
消息队列幂等性
幂等性是什么
借鉴数据库乐观锁机制,比如执行一条更新库存的SQL语句,保证执行一次和重复执行的结果一致。
在订单产生的业务高峰期,避免消息重复消费
消费端实现幂等,消息永远只会被消费一次
幂等性保障
- 唯一id+指纹码,利用数据库主键去重
- 利用Redis的原子性实现
唯一ID+指纹码机制
唯一ID+指纹码机制,利用数据库主键去重
好处:实现简单
缺点:高并发下有数据库写入的性能瓶颈
解决方案: 跟进ID进行分库分表进行算法路由
利用Redis原子特性实现
使用Redis进行幂等,需要考虑问题:
- 是否要进行数据落库,数据库和缓存如何做到原子性(同时成功或者同时失败)
- 如果不入库,都存储到缓存中,如何设置定时同步策略
Confirm消息确认
确认机制流程图
- 在channel开启确认模式:
channel.confirmSelect
- 在channel上添加监听:
addConfirmListener
,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送,或者记录日志等后续处理!
生产者代码
1 | //1 创建ConnectionFactory |
消费者代码
1 | ConnectionFactory connectionFactory = new ConnectionFactory(); |
Return消息机制
- Return Listener用于处理一些不可路由的消息
- 我们的消息生产者,指定一个Exchange和Routing key,把消息送达到某一个队列中,然后消费者监听队列,进行消费处理操作
- 某些情况下,发送的消息路由不到,这个时候需要监听不可达的消息,就要使用Return Listener
- 在基础API中有一个关键配置项
Mandatory
,如果为true,监听器就会接收到路由不可达的消息进行后续处理,如果为false,那么broker端自动删除该消息。
消费端的限流
一个场景,RabbitMQ服务器有上万条数据,一个消费者客户端会出现下面的情况。
巨量消息推送,但是单个客户端无法处理这么多数据。需要限制消费速度。
关于消费端限流参数设置
默认的Qos对与客户端不限制缓冲区大小,这样可能会导致服务端接受大量消息到RAM中,导致内存溢出。
假设一条消息从生产端发送到客户端需要50ms,而客户端处理消息耗时4ms,而发送ACK到消息队列需要50ms,那么客户端处理效率为4ms/104ms=3.8%,我们需要让他100%的忙碌起来,
而在104ms的时间内,客户端可以处理104/4=26条消息,但是网络不一定是稳定的,随时可能网络船速效率减半,为了解决问题,我们通常把缓冲区大小设置为处理速度的两倍。
但是如果是在处理消息需要40ms,而把缓冲区设置为过大的话,会导致消息在队列中堆积,这样会导致处理的消息时延大大增加。
所以限流参数设置除了需要让消费者保持100%的忙碌,还需要尽可能的让客户端尽可能少地缓存消息。
消费端的手工ACK和NACK
消费端进行消费的时候,由于业务异常我们记录到日志,然后进行补偿。
由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功。
消费端的重回队列
消费端重回队列是为了对没有处理成功的消息,把消息重新发送回队列(实际上并不常用)。
TTL队列/消息
TTL
Time To Live的缩写,也就是生存时间
RabbitMQ支持消息的过期时间,在消息发送时指定
RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过队列的超时时间配置,那么消息会自动清除。
死信队列(DLX,Dead-Letter-Exchange)
利用DLX,在消息在一个队列中变成死信,它能被重新发布到另一个exchange。
消息变成死信有以下几种情况
- 消息被拒绝,并且requeue=false
- 消息TTL过期
- 队列到达最大长度
死信队列
DLX也是一个正常的Exchange,和一般的Exchange没有区别,能在任何队列上被指定,实际上设置某个队列的属性。
当队列中有死信时,RabbitMQ自动的将这个消息发布到设置的Exchange上,进而被路由到另一个队列。
可以监听这个队列中消息做相应的处理,这个特性弥补RabbitMQ以前支持的immediate参数功能。
死信队列设置
死信队列的exchange和queue
- Exchange: dlx.exchange
- Queue: dlx.queue
- RoutingKey:#
正常声明交换机,队列,绑定,只不过需要队列加上一个参数即可:arguments.put(“x-dead-letter-exchange”,”dlx.exchange”);
这样消息过期,requeue,队列在达到最大长度时,直接路由到死信队列!
本章小结
首先介绍了大厂在实际使用中如何保障100%的消息投递成功和幂等性,以及对RabbitMQ的确认消息,返回消息,ACK与重回队列,消息的限流,以及对超时时间,死信队列的使用。
参考mooc网RabbitMQ教程