四.RocketMQ极简入门-RocketMQ顺序消息发送

矫情吗;* 2022-08-28 13:53 394阅读 0赞

前言

在某些业务场景下是需要消息按照顺序进行消费,比如一个账户的加钱,减钱的动作必须按照时间先后去执行,否则就会发生金额不够导致操作失败。

顺序消息故名知意就是消息按照发送的顺序进行消费,队列本身是一种先进先出的数据结构,而RocketMQ理论上说也遵循这种机制。但是默认生产者以Round Robin轮询方式把消息发送到不同的Queue分区队列;消费者从多个队列中消费消息,这种情况没法保证顺序。所以在RocketMQ中如何保证消息顺序呢?

在这里插入图片描述

全局有序消息

在RocketMQ中消息分为全局有序和局部有序消息,全局有序是一个topic下的所有消息都要保证顺序,如果要保证消息全局顺序消费,就需要保证使用一个队列存放消息,一个消费者从这一个队列消费消息就能保证顺序,即:单线程执行,可以通过 producer.setDefaultTopicQueueNums(1);来指定队列数量。

下面我们使用一个订单来模拟顺序消息,订单状态有:创建 ,支付,发货。需要按照顺序发送和消费消息

订单实体

  1. public class Order {
  2. private Long id;
  3. private String name;
  4. private String status;
  5. public Order() {
  6. }
  7. public Order(Long id, String name, String status) {
  8. this.id = id;
  9. this.name = name;
  10. this.status = status;
  11. }
  12. public Long getId() {
  13. return id;
  14. }
  15. public void setId(Long id) {
  16. this.id = id;
  17. }
  18. public String getName() {
  19. return name;
  20. }
  21. public void setName(String name) {
  22. this.name = name;
  23. }
  24. public String getStatus() {
  25. return status;
  26. }
  27. public void setStatus(String status) {
  28. this.status = status;
  29. }
  30. }

发送者

生产者通过 producer.setDefaultTopicQueueNums(1); 把队列数量设置成1,然后正常发送消息

  1. public class Producer {
  2. //演示消息同步发送
  3. public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
  4. //生产者
  5. DefaultMQProducer producer = new DefaultMQProducer("order-producer");
  6. //设置name server地址
  7. producer.setNamesrvAddr("127.0.0.1:9876");
  8. //队列数量,1个
  9. producer.setDefaultTopicQueueNums(1);
  10. //启动
  11. producer.start();
  12. for (long i = 0 ; i < 4 ; i++){
  13. Order order = new Order(i,"订单"+i,"创建");
  14. //添加内容
  15. byte[] bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
  16. Message message = new Message("order-topic","order",bytes);
  17. message.setKeys("key-"+i);
  18. //执行发送第一个消息
  19. SendResult result = producer.send(message);
  20. System.out.println(result);
  21. //====================================================================
  22. order.setStatus("支付");
  23. //添加内容
  24. bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
  25. message = new Message("order-topic","order",bytes);
  26. message.setKeys("key-"+i);
  27. //执行发送
  28. result = producer.send(message);
  29. System.out.println(result);
  30. //====================================================================
  31. order.setStatus("发货");
  32. //添加内容
  33. bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
  34. message = new Message("order-topic","order",bytes);
  35. message.setKeys("key-"+i);
  36. //执行发送
  37. result = producer.send(message);
  38. //打印结果
  39. System.out.println(result);
  40. }
  41. producer.shutdown();
  42. }
  43. }

