RocketMQ源码解析五(Consumer启动流程)

骑猪看日落 2022-09-12 07:46 76阅读 0赞

RocketMQ版本4.6.0,记录自己看源码的过程

Consumer示例:

  1. public class Consumer {
  2. public static void main(String[] args) throws InterruptedException, MQClientException {
  3. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup3");
  4. consumer.setNamesrvAddr("localhost:9876");
  5. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  6. consumer.subscribe("TestTopic", "*");
  7. consumer.registerMessageListener(new MessageListenerConcurrently() {
  8. @Override
  9. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  10. ConsumeConcurrentlyContext context) {
  11. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  12. if (msgs.get(0).getReconsumeTimes() >= 5) {
  13. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  14. }
  15. return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  16. }
  17. });
  18. consumer.start();
  19. System.out.printf("Consumer Started.%n");
  20. }
  21. }

先创建消费者DefaultMQPushConsumer

  1. public DefaultMQPushConsumer(final String consumerGroup) {
  2. // 这里可以看出负载均衡策略使用平均算法
  3. this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
  4. }
  5. public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
  6. AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
  7. this.consumerGroup = consumerGroup;
  8. this.namespace = namespace;
  9. this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
  10. defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
  11. }

创建过程没什么操作,指定了负载均衡策略以及也跟生产者一样实例化了一个内部消费者实例DefaultMQPushConsumerImpl,将消费者的功能委托给这个类。
接着设置Namesrv地址,从哪开始消费,订阅主题(同时会将订阅信息复制到重平衡组件)以及注册一个消息监听器,用来消费消息。最后启动。
DefaultMQPushConsumer

  1. @Override
  2. public void start() throws MQClientException {
  3. setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
  4. this.defaultMQPushConsumerImpl.start();
  5. if (null != traceDispatcher) {
  6. try {
  7. traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
  8. } catch (MQClientException e) {
  9. log.warn("trace dispatcher start failed ", e);
  10. }
  11. }
  12. }

可以看到也是通过DefaultMQPushConsumerImpl来启动

  1. public synchronized void start() throws MQClientException {
  2. switch (this.serviceState) {
  3. case CREATE_JUST:
  4. log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
  5. this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
  6. this.serviceState = ServiceState.START_FAILED;
  7. // 主要是对消费者的配置进行检查
  8. this.checkConfig();
  9. // 将订阅数据复制到重平衡组件中,并且如果是集群模式,也会订阅retryTopic,
  10. // 然后等consumer向broker发送心跳时在broker中创建该重试主题
  11. this.copySubscription();
  12. if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
  13. this.defaultMQPushConsumer.changeInstanceNameToPID();
  14. }
  15. // 从缓存获取或创建一个新的MQClientInstance实例
  16. this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
  17. // 为重平衡组件设置必要的属性
  18. this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
  19. this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
  20. this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
  21. this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
  22. // 从这里也可以看出push底层也是pull
  23. this.pullAPIWrapper = new PullAPIWrapper(
  24. mQClientFactory,
  25. this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
  26. this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
  27. if (this.defaultMQPushConsumer.getOffsetStore() != null) {
  28. this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
  29. } else {
  30. switch (this.defaultMQPushConsumer.getMessageModel()) {
  31. // 广播模式,消费组里的每个消费者都会收到全部消息,offset消费端自己管理
  32. case BROADCASTING:
  33. this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
  34. break;
  35. // 集群模式,消息平均分配给消费组中的每个消费者,offset由broker管理
  36. case CLUSTERING:
  37. this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
  38. break;
  39. default:
  40. break;
  41. }
  42. this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
  43. }
  44. // 集群模式是个空实现,只有广播模式有用,实现从本地文件加载进度
  45. this.offsetStore.load();
  46. // 根据并发消费还是顺序消费创建不同的 消费消息服务
  47. if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
  48. this.consumeOrderly = true;
  49. this.consumeMessageService =
  50. new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
  51. } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
  52. this.consumeOrderly = false;
  53. this.consumeMessageService =
  54. new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
  55. }
  56. //
  57. this.consumeMessageService.start();
  58. // 注册消费者缓存
  59. boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
  60. if (!registerOK) {
  61. this.serviceState = ServiceState.CREATE_JUST;
  62. this.consumeMessageService.shutdown();
  63. throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
  64. + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
  65. null);
  66. }
  67. // 启动一些功能组件和后台任务,比如netty客户端启动,拉取消息服务,重平衡服务
  68. mQClientFactory.start();
  69. log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
  70. this.serviceState = ServiceState.RUNNING;
  71. break;
  72. case RUNNING:
  73. case START_FAILED:
  74. case SHUTDOWN_ALREADY:
  75. throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
  76. + this.serviceState
  77. + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
  78. null);
  79. default:
  80. break;
  81. }
  82. this.updateTopicSubscribeInfoWhenSubscriptionChanged();
  83. this.mQClientFactory.checkClientInBroker();
  84. // 发送心跳到每个broker
  85. this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
  86. // 启动消费者需立即发起一次重平衡,通过CountdownLatch -1将重平衡线程立刻唤醒,如果线程已经在执行状态,则这里不会再重平衡
  87. this.mQClientFactory.rebalanceImmediately();
  88. }

