RocketMQ源码15- consumer启动流程

落日映苍穹つ 2023-10-08 23:50 121阅读 0赞

本系列RocketMQ4.8注释github地址,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈

org.apache.rocketmq.example.quickstart.Consumer:

  1. public class Consumer {
  2. public static void main(String[] args) throws InterruptedException, MQClientException {
  3. // todo
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
  5. consumer.setNamesrvAddr("127.0.0.1:9876");
  6. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  7. consumer.subscribe("TopicTest", "*");
  8. // 注册监听器,监听消息
  9. consumer.registerMessageListener(new MessageListenerConcurrently() {
  10. @Override
  11. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  12. ConsumeConcurrentlyContext context) {
  13. // 这里获得了消息
  14. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  15. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  16. }
  17. });
  18. // todo 启动
  19. consumer.start();
  20. System.out.printf("Consumer Started.%n");
  21. }
  22. }
  23. 复制代码

consumer使用起来还是挺简单的,先是创建了一个DefaultMQPushConsumer对象,然后配置了一些属性,比较关键的就是注册消息监听器(在这个监听器里会获取消息),之后就调用start()方法启动consumer.

接下来我们就来分析这块的消费过程。

1. 构造方法:DefaultMQPushConsumer

consumer的处理类为DefaultMQPushConsumer,我们先来看看DefaultMQPushConsumer的属性值和构造方法:

  1. public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
  2. private final InternalLogger log = ClientLogger.getLog();
  3. protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
  4. // 消费者所属组
  5. private String consumerGroup;
  6. // 消息消费模式,分为集群模式、广播模式,默认为集群模式
  7. private MessageModel messageModel = MessageModel.CLUSTERING;
  8. // 第一次消费时指定消费策略
  9. private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
  10. private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
  11. // 集群模式下消息队列的负载策略
  12. private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
  13. /**
  14. * Subscription relationship 订阅信息
  15. */
  16. private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
  17. /**
  18. * Message listener 消息业务监听器
  19. */
  20. private MessageListener messageListener;
  21. /**
  22. * Offset Storage 消息消费进度存储器
  23. */
  24. private OffsetStore offsetStore;
  25. /**
  26. * Minimum consumer thread number 消费者最小线程数
  27. */
  28. private int consumeThreadMin = 20;
  29. /**
  30. * Max consumer thread number 消费者最大线程数,因为消费
  31. * 者线程池使用无界队列,所以此参数不生效
  32. */
  33. private int consumeThreadMax = 20;
  34. /**
  35. * Threshold for dynamic adjustment of the number of thread pool
  36. */
  37. private long adjustThreadPoolNumsThreshold = 100000;
  38. /**
  39. * Concurrently max span offset.it has no effect on sequential consumption
  40. * 并发消息消费时处理队列最大跨度,默认2000,表示如果消息处理队列中偏移量最大的消息
  41. * 与偏移量最小的消息的跨度超过2000,则延迟50ms后再拉取消息
  42. */
  43. private int consumeConcurrentlyMaxSpan = 2000;
  44. /**
  45. * Flow control threshold on queue level, each message queue will cache at most 1000 messages by default,
  46. * Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
  47. * 默认1000,表示每1000
  48. * 次流控后打印流控日志
  49. */
  50. private int pullThresholdForQueue = 1000;
  51. private int pullThresholdSizeForTopic = -1;
  52. /**
  53. * Message pull Interval
  54. * 推模式下拉取任务的间隔时间,默
  55. * 认一次拉取任务完成后继续拉取
  56. */
  57. private long pullInterval = 0;
  58. /**
  59. * Batch consumption size 消息并发消费时一次
  60. * 消费消息的条数,通俗点说,就是每次传入
  61. * MessageListener#consumeMessage中的消息条数
  62. */
  63. private int consumeMessageBatchMaxSize = 1;
  64. /**
  65. * Batch pull size 每次消息拉取的条数,默认32条
  66. */
  67. private int pullBatchSize = 32;
  68. /**
  69. * Whether update subscription relationship when every pull
  70. * 是否每次拉取消息都更
  71. * 新订阅信息,默认为false
  72. */
  73. private boolean postSubscriptionWhenPull = false;
  74. private boolean unitMode = false;
  75. /**
  76. *
  77. * 最大消费重试次数。如果消息消费次数超过maxReconsumeTimes还未成功,则将该消息转移到一个失败
  78. * 队列,等待被删除
  79. */
  80. private int maxReconsumeTimes = -1;
  81. /**
  82. * 延迟将该队列的消
  83. * 息提交到消费者线程的等待时间,默认延迟1s
  84. */
  85. private long suspendCurrentQueueTimeMillis = 1000;
  86. /**
  87. * Maximum amount of time in minutes a message may block the consuming thread.
  88. * 消息消费超时时间,默认为15,
  89. * 单位为分钟
  90. */
  91. private long consumeTimeout = 15;
  92. ...
  93. public DefaultMQPushConsumer(final String consumerGroup) {
  94. // 指定了队列分配策略 AllocateMessageQueueAveragely
  95. this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
  96. }
  97. ...
  98. public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
  99. AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
  100. this.consumerGroup = consumerGroup;
  101. this.namespace = namespace;
  102. this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
  103. defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
  104. }
  105. }
  106. 复制代码

