消息队列选型方案 深藏阁楼爱情的钟 2023-09-28 13:37 1阅读 0赞 ### 概述 ### 消息队列是在消息的传输过程中保存消息的容器,用于接收消息并以文件的方式存储。 一个消息队列可以被一个也可以被多个消费者消费,包含以下 3 元素: * \*\*Producer:\*\*消息生产者,负责产生和发送消息到 Broker。 * \*\*Broker:\*\*消息处理中心,负责消息存储、确认、重试等,一般其中会包含多个 Queue。 * \*\*Consumer:\*\*消息消费者,负责从 Broker 中获取消息,并进行相应处理。 ![在这里插入图片描述][b053d29ad6184a9f8fc9ea2c24e24c89.png] #### 消息队列模式 #### \*\*点对点模式:\*\*多个生产者可以向同一个消息队列发送消息,一个具体的消息只能由一个消费者消费。 ![在这里插入图片描述][9e583243110b414b9e816e0ca72954ab.png] \*\*发布/订阅模式:\*\*单个消息可以被多个订阅者并发的获取和处理。 ![在这里插入图片描述][bf1fcfa9117044e2ab2a02602298bc46.png] #### 消息队列应用场景 #### * \*\*应用解耦:\*\*消息队列减少了服务之间的耦合性,不同的服务可以通过消息队列进行通信,而不用关心彼此的实现细节。 * \*\*异步处理:\*\*消息队列本身是异步的,它允许接收者在消息发送很长时间后再取回消息。 * \*\*流量削锋:\*\*当上下游系统处理能力存在差距的时候,利用消息队列做一个通用的”载体”,在下游有能力处理的时候,再进行分发与处理。 * \*\*日志处理:\*\*日志处理是指将消息队列用在日志处理中,比如 Kafka 的应用,解决大量日志传输的问题。 * \*\*消息通讯:\*\*消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯,比如实现点对点消息队列,或者聊天室等。 * \*\*消息广播:\*\*如果没有消息队列,每当一个新的业务方接入,我们都要接入一次新接口。有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,是下游的事情,无疑极大地减少了开发和联调的工作量。 常用的消息队列中间件 Kafka、RabbitMQ 和 RocketMQ,下面就一一讲解。 ### Kafka ### Apache Kafka 最初由 LinkedIn 公司基于独特的设计实现为一个分布式的提交日志系统,之后成为 Apache 项目的一部分,号称大数据的杀手锏,在数据采集、传输、存储的过程中发挥着举足轻重的作用。 它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统, #### 重要概念 #### * * \*\*主题(Topic):\*\*消息的种类称为主题,可以说一个主题代表了一类消息,相当于是对消息进行分类,主题就像是数据库中的表。 * \*\*分区(partition):\*\*主题可以被分为若干个分区,同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 Kafka 的伸缩性。 * \*\*批次:\*\*为了提高效率, 消息会分批次写入 Kafka,批次就代指的是一组消息。 * \*\*消费者群组(Consumer Group):\*\*消费者群组指的就是由一个或多个消费者组成的群体。 * \*\*Broker:\*\*一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。 * \*\*Broker 集群:\*\*broker 集群由一个或多个 broker 组成。 * **偏移量(Offset)**:Kafka的存储文件都是按照offset.kafka来命名,用Offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。 ![在这里插入图片描述][a1c3b139f5a64f47aa9676a02d8dbbbc.png] #### **架构** #### 一个典型的 Kafka 集群中包含 Producer、broker、Consumer Group、Zookeeper 集群。 Kafka 通过 Zookeeper 管理集群配置,选举 leader,以及在 Consumer Group 发生变化时进行 rebalance。 Producer 使用 push 模式将消息发布到 broker,Consumer 使用 pull 模式从 broker 订阅并消费消息。 ![在这里插入图片描述][778c68068584470f9c619c0356828f27.png] #### **工作原理** #### 消息经过序列化后,通过不同的分区策略,找到对应的分区。 相同主题和分区的消息,会被存放在同一个批次里,然后由一个独立的线程负责把它们发到 Kafka Broker 上。 ![在这里插入图片描述][dfae8c6bf76d4438a39bd19915239975.png] 分区的策略包括顺序轮询、随机轮询和 key hash 这 3 种方式,那什么是分区呢? 分区是 Kafka 读写数据的最小粒度,比如主题 A 有 15 条消息,有 5 个分区,如果采用顺序轮询的方式,15 条消息会顺序分配给这 5 个分区,后续消费的时候,也是按照分区粒度消费。 ![在这里插入图片描述][e91856ed618e49ae99727d930ac16955.png] 由于分区可以部署在多个不同的机器上,所以可以通过分区实现 Kafka 的伸缩性,比如主题 A 的 5 个分区,分别部署在 5 台机器上,如果下线一台,分区就变为 4。 Kafka 消费是通过消费群组完成,同一个消费者群组,一个消费者可以消费多个分区,但是一个分区,只能被一个消费者消费。 ![在这里插入图片描述][b9f083ac88b545cfaa6ed6caed5efae7.png] 不同的消费群组互不干涉,比如下图的 2 个消费群组,可以分别消费这 4 个分区的消息,互不影响。 ![在这里插入图片描述][5def2fe70f6d402b951ca69052f1b044.png] ### RocketMQ ### RocketMQ 是阿里开源的消息中间件,它是纯 Java 开发,具有高性能、高可靠、高实时、适合大规模分布式系统应用的特点。 RocketMQ 思路起源于 Kafka,但并不是 Kafka 的一个 Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binlog 分发等场景。 #### 重要概念 #### * **Name 服务器(NameServer):充当注册中心,类似 Kafka 中的 Zookeeper,但NameServer集群之间是没有通信**的,相对ZK来说更加**轻量**。它主要负责对于源数据的管理,包括了对于**Topic**和路由信息的管理。每个Broker在启动的时候会到NameServer注册,Producer在发送消息前会根据Topic去NameServer**获取对应Broker的路由信息**,Consumer也会定时获取 Topic 的路由信息。 * **Broker:一个独立的 RocketMQ 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量。单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时将Topic**信息注册到NameServer,顺带一提底层的通信和连接都是**基于Netty实现**的。**Broker**负责消息存储,以Topic为纬度支持轻量级的队列,单机可以支撑上万队列规模,支持消息推拉模型。官网上有数据显示:具有**上亿级消息堆积能力**,同时可**严格保证消息的有序性**。 * \*\*主题(Topic):\*\*消息的第一级类型,一条消息必须有一个 Topic。 * \*\*子主题(Tag):\*\*消息的第二级类型,同一业务模块不同目的的消息就可以用相同 Topic 和不同的 Tag 来标识。 * \*\*分组(Group):\*\*一个组可以订阅多个 Topic,包括生产者组(Producer Group)和消费者组(Consumer Group)。 * \*\*队列(Queue):\*\*可以类比 Kafka 的分区 Partition。 * **生产者(Producer)**:支持三种方式发送消息:**同步、异步和单向**单向发送 :消息发出去后,可以继续发送下一条消息或执行业务代码,不等待服务器回应,且**没有回调函数**。异步发送 :消息发出去后,可以继续发送下一条消息或执行业务代码,不等待服务器回应,**有回调函数**。同步发送 :消息发出去后,等待服务器响应成功或失败,才能继续后面的操作。 * **消费者(Consumer)**:支持 PUSH 和 PULL 两种消费模式,支持**集群消费**和**广播消费**集群消费 :该模式下一个消费者集群共同消费一个主题的多个队列,一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。广播消费 :会发给消费者组中的每一个消费者进行消费。相当于**RabbitMQ**的发布订阅模式。 * **消息偏移量(Offset)**:在RocketMQ中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset来访问,Offset为Java Long类型,64位,理论上在 100年内不会溢出,所以认为是长度无限。也可以认为Queue是一个长度无限的数组,**Offset**就是下标。 #### 工作原理 #### RockerMQ 中的消息模型就是按照主题模型所实现的,包括 Producer Group、Topic、Consumer Group 三个角色。 为了提高并发能力,一个 Topic 包含多个 Queue,生产者组根据主题将消息放入对应的 Topic,下图是采用轮询的方式找到里面的 Queue。 RockerMQ 中的消费群组和 Queue,可以类比 Kafka 中的消费群组和 Partition:不同的消费者组互不干扰,一个 Queue 只能被一个消费者消费,一个消费者可以消费多个 Queue。 消费 Queue 的过程中,通过偏移量记录消费的位置。 ![在这里插入图片描述][edf73bf881f640c9ad4c0c91129139e8.png] #### 架构 #### RocketMQ 技术架构中有四大角色 NameServer、Broker、Producer 和 Consumer,下面主要介绍 Broker。 Broker 用于存放 Queue,一个 Broker 可以配置多个 Topic,一个 Topic 中存在多个 Queue。 如果某个 Topic 消息量很大,应该给它多配置几个 Queue,并且尽量多分布在不同 broker 上,以减轻某个 broker 的压力。 Topic 消息量都比较均匀的情况下,如果某个 broker 上的队列越多,则该 broker 压力越大。 简单提一下,Broker 通过集群部署,并且提供了 master/slave 的结构,slave 定时从 master 同步数据(同步刷盘或者异步刷盘),如果 master 宕机,则 slave 提供消费服务,但是不能写入消息。 看到这里,大家应该可以发现,RocketMQ 的设计和 Kafka 真的很像! #### 延时消息 #### 开源版的RocketMQ不支持任意时间精度,仅支持特定的level,例如定时5s,10s,1min等。其中,level=0级表示不延时,level=1表示1级延时,level=2表示2级延时,以此类推。延时等级如下: > messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h #### 事务支持 #### 通过两阶段提交的方式实现事务,具体的流程: 1. 发送方向MQ发送“待确认”消息。 2. MQ将收到的“待确认”消息持久化成功后,向发送方回复消息已经发送成功了,此时第一阶段消息发送完成。 3. 发送方开始执行本地逻辑。 4. 发送方根据本地事件的执行结果向MQ发送第二次确认消息(commit或rollback)。若为commit,则将第一阶段消息标记为可投递,订阅方可收到消息;若为MQ,则删除第一阶段消息,订阅方收不到消息。 5. 若出现异常情况,步骤4提交的二次确最终为到达MQ,服务器会在固定时间段后对“待确认”消息发起回查请求。 6. 发送方收到消息回查请求后(如果发送一阶段消息的Producer不能工作,回查请求将被发送到和Producer在同一个Group里的其他Producer),通过检查对应消息的本地事件执行结果返回commit或rollback状态。 7. MQ收到回查请求后,按步骤4的逻辑处理。 由于MQ依赖将数据顺序写到磁盘这个特征来提高性能,上面步骤4却需要更改第一阶段消息的状态,这样会造成磁盘catch的脏页过多,降低系统性能,所以官方在**4.x版本后去除了该部分功能**,但系统中上层class还存在,用户还可以更具实际需求去实现事务功能。 #### 死信队列 #### 当一条消息消费失败,RocketMQ就会自动进行消息重试。而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列。死信队列的名称是 %DLQ%+ConsumGroup 。 死信队列具有以下特性: 1. 一个死信队列对应一个Group ID, 而不是对应单个消费者实例。 2. 如果一个Group ID未产生死信消息,消息队列RocketMQ不会为其创建相应的死信队列。 3. 一个死信队列包含了对应Group ID产生的所有死信消息,不论该消息属于哪个Topic。 ### RabbitMQ ### RabbitMQ 2007 年发布,是使用 Erlang 语言开发的开源消息队列系统,基于 AMQP 协议来实现。 AMQP 的主要特征是面向消息、队列、路由、可靠性、安全。AMQP 协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。 #### 重要概念 #### * **实例(Broker)** :一个RabbitMQ实例就是一个Broker * **虚拟主机(Virtual Host)** :相当于MySQL的DataBase,一个Broker上可以存在多个vhost,vhost之间相互隔离。每个vhost都拥有自己的队列、交换机、绑定和权限机制。vhost必须在连接时指定,默认的vhost是/。 * **消息队列(Queue )**:用来保存消息直到发送给消费者。它是消息的容器。一个消息可投入一个或多个队列。 * \*\*信道(Channel):\*\*消息读写等操作在信道中进行,客户端可以建立多个信道,每个信道代表一个会话任务。 * \*\*交换器(Exchange):\*\*接收消息,按照路由规则将消息路由到一个或者多个队列;如果路由不到,或者返回给生产者,或者直接丢弃。 * \*\*路由键(RoutingKey):\*\*生产者将消息发送给交换器的时候,会发送一个 RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。 * \*\*绑定(Binding):\*\*交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个 RoutingKey。 * **消息(Message)** :它是由消息头和消息体组成。消息头则包括**Routing-Key**、**Priority**(优先级)等。 #### 工作原理 #### AMQP 协议模型由三部分组成:生产者、消费者和服务端。 执行流程如下: * 生产者是连接到 Server,建立一个连接,开启一个信道。 * 生产者声明交换器和队列,设置相关属性,并通过路由键将交换器和队列进行绑定。 * 消费者也需要进行建立连接,开启信道等操作,便于接收消息。 * 生产者发送消息,发送到服务端中的虚拟主机。 * 虚拟主机中的交换器根据路由键选择路由规则,发送到不同的消息队列中。 * 订阅了消息队列的消费者就可以获取到消息,进行消费。 ![在这里插入图片描述][a6585dadac22461db7a8a439c44e398c.png] #### 常用交换器 #### RabbitMQ 常用的交换器类型有 direct、topic、fanout、headers 四种。headers匹配AMQP消息的header而不是路由键,此外headers交换器和direct交换器完全一致,但性能差很多,目前几乎用不到。 ##### direct类型 ##### 消息中的路由键routing key如果和Binding中的binding key一致,交换器就将消息发到对应的队列中去。路由键与队列名完全匹配,如果一个队列绑定到交换器要求路由键为“dog”,则只转发routing key标记为“dog”的消息,不会转发“dog.puppy”等等。**它是完全匹配、单传播的模式。** Driect exchange的路由算法非常简单:通过bindingkey的完全匹配,可以用下图来说明: ![在这里插入图片描述][3170250b1d5e4d75a4455e96816fac9e.png] ![在这里插入图片描述][5ed11730359c4513bba62bb42ad509aa.png] Exchange和两个队列绑定在一起,Q1的bindingkey是orange,Q2的binding key是black和green。 当Producer publish key是orange时,exchange会把它放到Q1上,如果是black或green就会到Q2上,其余的Message被丢弃。 ##### fanout类型 ##### 每个发到fanout类型交换器的消息都会分到所有绑定的队列上去。fanout交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。类似于子网广播,每台子网内的主机都获得了一份复制的消息。**fanout类型转发消息是最快的。** 如下图所示: ![在这里插入图片描述][a010d58010c3472e94d791597ada7f48.png] ##### topic类型 ##### topic交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:\#和\*,\#匹配0个或多个单词,*只能匹配一个单词。 对于Message的routing\_key是有限制的,不能是任意的。格式是以点号“.”分割的字符表。比如:”stock.usd.nyse”,“nyse.vmw”, “quick.orange.rabbit”。你可以放任意的key在routing\_key中,当然最长不能超过255 bytes。对于routing\_key,有两个特殊字符\#和*,\#匹配0个或多个单词,\*只能匹配一个单词。如下图所示: ![在这里插入图片描述][cff40726f38043b1942c51ea8f510536.png] ![在这里插入图片描述][1c831a2fa66546caaba6c9783b76fdaf.png] Producer发送消息时需要设置routing\_key,routing\_key包含三个单词和两个点号,第一个key描述了celerity(灵巧),第二个是color(色彩),第三个是物种。 在这里我们创建了两个绑定: Q1 的binding key 是”.orange.“; Q2 是 “…rabbit” 和 “lazy.\#”:Q1感兴趣所有orange颜色的动物;Q2感兴趣所有rabbits和所有的lazy的。 例如:rounting\_key 为 “quick.orange.rabbit”将会发送到Q1和Q2中。rounting\_key 为”lazy.orange.rabbit.hujj.ddd”会被投递到Q2中,\#匹配0个或多个单词。 #### TTL #### 消息生存时间。RabbitMQ支持消息的过期时间,一共2种。 * **在消息发送时进行指定**。通过配置消息体的 Properties ,可以指定当前消息的过期时间。 * **在创建Exchange时指定**。从进入消息队列开始计算,只要超过了队列的超时时间配置,那么消息会自动清除。 #### 消息确认机制 #### ##### 生产者消息确认机制 ##### 1、ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,**也就是只确认是否正确到达 Exchange 中。** !\[image.png\](https://img-blog.csdnimg.cn/img\_convert/44bd0d66b015ddb08fdd9a751d075253.png\#clientId=u5f061152-af28-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=167&id=ub4e7046e&margin=\[object Object\]&name=image.png&originHeight=334&originWidth=640&originalType=binary&ratio=1&rotation=0&showTitle=false&size=79543&status=done&style=none&taskId=u12b71a7e-005b-42f8-ae80-6ef00465802&title=&width=320) 2、ReturnCallback 接口,启动消息失败返回,此接口是在交换器路由不到队列时触发回调,该方法可以不使用,因为交换器和队列是在代码里绑定的,如果消息成功投递到 Broker 后几乎不存在绑定队列失败,除非你代码写错了。 ##### 消费者消息确认机制 ##### **消息接收确认**,消费者接收消息有三种不同的确认模式: 1、**AcknowledgeMode.NONE**:不确认,这是默认的模式,默认所有消息都被成功消费了,直接从队列删除消息。存在消息被消费过程中由于异常未被成功消费而掉丢失的风险。 2、**AcknowledgeMode.AUTO**:自动确认,根据消息被消费过程中是否发生异常来发送确认收到消息或拒绝消息的指令到 RabbitMQ 服务。这个确认时机开发人员是不可控的,同样存在消息丢失的风险。 3、**AcknowledgeMode.MANUAL**:手动确认,开发人员可以根据实际的业务,在合适的时机手动发送确认收到消息或拒绝消息指令到 RabbitMQ 服务,整个过程开发人是可控的。这种模式也是我们要重点介绍的。 #### 死信队列DLX #### 死信队列(DLX Dead-Letter-Exchange):当消息在一个队列中变成死信之后,它会被重新推送到另一个队列,这个队列就是死信队列。 DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。 当这个队列中有死信时,MQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。 ### MQ带来的问题以及解决方案 ### #### 保证顺序消费 #### * **RabbitMQ**:一个Queue对应一个Consumer即可解决。 * **RocketMQhash**(key)%队列数 * **Kafka**:**hash**(key)%分区数 #### 延时消费 #### * **RabbitMQ**:死信队列 + TTL引入RabbitMQ的延迟插件 * **RocketMQ**:天生支持延时消息。 * **Kafka**:专门为要延迟的消息创建一个Topic新建一个消费者去消费这个Topic消息持久化再开一个线程定时去拉取持久化的消息,放入实际要消费的Topic实际消费的消费者从实际要消费的Topic拉取消息。 ### 消息队列对比 ### #### **Kafka** #### 追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务,大型公司建议可以选用,如果有日志采集功能,肯定是首选 Kafka。 优点如下: * 高吞吐、低延迟:Kafka 最大的特点就是收发消息非常快,Kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。 * 高伸缩性:每个主题(topic)包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。 * 高稳定性:Kafka 是分布式的,一个数据多个副本,某个节点宕机,Kafka 集群能够正常工作。 * 持久性、可靠性、可回溯:Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,支持消息回溯。 * 消息有序:通过控制能够保证所有消息被消费且仅被消费一次。 * 有优秀的第三方 Kafka Web 管理界面 Kafka-Manager,在日志领域比较成熟,被多家公司和多个开源项目使用。 缺点如下: * Kafka 单机超过 64 个队列/分区,Load 会发生明显的飙高现象,队列越多,load 越高,发送消息响应时间变长。 * 不支持消息路由,不支持延迟发送,不支持消息重试。 * 社区更新较慢。 #### **RocketMQ** #### 天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。 RocketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双 11 已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择 RocketMQ。 优点如下: * 高吞吐:借鉴 Kafka 的设计,单一队列百万消息的堆积能力。 * 高伸缩性:灵活的分布式横向扩展部署架构,整体架构其实和 Kafka 很像。 * 高容错性:通过 ACK 机制,保证消息一定能正常消费。 * 持久化、可回溯:消息可以持久化到磁盘中,支持消息回溯。 * 消息有序:在一个队列中可靠的先进先出(FIFO)和严格的顺序传递。 * 支持发布/订阅和点对点消息模型,支持拉、推两种消息模式。 * 提供 docker 镜像用于隔离测试和云集群部署,提供配置、指标和监控等功能丰富的 Dashboard。 缺点如下: * 不支持消息路由,支持的客户端语言不多,目前是 java 及 c++,其中 c++ 不成熟。 * 部分支持消息有序:需要将同一类的消息 hash 到同一个队列 Queue 中,才能支持消息的顺序,如果同一类消息散落到不同的 Queue中,就不能支持消息的顺序。 * 社区活跃度一般。 #### **RabbitMQ** #### 结合 erlang 语言本身的并发优势,性能较好,社区活跃度也比较高,但是不利于做二次开发和维护,不过 RabbitMQ 的社区十分活跃,可以解决开发过程中遇到的 bug。如果你的数据量没有那么大,小公司优先选择功能比较完备的 RabbitMQ。 优点如下: * 支持几乎所有最受欢迎的编程语言:Java,C,C ++,C#,Ruby,Perl,Python,PHP 等等。 * 支持消息路由:RabbitMQ 可以通过不同的交换器支持不同种类的消息路由。 * 消息时序:通过延时队列,可以指定消息的延时时间,过期时间 TTL 等。 * 支持容错处理:通过交付重试和死信交换器(DLX)来处理消息处理故障。 * 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker。 * 社区活跃度高。 缺点如下: * Erlang 开发,很难去看懂源码,不利于做二次开发和维护,基本只能依赖于开源社区的快速维护和修复 bug。 * RabbitMQ 吞吐量会低一些,这是因为他做的实现机制比较重。 * 不支持消息有序、持久化不好、不支持消息回溯、伸缩性一般。 #### 消息队列选型 #### 选型主要侧重于以下几点: * HA:自身的高可用性保障,避免消息队列的引入而影响整体服务的可用性 * 高吞吐:在面对海量数据写入能否保持一个相对稳定、高效的数据处理能力 * 功能丰富性:是否支持延迟消息、事务消息、死信队列、优先级队列等 * 消息广播:是否支持将消息广播给消费者组或者一组消费者 * 消息堆积能力:在数据量过大时,是否允许一定消息堆积到broker * 数据持久性:数据持久化策略的采用,也决定着数据在宕机恢复后是否会丢失数据 * 重复消费:是否支持ack机制,在消费者未正确处理消息时,支持重新消费 * 消息顺序性:针对顺序消费的场景保证数据按写入时间的顺序性 这几个可以作为企业级消息中间件的代表,而 RocketMQ 在设计之初就借鉴了很多 RabbitMQ、Kafka 的设计理念,例如:Routing、多副本、顺序写(IO),也广泛应用在淘宝双十一等场景。 \*\*HA:\*\*在 HA 方面他们都是通过副本的方式,区别是 RabbitMQ 是集群级别的副本,Kafka 是多 partiton 和 ISR、选举机制,而 RocketMQ 通过多(master/slave)副本同时保障 NameServer 和 Broker。 \*\*高吞吐:\*\*Kafka 和 RocketMQ 通过直接操作文件系统,相比于 RabbitMQ,顺序写能大幅度提升数据的处理速度。 Kafka 为了进一步提升消息的吞吐量,可以采用客户端缓冲队列的方式批量发送,但也会存在宕机丢失数据的可能性,可以通过设置 batch.size 与 linger.ms 来动态调整,相比于 RocketMQ 更加灵活。 Kafka 的 partition 机制的确会带来性能的提升,但是在 Topic 不断增多的情况下,众多的 partition 及副本也将顺序写逐步退化为随机写,并且扩容时,由于 hash 值的变化,也会涉及到大量 partiton 数据的迁移。 RocketMQ 采用 commitlog 的方式实现全局写,所以能支持更多的 Topic,扩容也不涉及大量数据的迁移。 \*\*功能丰富性:\*\*Kafka 只有基础的消息类型,RabbitMQ 支持优先级队列,通过 TTL 和死信队列可以实现消息的延迟和重试,但是需要提前创建好对应重试频率的队列。 例如:1s 重试队列,10s 重试队列,RocketMQ 则内置了 18 个重试频率“1s 5s 10s 30s 1m 2m……”,另外也具有独有的 2PL 事务消息,很好的保障业务逻辑与消息发送的一致性。 \*\*重复消费:\*\*他们三者都采用 ACK 机制保障了单条消息重复消费的能力,Kafka 通过 offset 和 partition 特殊的 ttl 机制(segment 过期,按文件名顺序清理),能支持通过重置 offset 来回溯历史数据。 \*\*消息顺序性:\*\*RabbitMQ 和 RocketMQ 可以保证写入同一 topic 的顺序性,但是在多个消费者同时消费的情况下还是会出现乱序的情况。 在数据量较大的时候,我们也可以通过单个消费者消费,再按照一定的分发策略分配给多个消费者执行,只不过会提升整体复杂度,同时会带来更多的 HA、维护成本考量。 Kafka 可以保障单个 partition 的顺序性,并且每个 partiton 只允许一个消费者来消费(N:1),这就从策略上避免了多消费者的情况,在数据量较大的情况下,可以通过划分更多的 partition 提升数据处理能力。 综合来讲,RabbitMQ、RocketMQ 使用 Queue 模型,丰富的消息队列功能,更多的应用在业务场景,Kafka 基于 Streaming 模型,结合批处理、流式处理,更多的应用在大数据分析场景。 [b053d29ad6184a9f8fc9ea2c24e24c89.png]: https://img-blog.csdnimg.cn/b053d29ad6184a9f8fc9ea2c24e24c89.png [9e583243110b414b9e816e0ca72954ab.png]: https://img-blog.csdnimg.cn/9e583243110b414b9e816e0ca72954ab.png [bf1fcfa9117044e2ab2a02602298bc46.png]: https://img-blog.csdnimg.cn/bf1fcfa9117044e2ab2a02602298bc46.png [a1c3b139f5a64f47aa9676a02d8dbbbc.png]: https://img-blog.csdnimg.cn/a1c3b139f5a64f47aa9676a02d8dbbbc.png [778c68068584470f9c619c0356828f27.png]: https://img-blog.csdnimg.cn/778c68068584470f9c619c0356828f27.png [dfae8c6bf76d4438a39bd19915239975.png]: https://img-blog.csdnimg.cn/dfae8c6bf76d4438a39bd19915239975.png [e91856ed618e49ae99727d930ac16955.png]: https://img-blog.csdnimg.cn/e91856ed618e49ae99727d930ac16955.png [b9f083ac88b545cfaa6ed6caed5efae7.png]: https://img-blog.csdnimg.cn/b9f083ac88b545cfaa6ed6caed5efae7.png [5def2fe70f6d402b951ca69052f1b044.png]: https://img-blog.csdnimg.cn/5def2fe70f6d402b951ca69052f1b044.png [edf73bf881f640c9ad4c0c91129139e8.png]: https://img-blog.csdnimg.cn/edf73bf881f640c9ad4c0c91129139e8.png [a6585dadac22461db7a8a439c44e398c.png]: https://img-blog.csdnimg.cn/a6585dadac22461db7a8a439c44e398c.png [3170250b1d5e4d75a4455e96816fac9e.png]: https://img-blog.csdnimg.cn/3170250b1d5e4d75a4455e96816fac9e.png [5ed11730359c4513bba62bb42ad509aa.png]: https://img-blog.csdnimg.cn/5ed11730359c4513bba62bb42ad509aa.png [a010d58010c3472e94d791597ada7f48.png]: https://img-blog.csdnimg.cn/a010d58010c3472e94d791597ada7f48.png [cff40726f38043b1942c51ea8f510536.png]: https://img-blog.csdnimg.cn/cff40726f38043b1942c51ea8f510536.png [1c831a2fa66546caaba6c9783b76fdaf.png]: https://img-blog.csdnimg.cn/1c831a2fa66546caaba6c9783b76fdaf.png
还没有评论,来说两句吧...