RocketMQ(04)——发送顺序消息

傷城~ 2023-07-01 08:32 122阅读 0赞

发送顺序消息

如果你的业务上对消息的发送和消费顺序有较高的需求,那么在发送消息的时候你需要把它们放到同一个消息队列中,因为只有同一个队列的消息才能确保消费的顺序性。下面代码我们在发送消息的时候,调用的是需要传递MessageQueueSelector的send(),该方法还可以传递一个额外的参数,其对应MessageQueueSelector的select()的最后一个参数。下面代码中我们一共发送了10条消息,从1开始算顺序为奇数的都放到第一个队列中,顺序为偶数的都放第二个队列中。所以最终第一个队列放了顺序号为1/3/5/7/9的消息,第二个队列中放了顺序号为2/4/6/8/10的消息。

  1. @Test
  2. public void testOrderSend() throws Exception {
  3. DefaultMQProducer producer = new DefaultMQProducer("group1");
  4. producer.setNamesrvAddr(this.nameServer);
  5. producer.start();
  6. for (int i=0; i<10; i++) {
  7. Message message = new Message("topic1", "tag3", (System.currentTimeMillis() + "---" + System.nanoTime() + "hello ordered message " + i).getBytes());
  8. SendResult sendResult = producer.send(message, new MessageQueueSelector() {
  9. @Override
  10. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  11. int index = (int) arg;
  12. //奇数放一个队列,偶数放一个队列
  13. return mqs.get(index % mqs.size() % 2);
  14. }
  15. }, i);
  16. Assert.assertTrue(sendResult.getSendStatus() == SendStatus.SEND_OK);
  17. }
  18. producer.shutdown();
  19. }

在消费的时候如果需要确保队列中的消息是按照顺序消费的,注册消息监听器时不能再选择并发消费的MessageListenerConcurrently,而需要选择按顺序消费的MessageListenerOrderly。按顺序消费时每个线程会锁定当前队列,只有一条消息消费完了才会释放锁,这样确保同一队列同时只能有一个线程消费一条消息。而并发消费时是不会一直锁队列的。有序消费时同一个队列里面的消息会按照顺序进行消费,但是它们可能被不同的线程消费。如消息的顺序是1/2/3/4/5/6,则按照顺序消费可以保证消息的消费顺序一定是1/2/3/4/5/6,但是消费它们的线程有可能是线程6/5/4/3/2/1。如果要保证有序的消费是在同一个线程完成的,则消费者线程只能有一个,可以通过setConsumeThreadMax()定义消费线程的最大数,可以通过setConsumeThreadMin设置消费者线程的最小数。下面的代码中就定义了按照顺序进行消息的消费。

  1. @Test
  2. public void testOrderConsume() throws Exception {
  3. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group2");
  4. consumer.setNamesrvAddr(this.nameServer);
  5. consumer.subscribe("topic1", "tag3");
  6. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//默认值,订阅以前的消息将被忽略
  7. consumer.registerMessageListener(new MessageListenerOrderly() {
  8. @Override
  9. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
  10. System.out.println(Thread.currentThread().getName() + "消费消息:" + new String(msgs.get(0).getBody()));
  11. return ConsumeOrderlyStatus.SUCCESS;
  12. }
  13. });
  14. consumer.start();
  15. TimeUnit.SECONDS.sleep(120);
  16. consumer.shutdown();
  17. }

顺序消息消费时返回的ConsumeOrderlyStatus只能是SUCCESS和SUSPEND_CURRENT_QUEUE_A_MOMENT。SUCCESS表示消费成功,可以继续消费下一条,而SUSPEND_CURRENT_QUEUE_A_MOMENT表示消费失败,需要等待一下再继续消费。它不能像并发消费那样跳过消费失败的消息,因为那样就破坏了消息消费的顺序性。
(注:本文是基于RocketMQ4.5.0所写)

发表评论

表情:
评论列表 (有 0 条评论,122人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RocketMq发送延迟消息

    什么是延迟消息? 对于消息中间件来说,producer将消息发送到mq的服务器,但并不期望这条消息马上被消费,而是推迟到当前时间点之后的某个时间点后再投递到queue中让

    相关 RocketMQ04)——发送顺序消息

    发送顺序消息 如果你的业务上对消息的发送和消费顺序有较高的需求,那么在发送消息的时候你需要把它们放到同一个消息队列中,因为只有同一个队列的消息才能确保消费的顺序性。下面代

    相关 RocketMQ顺序消息

          消息有序指的是可以按照消息的发送顺序来消费。rockermq可以严格的保证消息有序,可以分为分区有序或者全部有序顺序消费的原理解析,在默认的情况下消息发送会采取ro

    相关 RocketMQ消息发送

    RocketMQ支持3种消息发送方式:同步(sync)、异步(async)、单向(oneway)。 同步:发送者向MQ执行发送消息API时,同步等待,直到消息服务器返回

    相关 RocketMQ——顺序消息

    消息有序指的是可以按照消息的发送顺序来消费。 RocketMQ可以严格的保证消息有序。但这个顺序,不是全局顺序,只是分区(queue)顺序。要全局顺序只能一个分区。 之所