前言

很久没有更新自己的博客了,主要是从3月份开始工作一直都比较忙碌,自己同时也在A股和币圈里面摸爬滚打,有了更多的认知,但也交了不少学费。由于开发时间很紧张,精力一直集中于需求的实现,最近稍微有了点时间,能够对一些接触到的技术做一个review,那我们废话也不多说,直接进入主题。

再看本篇文章之前,希望大家能先思考一下下面三个问题:

  1. 你的项目为什么使用消息队列?
  2. 你们技术选型为什么是 Kafka/RocketMQ/RabbitMQ?
  3. 如果叫你设计消息队列,你会如何设计?

简介

提到MQ想必大家都不会陌生,在项目中也是进场使用的,这里借助百度百科的介绍(MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构,指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。

作用

再谈一下它的三个作用:

  • 解耦:一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理,只需要主业务或者主系统处理完成后将消息发送到MQ队列中,由有需求的子系统或其他业务订阅消费。
  • 异步:用户在和业务系统交互后,其他从属的业务通过消费MQ异步执行,缩短了业务的相应时间,提高用户体验。
  • 削峰:添加缓冲区,防止服务上线后因可能遇到的业务请求峰值而导致服务瘫痪。

这样讲显然是比较晦涩的,所以我们这里通过例子直接展开说说,让大家能够有比较深刻的认识

解耦

解耦,目的是为了防止业务系统之间太过于依赖,高耦合对程序后续的维护与扩展是极为不利的,在没有引用MQ之前系统之间通信通过调用各个业务系统对外开出的接口,不管是业务扩展或者需求的修改,都伴随着发送方或者消费者代码的增加或者修改,长此以往下去系统便会变得相当繁琐,这自然是开发者不想看到的。

在引入了MQ之后,主业务系统在处理完业务后,可以直接将消息丢入MQ,其他子系统通过订阅消费,在业务扩展时,增加的子系统也只需要继续订阅信息,同时,子系统宕机后也不会影响到主业务系统,减少了系统之间的耦合。

异步

用户和界面进行的一些交互,产生的数据自然都需要后台来处理,如果用户的操作在主系统和从属业务子系统中都需要进行入库的操作,主系统入库需要30ms,子系统1从消费数据到数据入库完成需要200ms,子系统2需要200ms,子系统3需要300ms,那加在一起的后反馈时间就是30 + 200 + 200 + 300 = 730ms,用户直接操作的页面,这种延迟是无法容忍的。在引入了MQ后,子系统在入库后生成数据丢入MQ队列,便可以直接对操作进行反馈,总的反馈时间便是30ms。

削峰

服务在生产环境上线后,每天不同的时段可能面对不同的峰谷流量,这里假设服务消费流量的能力在2000条/秒,在白天中午高峰时段用户产生的请求可能是5000条/秒,服务器依然以2000条每秒的能力处理,产生的请求远远大于服务器处理的能力,多余的消息便可以暂时存储在队列中,晚上客户睡觉之后流量会减少到20条/秒,这时服务器便能够处理掉白天积压在MQ中的请求,不会因为顺时或者短时间的流量高峰而导致服务瘫痪。

常见MQ技术

ActiveMQ

ActiveMQ 是Apache出品,能力强劲的开源消息总线,是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

这里JMS叫做Java消息服务,是Java平台上有关面向MOM的技术规范,旨在通过提供标准的产生、发送、接收和处理消息的API简化企业应用的开发,类似于JDBC的关系型数据库通信方式的抽象。

单机吞吐量:万级,吞吐量较低,没经过大规模吞吐量场景的验证

时效性:ms级

可用性:高,基于主从架构实现高可用性

消息可靠性:有较低的概率丢失数据

无分片功能:这是一个功能缺失,JMS没有规定消息中间件的集群,分片机制。如果一台服务器不能承受更多的消息,则需要横向拆分,官方不提供分片机制,需要自己实现

早起的项目使用的比较多,比如我们一个零几年的项目,但现在使用的不是很多了,社区也不是很活跃,所以目前已经不太推荐使用

RabbitMQ

RabbitMQ是一个有Erlang语言发开的AMQP得开源实现,轻巧且易于部署在云端。它支持多种消息传递协议,可以部署在分布式和联合配置中,以满足高规模高可用性需求。

AMQP叫做高级信息队列协议,他是应用层协议的一个开放标准,为面向消息的中间件设计,基于协议的客户端与消息中间件可以传递消息,并且不收产品,开发语言等条件的限制。

单机吞吐量:万级

时效性:微秒级,延时低算是RabbitMQ一大特点

可用性:高,基于主从架构或者镜像实现高可用性

消息可靠性:基本不会丢失数据

erlang开发:基于erlang语言开发,并发能力很强,性能及其好,延时很低,社区也非常活跃,

在国内一些互联网公司近几年用RabbitMQ也比较多一些 但是问题也是显而易见的,RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。 基于erlang语言也导致了很多开发者没有办法深入研究,导致其很难定制或者掌控

Kafka

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

主要应用场景是:日志收集系统和消息系统。

单机吞吐量:10万级别,这是kafka最大的优点,就是吞吐量高。一般配合大数据类的系统来进行实时数据计算、日志采集等场景,但如果Topic从几十个到几百个,吞吐量会大幅度下降,因此需要控制Topic数量

时效性:ms级

可用性:非常高,分布式架构

消息可靠性:非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用

功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准

kafka的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展同时kafka最好是支撑较少的topic数量即可,保证其超高吞吐量而且kafka唯一的一点劣势是有可能消息重复消

RocketMQ

RocketMQ是阿里开源的消息中间件,目前在Apache孵化,使用纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是简单的复制,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景,支撑了阿里多次双十一活动。

单机吞吐量:十万级,特点吞吐量非常高,topic可以达到几百个,几千个的级别,吞吐量会有较小幅度的下降,可支持大量topic是一大优势。

时效性:ms级

可用性:非常高,分布式架构

消息可靠性:经过参数优化配置,消息可以做到0丢失

MQ功能较为完善,还是分布式的,扩展性好

接口简单易用,可以做到大规模吞吐,性能也非常好,分布式扩展也很方便,社区维护还可以,可靠性和可用性都是ok的,还可以支撑大规模的topic数量,支持复杂MQ业务场景,而且一个很大的优势在于,源码是java,我们可以自己阅读源码,定制自己公司的MQ,可以掌控 ,社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准JMS规范走的有些系统要迁移需要修改大量代码

消息队列的高可用性

我们在项目中引入MQ后,能够解决流量瞬时峰值,高耦合,高延时的问题,但是同时也提高了系统的复杂性,并且各个业务模块之间通过MQ进行消息通信,如果MQ挂了,那岂不是整个服务都会瘫痪掉,所以,保证MQ的高可用性也是相当重要的。

ActiveMQ

ActiveMQ使用master-slave模式实现高可用性,提供两种实现主从模式的配置:shared nothing、shared storage(a relational database and a shared file system)

RabbitMQ

镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他 Broker 节点之上,如果集群中的一个节点失效了,队列能够自动切换到镜像中的另一个节点上以保证服务的可用性。

性能开销大,消息同步所有机器导致网络带宽压力和消耗很重;无扩展性可言,若某queue负载很重,你再加机器,新增的机器也包含了这个queue的所有数据,并没有办法线性拓展你的queue(非分布式的)

Kafka

Kafka 0.8 以后,提供了 HA 机制,就是 replica(复制品) 副本机制。每个 partition 的数据都会同步到其它机器上,形成自己的多个 replica 副本。所有 replica 会选举一个 leader 出来,那么生产和消费都跟这个 leader 打交道,然后其他 replica 就是 follower。写的时候,leader 会负责把数据同步到所有 follower 上去,读的时候就直接读 leader 上的数据即可。如果某个 broker 宕机了,没事儿,那个 broker上面的 partition 在其他机器上都有副本的,如果这上面有某个 partition 的 leader,那么此时会从 follower 中重新选举一个新的 leader 出来,大家继续读写那个新的 leader 即可。这就有所谓的高可用性了,这个和我们之前讲的Paxos有一点类似,都是解决分布式数据一致性。

RocketMQ

RocketMQ天生对集群的支持非常友好,部署方式有多个Master模式、多Master多Slave——异步复制模式、多Master多Slave——同步双写模式,可用性也是从低到高,效率会稍微降低一点,但对于整个系统解耦和可用性来说是值得的

消息队列消费幂等性

幂等性:就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等。

这里总结了很多资料,很多可行的方法,大多都是在业务逻辑上做相应的策略来避免这种情况的出现:

  • 例如数据库插入数据的时候根据主键查询一下数据是否存在,避免二次插入导致脏数据
  • 在消费数据不急着写入数据库,通过唯一键写入redis,redis的set为天然的幂等性
  • 因为不同的Message ID对应的消息内容可能相同,有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以Message ID作为处理依据。最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息Key设置。

具体的实现还是会根据不同的业务系统从而有不同的实现,总的大概思路可以总结为用唯一键去约束消费信息,就可以保证避免重复消费。

消息队列消费顺序

和幂等性的例子基本一致,如果用户产生的请求的顺序和消费的顺序不一致,也会导致很多问题,所以我们需要保证消息队列消费顺序,在前面我们提到过,消息队列,是基础数据结构中“先进先出”的一种数据结构,消息进入队列是什么顺序,那么消费便是什么顺序。但这里也有前提条件,就是消息进入的必须是同一个队列,且如果整个消费环境存在多个消费者环境也会导致消费顺序出现问题,这里我们需要保证的是,一个有顺序的消息,必然是进入同一个内存队列,由同一个消费者消费,如果想要提升整个系统的消费能力,则需要创建多个内存队列,且每个队列只对应一个消费者。

消费队列的可靠性传输

我们保证了消费的幂等性与顺序,那我们怎么去保证消息不丢失,或者消息丢失了能够有补偿机制,让消费端能够确实消费到信息,再这可以数据回滚,始终保持数据一致。

关于丢失数据,可以再和创建消费队列的时候设置消息持久化,将队列中的数据持久到磁盘,MQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。

消费端和发送端怎么确保消息确实发送到了,我觉得可以参考TCP可靠性传输的解决方案,在消息队列中也可以实现,RabbitMQ 提供的 ack 机制,简单来说,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。