RabbitMQ高级特性

RabbitMQ消息队列高级特性

[TOC]

如何保障100%的投递成功

什么是生产端的可靠性投递

  • 保障消息的成功发出
  • 保障MQ节点的成功接收
  • 发送端收到MQ节点(Broker)
  • 完善的消息进行补偿机制

生产端的可靠性投递

  • 消息落库,对消息状态进行打标
  • 消息的延迟投递,做二次确认,回调检查

消息信心落库,对消息状态进行打标

3JBjZF.png

  1. 两次写盘操作,分别写入订单库和消息库
  2. 发送消息到消息队列
  3. 异步监听消息是否发送成功
  4. 更新消息状态为1,表示发送成功
  5. 分布式定时任务,检测失败的消息
  6. 重新发送消息
  7. 重发次数过多的消息标记状态2,使用其他方法解决

缺点:数据库插入操作过多

消息的延迟投递,做二次确认,回调检查

3JslVS.png

  • 业务消息数据落库,再发送消息去消息队列
  • 第二次发送消息,延迟消息做二次确认()
  • 发送消息到下游服务
  • 下游服务发送确认信息
  • 确认消息发送到回调服务
  • 回调服务发送确认消息,并且将消息入消息库
  • 发送失败就重新发送消息

消息队列幂等性

幂等性是什么

借鉴数据库乐观锁机制,比如执行一条更新库存的SQL语句,保证执行一次和重复执行的结果一致。

在订单产生的业务高峰期,避免消息重复消费

消费端实现幂等,消息永远只会被消费一次

幂等性保障

  • 唯一id+指纹码,利用数据库主键去重
  • 利用Redis的原子性实现

唯一ID+指纹码机制

唯一ID+指纹码机制,利用数据库主键去重

好处:实现简单

缺点:高并发下有数据库写入的性能瓶颈

解决方案: 跟进ID进行分库分表进行算法路由

利用Redis原子特性实现

使用Redis进行幂等,需要考虑问题:

  1. 是否要进行数据落库,数据库和缓存如何做到原子性(同时成功或者同时失败)
  2. 如果不入库,都存储到缓存中,如何设置定时同步策略

Confirm消息确认

确认机制流程图

3JHpYn.png

  1. 在channel开启确认模式:channel.confirmSelect
  2. 在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送,或者记录日志等后续处理!

生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

//2 获取C onnection
Connection connection = connectionFactory.newConnection();

//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();


//4 指定我们的消息投递模式: 消息的确认模式
channel.confirmSelect();

String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.save";

//5 发送一条消息
String msg = "Hello RabbitMQ Send confirm message!";
//参数: exchangeName, routingKey,props,msg
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());

//6 添加一个确认监听
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------no ack!-----------");
}

@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------ack!-----------");
}
});

消费者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitmqConfig.host);
connectionFactory.setPort(RabbitmqConfig.port);
connectionFactory.setVirtualHost(RabbitmqConfig.virtualHost);

//连接参数设置
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();

//参数设置
String exchangeName = "test_confirm_exchange";
String exchangeType = "topic";
String queueName = "test_confirm_queue";
String routingKey = "confirm.#";

//参数:exchangeName, exchangeType, Durable, AutoDelete, internal, arguments
channel.exchangeDeclare(exchangeName,exchangeType,false,false,false,null);
//参数:queueName,Durable,AutoDelete,exclusive
channel.queueDeclare(queueName,false,false,false,null);
//交换机绑定队列
channel.queueBind(queueName,exchangeName,routingKey);

com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("get this message:" + s);
}
};

Return消息机制

  • Return Listener用于处理一些不可路由的消息
  • 我们的消息生产者,指定一个Exchange和Routing key,把消息送达到某一个队列中,然后消费者监听队列,进行消费处理操作
  • 某些情况下,发送的消息路由不到,这个时候需要监听不可达的消息,就要使用Return Listener
  • 在基础API中有一个关键配置项Mandatory,如果为true,监听器就会接收到路由不可达的消息进行后续处理,如果为false,那么broker端自动删除该消息。

消费端的限流

一个场景,RabbitMQ服务器有上万条数据,一个消费者客户端会出现下面的情况。

巨量消息推送,但是单个客户端无法处理这么多数据。需要限制消费速度。

关于消费端限流参数设置

默认的Qos对与客户端不限制缓冲区大小,这样可能会导致服务端接受大量消息到RAM中,导致内存溢出。

假设一条消息从生产端发送到客户端需要50ms,而客户端处理消息耗时4ms,而发送ACK到消息队列需要50ms,那么客户端处理效率为4ms/104ms=3.8%,我们需要让他100%的忙碌起来,

而在104ms的时间内,客户端可以处理104/4=26条消息,但是网络不一定是稳定的,随时可能网络船速效率减半,为了解决问题,我们通常把缓冲区大小设置为处理速度的两倍。

但是如果是在处理消息需要40ms,而把缓冲区设置为过大的话,会导致消息在队列中堆积,这样会导致处理的消息时延大大增加。

所以限流参数设置除了需要让消费者保持100%的忙碌,还需要尽可能的让客户端尽可能少地缓存消息。

消费端的手工ACK和NACK

消费端进行消费的时候,由于业务异常我们记录到日志,然后进行补偿。

由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功。

消费端的重回队列

消费端重回队列是为了对没有处理成功的消息,把消息重新发送回队列(实际上并不常用)。

TTL队列/消息

TTL

Time To Live的缩写,也就是生存时间

RabbitMQ支持消息的过期时间,在消息发送时指定

RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过队列的超时时间配置,那么消息会自动清除。

死信队列(DLX,Dead-Letter-Exchange)

利用DLX,在消息在一个队列中变成死信,它能被重新发布到另一个exchange。

消息变成死信有以下几种情况

  • 消息被拒绝,并且requeue=false
  • 消息TTL过期
  • 队列到达最大长度

死信队列

DLX也是一个正常的Exchange,和一般的Exchange没有区别,能在任何队列上被指定,实际上设置某个队列的属性。

当队列中有死信时,RabbitMQ自动的将这个消息发布到设置的Exchange上,进而被路由到另一个队列。

可以监听这个队列中消息做相应的处理,这个特性弥补RabbitMQ以前支持的immediate参数功能。

死信队列设置

死信队列的exchange和queue

  • Exchange: dlx.exchange
  • Queue: dlx.queue
  • RoutingKey:#

正常声明交换机,队列,绑定,只不过需要队列加上一个参数即可:arguments.put(“x-dead-letter-exchange”,”dlx.exchange”);

这样消息过期,requeue,队列在达到最大长度时,直接路由到死信队列!

本章小结

首先介绍了大厂在实际使用中如何保障100%的消息投递成功和幂等性,以及对RabbitMQ的确认消息,返回消息,ACK与重回队列,消息的限流,以及对超时时间,死信队列的使用。

参考mooc网RabbitMQ教程

关于限流参数设置