RabbitMQ的消息确认ACK机制

冷不防 2023-10-05 18:08 104阅读 0赞

1、什么是消息确认ACK。

  答:如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不会丢失,RabbitMQ支持消息确定-ACK。

2、ACK的消息确认机制。

  答:ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。

    如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中。
    如果在集群的情况下,RabbitMQ会立即将这个消息推送给这个在线的其他消费者。这种机制保证了在消费者服务端故障的时候,不丢失任何消息和任务。
    消息永远不会从RabbitMQ中删除,只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
    消息的ACK确认机制默认是打开的。

3、ACK机制的开发注意事项。

  答:如果忘记了ACK,那么后果很严重。当Consumer退出时候,Message会一直重新分发。然后RabbitMQ会占用越来越多的内容,由于RabbitMQ会长时间运行,因此这个”内存泄漏”是致命的。

4、结合项目实例进行,理解一下ACK机制。之前写过RabbitMQ的交换器Exchange之direct(发布与订阅 完全匹配),这里借助这个进行消息持久化测试。生产者的代码不发生改变。控制层的触发生产者生产消息,这里只生产一条消息。方便观察现象。

复制代码

  1. 1 package com.example.bie.controller;
  2. 2
  3. 3 import org.springframework.beans.factory.annotation.Autowired;
  4. 4 import org.springframework.stereotype.Controller;
  5. 5 import org.springframework.web.bind.annotation.RequestMapping;
  6. 6 import org.springframework.web.bind.annotation.ResponseBody;
  7. 7
  8. 8 import com.example.bie.provider.RabbitMqLogErrorProduce;
  9. 9 import com.example.bie.provider.RabbitMqLogInfoProduce;
  10. 10
  11. 11 /**
  12. 12 *
  13. 13 * @author biehl
  14. 14 *
  15. 15 */
  16. 16 @Controller
  17. 17 public class RabbitmqController {
  18. 18
  19. 19 @Autowired
  20. 20 private RabbitMqLogInfoProduce rabbitMqLogInfoProduce;
  21. 21
  22. 22 @Autowired
  23. 23 private RabbitMqLogErrorProduce rabbitMqLogErrorProduce;
  24. 24
  25. 25 @RequestMapping(value = "/logInfo")
  26. 26 @ResponseBody
  27. 27 public String rabbitmqSendLogInfoMessage() {
  28. 28 String msg = "生产者===>生者的LogInfo消息message: ";
  29. 29 for (int i = 0; i < 1; i++) {
  30. 30 rabbitMqLogInfoProduce.producer(msg + i);
  31. 31 }
  32. 32 return "生产===> LogInfo消息message ===> success!!!";
  33. 33 }
  34. 34
  35. 35 @RequestMapping(value = "/logError")
  36. 36 @ResponseBody
  37. 37 public String rabbitmqSendLogErrorMessage() {
  38. 38 String msg = "生产者===>生者的LogError消息message: ";
  39. 39 for (int i = 0; i < 1; i++) {
  40. 40 rabbitMqLogErrorProduce.producer(msg + i);
  41. 41 }
  42. 42 return "生产===> LogError消息message ===> success!!!";
  43. 43 }
  44. 44
  45. 45 }

复制代码

消费者消费消息,打印输出后面手动抛出运行时异常,观察现象。