发送结果如下

  1. SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC450000, offsetMsgId=AC1028C700002A9F0000000000638E1D, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=48]
  2. SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC4E0001, offsetMsgId=AC1028C700002A9F0000000000638EF8, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=49]
  3. SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC4F0002, offsetMsgId=AC1028C700002A9F0000000000638FD3, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=50]
  4. SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC510003, offsetMsgId=AC1028C700002A9F00000000006390AE, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=51]
  5. SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC550004, offsetMsgId=AC1028C700002A9F0000000000639189, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=52]
  6. SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC560005, offsetMsgId=AC1028C700002A9F0000000000639264, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=53]
  7. SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC580006, offsetMsgId=AC1028C700002A9F000000000063933F, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=54]
  8. SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC590007, offsetMsgId=AC1028C700002A9F000000000063941A, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=55]
  9. SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC5B0008, offsetMsgId=AC1028C700002A9F00000000006394F5, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=56]
  10. SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC5D0009, offsetMsgId=AC1028C700002A9F00000000006395D0, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=57]
  11. SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC5F000A, offsetMsgId=AC1028C700002A9F00000000006396AB, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=58]
  12. SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC62000B, offsetMsgId=AC1028C700002A9F0000000000639786, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=59]

消费者

消费者设置 MessageListenerOrderly 进行顺序消费

  1. public class Consumer {
  2. public static void main(String[] args) throws MQClientException {
  3. //创建消费者
  4. DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("order-consumer");
  5. //设置name server 地址
  6. defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
  7. //从开始位置消费
  8. defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  9. //广播模式
  10. //最大线程1个
  11. //defaultMQPushConsumer.setConsumeThreadMax(1);
  12. //defaultMQPushConsumer.setConsumeThreadMin(1);
  13. //同时只拉取一个消息
  14. //defaultMQPushConsumer.setPullBatchSize(1);
  15. //订阅
  16. defaultMQPushConsumer.subscribe("order-topic","order");
  17. defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {
  18. @Override
  19. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
  20. msgs.forEach(message->{
  21. System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));
  22. });
  23. return ConsumeOrderlyStatus.SUCCESS;
  24. }
  25. });
  26. defaultMQPushConsumer.start();
  27. }
  28. }

