消息队列以及RabbitMQ使用

消息队列

消息队列思维导图

消息队列的概念

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题。实现高性能、高可用、可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

目前在生产环境,使用较多的消息队列有Kafka、ActiveMQ、RabbitMQ、RocketMQ。

为什么使用消息队列

主要有三个业务场景:解耦,异步,削峰

解耦

如果有多个系统需要数据,通过一个发布/订阅模型来和其他系统解耦。

l1LEa6.png

异步

场景: A系统接口一个请求,需要在本地写入数据库,还需要在BCD三个系统写入。用户等待时间为ABCD四个系统写入时间总和。

使用消息队列后,A系统只要把写入消息放到队列中即可返回,写入留给队列慢慢处理。

削峰

A系统在某段时间内,比如高峰期并发请求超出系统处理能力的上限。

这个时候可以使用MQ,将消息先存到队列中,等待系统慢慢处理,将高峰期的请求分摊到低峰期。

比如一个IOT系统传感器每秒传入几千条数据,但是数据库写入速度有限,此时

消息队列的优缺点

  • 系统可用性降低:依赖于MQ,MQ挂了消息队列将不可用。
  • 系统复杂度提高,考虑:消息有没有被重复消费,处理消息丢失,保证消息传递的正确性。
  • 一致性问题:A系统处理完返回后,万一BC系统有写入不成功的情况,数据就产生不一致。

广播

基本功能之一,进行广播。没有消息队列的话,每次新增一个消息接入方都需要联调一次新接口。使用消息队列的话,就只需要关心消息是否送达到消息队列中去。

几种主流消息队列的优缺点

特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级,比 RocketMQ、Kafka 低一个数量级 同 ActiveMQ 10 万级,支撑高吞吐 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
topic 数量对吞吐量的影响 topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性 ms 级 微秒级,这是 RabbitMQ 的一大特点,延迟最低 ms 级 延迟在 ms 级以内
可用性 高,基于主从架构实现高可用 同 ActiveMQ 非常高,分布式架构 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 基本不丢 经过参数优化配置,可以做到 0 丢失 同 RocketMQ
功能支持 MQ 领域的功能极其完备 基于 erlang 开发,并发能力很强,性能极好,延时很低 MQ 功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

消息队列的高可用

RabbitMQ的高可用性

RabbitMQ分为三种模式,单机模式,普通集群模式,镜像集群模式。

单机模式

单机:Demo级别

普通集群模式

在多台机器上启动多个实例,每个机器启动一个。你创建的queue只会放在一个RabbitMQ实例上,每个实例都要同步queue元数据。每个实例同步queue的元数据(元数据可以认为是queue的一些配置信息,通过元数据,可以找到queue所在实例)。

l3S36O.md.png

普通集群模式,导致消费者每次随机连接一个数据实例然后拉取数据,要么固定连接queue所在实例消费数据。 前者有数据拉取的开销,后者导致单实例性能瓶颈

此方案主要提高了吞吐量,但是没有高可用性。

镜像集群模式

和普通集群不一样,在镜像集群模式下,无论是元数据还是queue里的消息都会存在于多个实例上,每个RabbitMQ节点都有queue的一个完整镜像,包含queue的全部数据的意思。

l3inWn.png

好处在于高可用性,一个节点挂了不影响系统的使用,但是缺点在系统的性能开销太大了。消息需要同步到所有机器上。第二,需要在分布式架构下部署,不然每个节点都包含所有数据,数据量太大。

确保消息队列的幂等性(避免消息重复消费)

在某些生产条件下,信息被消费多次,怎么保证幂等性。比如

  • 订单接口:不能创建多个订单
  • 支付接口:一次订单只能扣一次钱
  • 普通表单提交接口,多次点击提交只成功一次

常见的解决方案有:

  1. 唯一索引:防止新增脏数据
  2. token机制:防止页面重复提交
  3. 悲观锁:获取数据时加锁
  4. 乐观锁:基于版本号实现,在更新的那一刻校验数据
  5. 分布式锁:基于redis

下面简单说一下基于redis和token实现接口的幂等性校验

为了保证幂等性,每一次请求创建一个唯一标识token,先获取token并且存入redis,请求接口时,后端先判断redis里有无此token

  • 如果存在就正常处理业务逻辑并且从redis里删除token。
  • 不存在提示非法请求或者重复请求

确保消息队列的可靠性传输

以RabbitMQ为例分析消息丢失的情况,主要分为三种情况。

生产者弄丢数据

一般有两种方式确保。

一是RabbitMQ事务机制,即生产者发送数据之前开启RabbitMQ事务,如果没有被Rabbit接受到,生产者会收到异常报错,此时可以回滚事务,重新发送消息。缺点是吞吐量下降,因为事务机制是同步的。

二是RabbitMQconfirm机制,confirm机制是异步的。一般在生产者这块避免数据丢失。

