图解 Kafka 生产者元数据拉取管理全流程

今天药忘吃喽~ 2024-04-08 10:09 121阅读 0赞

今天这篇我们就来聊聊生产者是会如何拉取和管理元数据的,带你梳理生产者元数据管理整体的源码分析脉络。

认真读完这篇文章,我相信你会对 Kafka 生产获取和管理元数据源码有更加深刻的理解。

这篇文章干货很多,希望你可以耐心读完。

5117da237f82de642f04445c3af65480.png

01 总的概述

消息想从 Producer 端发送到 Broker 端,必须要先知道 Topic 在 Broker 的分布情况,才能判断消息该发往哪些节点,比如:「Topic 对应的 Leader 分区有哪些」、「Leader分区分布在哪些 Broker 节点」、「Topic 分区动态变更」等等,所以及时获取元数据对生产者正常工作是非常有必要的。

元数据获取涉及的底层组件比较多,主要分为:「KafkaProducer 主线程加载元数据」、「Sender 子线程拉取元数据」。

接下来我们逐一分析元数据在生产者端是如何被加载和拉取、更新的。为了方便大家理解,所有的源码只保留骨干。

02 主线程如何加载元数据

首先我们来看下 KafkaProducer 主线程是如何加载元数据的

集群元数据的初始化是在 KafkaProducer 主线程的构造函数中来完成的,我们来看一下相关源码:

  1. // 初始化 Kafka 集群元数据,元数据会保存到客户端中,并与服务端元数据保持一致
  2. if (metadata != null) {
  3. this.metadata = metadata;
  4. } else {
  5. // 初始化集群元数据
  6. this.metadata = new ProducerMetadata(retryBackoffMs,
  7. // 元数据过期时间:默认5分钟
  8. config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
  9. // topic最大空闲时间,如果在规定时间没有被访问,将从缓存删除,下次访问时强制获取元数据
  10. config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
  11. logContext,
  12. clusterResourceListeners,
  13. Time.SYSTEM);
  14. // 启动metadata的引导程序
  15. this.metadata.bootstrap(addresses);
  16. }

从上述源代码我们可以看出在 KafkaProducer 的构造方法中,如果metadata为空就会初始化集群元数据类「ProducerMetadata」,然后通过调用 「this.metadata.bootstrap」这个方法来启动引导程序,这时 metaData 对象里并没有具体的元数据信息,因为客户端还没发送元数据更新的请求,后面会通过唤醒 Sender 线程进行发送请求获取元数据的

这里的 this.meta 其实就是 Kafka 内存中的一个对象,底层会做一层缓存,因此并不会一直请求 Kafka Broker 端进行获取。

我先给大家总结下元数据初始化及启动的调用关系图,口说无凭,我们来扒开源码瞅一瞅,这样更真实。

eb7f6196280748eca05468c47b04de26.png

通过上述的调用关系图,我们可以看出:

  1. ProducerMetadata 类是 MetaData 的子类。
  2. Metadata 类是元数据基类,封装了元数据的具体信息、版本控制、更新标识、响应解析等。
  3. 元数据的信息其实最终是保存在元数据缓存即 MetadataCache 中,而 它最核心的是 Cluster , 保存了元数据基础信息。

接下来会挨个类展开来进行讲解。

02.1 初探 ProducerMetadata

在主线程中初始化了 ProducerMetadata 类的对象,我们先来看看这个类都做了哪些事情。

github 源码地址如下:

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java

  1. public class ProducerMetadata extends Metadata {
  2. // 主题元数据过期时间,如果在这个时间段内未被访问,它就会从缓存中删除。
  3. private final long metadataIdleMs;
  4. /* Topics with expiry time */
  5. // map集合,生产者的元数据主题集合,里面保存着
  6. // 主题和主题过期时间的对应关系,即 topic, nowMs + metadataIdleMs,
  7. // 过期了的主题会被踢出去。
  8. private final Map<String, Long> topics = new HashMap<>();
  9. // 新的主题集合, set集合,即第一次要发送的主题
  10. private final Set<String> newTopics = new HashSet<>();
  11. private final Logger log;
  12. private final Time time;
  13. public ProducerMetadata(long refreshBackoffMs,
  14. long metadataExpireMs,
  15. long metadataIdleMs,
  16. LogContext logContext,
  17. ClusterResourceListeners clusterResourceListeners,
  18. Time time) {
  19. //调用父类 Metadata 的构造函数
  20. super(refreshBackoffMs, metadataExpireMs, logContext, clusterResourceListeners);
  21. ....
  22. }
  23. }

我们可以看出这里只是调用了父类的构造函数进行类属性的初始化,接下来我们深度分析下 ProducerMetadata 类中的几个比较重要的方法。

02.1.1 add()

  1. public synchronized void add(String topic, long nowMs) {
  2. // 判断对象是否为空
  3. Objects.requireNonNull(topic, "topic cannot be null");
  4. if (topics.put(topic, nowMs + metadataIdleMs) == null) {
  5. // 添加主题到新主题集合中
  6. newTopics.add(topic);
  7. // 更新元数据标记 属于Metadata类方法,后面小节分析
  8. requestUpdateForNewTopics();
  9. }
  10. }