Step1:检查消费者配置。
Step2:将订阅数据复制到重平衡组件中,并且如果是集群模式,也会订阅retryTopic,然后等consumer向broker发送心跳时在broker中创建该重试主题。
Step3:从缓存获取或创建一个新的MQClientInstance实例,这里面创建了包括拉取消息服务和重平衡服务等。
MQClientManager

  1. /** * 从缓存获取或创建一个新的MQClientInstance实例 */
  2. public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
  3. String clientId = clientConfig.buildMQClientId();
  4. MQClientInstance instance = this.factoryTable.get(clientId);
  5. // 根据clientId从缓存中获取,没获取到则创建一个放到缓存中
  6. if (null == instance) {
  7. instance =
  8. new MQClientInstance(clientConfig.cloneClientConfig(),
  9. this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
  10. MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
  11. if (prev != null) {
  12. instance = prev;
  13. log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
  14. } else {
  15. log.info("Created new MQClientInstance for clientId:[{}]", clientId);
  16. }
  17. }
  18. return instance;
  19. }

整个JVM实例中只存在一个MQClientInstance实例,并维护一个MQClientInstance缓存factoryTable,一个clientId只会创建一个MQClientInstance。clientId = 客户端IP + “@” + instanceName,同一个JVM内,生产者和消费者都是用一个MQClientInstance实例。
Step4:为重平衡组件设置必要的属性。
Step5:创建Consumer端的消费进度管理组件。
Step6:从消费进度组件加载消费进度,只有广播模式有用,实现从本地文件加载进度。
Step7:根据并发消费还是顺序消费创建不同的消费消息服务。
Step8:向MQClientInstance注册消费者实例,如果相同消费组已经存在一个实例了,则会报错。
Step9:启动MQClientInstance实例,启动一些功能组件和后台任务,比如netty客户端启动,拉取消息服务,重平衡服务。

  1. /** * 一个实例只会启动一次 */
  2. public void start() throws MQClientException {
  3. synchronized (this) {
  4. switch (this.serviceState) {
  5. case CREATE_JUST:
  6. this.serviceState = ServiceState.START_FAILED;
  7. // If not specified,looking address from name server
  8. if (null == this.clientConfig.getNamesrvAddr()) {
  9. this.mQClientAPIImpl.fetchNameServerAddr();
  10. }
  11. // Start request-response channel 启动netty客户端
  12. this.mQClientAPIImpl.start();
  13. // 启动一些定时任务,比如定时从NameServer拉取路由信息,定时向broker发送心跳等
  14. this.startScheduledTask();
  15. // 启动拉取消息服务
  16. this.pullMessageService.start();
  17. // 启动重平衡服务
  18. this.rebalanceService.start();
  19. // 用来消费者往broker发送重试消息??
  20. this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
  21. log.info("the client factory [{}] start OK", this.clientId);
  22. this.serviceState = ServiceState.RUNNING;
  23. break;
  24. case START_FAILED:
  25. throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
  26. default:
  27. break;
  28. }
  29. }
  30. }

到这消费者就算启动完成了,接着就交给几个定时任务了。
在这里插入图片描述

参考资料
《儒猿技术窝——从 0 开始带你成为消息中间件实战高手》

发表评论

表情:
评论列表 (有 0 条评论,76人围观)

还没有评论,来说两句吧...

相关阅读