顺序消息-RocketMQ

朴灿烈づ我的快乐病毒、 2024-04-17 05:41 176阅读 0赞

消息有序指的是可以按照消息的发送顺序来消费。
RocketMQ可以严格的保证消息有序。但这个顺序,不是全局顺序,只是分区(queue)顺序。要全局顺序只能一个分区。

之所以出现你这个场景看起来不是顺序的,是因为发送消息的时候,消息发送默认是会采用轮询的方式发送到不通的queue(分区)。如图:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NoZW45Nzg2MTY2NDk_size_16_color_FFFFFF_t_70

而消费端消费的时候,是会分配到多个queue的,多个queue是同时拉取提交消费。如图:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NoZW45Nzg2MTY2NDk_size_16_color_FFFFFF_t_70 1

但是同一条queue里面,RocketMQ的确是能保证FIFO的。那么要做到顺序消息,应该怎么实现呢——把消息确保投递到同一条queue。

下面用订单进行示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

rocketmq消息生产端示例代码如下:

/**
* Producer,发送顺序消息
*/
public class Producer {

  1. public static void main(String\[\] args) throws IOException \{
  2. try \{
  3. DefaultMQProducer producer = new DefaultMQProducer("please\_rename\_unique\_group\_name");
  4. producer.setNamesrvAddr("10.11.11.11:9876;10.11.11.12:9876");
  5. producer.start();
  6. String\[\] tags = new String\[\] \{ "TagA", "TagC", "TagD" \};
  7. // 订单列表
  8. List<OrderDemo> orderList = new Producer().buildOrders();
  9. Date date = new Date();
  10. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  11. String dateStr = sdf.format(date);
  12. for (int i = 0; i < 10; i++) \{
  13. // 加个时间后缀
  14. String body = dateStr + " Hello RocketMQ " + orderList.get(i);
  15. Message msg = new Message("TopicTestjjj", tags\[i % tags.length\], "KEY" + i, body.getBytes());
  16. SendResult sendResult = producer.send(msg, new MessageQueueSelector() \{
  17. @Override
  18. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) \{
  19. Long id = (Long) arg;
  20. long index = id % mqs.size();
  21. return mqs.get((int)index);
  22. \}
  23. \}, orderList.get(i).getOrderId());//订单id
  24. System.out.println(sendResult + ", body:" + body);
  25. \}
  26. producer.shutdown();
  27. \} catch (MQClientException e) \{
  28. e.printStackTrace();
  29. \} catch (RemotingException e) \{
  30. e.printStackTrace();
  31. \} catch (MQBrokerException e) \{
  32. e.printStackTrace();
  33. \} catch (InterruptedException e) \{
  34. e.printStackTrace();
  35. \}
  36. System.in.read();
  37. \}
  38. /\*\*
  39. \* 生成模拟订单数据
  40. \*/
  41. private List<OrderDemo> buildOrders() \{
  42. List<OrderDemo> orderList = new ArrayList<OrderDemo>();
  43. OrderDemo orderDemo = new OrderDemo();
  44. orderDemo.setOrderId(15103111039L);
  45. orderDemo.setDesc("创建");
  46. orderList.add(orderDemo);
  47. orderDemo = new OrderDemo();
  48. orderDemo.setOrderId(15103111065L);
  49. orderDemo.setDesc("创建");
  50. orderList.add(orderDemo);
  51. orderDemo = new OrderDemo();
  52. orderDemo.setOrderId(15103111039L);
  53. orderDemo.setDesc("付款");
  54. orderList.add(orderDemo);
  55. orderDemo = new OrderDemo();
  56. orderDemo.setOrderId(15103117235L);
  57. orderDemo.setDesc("创建");
  58. orderList.add(orderDemo);
  59. orderDemo = new OrderDemo();
  60. orderDemo.setOrderId(15103111065L);
  61. orderDemo.setDesc("付款");
  62. orderList.add(orderDemo);
  63. orderDemo = new OrderDemo();
  64. orderDemo.setOrderId(15103117235L);
  65. orderDemo.setDesc("付款");
  66. orderList.add(orderDemo);
  67. orderDemo = new OrderDemo();
  68. orderDemo.setOrderId(15103111065L);
  69. orderDemo.setDesc("完成");
  70. orderList.add(orderDemo);
  71. orderDemo = new OrderDemo();
  72. orderDemo.setOrderId(15103111039L);
  73. orderDemo.setDesc("推送");
  74. orderList.add(orderDemo);
  75. orderDemo = new OrderDemo();
  76. orderDemo.setOrderId(15103117235L);
  77. orderDemo.setDesc("完成");
  78. orderList.add(orderDemo);
  79. orderDemo = new OrderDemo();
  80. orderDemo.setOrderId(15103111039L);
  81. orderDemo.setDesc("完成");
  82. orderList.add(orderDemo);
  83. return orderList;
  84. \}

输出:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NoZW45Nzg2MTY2NDk_size_16_color_FFFFFF_t_70 2
从图中红色框可以看出,orderId等于15103111039的订单被顺序放入queueId等于7的队列。queueOffset同时在顺序增长。