该方法主要用来向元数据主题集合 topics 中添加主题,主要用在「KafkaProducer 主线程」以及「Sender 子线程」中,我们来看下是如何添加的,具体逻辑如下:

  1. 往元数据主题集合 topics 中添加主题和对应的过期时间(当时时间+过期时间段「默认值:5分钟」)。
  2. 如果元数据主题集合中不存在该主题时,说明是第一次就把该主题添加到新主题集合中。
  3. 标记要更新新主题元数据的属性字段「lastRefreshMs」 、「needPartialUpdate」 、「requestVersion」,以便后续唤醒 Sender 线程去服务端拉取新主题的元数据。

此时主题被添加到元数据主题主题集合中,但是如果集合里面数据过期了该怎么办?接下来我们看另外一个方法是如何判断的。

02.1.2 retainTopic()

  1. public synchronized boolean retainTopic(String topic, boolean isInternal, long nowMs) {
  2. // 获取该主题的过期时间
  3. Long expireMs = topics.get(topic);
  4. // 如果为空表示该主题不在元数据主题集合中
  5. if (expireMs == null) {
  6. return false;
  7. // 判断该主题是否在新集合中
  8. } else if (newTopics.contains(topic)) {
  9. return true;
  10. // 判断是否超过了过期时间
  11. } else if (expireMs <= nowMs) {
  12. log.debug("Removing unused topic {} from the metadata list, expiryMs {} now {}", topic, expireMs, nowMs);
  13. // 超过后直接从元数据主题集合中删除该主题
  14. topics.remove(topic);
  15. return false;
  16. } else {
  17. return true;
  18. }
  19. }

该方法用来判断元数据中是否该保留该主题,会在 handleMetadataResponse 即处理元数据响应结果的时候进行调用,我们来看下它是如何判断的。

  1. 先判断元数据主题集合中是否存在该主题,如果不存在直接返回false。
  2. 然后判断该主题是否在新主题集合中,如果存在直接返回true。
  3. 再判断该主题是否超过了过期时间,如果超过了,就从元数据主题集合中删除该主题,再请求元数据的时候就不用带上该主题,可以有效的减少网络传输数据大小。

02.1.3 requestUpdateForTopic()

  1. public synchronized int requestUpdateForTopic(String topic) {
  2. // 如果新主题集合中存在该主题
  3. if (newTopics.contains(topic)) {
  4. // 针对新主题集合标记部分更新,并返回版本
  5. return requestUpdateForNewTopics();
  6. } else {
  7. // 全量更新,并返回版本
  8. return requestUpdate();
  9. }
  10. }

该方法用来判断是全量更新元数据还是部分更新元数据,逻辑相对比较简单,主要用在 KafkaProducer 主线程元数据同步等待时调用,后续小节会详细分析。

02.1.4 update()

  1. public synchronized void update(int requestVersion, MetadataResponse response, boolean isPartialUpdate, long nowMs) {
  2. // 调用父类的update方法
  3. super.update(requestVersion, response, isPartialUpdate, nowMs);
  4. // 如果新主题集合不为空,则遍历响应元数据找出已经获取元数据的主题,并从新主题集合中删除
  5. if (!newTopics.isEmpty()) {
  6. for (MetadataResponse.TopicMetadata metadata : response.topicMetadata()) {
  7. newTopics.remove(metadata.topic());
  8. }
  9. }
  10. // 唤醒等待元数据更新完成的线程
  11. notifyAll();
  12. }

该方法用来更新生产端元数据的,具体逻辑如下:

  1. 先调用父类的 update() 方法。
  2. 然后判断新主题集合是否不为空,如果不为空则遍历响应元数据找出已经获取元数据的主题,并从新主题集合中删除。
  3. 最后调用 notifyAll() 来唤醒等待元数据更新完成的线程。

从上述 update()方法 中可以看出最后调用 notifyAll() 来唤醒阻塞的线程, 那么它是如何唤醒的呢,这就是接下来要分析的方法。

02.1.5 awaitUpdate()

  1. public synchronized void awaitUpdate(final int lastVersion, final long timeoutMs) throws InterruptedException {
  2. long currentTimeMs = time.milliseconds();
  3. long deadlineMs = currentTimeMs + timeoutMs < 0 ? Long.MAX_VALUE : currentTimeMs + timeoutMs;
  4. // 通过调用 time.waitObject 来实现线程阻塞
  5. time.waitObject(this, () -> {
  6. // Throw fatal exceptions, if there are any. Recoverable topic errors will be handled by the caller.
  7. maybeThrowFatalException();
  8. return updateVersion() > lastVersion || isClosed();
  9. }, deadlineMs);
  10. if (isClosed())
  11. throw new KafkaException("Requested metadata update after close");
  12. }

该方法用来实现线程阻塞的功能,用在主线程中如果发现主题对应的元数据不存在时,阻塞并等待 Sender 线程将元数据更新完成。

重点是调用了 time.waitObject() 方法来实现阻塞功能,它的实现还是有一些技巧的,它的底层通过调用 SystemTime 包里面的 waitObject() 实现的,源码如下:

  1. public void waitObject(Object obj, Supplier<Boolean> condition, long deadlineMs)

发表评论

表情:
评论列表 (有 0 条评论,121人围观)

还没有评论,来说两句吧...

相关阅读