消费结果如下

  1. MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=48, sysFlag=0, bornTimestamp=1632010570822, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570826, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000638E1D, commitLogOffset=6524445, bodyCRC=543694636, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=49, KEYS=key-0, CONSUME_START_TIME=1632010570828, UNIQ_KEY=7F000001244418B4AAC25E78BC450000, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 48, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 48, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -120, -101, -27, -69, -70, 34, 125], transactionId='null'}] ; {"id":0,"name":"订单0","status":"创建"}
  2. MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=49, sysFlag=0, bornTimestamp=1632010570830, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570830, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000638EF8, commitLogOffset=6524664, bodyCRC=400232688, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=50, KEYS=key-0, CONSUME_START_TIME=1632010570832, UNIQ_KEY=7F000001244418B4AAC25E78BC4E0001, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 48, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 48, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -26, -108, -81, -28, -69, -104, 34, 125], transactionId='null'}] ; {"id":0,"name":"订单0","status":"支付"}
  3. MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=50, sysFlag=0, bornTimestamp=1632010570831, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570832, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000638FD3, commitLogOffset=6524883, bodyCRC=1884939776, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=51, KEYS=key-0, CONSUME_START_TIME=1632010570835, UNIQ_KEY=7F000001244418B4AAC25E78BC4F0002, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 48, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 48, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -113, -111, -24, -76, -89, 34, 125], transactionId='null'}] ; {"id":0,"name":"订单0","status":"发货"}
  4. MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=51, sysFlag=0, bornTimestamp=1632010570833, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570836, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F00000000006390AE, commitLogOffset=6525102, bodyCRC=1061325741, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=53, KEYS=key-1, CONSUME_START_TIME=1632010570839, UNIQ_KEY=7F000001244418B4AAC25E78BC510003, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 49, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 49, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -120, -101, -27, -69, -70, 34, 125], transactionId='null'}] ; {"id":1,"name":"订单1","status":"创建"}
  5. MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=52, sysFlag=0, bornTimestamp=1632010570837, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570837, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000639189, commitLogOffset=6525321, bodyCRC=150045809, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=53, KEYS=key-1, CONSUME_START_TIME=1632010570841, UNIQ_KEY=7F000001244418B4AAC25E78BC550004, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 49, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 49, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -26, -108, -81, -28, -69, -104, 34, 125], transactionId='null'}] ; {"id":1,"name":"订单1","status":"支付"}
  6. MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=53, sysFlag=0, bornTimestamp=1632010570838, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570839, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000639264, commitLogOffset=6525540, bodyCRC=1869836929, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=55, KEYS=key-1, CONSUME_START_TIME=1632010570844, UNIQ_KEY=7F000001244418B4AAC25E78BC560005, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 49, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 49, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -113, -111, -24, -76, -89, 34, 125], transactionId='null'}] ; {"id":1,"name":"订单1","status":"发货"}
  7. MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=54, sysFlag=0, bornTimestamp=1632010570840, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570840, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F000000000063933F, commitLogOffset=6525759, bodyCRC=507328046, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=56, KEYS=key-2, CONSUME_START_TIME=1632010570845, UNIQ_KEY=7F000001244418B4AAC25E78BC580006, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 50, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 50, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -120, -101, -27, -69, -70, 34, 125], transactionId='null'}] ; {"id":2,"name":"订单2","status":"创建"}
  8. MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=55, sysFlag=0, bornTimestamp=1632010570841, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570842, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F000000000063941A, commitLogOffset=6525978, bodyCRC=697186802, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=57, KEYS=key-2, CONSUME_START_TIME=1632010570847, UNIQ_KEY=7F000001244418B4AAC25E78BC590007, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 50, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 50, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -26, -108, -81, -28, -69, -104, 34, 125], transactionId='null'}] ; {"id":2,"name":"订单2","status":"支付"}
  9. MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=56, sysFlag=0, bornTimestamp=1632010570843, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570844, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F00000000006394F5, commitLogOffset=6526197, bodyCRC=1309462274, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=58, KEYS=key-2, CONSUME_START_TIME=1632010570850, UNIQ_KEY=7F000001244418B4AAC25E78BC5B0008, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 50, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 50, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -113, -111, -24, -76, -89, 34, 125], transactionId='null'}] ; {"id":2,"name":"订单2","status":"发货"}
  10. MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=57, sysFlag=0, bornTimestamp=1632010570845, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570846, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F00000000006395D0, commitLogOffset=6526416, bodyCRC=18326191, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=59, KEYS=key-3, CONSUME_START_TIME=1632010570851, UNIQ_KEY=7F000001244418B4AAC25E78BC5D0009, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 51, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 51, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -120, -101, -27, -69, -70, 34, 125], transactionId='null'}] ; {"id":3,"name":"订单3","status":"创建"}
  11. MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=58, sysFlag=0, bornTimestamp=1632010570847, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570848, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F00000000006396AB, commitLogOffset=6526635, bodyCRC=916761971, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=60, KEYS=key-3, CONSUME_START_TIME=1632010570853, UNIQ_KEY=7F000001244418B4AAC25E78BC5F000A, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 51, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 51, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -26, -108, -81, -28, -69, -104, 34, 125], transactionId='null'}] ; {"id":3,"name":"订单3","status":"支付"}
  12. MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=59, sysFlag=0, bornTimestamp=1632010570850, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570850, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000639786, commitLogOffset=6526854, bodyCRC=1361468291, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=60, KEYS=key-3, CONSUME_START_TIME=1632010570855, UNIQ_KEY=7F000001244418B4AAC25E78BC62000B, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 51, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 51, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -113, -111, -24, -76, -89, 34, 125], transactionId='null'}] ; {"id":3,"name":"订单3","status":"发货"}

局部有序消息

还有一种就是分区有序或者部分有序,部分顺序消息只要保证某一组消息被顺序消费,即:只需要保证一个队列中的消息有序消费即可。

比如:保证同一个订单ID的生成、付款、发货消息按照顺序消费即可实现原理:

  • 把同一个订单ID的消息放入同一个MessageQueue
  • 保证这个MessageQueue只有一个消费者不被并发处理 ,这个需要使用到 MessageQueueSelector 来保证同一个订单的消息在同一个队列
    在这里插入图片描述

发送者