RabbitMQ弄丢数据

开启RabbitMQ持久化,这样RabbitMQ挂掉之后可以恢复数据。

消费段弄丢数据

采用RabbitMQ的ack机制,必须关闭RabbitMQ的自动ack,通过一个api来调用,确保消费者处理完后再发送ack。这样如果没有ack的话RabbitMQ会把消息分配给其他消费者处理。

l3V2s1.png

消息队列确保消息的顺序性

RabbitMQ解决方案:拆分成多个queue,一个queue对应一个consumer,在消费者内部用内存队列做排队在分发给不同的worker来处理。

lJz2Of.png

设计一个消息队列的基本功能

RPC通信协议

消息队列,通信过程可以简化为两次RPC加一次转储(需要进行消费确认的情况是三次RPC)。

RPC:远程过程调用,两台服务器A,B。一个应用部署在A上,想要调用B的方法,因为不在一个内存空间,需要网络来表达调用的语义和传达调用的数据。

高可用

依赖于RPC的高可用以及存储的高可用。

服务端承载消息堆积的能力

为了满足削峰/流控/最终可达的一系列需求,把消息存储下来,选择时机投递,这个时候服务端需要有一定的消息承载能力。载体可以有很多种,在内存中,在分布式KV系统中,在磁盘中,存储在数据库。形式上主要分为两种,持久化或非持久化。但是具体怎么选择还是需要看业务的实现场景。

存储子系统的选择

从速度上来看,文件系统>分布式KV>分布式文件系统>数据库,可靠性则相反。

DB受限于存储硬件的限制,但是可靠性高。分布式KV(Mongo,Hbase, redis)在可靠性要求不是那么高的场景时,比如日志系统,也是个好选择。

消费关系解析

当消息队列初步具备了转储消息的能力,下面就要开始解析发送接受的关系,进行正确的消息投递。本质上市面 上消息队列的通信关系可以归结为单播和广播。大部分互联网的应用来说,组件广播和组内单播是最常见的情形。维护广播关系需要做的事情基本是一致的:

  • 发送关系的维护
  • 发送关系变更时的通知

可靠投递(最终一致性)

消息完全不丢,是否可能?答案是可行的,但是保证消息完全不丢,消息可能会重复,会有延迟。消息重复和消息丢失势必是需要面对一个的。

消息确认

broker把消息投递给消费者,但是消费者不一定就能处理这个消息。把消息的送达和消息的处理分开,这样才真正实现了消息队列的本质-解耦。所以为了实现复杂业务逻辑,一定是需要支持消费者主动进行ACK。

重复消息和顺序消息

顺序消息想要满足:

  • 允许消息丢失
  • 从发送方到到服务方到接收者都是单点单线程。

一般主流的消息队列的设计范式中,应该是在不丢消息的前提下,尽量的减少重复消息。

重复消息主要关注的是:

  • 鉴别重复消息,并且幂等的处理重复消息
  • 一个消息队列尽量减少重复消息的投递

对于重复消息的鉴定,每一个消息都应该有一个唯一身份,可以是业务方自己定义,也可以是根据IP/PID/时间戳生成的message ID。实现的方式可以有数据库的唯一键/bloom filter/分布式KV中的key。

幂等的处理消息,两种通用的解决方案:1版本号 2状态机

版本号

举个简单的例子,一个产品的状态有上线/下线状态,消息1是下线,消息2是上线,消息1判重失败,被投递了两次,第二次投递发生在消息2之后。此时如果每个消息都带一个版本号,并且每次只接受比当前版本号大的消息。

状态机

使用版本号也有缺点:

  1. 对发送方必须要求消息带业务版本号
  2. 下游必须存储消息的版本号,对于要严格保证顺序的。

要把乱序到来的消息都存储起来,必须对此做出处理,这样成本太高,需要保存到

RabbitMQ基本概念

AMQP协议消息通信模式

la1z8g.png

RabbitMQ使用生产者消费者概念。

生产者可以创建消息,对消息设置标签(Route key),发送消息到RabbitMQ中,消费者也可以连接到RabbitMQ中,进行订阅消息,从而实现生产者和消费者的异步通信。

Rabbit通信模型中的基本概念

信道vs连接

无论是发布还是消费消息,首先必须要连接到RabbitMQ中,此时,你的应用程序和Rabbit服务器之间创建了一条TCP连接。信道则是建立在TCP连接内的虚拟连接。

RabbitMQ基本通信模型

Simple简单队列

简单的队列模型,一个队列只有一个消费者去消费。应用场景较少,一般一个队列都会有多个消费者去消费。

Fair dispatch公平分发

这里的公平是指消费者消费消息的能力完全取决于消费者的处理能力。能者多劳,不是被mq安排消息。