在构造方法中,就只是做了一些成员变量的赋值操作,比较关键的是分配消息队列的策略:allocateMessageQueueStrategy,如果指定,默认就使用AllocateMessageQueueAveragely,即从各队列平均获取消息。

2.客户端start()方法都干了什么?

首先,暴露给开发者的 DefaultMQPushConsumer是一个外观类,真正工作的是其内部的DefaultMQPushConsumerImpl,所以我们看下DefaultMQPushConsumerImpl#start()的逻辑

  1. public synchronized void start() throws MQClientException {
  2. switch (this.serviceState) {
  3. case CREATE_JUST:
  4. log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
  5. this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
  6. this.serviceState = ServiceState.START_FAILED;
  7. //TODO:检查配置,比如消费者组是否为空,消费模式是否为空,订阅信息是否为空等等
  8. this.checkConfig();
  9. // TODO 构建主题订阅信息SubscriptionData并加入RebalanceImpl的订阅消息中
  10. this.copySubscription();
  11. if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
  12. this.defaultMQPushConsumer.changeInstanceNameToPID();
  13. }
  14. // todo 初始化MQClientInstance、RebalanceImpl(消息重新负载实现类)等
  15. this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
  16. // 设置负载均衡相关属性
  17. this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
  18. this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
  19. this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
  20. this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
  21. //TODO: 创建拉取对象的核心类,后面去broker拉取消息时会看到
  22. this.pullAPIWrapper = new PullAPIWrapper(
  23. mQClientFactory,
  24. this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
  25. this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
  26. // todo 初始化消息进度。如果消息消费采用集群模式,那么消
  27. //息进度存储在Broker上,如果采用广播模式,那么消息消费进度存储
  28. //在消费端
  29. if (this.defaultMQPushConsumer.getOffsetStore() != null) {
  30. this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
  31. } else {
  32. switch (this.defaultMQPushConsumer.getMessageModel()) {
  33. //TODO: 广播模式创建LocalFileOffsetStore,保存offset到本地文件中
  34. case BROADCASTING:
  35. this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
  36. break;
  37. //TODO:集群模式创建 RemoteBrokerOffsetStore,保存offset到broker文件中
  38. case CLUSTERING:
  39. this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
  40. break;
  41. default:
  42. break;
  43. }
  44. this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
  45. }
  46. // 加载消费信息的偏移量
  47. this.offsetStore.load();
  48. // todo 如果是顺序消费,创建消费端消费线程服务。
  49. //ConsumeMessageService主要负责消息消费,在内部维护一个线程池
  50. if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
  51. this.consumeOrderly = true;
  52. //TODO:创建顺序消费消息服务类
  53. this.consumeMessageService =
  54. new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
  55. } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
  56. this.consumeOrderly = false;
  57. //TODO:创建其他消费消息服务类,其内部维护了一个线程池,后面会用到
  58. this.consumeMessageService =
  59. new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
  60. }
  61. this.consumeMessageService.start();
  62. // todo 向MQClientInstance注册消费者并启动MQClientInstance,JVM中的所有消费者、生产者持有同一个MQClientInstance,MQClientInstance只会启动一次
  63. boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
  64. if (!registerOK) {
  65. this.serviceState = ServiceState.CREATE_JUST;
  66. this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
  67. throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
  68. + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
  69. null);
  70. }
  71. // todo 启动客户端实例,这个方法非常重要,内部做了很多事情
  72. mQClientFactory.start();
  73. log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
  74. this.serviceState = ServiceState.RUNNING;
  75. break;
  76. case RUNNING:
  77. case START_FAILED:
  78. case SHUTDOWN_ALREADY:
  79. throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
  80. + this.serviceState
  81. + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
  82. null);
  83. default:
  84. break;
  85. }
  86. // 更新 topic 的信息,从nameServer获取数据
  87. this.updateTopicSubscribeInfoWhenSubscriptionChanged();
  88. this.mQClientFactory.checkClientInBroker();
  89. // todo 发送心跳,发送到所有的broker
  90. this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
  91. // todo 负载均衡 立即重平衡
  92. this.mQClientFactory.rebalanceImmediately();
  93. }
  94. 复制代码

