RocketMQ顺序消息

一时失言乱红尘 2023-10-01 17:03 107阅读 0赞

顺序类型

无序消息

无序消息也指普通的消息,Producer 只管发送消息,Consumer 只管接收消息,至于消息和消息之间的顺序并没有保证。

  • Producer 依次发送 orderId 为 1、2、3 的消息
  • Consumer 接到的消息顺序有可能是 1、2、3,也有可能是 2、1、3 等情况,这就是普通消息。

全局顺序

对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。
比如 Producer 发送orderId 1,3,2 的消息, 那么 Consumer 也必须要按照 1,3,2 的顺序进行消费。
在这里插入图片描述

局部顺序

在实际开发有些场景中,我并不需要消息完全按照完全按的先进先出,而是某些消息保证先进先出就可以了。

就好比一个订单涉及 订单生成,订单支付、订单完成。我不用管其它的订单,只保证同样订单ID能保证这个顺序就可以了。
在这里插入图片描述

Rocket顺序消息

RocketMQ可以严格的保证消息有序。但这个顺序,不是全局顺序,只是分区(queue)顺序。要实现全局顺序只能有一个分区。
因为发送消息的时候,消息发送默认是会采用轮询的方式发送到不通的queue(分区)。

实现原理

我们知道 生产的message最终会存放在Queue中,如果一个Topic关联了4个Queue,如果我们不指定消息往哪个队列里放,那么默认是平均分配消息到4个queue。

好比有10条消息,那么这10条消息会平均分配在这4个Queue上,那么每个Queue大概放2个左右。这里有一点很重的是:同一个queue,存储在里面的message 是按照先进先出的原则
在这里插入图片描述
这个时候思路就来了,我们让不同的地区用不同的queue。只要保证同一个地区的订单把他们放到同一个Queue那就保证消费者先进先出了。

在这里插入图片描述
这就保证局部顺序了,即同一订单按照先后顺序放到同一Queue,那么取消息的时候就可以保证先进先取出。

如何保证集群有序

这里还有很关键的一点,在一个消费者集群的情况下,消费者1先去Queue拿消息,它拿到了 北京订单1,它拿完后,消费者2去queue拿到的是 北京订单2。

拿的顺序是没毛病了,但关键是先拿到不代表先消费完它。会存在虽然你消费者1先拿到北京订单1,但由于网络等原因,消费者2比你真正的先消费消息。这是不是很尴尬了。

这里解决就用到了分布式锁:
Rocker采用的是分段锁,它不是锁整个Broker而是锁里面的单个Queue,因为只要锁单个Queue就可以保证局部顺序消费了。
所以最终的消费者这边的逻辑就是这样:

  • 消费者1去Queue拿 订单生成,它就锁住了整个Queue,只有它消费完成并返回成功后,这个锁才会释放。
  • 然后下一个消费者去拿到 订单支付 同样锁住当前Queue,这样的一个过程来真正保证对同一个Queue能够真正意义上的顺序消费,而不仅仅是顺序取出。

消息类型对比

在这里插入图片描述
发送方式对比
在这里插入图片描述

注意事项

  1. 顺序消息暂不支持广播模式。
  2. 顺序消息不支持异步发送方式,否则将无法严格保证顺序。
  3. 建议同一个 Group ID 只对应一种类型的 Topic,即不同时用于顺序消息和无序消息的收发。
  4. 对于全局顺序消息,建议创建broker个数 >=2。

代码示例

主要实现两点:

  1. 生产端 同一orderID的订单放到同一个queue。
  2. 消费端 同一个queue取出消息的时候锁住整个queue,直到消费后再解锁。

实体类

  1. public class ProductOrder {
  2. private String orderId;
  3. private String orderName;
  4. public ProductOrder(String orderId, String orderName) {
  5. this.orderId = orderId;
  6. this.orderName = orderName;
  7. }
  8. public String getOrderId() {
  9. return orderId;
  10. }
  11. public void setOrderId(String orderId) {
  12. this.orderId = orderId;
  13. }
  14. public String getOrderName() {
  15. return orderName;
  16. }
  17. public void setOrderName(String orderName) {
  18. this.orderName = orderName;
  19. }
  20. }