复制代码

  1. 1 package com.example.bie.consumer;
  2. 2
  3. 3 import org.springframework.amqp.core.ExchangeTypes;
  4. 4 import org.springframework.amqp.rabbit.annotation.Exchange;
  5. 5 import org.springframework.amqp.rabbit.annotation.Queue;
  6. 6 import org.springframework.amqp.rabbit.annotation.QueueBinding;
  7. 7 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  8. 8 import org.springframework.amqp.rabbit.annotation.RabbitListener;
  9. 9 import org.springframework.stereotype.Component;
  10. 10
  11. 11 /**
  12. 12 *
  13. 13 * @author biehl
  14. 14 *
  15. 15 * 消息接收者
  16. 16 *
  17. 17 * 1、@RabbitListener bindings:绑定队列
  18. 18 *
  19. 19 * 2、@QueueBinding
  20. 20 * value:绑定队列的名称、exchange:配置交换器、key:路由键routing-key绑定队列和交换器
  21. 21 *
  22. 22 * 3、@Queue value:配置队列名称、autoDelete:是否是一个可删除的临时队列
  23. 23 *
  24. 24 * 4、@Exchange value:为交换器起个名称、type:指定具体的交换器类型
  25. 25 *
  26. 26 *
  27. 27 */
  28. 28 @Component
  29. 29 @RabbitListener(bindings = @QueueBinding(
  30. 30
  31. 31 value = @Queue(value = "${rabbitmq.config.queue.error}", autoDelete = "true"),
  32. 32
  33. 33 exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.DIRECT),
  34. 34
  35. 35 key = "${rabbitmq.config.queue.error.routing.key}"))
  36. 36 public class LogErrorConsumer {
  37. 37
  38. 38 /**
  39. 39 * 接收消息的方法,采用消息队列监听机制.
  40. 40 *
  41. 41 * @RabbitHandler意思是将注解@RabbitListener配置到类上面
  42. 42 *
  43. 43 * @RabbitHandler是指定这个方法可以进行消息的接收并且消费.
  44. 44 *
  45. 45 * @param msg
  46. 46 */
  47. 47 @RabbitHandler
  48. 48 public void consumer(String msg) {
  49. 49 // 打印消息
  50. 50 System.out.println("ERROR消费者===>消费<===消息message: " + msg);
  51. 51 throw new RuntimeException();
  52. 52 }
  53. 53
  54. 54 }

复制代码

观察现象,如下所示:

在RabbitMQ的浏览器界面,可以看到一条消息未被进行ACK的消息确认机制,这条消息被锁定Unacked,所以一直在控制台进行报错。

58c029ebeebd9be0f0389e0e71c21795.png

控制台效果如下所示,一直进行消息的发送,因为消费方一直没有返回ACK确认,RabbitMQ认为消息未进行正常的消费,会将消息再次放入到队列中,再次让你消费,但是还是没有返回ACK确认,依次循环,形成了死循环。

038e4dbea58994dad2a6181740761fad.png

如何解决问题呢,如果消息发送的时候,程序出现异常,后果很严重的,会导致内存泄漏的,所以在程序处理中可以进行异常捕获,保证消费者的程序正常执行,这里不进行介绍了。第二种方式可以使用RabbitMQ的ack确认机制。开启重试,然后重试次数,默认为3次。这里设置为5次。

复制代码

  1. 1 # 给当前项目起名称.
  2. 2 spring.application.name=rabbitmq-ack-direct-consumer
  3. 3
  4. 4 # 配置端口号
  5. 5 server.port=8080
  6. 6
  7. 7 # 配置rabbitmq的参数.
  8. 8 # rabbitmq服务器的ip地址.
  9. 9 spring.rabbitmq.host=192.168.110.133
  10. 10 # rabbitmq的端口号5672,区别于浏览器访问界面的15672端口号.
  11. 11 spring.rabbitmq.port=5672
  12. 12 # rabbitmq的账号.
  13. 13 spring.rabbitmq.username=guest
  14. 14 # rabbitmq的密码.
  15. 15 spring.rabbitmq.password=guest
  16. 16
  17. 17 # 设置交换器的名称,方便修改.
  18. 18 # 路由键是将交换器和队列进行绑定的,队列通过路由键绑定到交换器.
  19. 19 rabbitmq.config.exchange=log.exchange.direct
  20. 20
  21. 21 # info级别的队列名称.
  22. 22 rabbitmq.config.queue.info=log.info.queue
  23. 23 # info的路由键.
  24. 24 rabbitmq.config.queue.info.routing.key=log.info.routing.key
  25. 25
  26. 26 # error级别的队列名称.
  27. 27 rabbitmq.config.queue.error=log.error.queue
  28. 28 # error的路由键.
  29. 29 rabbitmq.config.queue.error.routing.key=log.error.routing.key
  30. 30
  31. 31 # 开启重试
  32. 32 spring.rabbitmq.listener.simple.retry.enabled=true
  33. 33 # 重试次数,默认为3次
  34. 34 spring.rabbitmq.listener.simple.retry.max-attempts=5

复制代码

效果如下所示:

可以看到控制台尝试了5次以后就不再进行重试了。

b13b84e5efeec8f30f9dd96407805c93.png

RabbitMQ的界面可以看到,开始的效果和上面的一致,但是5次尝试以后,就变成了0条。RabbitMQ将这条消息丢弃了。

f18881a404770d1911282ec785e7f2c9.png

来源:https://www.cnblogs.com/biehongli/p/11789098.html

发表评论

表情:
评论列表 (有 0 条评论,104人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RabbitMQ消息确认ACK机制

    1、什么是消息确认ACK。   答:如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不