使用 MessageQueueSelector 消息队列选择器来把消息路由到不同的队列,下面案例就是把同一个订单的消息:创建,支付,发货 路由到同一个队列,达到局部消费的目的。

  1. //演示消息同步发送
  2. public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
  3. //生产者
  4. DefaultMQProducer producer = new DefaultMQProducer("order-producer2");
  5. //设置name server地址
  6. producer.setNamesrvAddr("127.0.0.1:9876");
  7. //发送消息超时时间
  8. producer.setSendMsgTimeout(1000);
  9. //启动
  10. producer.start();
  11. for (long i = 0 ; i < 4 ; i++){
  12. Order order = new Order(i,"订单"+i,"创建");
  13. //添加内容
  14. byte[] bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
  15. Message message = new Message("order-topic","order2",bytes);
  16. message.setKeys("key-"+i);
  17. //执行发送
  18. SendResult result = producer.send(message, new MessageQueueSelector() {
  19. @Override
  20. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  21. Long id = (Long) arg;
  22. //使用取模算法确定id存放到哪个队列
  23. int index =(int) (id % mqs.size());
  24. //index就是要存放的队列的索引
  25. return mqs.get(index);
  26. }
  27. //把订单ID作为参数,作为选择器的基础数据
  28. },order.getId());
  29. //打印结果
  30. System.out.println(result);
  31. //====================================================================
  32. order.setStatus("支付");
  33. //添加内容
  34. bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
  35. message = new Message("order-topic","order2",bytes);
  36. message.setKeys("key-"+i);
  37. //执行发送
  38. result = producer.send(message,new MessageQueueSelector() {
  39. @Override
  40. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  41. Long id = (Long) arg;
  42. //使用取模算法确定id存放到哪个队列
  43. int index =(int) (id % mqs.size());
  44. //index就是要存放的队列的索引
  45. return mqs.get(index);
  46. }
  47. },order.getId());
  48. //打印结果
  49. System.out.println(result);
  50. //====================================================================
  51. order.setStatus("发货");
  52. //添加内容
  53. bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
  54. message = new Message("order-topic","order2",bytes);
  55. message.setKeys("key-"+i);
  56. //执行发送
  57. result = producer.send(message,new MessageQueueSelector() {
  58. @Override
  59. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  60. Long id = (Long) arg;
  61. //使用取模算法确定id存放到哪个队列
  62. int index =(int) (id % mqs.size());
  63. //index就是要存放的队列的索引
  64. return mqs.get(index);
  65. }
  66. },order.getId());
  67. //打印结果
  68. System.out.println(result);
  69. }
  70. producer.shutdown();
  71. }

消费者

消费者一样可通过 MessageListenerOrderly 进行顺序消费

  1. public class Consumer {
  2. public static void main(String[] args) throws MQClientException {
  3. //创建消费者
  4. DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("order-consumer");
  5. //设置name server 地址
  6. defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
  7. //从开始位置消费
  8. defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  9. //广播模式
  10. //最大线程1个
  11. //defaultMQPushConsumer.setConsumeThreadMax(1);
  12. //defaultMQPushConsumer.setConsumeThreadMin(1);
  13. //同时只拉取一个消息
  14. //defaultMQPushConsumer.setPullBatchSize(1);
  15. //订阅
  16. defaultMQPushConsumer.subscribe("order-topic","order");
  17. defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {
  18. @Override
  19. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
  20. msgs.forEach(message->{
  21. System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));
  22. });
  23. return ConsumeOrderlyStatus.SUCCESS;
  24. }
  25. });
  26. defaultMQPushConsumer.start();
  27. }
  28. }

文章到这就结束了,点赞还是要求一下的,万一屏幕面前的大帅哥,或者大漂亮一不小心就一键三连了啦,那我就是熬夜到头发掉光,也出下章。

发表评论

表情:
评论列表 (有 0 条评论,394人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RocketMQ(04)——发送顺序消息

    发送顺序消息 如果你的业务上对消息的发送和消费顺序有较高的需求,那么在发送消息的时候你需要把它们放到同一个消息队列中,因为只有同一个队列的消息才能确保消费的顺序性。下面代