Product(生产者)

  1. public class OrderProducer {
  2. private static final List<ProductOrder> orderList = new ArrayList<>();
  3. static {
  4. orderList.add(new ProductOrder("XXX001", "订单创建"));
  5. orderList.add(new ProductOrder("XXX001", "订单付款"));
  6. orderList.add(new ProductOrder("XXX001", "订单完成"));
  7. orderList.add(new ProductOrder("XXX002", "订单创建"));
  8. orderList.add(new ProductOrder("XXX002", "订单付款"));
  9. orderList.add(new ProductOrder("XXX002", "订单完成"));
  10. orderList.add(new ProductOrder("XXX003", "订单创建"));
  11. orderList.add(new ProductOrder("XXX003", "订单付款"));
  12. orderList.add(new ProductOrder("XXX003", "订单完成"));
  13. }
  14. public static void main(String[] args) throws Exception {
  15. //创建一个消息生产者,并设置一个消息生产者组
  16. DefaultMQProducer producer = new DefaultMQProducer("rocket_test_consumer_group");
  17. //指定 NameServer 地址
  18. producer.setNamesrvAddr("127.0.0.1:9876");
  19. //初始化 Producer,整个应用生命周期内只需要初始化一次
  20. producer.start();
  21. for (int i = 0; i < orderList.size(); i++) {
  22. //获取当前order
  23. ProductOrder order = orderList.get(i);
  24. //创建一条消息对象,指定其主题、标签和消息内容
  25. Message message = new Message(
  26. /* 消息主题名 */
  27. "topicTest",
  28. /* 消息标签 */
  29. order.getOrderId(),
  30. /* 消息内容 */
  31. (order.toString()).getBytes(RemotingHelper.DEFAULT_CHARSET)
  32. );
  33. //发送消息并返回结果
  34. SendResult sendResult = producer.send(message, new MessageQueueSelector() {
  35. @Override
  36. /**
  37. * hash队列选择,简化来说就是用index%一个 固定的值,使得同一tag的消息能发送到同一个queue
  38. */
  39. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object args) {
  40. //arg的值其实就是下面传入 orderId
  41. String orderid = (String) args;
  42. //因为订单是String类型,所以通过hashCode转成int类型
  43. int hashCode = orderid.hashCode();
  44. //因为hashCode可能为负数 所以取绝对值
  45. hashCode = Math.abs(hashCode);
  46. //保证同一个订单号 一定分配在同一个queue上
  47. long index = hashCode % mqs.size();
  48. //根据索引选择不同的队列
  49. return mqs.get((int) index);
  50. }
  51. }, order.getOrderId());
  52. System.out.println("product: 发送状态:" + sendResult.getSendStatus() + ",存储queue:" + sendResult.getMessageQueue().getQueueId() + ",orderID:" + order.getOrderId() + ",type:" + order.getOrderName());
  53. }
  54. // 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等
  55. producer.shutdown();
  56. }
  57. }

Consumer(消费者)

上面说过,消费者真正要达到消费顺序,需要分布式锁,所以这里需要用MessageListenerOrderly替换之前的MessageListenerConcurrently,因为它里面实现了分布式锁。

  1. public class OrderConsumer {
  2. private static final Random random = new Random();
  3. public static void main(String[] args) throws Exception {
  4. //创建一个消息消费者,并设置一个消息消费者组
  5. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocket_test_consumer_group");
  6. //指定 NameServer 地址
  7. consumer.setNamesrvAddr("127.0.0.1:9876");
  8. //设置 Consumer 第一次启动时从队列头部开始消费还是队列尾部开始消费
  9. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  10. //订阅指定 Topic 下的所有消息
  11. consumer.subscribe("topicTest", "*");
  12. //注册消费的监听 这里注意顺序消费为MessageListenerOrderly 之前并发为ConsumeConcurrentlyContext
  13. consumer.registerMessageListener(new MessageListenerOrderly() {
  14. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {
  15. //默认 list 里只有一条消息,可以通过设置参数来批量接收消息
  16. if (list != null) {
  17. for (MessageExt ext : list) {
  18. try {
  19. try {
  20. //模拟业务逻辑处理中...
  21. TimeUnit.SECONDS.sleep(random.nextInt(10));
  22. } catch (Exception e) {
  23. e.printStackTrace();
  24. }
  25. //获取接收到的消息
  26. String message = new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET);
  27. //获取队列ID
  28. int queueId = context.getMessageQueue().getQueueId();
  29. //打印消息
  30. System.out.println("Consumer-线程名称=[" + Thread.currentThread().getId() + "],接收queueId:[" + queueId + "],接收时间:[" + new Date().getTime() + "],消息=[" + message + "]");
  31. } catch (UnsupportedEncodingException e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. }
  36. //消费成功提交偏移量
  37. return ConsumeOrderlyStatus.SUCCESS;
  38. }
  39. });
  40. // 消费者对象在使用之前必须要调用 start 初始化
  41. consumer.start();
  42. System.out.println("消息消费者已启动");
  43. }
  44. }

测试

生产者发送消息

看看生产者有没有把相同订单指定到同一个queue
在这里插入图片描述
通过测试结果可以看出:相同订单已经存到同一queue中了。

消费者消费消息

单消费者
看看消费结果是不是我们需要的结果
在这里插入图片描述
MessageListenerOrderly能够保证顺序消费,从图中我们也看到了期望的结果。

消费异常
如果出现消费异常返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT后面的消息将无法消费。

多消费者
如果启动2个消费者: 那么其中一个消费者对应消费2个队列,另一个消费者对应消费剩下的1个队列。

因为

如果启动3个消费者: 那么每个消费者都对应消费1个队列,订单号就区分开了。输出变为这样:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,107人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RocketMQ(04)——发送顺序消息

    发送顺序消息 如果你的业务上对消息的发送和消费顺序有较高的需求,那么在发送消息的时候你需要把它们放到同一个消息队列中,因为只有同一个队列的消息才能确保消费的顺序性。下面代

    相关 RocketMQ顺序消息

          消息有序指的是可以按照消息的发送顺序来消费。rockermq可以严格的保证消息有序,可以分为分区有序或者全部有序顺序消费的原理解析,在默认的情况下消息发送会采取ro

    相关 RocketMQ——顺序消息

    消息有序指的是可以按照消息的发送顺序来消费。 RocketMQ可以严格的保证消息有序。但这个顺序,不是全局顺序,只是分区(queue)顺序。要全局顺序只能一个分区。 之所