RocketMQ——顺序消费(代码)

柔情只为你懂 2021-09-27 01:25 422阅读 0赞

关于rocketmq顺序消费的理解和图示可以查看该博文:RocketMQ——顺序消费和重复消费

本博客主要是以代码示例来了解顺序消费的相关内容,建议在此之前先了解下顺序消费的原理。

注:RocketMQ可以严格的保证消息有序,但这个顺序,不是全局顺序,只是分区(queue)顺序,如果想要全局顺序,那么需要保证只有一个分区。

顺序消费简介

1.普通顺序消费

顺序消费的一种,正常情况下可以保证完全的顺序消息,但是一旦发生通信异常,Broker重启,由于队列总数法还是能变化,哈希取模后定位的队列会变化,产生短暂的消息顺序不一致。

2.严格顺序消息

顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式failover特性,即broker集群中要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主避免,不过仍然会存在几分钟的服务不可用。

目前已知的应用只有数据库binlog同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息。

1.producer

  1. package com.gwd.rocketmq;
  2. import java.io.IOException;
  3. import java.text.SimpleDateFormat;
  4. import java.util.ArrayList;
  5. import java.util.Date;
  6. import java.util.List;
  7. import com.alibaba.rocketmq.client.exception.MQBrokerException;
  8. import com.alibaba.rocketmq.client.exception.MQClientException;
  9. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
  10. import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
  11. import com.alibaba.rocketmq.client.producer.SendResult;
  12. import com.alibaba.rocketmq.common.message.Message;
  13. import com.alibaba.rocketmq.common.message.MessageQueue;
  14. import com.alibaba.rocketmq.remoting.exception.RemotingException;
  15. /**
  16. * @FileName Producer.java
  17. * @Description:
  18. * @author gu.weidong
  19. * @version V1.0
  20. * @createtime 2018年7月3日 上午9:59:38
  21. * 修改历史:
  22. * 时间 作者 版本 描述
  23. *====================================================
  24. *
  25. */
  26. /**
  27. * Producer,发送顺序消息
  28. */
  29. public class Producer {
  30. public static void main(String[] args) throws IOException {
  31. try {
  32. DefaultMQProducer producer = new DefaultMQProducer("sequence_producer");
  33. producer.setNamesrvAddr("192.168.159.128:9876;192.168.159.129:9876");
  34. producer.start();
  35. String[] tags = new String[] { "TagA", "TagC", "TagD" };
  36. // 订单列表
  37. List<OrderDO> orderList = new Producer().buildOrders();
  38. Date date = new Date();
  39. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  40. String dateStr = sdf.format(date);
  41. for (int i = 0; i < 10; i++) {
  42. // 加个时间后缀
  43. String body = dateStr + " Hello RocketMQ " + orderList.get(i).getOrderId()+orderList.get(i).getDesc();
  44. Message msg = new Message("SequenceTopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
  45. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
  46. @Override
  47. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  48. Long id = Long.valueOf((String)arg);
  49. long index = id % mqs.size();
  50. return mqs.get((int)index);
  51. }
  52. }, orderList.get(i).getOrderId());//通过订单id来获取对应的messagequeue
  53. System.out.println(sendResult + ", body:" + body);
  54. }
  55. producer.shutdown();
  56. } catch (MQClientException e) {
  57. e.printStackTrace();
  58. } catch (RemotingException e) {
  59. e.printStackTrace();
  60. } catch (MQBrokerException e) {
  61. e.printStackTrace();
  62. } catch (InterruptedException e) {
  63. e.printStackTrace();
  64. }
  65. System.in.read();
  66. }
  67. /**
  68. * 生成模拟订单数据
  69. */
  70. private List<OrderDO> buildOrders() {
  71. List<OrderDO> orderList = new ArrayList<OrderDO>();
  72. OrderDO OrderDO = new OrderDO();
  73. OrderDO.setOrderId("15103111039");
  74. OrderDO.setDesc("创建");
  75. orderList.add(OrderDO);
  76. OrderDO = new OrderDO();
  77. OrderDO.setOrderId("15103111065");
  78. OrderDO.setDesc("创建");
  79. orderList.add(OrderDO);
  80. OrderDO = new OrderDO();
  81. OrderDO.setOrderId("15103111039");
  82. OrderDO.setDesc("付款");
  83. orderList.add(OrderDO);
  84. OrderDO = new OrderDO();
  85. OrderDO.setOrderId("15103117235");
  86. OrderDO.setDesc("创建");
  87. orderList.add(OrderDO);
  88. OrderDO = new OrderDO();
  89. OrderDO.setOrderId("15103111065");
  90. OrderDO.setDesc("付款");
  91. orderList.add(OrderDO);
  92. OrderDO = new OrderDO();
  93. OrderDO.setOrderId("15103117235");
  94. OrderDO.setDesc("付款");
  95. orderList.add(OrderDO);
  96. OrderDO = new OrderDO();
  97. OrderDO.setOrderId("15103111065");
  98. OrderDO.setDesc("完成");
  99. orderList.add(OrderDO);
  100. OrderDO = new OrderDO();
  101. OrderDO.setOrderId("15103111039");
  102. OrderDO.setDesc("推送");
  103. orderList.add(OrderDO);
  104. OrderDO = new OrderDO();
  105. OrderDO.setOrderId("15103117235");
  106. OrderDO.setDesc("完成");
  107. orderList.add(OrderDO);
  108. OrderDO = new OrderDO();
  109. OrderDO.setOrderId("15103111039");
  110. OrderDO.setDesc("完成");
  111. orderList.add(OrderDO);
  112. return orderList;
  113. }
  114. }

