RocketMQ源码解析之消息消费者(启动流程)
原创不易,转载请注明出处
文章目录
- 前言
- 1.写一个消息消费者
- 2.源码分析
- 2.1 创建DefaultMQPushConsumer 的流程
- 2.2 启动流程分析
- 总结
前言
RocketMQ支持pull与push两种模式,从字面上是拉模式与推模式,可能会误认为拉模式是消息消费者主动管broker 要消息,然后推模式是broker主动推消息给消息消费者,其实在RocketMQ中不是这个样子的,在RocketMQ中push模式本质上还是消息消费者管broker 要消息,本文主要先写一个push模式的消息消费者来感受一下,介绍各个步骤都是干什么的,接着就是对push消费者的启动流程进行源码剖析
1.写一个消息消费者
这里直接拿RocketMQ example子项目的一个简单消息消费者例子过来
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 订阅的主题
consumer.subscribe("TestPushMsgOne", "*");
/* * Register callback to execute on arrival of messages fetched from brokers. * 消息到达的时候执行 */
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 执行状态的返回
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
首先是创建一个push模式的consumer对象,需要传入消费者组信息,接下来就是设置namesrv地址,设置从哪里开始消费,下面是这个参数的所有选项
当建立一个新的消费者组时,需要决定是否需要消费已经存在于 Broker 中的历史消息
CONSUME_FROM_LAST_OFFSET 将会忽略历史消息,并消费之后生成的任何消息。
CONSUME_FROM_FIRST_OFFSET 将会消费每个存在于 Broker 中的信息。
你也可以使用 CONSUME_FROM_TIMESTAMP 来消费在指定时间戳后产生的消息
接下来就是订阅的主题了,第二个参数关于过滤tag的表达式
最后就是具体处理消息了,就是写业务逻辑的地方,也就是消息消费者从broker把消息拉过来,然后具体处理还是要看你具体业务。
MessageListenerConcurrently 这个有两个实现类,一个是MessageListenerConcurrently,也就是咱们上个例子上面那个,从名字就可以看出来是个并发消费的,消息到了的时候,不管消息消费顺序,然后并发消费,如果你业务场景要求顺序消息的话,就不能选择这个了,需要使用它另一个实现类MessageListenerOrderly ,这个能够保证消息顺序消费,它其实就是一个线程来一个消息一个消息消费的。
2.源码分析
2.1 创建DefaultMQPushConsumer 的流程
我们下来看下DefaultMQPushConsumer 创建流程
这里多了一个这么个东西,这个其实是消费者rebalance的时候一个策略,可以从类名上看出来是平均的,这里我们先不用管,到后面消费流程就成知道了。
这里创建了defaultMQPushConsumerImpl 对象,下面我们就看一下这个DefaultMQPushConsumerImpl类的创建过程了。
可以看到,没了,从类名上,也能看出来它是默认push消费的实现类。接下来我们看下这个启动流程了
2.2 启动流程分析
DefaultMQPushConsumer.start();
这里直接是调用了defaultMQPushConsumerImpl 的start方法,下面的traceDispatcher 不管管他,它主要是做trace用的。我们来看下DefaultMQPushConsumerImpl类的start 方法都干啥了啥。这个方法比较长,我们一部分一部分看看
我们刚开始的时候,这个启动状态就是CREATE_JUST ,这是这个对象创建的时候默认的 private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
,所以就走CREATE_JUST 这支
先是检查了一堆配置,我这里列一下
- 消费者组名字不是null ,消费者组不能是默认的消费组名字
- 消息模式不是null,这个消息模式有两个,一个是集群,一个是广播,广播的话都知道,谁订阅了给谁消息,集群的话一个消息只能被一个消费者组里面的消费者消费,这里模式就是集群的
- 从哪开始消费,这个就是咱们上面说的那个参数
- messageQueue的分配策略,就是rebalance的策略,这个模式是有的,就是平均的方式,这个东西可以这么理解,我一个topic有4个queue,然后我这个消费者组有2个消费者,那么一个消费者分配2个queue来消费,如果有4个消费者的话一个消费者1个queue,如果5个消费者的话,有个是闲着的。
- 订阅topic不能是空
- 消息监听器,就是自己处理消息的那个实现不能是空
- 要么是并发消费,要么是顺序消费
- 最大消费线程在1到1000之间,默认的最大消费线程是64
- 最小消费线程不能大于最大消费线程,默认最小消费线程是20
- 并发消费的跨度不能大于65535
关于拉取消息大小,多少的限制,拉取消息间隔,批量拉取一批拉多少个,批量消费规模的检查
下面就是拷贝订阅信息了 this.copySubscription(),这里主要是将defaultMQPushConsumerImpl中的订阅信息拷贝到RebalancePushImpl中去,有个比较重要的地方是,如果是集群消息模式的话这里它还添加了一个关于重试topic的订阅,一个消费者组一个重试topic
接着往下看就是设置instance name,再往下就是创建MQClientInstance 对象了,这与消息生产者创建是一样的《RocketMQ源码解析之消息生产者(启动流程)》,我们这里就不赘述了。// 设置group组
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
// 设置message model
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
// 设置获取messagequeue的策略
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
// 设置 MQClientInstance
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
这几行就是往rebalanceImpl中设置一些配置,需要主要的是rebalanceImpl这个是在消息消费者中很重要的一个组件
// 创建 pullApi wrapper
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
// 注册过滤消息的钩子
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
关于这个PullAPIWrapper 没啥好说的,它就是拉取消息的api,然后拉取请求找它,它再找client发起请求,它其实就是封装了一套拉取消息的api。
这个就是创建offsetStore,如果你是广播的话,就使用本地文件来存储消费offset,如果你是集群消息模式的话,就使用远程broker offset存储的,说白了就是我消费完一个消息之后,找这个组件,告诉它我这个offset 消费完了,这个组件就会隔一段时间告诉下broker消费进度。
这段就是判断你是并发消费还是顺序消费,然后创建不同消费消息服务,接着就是启动了,看下并发消费的start方法实现
可以看到就是创建了一个定时任务,然后15分钟执行一次,用来清理过期消息
再来看下顺序消费的消费消息服务 启动,也即是start方法
它也是一个定时任务,然后它的定时任务20s执行一次,比较重要的一个东西,就是默认每20s锁定一下queue,这个锁定queue是向broker 申请锁定,这里现在先不看,后面会有文章专门介绍。
接着回到这个大的start方法中
这里就是注册consumer了,然后启动MQClientInstance,这里需要看下MQClientInstance 的启动流程
这个fetchNameServer我们就不用关心了,mQClientAPIImpl启动主要就是远程客户端的启动,接着就是启动一堆定时任务,这里说下定时任务都有哪些
- 每2分钟获取一下namesrv地址
- 每30s 向namesrv获取一下订阅路由信息
- 每30s 清理一下下线的broker,发送心跳给broker
- 5s将 消费offset 向broker 报告一次或者是本地持久化一下,这个是要看你是消息集群模式还是广播模式
- 1分钟调整一下线程池
接着就是启动拉消息服务,这个服务主要是处理拉取消息请求的,这里的启动就是启动线程
再往下启动rebalance服务,这里也是启动一个线程,它主要是做queue的负载的
再往下就是启动内置消息发送者,它主要是发送一些系统消息的
好了,这里我们MQClientInstance 启动流程就介绍完成了。
接着DefaultMQPushConsumerImpl的启动
先是向namesrv更新本地订阅信息,检查client,发送心跳给broker,这里这个不光光是发送心跳这么简单,里面带了一堆东西,最后就是里面执行rebalance了,这个是最重要了,我们后面会有文章专门介绍这个。
好了,到这我们启动流程就完事了,细数一下这次启动了哪些组件
- MQClientInstance 这个就是个客户端,然后不管你是消息消费者对象,还是消息生产者对象,都是这个client的小弟,都要向他注册。比如说你一个进程可以有多个 消息消费者或者是消息生产者,但是只有一个MQClientInstance对象。
- rebalanceImpl 这是非常重要的,就是做queue分配的,具体干活的
- OffsetStore 存储消费offset ,重要的是那个远程broker offset store ,它向broker 汇报消费进度
- ConsumeMessageService 消息消费服务,管着 顺序消费与并发消费
- pullMessageService 拉取消息的服务,管着处理拉取消息请求
- RebalanceService 这个管着调度的,然后具体干活的就是rebalanceImpl 了。
- defaultMQProducer 内置的消息生产者,主要负责系统消息的发送
- mQClientAPIImpl 这个就是封装了一堆调用实现,比如发送消息,拉取消息等等,然后它会找远程客户端来具体的通信
总结
好了,到这我们push模式消息消费者启动流程就介绍完了,我们先是看了看官方消息消费者例子,介绍了一下每一步都是干啥的。接着我们基于这个例子,从源码的角度看了一下DefaultMQPushConsumer 的创建流程与启动流程,其实这个启动流程主要就是了解它启动了哪些组件,分别有什么作用,我们后期解析消费流程其实就是介绍这些组件的工作。
还没有评论,来说两句吧...