2.1 检查配置

  1. 校验GroupName是否为空;校验GroupName是否等于DEFAULT_CONSUMER(等于的话直接抛出异常)
  2. 校验消费模式:集群/广播
  3. 校验ConsumeFromWhere
  4. 校验开始消费的指定时间
  5. 校验AllocateMessageQueueStrategy
  6. 校验订阅关系
  7. 校验是否注册消息监听
  8. 校验消费线程数,consumeThreadMinconsumeThreadMax 默认值都是20,取值区间都是 [1, 1000]
  9. 校验本地队列缓存消息的最大数,默认是1000,取值范围是[1, 1024], 主要是做流控用的
  10. 校验拉取消息的时间间隔,pullInterval参数,默认是不存在间隔,取值范围是[0, 65535]。当消费速度比生产速度快,可以设置这个参数,避免花费大概率从broker拉取空消息
  11. 校验单次拉取的最大消息数,consumeMessageBatchMaxSize 参数,默认是1,取值范围是[1, 1024]
  12. 校验单次消费的最大消息数, pullBatchSize 参数,默认是32,取值范围是[1, 1024]。

2.2 拷贝订阅关系

将订阅关系设置到重平衡服务类RebalanceImpl中,订阅重试主题消息:

  1. //TODO:key=topic, value=订阅数据(就是tag信息)
  2. protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
  3. new ConcurrentHashMap<String, SubscriptionData>();
  4. 复制代码

2.3 创建客户端实例 MQClientInstance

  1. // todo 初始化MQClientInstance、RebalanceImpl(消息重新负载实现类)等
  2. this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
  3. 复制代码

