RabbitMQ 消息的可靠投递 曾经终败给现在 2022-12-13 04:33 206阅读 0赞 ### 目录 ### * * * 概述 * 方案一 消息状态打标 * 方案二 延时再次投递消息 * 说明 * 幂等性 * confirm 消息确认机制 * return消息机制 * 消费端限流(Qos机制) * 消费端的手动ACK、NACK、重回队列 * TTL 消息的有效期 * 死信队列 * * 方案一demo ### 概述 ### **可靠性投递** * 保障消息能够成功发出 * 保 障rabbitmq(broker)能够成功接收。接收指的是:broker接收到生产者发送的消息,放到exchange中,分发给对应的queue,交付给对应的消费者。 * 发送端要收到broker的确认应答,确认broker已收到|处理消息 * 完善的消息补偿机制。发送端没收到broker的确认应答,不知道消息是否成功投递成功,这时候就需要做一些补偿处理,比如重新投递。 rabbitmq的server又叫做broker,接收客户端的连接,实现AMQP实体服务,包含exchange、queue等多种组件。 说白了,broker就是rabbitmq服务器。 **实现消息可靠性投递的、常见的2种解决方案** * 消息状态入库,对消息状态进行打标 * 延时再次投递消息,做二次确认,回调检查 ### 方案一 消息状态打标 ### ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NoeV8xODg4MzcwMTE2MQ_size_16_color_FFFFFF_t_70] 1、生产者将业务数据入库、消息数据(状态)入库。比如有张msg\_log表,消息的status初始状态为0,设置一个重试时间字段try\_time,记录下次应该重试的时间。 2、生产者发送消息到broker 3、生产者监听queue 4、生产者收到确认ack应答后将数据库中的消息状态修改为1 5、分布式定时任务拉取数据库中status为0的消息。定时任务要是分布式的,不然可能出现多个定时任务同时执行的问题。 6、重新发送这些消息 7、如果重试次数达到指定值后消息还未发送成功,将状态改为2 8、人工处理状态为2的消息 **方案一存在的问题** * 生产者要执行定时任务,有额外的开销, * 生产者要进行2方面的数据库IO操作(业务数据+消息数据),IO操作很花时间,在高并发的情况下,数据库性能很容易成为系统性能的瓶颈。 * 最开始的业务数据、消息数据入库涉及事务,会拉低性能 并发量大的情况下,第一种方案严重拉低生产者的性能。 相比之下,第二种方案用得更多、性能更好,但稍微复杂一点。 ### 方案二 延时再次投递消息 ### ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NoeV8xODg4MzcwMTE2MQ_size_16_color_FFFFFF_t_70 1] 消息生产者 -> broker -> 消息消费者,生产者是upstream service(上游服务),消费者是downstream service(下游服务),callback service 回调服务。 1、生产者将业务数据入库 2、生产者将消息发送给broker,broker将消息放到对应的queue1中 3、消费者监听queue1,处理消息,处理一条消息后生成一条状态消息作为确认(绑定queue2),比如以Order的id作为新消息,总之要能唯一标识处理的消息。 3、消费者将生成的消息发送给broker,broker将消息放到queue2中 4、单独写一个callback service(回调服务)作为消费者,监听queue2,把queue2中的消息状态数据入库 5、生产者发送消息后,延时再次发送这条消息(绑定queue3),比如3min|5min后再次发送这条消息。 6、回调服务监听queue3,检查queue3中的消息是否已被消费(查询消息数据库)。如果消息未发送成功,回调服务rpc远程调用生产者(传递order id),生产者从数据库查询该条消息的数据(order对象),重新投递(queue1)。 第一种方案消费者使用数据包来确认应答(ack),第二种由消费者自己产生一条消息来确认应答。 **方案二的优缺点** 相较于第一种方案,第二种方案多写一个服务,每对生产者——消费者都使用一个额外的queue来确认,略微复杂些;也可以使用全局的唯一id,这样所有的生产者——消费者都可以使用一个queue来确认。 部署回调服务又要使用、维护额外的机器,成本变高了。 但生产者的数据库IO操作减少了,提升了性能。只要性能上去,稍微增加点成本完全可以接受。 ### 说明 ### (1)数据入库后才发送消息到broker,注意顺序 (2)分布式事务对性能的影响很大,并发量中小的可以加事务,如果并发量很大,事务会严重拉低性能,不要加事务,用其它方案做补偿 (3)不管是第一种、还是第二种,都很难做到100%的投递成功。优先考虑能够扛得住高并发(性能),在保证性能的前提下尽可能提高消息投递的可靠性。定时任务+人工检查,补偿失败的消息。 ### 幂等性 ### 消费者可能会收到重复的消息,消费者需要实现幂等性,保证消息不被重复消费。 实现幂等性常见的2种方式 * 唯一id+指纹码作为主键,利用数据库主键去重。比如用户对这个订单做了多个处理(支付、发起退款),不能以order\_id作为主键,得拼接一个时间戳之类的指纹码。可以直接拼接作为主键字段,也可以搞成2个字段联合主键的形式。缺点是:要操作数据库,数据库性能容易成为系统性能的瓶颈。 * 利用redis操作的原子性实现幂等。将一中的主键存储到redis上。要考虑的问题:1、redis存储的数据要不要落库?2、redis、数据库的数据一致性问题 ### confirm 消息确认机制 ### ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NoeV8xODg4MzcwMTE2MQ_size_16_color_FFFFFF_t_70 2] 生产者投递消息到broker,异步监听broker的确认应答。 broker收到生产者投递的消息后,会给生产者一个应答,确认消息已投递到broker。 ### return消息机制 ### ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NoeV8xODg4MzcwMTE2MQ_size_16_color_FFFFFF_t_70 3] 消息在投递到broker后,可能指定exchange、queue不存在,或routing key匹配不了任何队列,导致一些消息不能被路由到指定队列中,使用return消息机制后,broker会把不可路由的消息返回给生产者,便于生产者处理。 ### 消费端限流(Qos机制) ### 消费端一般要限制从broker拉取的消息数量,防止大量消息涌入消费者,冲垮消费者。 prefetch、batch-size? ### 消费端的手动ACK、NACK、重回队列 ### ACK是确认签收,可以自动签收——消费者收到消息就自动签收,不管后续消息是否消费成功,一般使用手动签收——消息消费成功才签收。 签收后broker才会从queue中删除对应的消息。 拒绝签收可以使用reject、unack,reject一次只能拒绝签收单条消息,unack可以批量拒收多条消息。 拒绝签收可以指定此消息是否重回队列,重新投递给消费者。重回队列可能仍会出现消息消费失败的问题,导致一直重回队列、反复投递给消费者,一般不重回,使用限制了重新发送次数的定时任务、人工处理做补偿。 ### TTL 消息的有效期 ### 有些消息的时效性很强,如果指定时间内没有投递给消费者,就没有必要投递了。 TTL,Time To Live,即消息的有效期,从消息进入queue开始计时,如果在TTL指定的时间内没有投递消费者,会从queue中清除这条消息。 TTL可以在创建队列时在Arguments(其它参数)中设置x-message-ttl,设置的是此队列中所有消息的TTL;也可以在生产者发送消息时在消息的Properties中设置expiration,设置的是当前消息的TTL。默认单位ms。 创建队列时可以设置很多其它参数,比如queue的最大长度。 ### 死信队列 ### 死信:dead message,消息成为死信的几种情况 * 消费者reject或unack拒收了消息,并且设置了不重回队列 * 消息ttl过期 * queue达到最大长度 死信队列:消息成为死信之后,会被broker自动投放到死信交换机中,从而被死信交换机投放到死信队列中。可以坚听这个死信队列,处理其中的死信。 使用流程 * 创建死信的交换机:dlx.exchange * 创建死信的队列:dlx.queue,创建时需要在其它参数中用x-dead-letter-exchange指定使用的死信交换机 * routing key: \# * 绑定死信交换机、死信队列 死信交换机、死信队列名称随意 ### ### ### 方案一demo ### **pom** 引入mybatis的依赖 **yml** 原来的+mybatis的配置 **表结构** ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NoeV8xODg4MzcwMTE2MQ_size_16_color_FFFFFF_t_70 4] 记录表存储了消息内容,重新发送时取处内容反序列化得到Order对象,重新发送order对象。 记录了下次应该重试的时间,定时任务拉取时要检查状态=0 && 下次重试时间是在当前时间之前 才拉取 [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NoeV8xODg4MzcwMTE2MQ_size_16_color_FFFFFF_t_70]: /images/20221123/28cf022fa95e4b3c80d1bfd64ff85a3f.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NoeV8xODg4MzcwMTE2MQ_size_16_color_FFFFFF_t_70 1]: /images/20221123/0007f1184e894c95a8f43ed3784a1758.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NoeV8xODg4MzcwMTE2MQ_size_16_color_FFFFFF_t_70 2]: /images/20221123/cead9e4bc1024f4ca9fb9a518a80d3db.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NoeV8xODg4MzcwMTE2MQ_size_16_color_FFFFFF_t_70 3]: /images/20221123/34ddd8df15e8472097efedba78e0c836.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NoeV8xODg4MzcwMTE2MQ_size_16_color_FFFFFF_t_70 4]: /images/20221123/fb889ab307e54054a07df09776a7f6c3.png
还没有评论,来说两句吧...