消费端消费数据时,会有一个确认消费完成的动作,MQ受到消费完成的通知后,才会继续向消费者发送消息。消费者处理的慢,mq向它发送的消息就少。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Connection connection = ConnectionUtil.getConnection();

//创建通道
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//每次只发送一个消息给消费者
channel.basicQos(1);

/**
*接受消息回调
*/
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
String message = new String(body, "utf-8");
System.out.println("Consumer1->" + message);

try{
Thread.sleep(200);
}catch(InterruptedException e){
e.printStackTrace();
}
//手动通知MQ消息处理完毕
channel.basicAck(envelope.getDeliveryTag(), false);
}

// @param : fasle 关闭自动通知
channel.basicConsume(Queue_NAME, false, consumer);
}

Round-robin轮询分发

轮询分发,是公平分发的退化版,打开自动通知,去掉手动通知,去掉消费端消费条数限制。

轮询分发利用了自动通知参数,开启了自动通知,mq根据一个简单的规则,先确定好那些消息发送给那些消费者,无论消费者的处理能力如何,这些消息都得让你处理。这种模型不能很好的利用消费端的处理能力的差异,做不到真正意义上的负载均衡。而且还有可能造成大量消息堆积在消费者容器中,可能造成消息丢失,甚至压垮消费者。

publish_subscribe发布订阅模式

RabbitMQ有一个交换机Exchanges的概念,发布订阅就是通过交换机来实现的。

交换机的概念十分的简单,就是一个转发器,有了交换机之后,生产端先把消息发送给 交换机,然后交换机再把消息发送到与其绑定的消息队列中,解决了生成端如何把一条消息批量发送到多个队列的问题。

实现发布订阅的关键在于:

  • 生产端直接发送消息到交换机,而不是具体队列
  • 多个消费端将自己的消息队列绑定到同一个交换机上。

这样就实现了发布订阅模式。

routing路由模式

路由模式仅仅基于发布订阅模式做了一点点改动,在发布订阅模式中,交换机无脑向所有与之绑定的消息队列发送消息,而路由器模式对交换机做了一些限制,指定了一个route key,生产端向交换机发送消息时,指定消息的route key,消费端将消息队列绑定到交换机时,也指定了该消费队列的route key,这样一来交换机就可根据消息的routekey,将该消息转发到绑定消费该route key的消息队列。

生产端关键点

1
channel.basicPublish("交换机名称","route key", null, message.getBytes());

消费端关键点

1
channel.queueBind("交换机名称","消息队列名称","route key");

Topic主题模式

topic主题模式其实就是路由模式的一个加强: route key支持通配符。

这样的好处是消费端的routekey不用写死,增加了一个模糊匹配的功能。

RabbitMQ的可靠性

消费端的可靠性:手动通知告知消费者。

生产端的提交可靠性,可以通过mq的回调机制实现,生产端发送消息时维护一份已发送消息的集合,mq收到某条消息之后,会向生产端发送一个接受成功确认。

如果某些消息未成功到达mq,那么就不会有对应消息的确认,最终集合会有剩余元素,就是发送失败的消息,需要重新发送。

生产端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//在通道上开启消息确认模式
channel.confirmSelect();

/**
*发送消息确认回调处理
*/
channel.addConfirmListener(new ConfirmListener(){
public void handleAck(long deliveryTag, boolean multiple) throws IOException{
/**
*发送成功回调
*delveryTag就是发送消息之前getNextPublishSeqNo方法获取消息序号,通过该序号管理集合的消息
*/
public void handleNack(long deliveryTag, boolean multiple) throws IOException{
/**
* 发送失败回调
* deliveryTag就是发送消息之前getNextPublishSeqNo方法获取的消息序号,通过该序号管理集合中的消息
*/

}
}
)};
/**
* 发送消息之前先获取消息序号
* 该序号仅仅在维护channel级别,并不是全局序号,多个通道之间互相隔离
*/
long no = channel.getNextPublishSeqNo();

确认机制可以保证消息的可靠性,但是必然带来性能损失,因此到底需不需要开启生产端或者消费端的确认机制,需要根据业务场景具体分析。

RabbitMQ的注意事项

RabbitMQ中的Connection是昂贵的,但是channel是廉价的,所以在多线程的环境下,尽量创建少量的Connection,然后在每个Connection中会创建多个Channel,利用Channel实现Connection的服用,提高系统性能。

生产端发送消息的时候,同一个Channel的basic Publish方法并不是线程安全的,更加体现多Channel的重要性。如果生产端需要使用多线程发送消息,必须创建多个Channel,每一个线程单独使用一个Channel。线程数量过多的时候需要通过Channel Pool的思路去控制并发。

对于同一个Channel而言,发送消息和接受消息是互不影响,可以进行并发操作。

参考文献

什么是消息队列

消息队列的使用场景

如何保证消息队列的高可用

Rabbit消息模型

RabbitMQ概念与模型

消息队列设计精要