这个对象在前面分析Producer启动时也看到了,因为Prodcuer/Consumer 都是客户端,所以都会根据这个实例来创建对象。 我们看下这个类的结构:

  1. public class MQClientInstance {
  2. //TODO:生产者表,producer启动时创建一个新的MQClientInstance实例对象,将生产者信息注册到这里。生产者实例对象中消费者信息是空
  3. private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
  4. //TODO:消费者表,consumer启动时创建一个新的MQClientInstance实例对象,将消费者信息注册到这里。消费者实例对象中生产者信息是空
  5. private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
  6. //TODO:topic路由信息,producer和consumer都会使用
  7. private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
  8. //TODO:broker信息,producer和consumer都会用到
  9. private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
  10. new ConcurrentHashMap<String, HashMap<Long, String>>();
  11. //TODO......
  12. /**
  13. * TODO:构造器
  14. */
  15. public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
  16. //TODO:客户端处理器,比如在集群消费模式下,有新的消费者加入,则通知消费者客户端重平衡,是给消费者用的(分析生产者时,我们直接忽略了它)
  17. this.clientRemotingProcessor = new ClientRemotingProcessor(this);
  18. //TODO:它的内部会创建netty客户端对象(NettyRemotingClient),用于和broker通信
  19. this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
  20. //TODO.......
  21. //TODO:拉取消息的服务,和消费者相关
  22. this.pullMessageService = new PullMessageService(this);
  23. //TODO:重平衡服务,和消费者相关
  24. this.rebalanceService = new RebalanceService(this);
  25. //TODO:other......
  26. }
  27. }
  28. 复制代码

所谓的”客户端”,实际上是在MQClientAPIImpl对象的内部的NettyRemotingClient

  1. public class MQClientAPIImpl {
  2. //TODO:.....
  3. private final RemotingClient remotingClient;
  4. private final TopAddressing topAddressing;
  5. private final ClientRemotingProcessor clientRemotingProcessor;
  6. private String nameSrvAddr = null;
  7. private ClientConfig clientConfig;
  8. public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
  9. final ClientRemotingProcessor clientRemotingProcessor,
  10. RPCHook rpcHook, final ClientConfig clientConfig) {
  11. this.clientConfig = clientConfig;
  12. topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
  13. //TODO:所谓的客户端实际上就是这个,netty客户端
  14. this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
  15. this.clientRemotingProcessor = clientRemotingProcessor;
  16. this.remotingClient.registerRPCHook(rpcHook);
  17. //TODO:注册处理器
  18. this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);
  19. this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);
  20. //TODO:....注册其他处理器......
  21. }
  22. 复制代码

这里我们关注下 NettyRemotingClient,其实前面分析Broker,Producer时,我们经常看到这个,因为RocketMQ是使用netty作为通信的。

NettyRemotingClient:netty的客户端对象
NettyRemotingServer:netty的服务端对象

