RocketMQ:顺序消息

£神魔★判官ぃ 2023-06-26 10:59 110阅读 0赞
  1. 消息有序指的是可以按照消息的发送顺序来消费。rockermq可以严格的保证消息有序,可以分为分区有序或者全部有序顺序消费的原理解析,在默认的情况下消息发送会采取round robin轮询方式把消息发送到不同的queue(分区队列); 而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只一次发送 到同一个queue中,消费的时候只从这个queue上一次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序; 如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的

下面用订单进行分区有序的示例,一个订单的顺序流程:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中, 消费时,同一个orderid获取到的肯定是同一个队列

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-client</artifactId>
  4. <version>4.4.0</version>
  5. </dependency>

构建消息:

  1. public class OrderStep {
  2. private long orderId;
  3. private String desc;
  4. public String getDesc() {
  5. return desc;
  6. }
  7. public void setDesc(String desc) {
  8. this.desc = desc;
  9. }
  10. public long getOrderId() {
  11. return orderId;
  12. }
  13. public void setOrderId(long orderId) {
  14. this.orderId = orderId;
  15. }
  16. @Override
  17. public String toString() {
  18. return "OrderStep{" +
  19. "orderId=" + orderId +
  20. ", desc='" + desc + '\'' +
  21. '}';
  22. }
  23. public static List<OrderStep> buildOrders() {
  24. // 1039L 创建 付款 推送 完成
  25. // 1065L 创建 付款
  26. // 7235L 创建 付款
  27. List<OrderStep> orderList = new ArrayList<OrderStep>();
  28. OrderStep demo = new OrderStep();
  29. demo = new OrderStep();
  30. demo.setOrderId(6L);
  31. demo.setDesc("创建");
  32. orderList.add(demo);
  33. demo = new OrderStep();
  34. demo.setOrderId(6L);
  35. demo.setDesc("付款");
  36. orderList.add(demo);
  37. demo.setOrderId(6L);
  38. demo.setDesc("推送");
  39. orderList.add(demo);
  40. demo = new OrderStep();
  41. demo.setOrderId(6L);
  42. demo.setDesc("完成");
  43. orderList.add(demo);
  44. demo = new OrderStep();
  45. demo.setOrderId(7L);
  46. demo.setDesc("推送");
  47. orderList.add(demo);
  48. demo = new OrderStep();
  49. demo.setOrderId(7L);
  50. demo.setDesc("完成");
  51. orderList.add(demo);
  52. demo = new OrderStep();
  53. demo.setOrderId(9L);
  54. demo.setDesc("创建");
  55. orderList.add(demo);
  56. demo = new OrderStep();
  57. demo.setOrderId(9L);
  58. demo.setDesc("付款");
  59. orderList.add(demo);
  60. demo = new OrderStep();
  61. demo.setOrderId(9L);
  62. demo.setDesc("推送");
  63. orderList.add(demo);
  64. demo = new OrderStep();
  65. demo.setOrderId(9L);
  66. demo.setDesc("完成");
  67. orderList.add(demo);
  68. return orderList;
  69. }
  70. }

生产者

  1. public class Producer {
  2. public static void main(String[] args) throws Exception {
  3. // 1.创建消息生产者producer,并制定生产者组名
  4. DefaultMQProducer producer = new DefaultMQProducer("group1");
  5. // 2.指定NameServer地址
  6. producer.setNamesrvAddr("localhost:9876");
  7. // 3.启动producer
  8. producer.start();
  9. // 构建消息集合
  10. List<OrderStep> orderStepList = OrderStep.buildOrders();
  11. // 发送消息
  12. for (int i = 0; i < orderStepList.size(); i++) {
  13. String body = orderStepList.get(i)+"";
  14. Message message = new Message("OrderTopic","Order","i"+i,body.getBytes());
  15. /**
  16. * 参数1:消息对象
  17. * 参数2:消息队列的选择器
  18. * 参数3:选择队列的业务标识(订单id)
  19. */
  20. SendResult send = producer.send(message, new MessageQueueSelector() {
  21. /**
  22. *
  23. * @param list 队列集合
  24. * @param message 消息对象
  25. * @param o 业务标识的参数
  26. * @return
  27. */
  28. public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
  29. long orderId = (Long) o;
  30. long index = orderId % list.size();
  31. return list.get((int) index);
  32. }
  33. }, orderStepList.get(i).getOrderId());
  34. System.out.println("发送结果:"+send);
  35. }
  36. producer.shutdown();
  37. }
  38. }

消费者

  1. public class Consumer {
  2. public static void main(String[] args) throws MQClientException {
  3. // 1.创建消费者Consumer,制定消费者组名
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
  5. // 2.指定Nameserver地址
  6. consumer.setNamesrvAddr("localhost:9876");
  7. // 3.订阅主题Topic和Tag
  8. consumer.subscribe("OrderTopic","*");
  9. // 4.注册消息监听器
  10. consumer.registerMessageListener(new MessageListenerOrderly() {
  11. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
  12. for (MessageExt messageExt : list) {
  13. System.out.println("线程名称:["+Thread.currentThread().getName()+"]消费消息:"+new String(messageExt.getBody()));
  14. }
  15. return ConsumeOrderlyStatus.SUCCESS;
  16. }
  17. });
  18. // 5.启动消费者
  19. consumer.start();
  20. System.out.println("消费者启动");
  21. }
  22. }

发表评论

表情:
评论列表 (有 0 条评论,110人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RocketMQ(04)——发送顺序消息

    发送顺序消息 如果你的业务上对消息的发送和消费顺序有较高的需求,那么在发送消息的时候你需要把它们放到同一个消息队列中,因为只有同一个队列的消息才能确保消费的顺序性。下面代

    相关 RocketMQ顺序消息

          消息有序指的是可以按照消息的发送顺序来消费。rockermq可以严格的保证消息有序,可以分为分区有序或者全部有序顺序消费的原理解析,在默认的情况下消息发送会采取ro

    相关 RocketMQ——顺序消息

    消息有序指的是可以按照消息的发送顺序来消费。 RocketMQ可以严格的保证消息有序。但这个顺序,不是全局顺序,只是分区(queue)顺序。要全局顺序只能一个分区。 之所