RocketMQ源码解析之消息消费者(发送心跳给broker)
原创不易,转载请注明出处
文章目录
- 前言
- 关于路由信息的拉取
- 发送心跳到broker并锁定
- 总结
前言
我们都知道RocketMQ的架构设计,broker 启动的时候,会向namesrv 注册一下,然后会将本地的一些信息告诉namesrv ,其中就有这个broker里面存储了哪些topic,然后哪些queue在这个broker上面,同时broker要隔一段时间向namesrv发送心跳,然后消息消费者,消息生产者 在发送消息,拉取消息的时候会向namesrv 拉取一下topic的路由信息,这个样子才能知道向哪个broker拉取消息,发送消息,而且有个定时任务,每个30s向namesrv 拉取一下,更新本地信息。我们在《RocketMQ源码解析之消息消费者(启动流程)》介绍RocketMQ消费者启动的时候,在最后有几行代码比较重要
// 更新订阅的topic信息
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
// 检查
this.mQClientFactory.checkClientInBroker();
// 发送心跳到所有broker
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// rebalance, 立即执行rebalance
this.mQClientFactory.rebalanceImmediately();
第一行是去namesrv上更新一下订阅topic的路由信息。
第二行这个就是判断订阅主题的过滤类型,如果不是tag的话,就要发送到broker上面校验
第三行从字面意思上就可以看出来 发送心跳到所有的broker,并锁定
第四行就是里面进行rebalance,这个我们后面说,因为我们后面拉取消息就是通过它展开的
本文主要是介绍下向namesrv 拉取topic路由信息,然后更新本地持有的信息,接着就是看下这个向所有broker 发送心跳并锁定传了写啥东西给了broker。
1. 关于路由信息的拉取
消息消费者向namesrv获取topic路由信息这个代码逻辑其实跟消息生产者的一样,它们都是在这个client子项目下的,所以这里不在赘述封装请求发送请求的这部分内容,可以直接看《RocketMQ源码解析之消息生产者(获取topic路由信息)》文章的分析,我们这里看下它对结果的处理,消息消费者是将namesrv返回的结果封装成了sub类型的实体,而消息生产者生成的是pub类型的实体,我们看看namesrv跟我们返回了哪些东西,注意它一个topic一次请求的,不是一下全拉回来的。
namesrv 就返回这么个实体, 有这个topic的queue信息集合,这个queue信息里面就是包含了这个queue 在哪个broker name上面,有这个topic的 broker 信息集合,这个broker 信息里面就是 哪个broker name 下面有哪些broker(broker id 对应着地址),属于哪个cluster
其实贴出来实体的定义就知道,都有啥信息了。
接下来我们就看下消息消费者怎样将namesrv返回的结果解析成自己要的结果,在看之前它还有几行代码很重要,就是更新broker地址表
这里其实就是遍历一下路由信息里面的brokerData 集合,然后更新下本地的broker地址信息表
在MQClientInstance中updateTopicRouteInfoFromNameServer方法中有下面一段
将TopicRouteData 转成了MessageQueue集合,然后更新每个consumerImpl中的订阅信息,下面看下是怎样转的,其实很简单
就是遍历了一遍QueueData集合,然后判断是不是可读的,然后根据根据queue的read数量创建对应数量的MessageQueue就可以了。
接着再看一下更新方法
其实就是更新了一下rebalanceImpl 中的主题订阅信息表
2. 发送心跳到broker并锁定
接下来我们看下给broker 发送心跳都带些什么东西过去
这其实这个方法就是干了四件事情,先是组装心跳信息,接着是遍历broker 地址表,找到master节点的broker 发送心跳,返回一个version信息,最后是更新broker 版本表中的对应的版本。
接下来我们先看下都准备了什么心跳数据
主要是看下这个consumer 这块,可以看到它把消费者组,消费类型(push还是 pull 方式),消息类型(集群还是广播),从哪开始消费,订阅的主题信息。
接着就是遍历broker 地址信息表,然后找出master broker (这个其实就是broker id 是0的就是master),然后交给api组件进行发送,这里发送我们不介绍深了,稍微看下就行
可以看到直接创建了code是HEART_BEAT的RemotingCommand(broker 收到请求后,就是根据code进行不同的处理流程),然后将心跳信息设置到body中,调用client组件进行同步调用。如果成功获取version返回。
这里broker 收到心跳消息之后,会根据你这个消费者组获取对应维护的消费者组信息,最最最重要的是跟broker的ConsumerManager组件注册,如果你是新加入的consumer获取订阅信息变了,就会通知这个消费者组里面的其他消费者说消费者有变化,然后被通知到的消费者,就会里面进行新的rebalance,重新负载均衡。
需要注意的是,其实这个发送心跳信息有个定时任务的,每30s发送一次,他这里启动的时候就发送,就是想让broker 通知其他消费者有新的消费者加入,让他们重新进行balance。
总结
本文主要是两部分,算是拉取消息的准备工作吧,第一部分是消息消费者向namesrv拉取 topic路由信息,这里只需要知道,namesrv都返回了些什么就可以了,其实就是返回了QueueData 集合跟borkerData集合,QueueData就是队列信息,读队列几个,写队列几个,然后在哪个broker name上存储着,brokerData 其实就是broker name 下面对应的 broker 地址信息表,broker id 对应着broker 的地址,第二部分就是向broker 发送心跳了,这个发送心跳不是单单发送心跳这么简单,尤其是消息消费者,它要把一堆信息带给broker,然后broker 判断这些信息是不是新的或者改动过了,这个时候就会通知这个消费者组下的所有消息消费者告诉他们消息消费者有变动,让他们里面进行rebalance。
还没有评论,来说两句吧...