2.4 创建拉取消息的核心类 PullAPIWrapper

  1. this.pullAPIWrapper = new PullAPIWrapper(
  2. mQClientFactory,
  3. this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
  4. 复制代码

2.5 创建 offset存储服务 OffsetStore

  • 如果是广播模式(BROADCASTING), 则创建 LocalFileOffsetStore 对象,将消费者的offset存储到本地的,默认文件路径为当前用户主目录下的 .rocketmq_offsets/clientId/{clientId}/clientId/{group}/Offsets.json。其中clientId为当前消费者id,默认为ip@default,{clientId}为当前消费者id,默认为ip@default, clientId为当前消费者id,默认为ip@default,{group}为消费者组名称
  • 如果是集群模式(CLUSTERING), 则创建 RemoteBrokerOffsetStore对象,将消费者的offset存储到broker中,文件路径为当前用户主目录下的store/config/consumerOffset.json

2.6 创建消费消息的服务类 ConsumeMessageService

  • 如果是顺序消费,则创建 ConsumeMessageOrderlyService 对象
  • 如果是其他消费,则创建 ConsumeMessageConcurrentlyService 对象,同时内部也会创建一个ThreadPoolExecutor线程池,这个线程池非常的重要,拉取到消息后会将消息提交到这个线程池中给消费者消费

2.7 将consumer注册到本地

  1. boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
  2. 复制代码

将消费者组信息添加到本地客户端实例MQClientInstanceconsumerTable Map中,key=groupName; value=DefaultMQPushConsumerImpl,就是消费者对象

2.8 启动客户端实例(重要)

  1. mQClientFactory.start();
  2. 复制代码

其内部做了很多事情,主要如下: org.apache.rocketmq.client.impl.factory.MQClientInstance#start:

  1. public void start() throws MQClientException {
  2. synchronized (this) {
  3. switch (this.serviceState) {
  4. case CREATE_JUST:
  5. // 先设置 失败
  6. this.serviceState = ServiceState.START_FAILED;
  7. // If not specified,looking address from name server
  8. // 判断namesrv 是否为null
  9. if (null == this.clientConfig.getNamesrvAddr()) {
  10. this.mQClientAPIImpl.fetchNameServerAddr();
  11. }
  12. // todo Start request-response channel
  13. // 启动远程服务,这个方法只是装配了netty客户端相关配置
  14. // todo 注意:1. 这里是netty客户端,2. 这里并没有创建连接
  15. this.mQClientAPIImpl.start();
  16. // Start various schedule tasks
  17. // todo 开启任务调度
  18. this.startScheduledTask();
  19. // Start pull service
  20. // todo 开启 拉取服务 仅对consumer启作用
  21. this.pullMessageService.start();
  22. // Start rebalance service
  23. // todo 开启平衡服务 仅对consumer启作用
  24. this.rebalanceService.start();
  25. // Start push service
  26. // todo 启用内部的 producer
  27. this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
  28. log.info("the client factory [{}] start OK", this.clientId);
  29. // 设置状态为running
  30. this.serviceState = ServiceState.RUNNING;
  31. break;
  32. case START_FAILED:
  33. throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
  34. default:
  35. break;
  36. }
  37. }
  38. }
  39. 复制代码

producer的启动过程中,也会调用这个方法,前面我们已经分析过了一波了,这次我们在consumer的角度再来分析这个方法。

该方法所做的工作如下:

  1. 获取 nameServer 的地址
  2. 启动客户端的远程服务,这个方法会配置netty客户端
  3. 启动定时任务
  4. 启动拉取消息服务
  5. 启动负载均衡服务

上面的12producer的流程并无区别,就不再分析了,我们来看看接下来的任务

2.8.1 启动远程netty客户端

  1. // Start request-response channel
  2. this.mQClientAPIImpl.start();
  3. 复制代码
  4. 复制代码

就是启动其内部的 NettyRemotingClient,用于和broker通信。

2.8.2 启动各种定时任务

  1. // Start various schedule tasks
  2. this.startScheduledTask();
  3. 复制代码

那么有哪些定时任务呢?继续往里看,罗列几个特别关注的

2.8.2.1 发送心跳到Broker

  1. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  2. @Override
  3. public void run() {
  4. try {
  5. // todo 清除下线broker
  6. MQClientInstance.this.cleanOfflineBroker();
  7. // todo 发送心跳到所有broker上面
  8. MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
  9. } catch (Exception e) {
  10. log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
  11. }
  12. }
  13. }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
  14. 复制代码

延迟1s执行,每隔30s发送一次心跳包

2.8.2.2 持久化消费者的 offset

  1. // // 持久化消费者的消费偏移量,每5秒一次
  2. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  3. @Override
  4. public void run() {
  5. try {
  6. // todo 持久化consumer offset 可以放在本地文件,也可以推送到 broker
  7. MQClientInstance.this.persistAllConsumerOffset();
  8. } catch (Exception e) {
  9. log.error("ScheduledTask persistAllConsumerOffset exception", e);
  10. }
  11. }
  12. }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
  13. 复制代码

延迟10s执行,每隔5s持久化一次offset
这里的持久化是将本地Map中offset发送到broker中,然后broker中的定时任务写到文件中,完成真正的持久化,后面会看到。

2.8.3 启动拉取消息的服务PullMessageService(重要)

  1. // Start pull service
  2. this.pullMessageService.start();
  3. 复制代码

