RabbitMQ消息重复消费场景及解决方案

桃扇骨 2022-11-06 11:59 372阅读 0赞

前言

上一篇文章介绍了springboot如何整合RabbitMQ:https://blog.csdn.net/chenping1993/article/details/114301341

这里介绍一下RabbitMQ重复消费的场景,以及如何解决消息重复消费的问题。

目录

消息重复消费:

MQ的一条消息被消费者消费了多次:

重复消费场景重现测试:

如何解决消息重复消费的问题:

编码:

解决消息重复消费测试:


消息重复消费:

什么是消息重复消费?首先我们来看一下消息的传输流程。消息生产者—》MQ—》消息消费者;消息生产者发送消息到MQ服务器,MQ服务器存储消息,消息消费者监听MQ的消息,发现有消息就消费消息。

所以消息重复也就出现在两个阶段1、生产者多发送了消息给MQ;2、MQ的一条消息被消费者消费了多次。第一种场景很好控制,只要保证消息生成者不重复发送消息给MQ即可。我们着重来看一下第二个场景。

MQ的一条消息被消费者消费了多次

在保证MQ消息不重复的情况下,消费者消费消息成功后,在给MQ发送消息确认的时候出现了网络异常(或者是服务中断),MQ没有接收到确认,此时MQ不会将发送的消息删除,
为了保证消息被消费,当消费者网络稳定后,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。

重复消费场景重现测试

1、消息发送者发送1万条消息给MQ:

  1. @GetMapping("/rabbitmq/sendToClient")
  2. public String sendToClient() {
  3. String message = "server message sendToClient";
  4. for (int i = 0; i < 10000; i++) {
  5. amqpTemplate.convertAndSend("queueName3",message+": "+i);
  6. }
  7. return message;
  8. }

启动消息发送服务,调用接口发送消息,mq成功收到1万条消息。

2、消费者监听消费消息:

  1. @RabbitListener(queues = "queueName3")//发送的队列名称 @RabbitListener注解到类和方法都可以
  2. @RabbitHandler
  3. public void receiveMessage(String message) {
  4. System.out.println("接收者2--接收到queueName3队列的消息为:"+message);
  5. }

启动消费者服务,然后中断消费服务,此时消费到了第7913个消息:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NoZW5waW5nMTk5Mw_size_16_color_FFFFFF_t_70

此时查看MQ的消息,现在MQ队列中应该还有2087个消息,但还有2088个消息,说明最后一个消息被消费了没有被MQ服务确认。

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NoZW5waW5nMTk5Mw_size_16_color_FFFFFF_t_70 1

再次启动消费者服务,消息从第7913个消息开始消费,而不是第7914个消息

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NoZW5waW5nMTk5Mw_size_16_color_FFFFFF_t_70 2

如何解决消息重复消费的问题:

为了保证消息不被重复消费,首先要保证每个消息是唯一的,所以可以给每一个消息携带一个全局唯一的id,流程如下

1、消费者监听到消息后获取id,先去查询这个id是否存中

2、如果不存在,则正常消费消息,并把消息的id存入 数据库或者redis中(下面的编码示例使用redis)

3、如果存在则丢弃此消息

编码:

消息生产者服务

  1. /**
  2. * @Description: 发送消息 模拟消息重复消费
  3. * 消息重复消费情景:消息生产者已把消息发送到mq,消息消费者在消息消费的过程中突然因为网络原因或者其他原因导致消息消费中断
  4. * 消费者消费成功后,在给MQ确认的时候出现了网络波动,MQ没有接收到确认,
  5. * 为了保证消息被消费,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息
  6. * @param:
  7. * @return: java.lang.String
  8. * @Author: chenping
  9. * @Date: 2021/3/5 17:25
  10. */
  11. @GetMapping("/rabbitmq/sendMsgNoRepeat")
  12. public String sendMsgNoRepeat() {
  13. String message = "server message sendMsgNoRepeat";
  14. for (int i = 0; i <10000 ; i++) {
  15. Message msg = MessageBuilder.withBody((message+"--"+i).getBytes()).setMessageId(UUID.randomUUID()+"").build();
  16. amqpTemplate.convertAndSend("queueName4",msg);
  17. }
  18. return message;
  19. }

消息消费者服务

将id存入string中(单消费者场景):

  1. @RabbitListener(queues = "queueName4")//发送的队列名称 @RabbitListener注解到类和方法都可以
  2. @RabbitHandler
  3. public void receiveMessage(Message message) throws UnsupportedEncodingException {
  4. String messageId = message.getMessageProperties().getMessageId();
  5. String msg = new String(message.getBody(),"utf-8");
  6. String messageRedisValue = redisUtil.get("queueName4","");
  7. if (messageRedisValue.equals(messageId)) {
  8. return;
  9. }
  10. System.out.println("消息:"+msg+", id:"+messageId);
  11. redisUtil.set("queueName4",messageId);//以队列为key,id为value
  12. }

将id存入list中(多消费者场景):

  1. @RabbitListener(queues = "queueName4")//发送的队列名称 @RabbitListener注解到类和方法都可以
  2. @RabbitHandler
  3. public void receiveMessage1(Message message) throws UnsupportedEncodingException {
  4. String messageId = message.getMessageProperties().getMessageId();
  5. String msg = new String(message.getBody(),"utf-8");
  6. List<String> messageRedisValue = redisUtil.lrange("queueName4");
  7. if (messageRedisValue.contains(messageId)) {
  8. return;
  9. }
  10. System.out.println("消息:"+msg+", id:"+messageId);
  11. redisUtil.lpush("queueName4",messageId);//存入list
  12. }

#

解决消息重复消费测试:

首先,启动消息生成服务,发送一万条消息

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NoZW5waW5nMTk5Mw_size_16_color_FFFFFF_t_70 3

启动消息消费服务,然后中断服务,消费了1934条消息

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NoZW5waW5nMTk5Mw_size_16_color_FFFFFF_t_70 4

查看未被消费的消息条数为8067条,多了一条(10000-1934=8066 )

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NoZW5waW5nMTk5Mw_size_16_color_FFFFFF_t_70 5

再次启动消费者服务,消费者舍弃了已被消费的第1934条消息

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NoZW5waW5nMTk5Mw_size_16_color_FFFFFF_t_70 6

发表评论

表情:
评论列表 (有 0 条评论,372人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RabbitMQ消息重复消费问题

    业务背景 消息队列在数据传输的过程中,为了保证消息传递的可靠性,一般会对消息采用ack确认机制,如果消息传递失败,消息队列会进行重试,此时便可能存在消息重复消费的问题。