发送时有序,接收(消费)时也要有序,才能保证顺序消费。如下这段代码演示了普通消费(非有序消费)的实现方式。

/**
* 普通消息消费
*/
public class Consumer {

  1. public static void main(String\[\] args) throws MQClientException \{
  2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please\_rename\_unique\_group\_name\_3");
  3. consumer.setNamesrvAddr("10.11.11.11:9876;10.11.11.12:9876");
  4. /\*\*
  5. \* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
  6. \* 如果非第一次启动,那么按照上次消费的位置继续消费
  7. \*/
  8. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME\_FROM\_FIRST\_OFFSET);
  9. consumer.subscribe("TopicTestjjj", "TagA || TagC || TagD");
  10. consumer.registerMessageListener(new MessageListenerConcurrently() \{
  11. Random random = new Random();
  12. @Override
  13. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) \{
  14. System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );
  15. for (MessageExt msg: msgs) \{
  16. System.out.println(msg + ", content:" + new String(msg.getBody()));
  17. \}
  18. try \{
  19. //模拟业务逻辑处理中...
  20. TimeUnit.SECONDS.sleep(random.nextInt(10));
  21. \} catch (Exception e) \{
  22. e.printStackTrace();
  23. \}
  24. return ConsumeConcurrentlyStatus.CONSUME\_SUCCESS;
  25. \}
  26. \});
  27. consumer.start();
  28. System.out.println("Consumer Started.");
  29. \}

}

输出:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NoZW45Nzg2MTY2NDk_size_16_color_FFFFFF_t_70 3
可见,订单号为15103111039的订单被消费时顺序完成乱了。所以用MessageListenerConcurrently这种消费者是无法做到顺序消费的,采用下面这种方式就做到了顺序消费:

/**
* 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
*/
public class ConsumerInOrder {

  1. public static void main(String\[\] args) throws MQClientException \{
  2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please\_rename\_unique\_group\_name\_3");
  3. consumer.setNamesrvAddr("10.11.11.11:9876;10.11.11.12:9876");
  4. /\*\*
  5. \* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
  6. \* 如果非第一次启动,那么按照上次消费的位置继续消费
  7. \*/
  8. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME\_FROM\_FIRST\_OFFSET);
  9. consumer.subscribe("TopicTestjjj", "TagA || TagC || TagD");
  10. consumer.registerMessageListener(new MessageListenerOrderly() \{
  11. Random random = new Random();
  12. @Override
  13. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) \{
  14. context.setAutoCommit(true);
  15. System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );
  16. for (MessageExt msg: msgs) \{
  17. System.out.println(msg + ", content:" + new String(msg.getBody()));
  18. \}
  19. try \{
  20. //模拟业务逻辑处理中...
  21. TimeUnit.SECONDS.sleep(random.nextInt(10));
  22. \} catch (Exception e) \{
  23. e.printStackTrace();
  24. \}
  25. return ConsumeOrderlyStatus.SUCCESS;
  26. \}
  27. \});
  28. consumer.start();
  29. System.out.println("Consumer Started.");
  30. \}

}
输出:

20190823120242817.png

MessageListenerOrderly能够保证顺序消费,从图中我们也看到了期望的结果。图中的输出是只启动了一个消费者时的输出,看起来订单号还是混在一起,但是每组订单号之间是有序的。因为消息发送时被分配到了三个队列(参见前面生产者输出日志),那么这三个队列的消息被这唯一消费者消费。

如果启动2个消费者呢?那么其中一个消费者对应消费2个队列,另一个消费者对应消费剩下的1个队列。

如果启动3个消费者呢?那么每个消费者都对应消费1个队列,订单号就区分开了。输出变为这样:

消费者1输出:

消费者2输出:

消费者3输出:

很完美,有木有?!

按照这个示例,把订单号取了做了一个取模运算再丢到selector中,selector保证同一个模的都会投递到同一条queue。即: 相同订单号的—->有相同的模—->有相同的queue。最后就会类似这样:

总结:

rocketmq的顺序消息需要满足2点:

1.Producer端保证发送消息有序,且发送到同一个队列。
2.consumer端保证消费同一个队列。

发表评论

表情:
评论列表 (有 0 条评论,176人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RocketMQ(04)——发送顺序消息

    发送顺序消息 如果你的业务上对消息的发送和消费顺序有较高的需求,那么在发送消息的时候你需要把它们放到同一个消息队列中,因为只有同一个队列的消息才能确保消费的顺序性。下面代

    相关 RocketMQ顺序消息

          消息有序指的是可以按照消息的发送顺序来消费。rockermq可以严格的保证消息有序,可以分为分区有序或者全部有序顺序消费的原理解析,在默认的情况下消息发送会采取ro

    相关 RocketMQ——顺序消息

    消息有序指的是可以按照消息的发送顺序来消费。 RocketMQ可以严格的保证消息有序。但这个顺序,不是全局顺序,只是分区(queue)顺序。要全局顺序只能一个分区。 之所