他是一个异步线程,其核心逻辑是

  1. public void run() {
  2. log.info(this.getServiceName() + " service started");
  3. // Stopped 声明为volatile,每执行一次业务逻辑,检测一下其运行状态,可以
  4. //通过其他线程将Stopped设置为true,从而停止该线程
  5. while (!this.isStopped()) {
  6. try {
  7. // 从pullRequestQueue中获取一个PullRequest消息拉取任务,
  8. //如果pullRequestQueue为空,则线程将阻塞,直到有拉取任务被放入
  9. PullRequest pullRequest = this.pullRequestQueue.take();
  10. // todo 调用pullMessage方法进行消息拉取
  11. this.pullMessage(pullRequest);
  12. } catch (InterruptedException ignored) {
  13. } catch (Exception e) {
  14. log.error("Pull Message Service Run Method exception", e);
  15. }
  16. }
  17. log.info(this.getServiceName() + " service end");
  18. }
  19. 复制代码

他会监听阻塞队列pullRequestQueue,当队列是空的时候,他会一直阻塞,如果不为空,则获取PullRequest对象,去拉取消息,这个逻辑后面再说。

刚开始肯定是阻塞的,我们要看什么时候往队列中放入值,以及放入值之后做什么

2.8.4 启动重平衡服务 RebalanceService

  1. // Start rebalance service
  2. this.rebalanceService.start();
  3. 复制代码
  4. 复制代码

它也是一个异步线程,其核心逻辑是

  1. public void run() {
  2. log.info(this.getServiceName() + " service started");
  3. while (!this.isStopped()) {
  4. // todo 等待20s执行一次 内部使用了juc的CountDownLatch, 使得这里启动后仍然是阻塞的
  5. this.waitForRunning(waitInterval);
  6. // todo
  7. this.mqClientFactory.doRebalance();
  8. }
  9. log.info(this.getServiceName() + " service end");
  10. }
  11. 复制代码

他是重平衡的核心逻辑,但是在启动时,由于使用了JUC的 CountDownLatch锁,使其不会立即重平衡,而是阻塞,什么时候出发重平衡呢?我们还是继续往后看

2.9 发送心跳到broker

  1. this.updateTopicSubscribeInfoWhenSubscriptionChanged();
  2. this.mQClientFactory.checkClientInBroker();
  3. //TODO: 发送心跳到broker
  4. this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
  5. this.mQClientFactory.rebalanceImmediately();
  6. 复制代码

是不是有些疑惑?前面的定时任务中不是已经启动了心跳服务吗,为什么这里还要启动呢?我也不清楚,猜测是因为担心网络问题导致没有及时发送给broker吧

2.10 立即启动重平衡服务RebalanceService

  1. //TODO:立即启动重平衡服务
  2. this.mQClientFactory.rebalanceImmediately();
  3. 复制代码

前面在 2.8.4的时候,启动了重平衡服务,但是因为 CountDownLatch 导致阻塞了,这里就是唤醒,可以执行重平衡的逻辑。

这里先不关注它的内部逻辑,后面在分析。

其实,到这里Consumer客户端启动流程就结束了,至于如何拉取消息,也是这些服务类的相互配合工作的,后面我们在分析中在仔细分析它是如何一步一步拉取消息并消费的。

3. 总结

消费者启动过程相比生产者启动过程要复杂一些,会启动很多对象(实际上生产者也启动了,因为他们都是基于客户端实例MQClientInstance去创建的对象,只不过生产者并不会使用某些服务类)

简单总结下:

  1. 创建客户端实例MQClientInstance(其内部要创建netty客户端对象NettyRemotingClient
  2. 启动客户端实例,其内部要启动netty客户端,消息拉取服务,以及重平衡服务,还有各种定时任务
  3. 发送心跳到broker

好了,Consumer启动过程就分析到这里,接下来就分析下消费过程

发表评论

表情:
评论列表 (有 0 条评论,121人围观)

还没有评论,来说两句吧...

相关阅读