RocketMQ批量消费、消息重试、消费模式、刷盘方式

港控/mmm° 2022-05-26 07:06 1292阅读 0赞

转载: https://blog.csdn.net/u010634288/article/details/56049305

一、Consumer 批量消费

可以通过

[java] view plain copy

  1. consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10条

这里需要分为2种情况1、Consumer端先启动 2、Consumer端后启动. 正常情况下:应该是Consumer需要先启动

1、Consumer端先启动

Consumer代码如下

[java] view plain copy

  1. package quickstart;
  2. import java.util.List;
  3. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
  4. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  5. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  6. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  7. import com.alibaba.rocketmq.client.exception.MQClientException;
  8. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
  9. import com.alibaba.rocketmq.common.message.MessageExt;
  10. /**
  11. * Consumer,订阅消息
  12. */
  13. public class Consumer {
  14. public static void main(String[] args) throws InterruptedException, MQClientException {
  15. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“please_rename_unique_group_name_4”);
  16. consumer.setNamesrvAddr(“192.168.100.145:9876;192.168.100.146:9876”);
  17. consumer.setConsumeMessageBatchMaxSize(10);
  18. /**
  19. * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
  20. * 如果非第一次启动,那么按照上次消费的位置继续消费
  21. */
  22. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  23. consumer.subscribe(“TopicTest”, “*“);
  24. consumer.registerMessageListener(new MessageListenerConcurrently() {
  25. public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
  26. try {
  27. System.out.println(“msgs的长度” + msgs.size());
  28. System.out.println(Thread.currentThread().getName() + “ Receive New Messages: “ + msgs);
  29. } catch (Exception e) {
  30. e.printStackTrace();
  31. return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  32. }
  33. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  34. }
  35. });
  36. consumer.start();
  37. System.out.println(“Consumer Started.”);
  38. }
  39. }

由于这里是Consumer先启动,所以他回去轮询MQ上是否有订阅队列的消息,由于每次producer插入一条,Consumer就拿一条所以测试结果如下(每次size都是1):

Center

2、Consumer端后启动,也就是Producer先启动

由于这里是Consumer后启动,所以MQ上也就堆积了一堆数据,Consumer的

consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10条

所以这段代码就生效了测试结果如下(每次size最多是10):

Center 1

二、消息重试机制:消息重试分为2种1、Producer端重试 2、Consumer端重试

1、Producer端重试

也就是Producer往MQ上发消息没有发送成功,我们可以设置发送失败重试的次数

[java] view plain copy

  1. package quickstart;
  2. import com.alibaba.rocketmq.client.exception.MQClientException;
  3. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
  4. import com.alibaba.rocketmq.client.producer.SendResult;
  5. import com.alibaba.rocketmq.common.message.Message;
  6. /**
  7. * Producer,发送消息
  8. *
  9. */
  10. public class Producer {
  11. public static void main(String[] args) throws MQClientException, InterruptedException {
  12. DefaultMQProducer producer = new DefaultMQProducer(“please_rename_unique_group_name”);
  13. producer.setNamesrvAddr(“192.168.100.145:9876;192.168.100.146:9876”);
  14. producer.setRetryTimesWhenSendFailed(10);//失败的 情况发送10次
  15. producer.start();
  16. for (int i = 0; i < 1000; i++) {
  17. try {
  18. Message msg = new Message(“TopicTest”,// topic
  19. “TagA”,// tag
  20. (“Hello RocketMQ “ + i).getBytes()// body
  21. );
  22. SendResult sendResult = producer.send(msg);
  23. System.out.println(sendResult);
  24. } catch (Exception e) {
  25. e.printStackTrace();
  26. Thread.sleep(1000);
  27. }
  28. }
  29. producer.shutdown();
  30. }
  31. }

Center 2

2、Consumer端重试

2.1、exception的情况,一般重复16次 10s、30s、1分钟、2分钟、3分钟等等

上面的代码中消费异常的情况返回

return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试

正常则返回:

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功

[java] view plain copy

  1. package quickstart;
  2. import java.util.List;
  3. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
  4. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  5. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  6. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  7. import com.alibaba.rocketmq.client.exception.MQClientException;
  8. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
  9. import com.alibaba.rocketmq.common.message.MessageExt;
  10. /**
  11. * Consumer,订阅消息
  12. */
  13. public class Consumer {
  14. public static void main(String[] args) throws InterruptedException, MQClientException {
  15. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“please_rename_unique_group_name_4”);
  16. consumer.setNamesrvAddr(“192.168.100.145:9876;192.168.100.146:9876”);
  17. consumer.setConsumeMessageBatchMaxSize(10);
  18. /**
  19. * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
  20. * 如果非第一次启动,那么按照上次消费的位置继续消费
  21. */
  22. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  23. consumer.subscribe(“TopicTest”, “*“);
  24. consumer.registerMessageListener(new MessageListenerConcurrently() {
  25. public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
  26. try {
  27. // System.out.println(“msgs的长度” + msgs.size());
  28. System.out.println(Thread.currentThread().getName() + “ Receive New Messages: “ + msgs);
  29. for (MessageExt msg : msgs) {
  30. String msgbody = new String(msg.getBody(), “utf-8”);
  31. if (msgbody.equals(“Hello RocketMQ 4”)) {
  32. System.out.println(“======错误=======”);
  33. int a = 1 / 0;
  34. }
  35. }
  36. } catch (Exception e) {
  37. e.printStackTrace();
  38. if(msgs.get(0).getReconsumeTimes()==3){
  39. //记录日志
  40. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
  41. }else{
  42. return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
  43. }
  44. }
  45. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
  46. }
  47. });
  48. consumer.start();
  49. System.out.println(“Consumer Started.”);
  50. }
  51. }