此处需要注意,producer.send(msg, new MessageQueueSelector()),如果需要全局有序,只需要使new MessageQueueSelector().select(List mqs, Message msg, Object arg)方法返回值唯一且不变,例如:

  1. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
  2. @Override
  3. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  4. Long id = Long.valueOf((String)arg);
  5. long index = id % mqs.size();
  6. return mqs.get((int)index);
  7. }
  8. }, orderList.get(0).getOrderId());//通过订单id来获取对应的messagequeue

这边获取到的queue永远都是唯一的且确定的(此处只是举个简单的例子,orderList.get(i).getOrderId()改为0亦可)

2.错误的Consumer

  1. package com.gwd.rocketmq;
  2. import java.util.List;
  3. import java.util.Random;
  4. import java.util.concurrent.TimeUnit;
  5. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
  6. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  7. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  8. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  9. import com.alibaba.rocketmq.client.exception.MQClientException;
  10. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
  11. import com.alibaba.rocketmq.common.message.MessageExt;
  12. /**
  13. * @FileName WrongConsumer.java
  14. * @Description:
  15. * @author gu.weidong
  16. * @version V1.0
  17. * @createtime 2018年7月3日 下午3:13:16
  18. * 修改历史:
  19. * 时间 作者 版本 描述
  20. *====================================================
  21. *
  22. */
  23. public class WrongConsumer {
  24. public static void main(String[] args) throws MQClientException {
  25. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
  26. consumer.setNamesrvAddr("192.168.159.128:9876;192.168.159.129:9876");
  27. /**
  28. * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
  29. * 如果非第一次启动,那么按照上次消费的位置继续消费
  30. */
  31. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  32. consumer.subscribe("SequenceTopicTest", "TagA || TagC || TagD");
  33. consumer.registerMessageListener(new MessageListenerConcurrently() {
  34. Random random = new Random();
  35. @Override
  36. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  37. System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );
  38. for (MessageExt msg: msgs) {
  39. System.out.println(msg + ", content:" + new String(msg.getBody()));
  40. }
  41. try {
  42. //模拟业务逻辑处理中...
  43. TimeUnit.SECONDS.sleep(random.nextInt(10));
  44. } catch (Exception e) {
  45. e.printStackTrace();
  46. }
  47. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  48. }
  49. });
  50. consumer.start();
  51. System.out.println("Consumer Started.");
  52. }
  53. }

注意:要想要有顺序,那么这边吃监听器就不能是MessageListenerConcurrently了,其显示效果如下:

70

3.正确的Consumer

  1. package com.gwd.rocketmq;
  2. import java.util.List;
  3. import java.util.Random;
  4. import java.util.concurrent.TimeUnit;
  5. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
  6. import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
  7. import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
  8. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
  9. import com.alibaba.rocketmq.client.exception.MQClientException;
  10. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
  11. import com.alibaba.rocketmq.common.message.MessageExt;
  12. /**
  13. * @FileName Consumer.java
  14. * @Description:
  15. * @author gu.weidong
  16. * @version V1.0
  17. * @createtime 2018年7月3日 上午10:05:26
  18. * 修改历史:
  19. * 时间 作者 版本 描述
  20. *====================================================
  21. *
  22. */
  23. /**
  24. * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
  25. */
  26. public class Consumer {
  27. public static void main(String[] args) throws MQClientException {
  28. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
  29. consumer.setNamesrvAddr("192.168.159.128:9876;192.168.159.129:9876");
  30. /**
  31. * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
  32. * 如果非第一次启动,那么按照上次消费的位置继续消费
  33. */
  34. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  35. consumer.subscribe("SequenceTopicTest", "TagA || TagC || TagD");
  36. consumer.registerMessageListener(new MessageListenerOrderly() {
  37. Random random = new Random();
  38. @Override
  39. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
  40. context.setAutoCommit(true);
  41. System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );
  42. for (MessageExt msg: msgs) {
  43. System.out.println(msg + ", content:" + new String(msg.getBody()));
  44. }
  45. try {
  46. //模拟业务逻辑处理中...
  47. TimeUnit.SECONDS.sleep(random.nextInt(10));
  48. } catch (Exception e) {
  49. e.printStackTrace();
  50. }
  51. return ConsumeOrderlyStatus.SUCCESS;
  52. }
  53. });
  54. consumer.start();
  55. System.out.println("Consumer Started.");
  56. }
  57. }

这边的Consumer和上面的最明显的区别在于对应的监听器是MessageListenerOrderly,MessageListenerOrderly是能够保证顺序消费的。

显示结果:

70 1

4.多个消费者

那如果有多个消费者呢?因为消息发送时被分配到多个队列,这些队列又会被分别发送给消费者唯一消费,现在启动两个消费者,其消费情况如下图:

70 2

70 3

结论:多个消费者时,各个消费者的消息依旧是顺序消费,且不会重复消费

原文转自:https://blog.csdn.net/earthhour/article/details/78323026 ,在此基础上部分代码略作修改

发表评论

表情:
评论列表 (有 0 条评论,422人围观)

还没有评论,来说两句吧...

相关阅读

    相关 使用RocketMQ实现消息顺序消费

    消息的顺序消费在很多交易型的业务场景中都会被要求实现,而且,消息队列的顺序消费解决方案在很多互联网公司的面试中经常会被问到。 索尔老师在使用了多个消息队列后发现,虽然每个消息

    相关 rocketMq 顺序消费

    什么是顺序消费? 消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了 3 条消息,分别是订单创建、订单付款、订单完成。消费时,要按照这个顺序消费才

    相关 RocketMQ 顺序消费机制

    顺序消息是指对于一个指定的 Topic ,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。 顺序消息分为分区顺序消息和全局

    相关 springboot整合rocketmq实现顺序消费

    消息队列已然成为当下非常火热的中间件,而rocketmq作为阿里开源的中间件产品,历经数次超大并发的考验,已然成为中间件产品的首选。而有时候我们在使用消息队列的时候,往往需要能