打印结果:

Center 3

Center 4

假如超过了多少次之后我们可以让他不再重试记录 日志。

if(msgs.get(0).getReconsumeTimes()==3){
//记录日志
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}

2.2超时的情况,这种情况MQ会无限制的发送给消费端。

就是由于网络的情况,MQ发送数据之后,Consumer端并没有收到导致超时。也就是消费端没有给我返回return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;这样的就认为没有到达Consumer端。

这里模拟Producer只发送一条数据。consumer端暂停1分钟并且不发送接收状态给MQ

[java] view plain copy

  1. package model;
  2. import java.util.List;
  3. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
  4. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  5. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  6. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  7. import com.alibaba.rocketmq.client.exception.MQClientException;
  8. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
  9. import com.alibaba.rocketmq.common.message.MessageExt;
  10. /**
  11. * Consumer,订阅消息
  12. */
  13. public class Consumer {
  14. public static void main(String[] args) throws InterruptedException, MQClientException {
  15. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“message_consumer”);
  16. consumer.setNamesrvAddr(“192.168.100.145:9876;192.168.100.146:9876”);
  17. consumer.setConsumeMessageBatchMaxSize(10);
  18. /**
  19. * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
  20. * 如果非第一次启动,那么按照上次消费的位置继续消费
  21. */
  22. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  23. consumer.subscribe(“TopicTest”, “*“);
  24. consumer.registerMessageListener(new MessageListenerConcurrently() {
  25. public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
  26. try {
  27. // 表示业务处理时间
  28. System.out.println(“=========开始暂停===============”);
  29. Thread.sleep(60000);
  30. for (MessageExt msg : msgs) {
  31. System.out.println(“ Receive New Messages: “ + msg);
  32. }
  33. } catch (Exception e) {
  34. e.printStackTrace();
  35. return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
  36. }
  37. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
  38. }
  39. });
  40. consumer.start();
  41. System.out.println(“Consumer Started.”);
  42. }
  43. }

Center 5

Center 6

三、消费模式

广播消费:rocketMQ默认是集群消费,我们可以通过在Consumer来支持广播消费

consumer.setMessageModel(MessageModel.BROADCASTING);// 广播消费

[java] view plain copy

  1. package model;
  2. import java.util.List;
  3. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
  4. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  5. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  6. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  7. import com.alibaba.rocketmq.client.exception.MQClientException;
  8. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
  9. import com.alibaba.rocketmq.common.message.MessageExt;
  10. import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
  11. /**
  12. * Consumer,订阅消息
  13. */
  14. public class Consumer2 {
  15. public static void main(String[] args) throws InterruptedException, MQClientException {
  16. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“message_consumer”);
  17. consumer.setNamesrvAddr(“192.168.100.145:9876;192.168.100.146:9876”);
  18. consumer.setConsumeMessageBatchMaxSize(10);
  19. consumer.setMessageModel(MessageModel.BROADCASTING);// 广播消费
  20. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  21. consumer.subscribe(“TopicTest”, “*“);
  22. consumer.registerMessageListener(new MessageListenerConcurrently() {
  23. public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
  24. try {
  25. for (MessageExt msg : msgs) {
  26. System.out.println(“ Receive New Messages: “ + msg);
  27. }
  28. } catch (Exception e) {
  29. e.printStackTrace();
  30. return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
  31. }
  32. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
  33. }
  34. });
  35. consumer.start();
  36. System.out.println(“Consumer Started.”);
  37. }
  38. }

如果我们有2台节点,Producerw往MQ上写入20条数据 其中Consumer1中拉取了12条 。Consumer2中拉取了8 条,这种情况下,加入Consumer1宕机,那么我们消费数据的时候,只能消费到Consumer2中的8条,Consumer1中的12条已经持久化到中。需要Consumer1回复之后这12条数据才能继续被消费。其实这种先启动producer往MQ上写数据,然后再启动Consumer的情况本来就是违规操作,正确的情况应该是先启动Consumer后再启动producer。

Center 7

异步复制和同步双写主要是主和从的关系。消息需要实时消费的,就需要采用主从模式部署

异步复制:比如这里有一主一从,我们发送一条消息到主节点之后,这样消息就算从producer端发送成功了,然后通过异步复制的方法将数据复制到从节点

同步双写:比如这里有一主一从,我们发送一条消息到主节点之后,这样消息就并不算从producer端发送成功了,需要通过同步双写的方法将数据同步到从节点后, 才算数据发送成功。

四、刷盘方式

同步刷盘:在消息到达MQ后,RocketMQ需要将数据持久化,同步刷盘是指数据到达内存之后,必须刷到commitlog日志之后才算成功,然后返回producer数据已经发送成功。

Center 8

异步刷盘:,同步刷盘是指数据到达内存之后,返回producer说数据已经发送成功。,然后再写入commitlog日志。

Center 9

commitlog:

commitlog就是来存储所有的元信息,包含消息体,类似于Mysql、Oracle的redolog,所以主要有CommitLog在,Consume Queue即使数据丢失,仍然可以恢复出来。

consumequeue:记录数据的位置,以便Consume快速通过consumequeue找到commitlog中的数据

发表评论

表情:
评论列表 (有 0 条评论,1292人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RocketMQ消息消费(三)

    顺序消息 RocketMQ支持局部消息顺序消费,可以确保同一个消息消费队列中的消息被顺序消费,如果需要做到全局顺序消费则可以将主题配置成一个队列,例如数据库BinLog等

    相关 RocketMQ消息消费(一)

    消息消费以组的模式开展,一个消费组内可以包含多个消费者,每一个消费组可订阅多个主题,消费组之间有集群模式与广播模式两种消费模式。集群模式,主题下的同一条消息只允许被其中一个消费