kafka consumer rebalance & ConsumerRebalanceListener

柔光的暖阳◎ 2022-11-12 08:20 729阅读 0赞

在使用kafka的时候,就算前期不知道这个rebalance,后期肯定得遇到的,这个坑你跑不了,你后期肯定会想监测一下kafka的rebalance状态,也许这个时候,你会搜索到一个接口ConsumerRebalanceListener,在消费topic的时候,带上这个接口,说是在kafka进行rebalance的时候,会有一些操作的,但是,他讲真管用吗?

1,Kafka相关的理论知识。

Consumer 与Consumer Group

Consumer Group与Consumer的关系是动态维护的:

当一个Consumer 进程挂掉 或者是卡住时,该consumer所订阅的partition会被重新分配到该group内的其它正常的consumer上。当一个consumer加入到一个consumer group中时,同样会从其它的consumer中分配出一个或者多个partition 到这个新加入的consumer。当启动一个Consumer时,会指定它要加入的group,使用的是配置项:group.id。为了维持Consumer 与 Consumer Group的关系,需要Consumer周期性的发送heartbeat到coordinator(协调者,在早期版本,以zookeeper作为协调者。后期版本则以某个broker作为协调者)。当Consumer由于某种原因不能发Heartbeat到coordinator时,并且时间超过session.timeout.ms时,就会认为该consumer已退出,它所订阅的partition会分配到同一group 内的其它的consumer上。而这个过程,被称为rebalance。那么现在有这样一个问题:如果一个consumer 进程一直在周期性的发送heartbeat,但是它就是不消费消息,这种状态称为livelock状态。那么在这种状态下,它所订阅的partition不消息是否就一直不能被消费呢?

Consumer Fetch Message

在Kafka partition中,每个消息有一个唯一标识,即partition内的offset。每个consumer group中的订阅到某个partition的consumer在从partition中读取数据时,是依次读取的。

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzI3MDkzNDY1_size_16_color_FFFFFF_t_70

上图中,Consumer A、B分属于不同的Consumer Group。Consumer B读取到offset =11,Consumer A读取到offset=9 。这个值表示Consumer Group中的某个Consumer 在下次读取该partition时会从哪个offset的 message开始读取,即 Consumer Group A 中的Consumer下次会从offset = 9 的message 读取, Consumer Group B 中的Consumer下次会从offset = 11 的message 读取。这里并没有说是Consumer A 下次会从offset = 9 的message读取,原因是Consumer A可能会退出Group ,然后Group A 进行rebalance,即重新分配分区。

Consumer读取partition中的数据是通过调用发起一个fetch请求来执行的。而从KafkaConsumer来看,它有一个poll方法。但是这个poll方法只是可能会发起fetch请求。原因是:Consumer每次发起fetch请求时,读取到的数据是有限制的,通过配置项max.partition.fetch.bytes来限制的。而在执行poll方法时,会根据配置项max.poll.records来限制一次最多poll多少个record。那么就可能出现这样的情况: 在满足max.partition.fetch.bytes限制的情况下,假如fetch到了100个record,放到本地缓存后,由于max.poll.records限制每次只能poll出15个record。那么KafkaConsumer就需要执行7次才能将这一次通过网络发起的fetch请求所fetch到的这100个record消费完毕。其中前6次是每次pool中15个record,最后一次是poll出10个record。在consumer中,还有另外一个配置项:max.poll.interval.ms ,它表示最大的poll数据间隔,如果超过这个间隔没有发起pool请求,但heartbeat仍旧在发,就认为该consumer处于 livelock状态。就会将该consumer退出consumer group。所以为了不使Consumer 自己被退出,Consumer 应该不停的发起poll(timeout)操作。而这个动作 KafkaConsumer Client是不会帮我们做的,这就需要自己在程序中不停的调用poll方法了。

当一个consumer因某种原因退出Group时,进行重新分配partition后,同一group中的另一个consumer在读取该partition时,怎么能够知道上一个consumer该从哪个offset的message读取呢?也就是如何保证同一个group内的consumer不重复消费消息呢?上面说了一次走网络的fetch请求会poll到一定量的数据,但是这些数据还没有被消息完毕,Consumer就挂掉了,下一次进行数据fetch时,是否会从上次读到的数据开始读取,而导致Consumer消费的数据丢失呢?为了做到这一点,当使用完poll从本地缓存拉取到数据之后,需要client调用commitSync方法(或者commitAsync方法)去commit 下一次该去读取 哪一个offset的message。而这个commit方法会通过走网络的commit请求将offset在coordinator中保留,这样就能够保证下一次读取(不论进行了rebalance)时,既不会重复消费消息,也不会遗漏消息。对于offset的commit,Kafka Consumer Java Client支持两种模式:由KafkaConsumer自动提交,或者是用户通过调用commitSync、commitAsync方法的方式完成offset的提交。

Rebalance作用

Rebalance 本质上是一种协议,主要作用是为了保证消费者组(Consumer Group)下的所有消费者(Consumer)消费的topic分区达成均衡。比如:我们有10个分区,当我们有一个消费者时,该消费者消费10个分区,当我们增加一个消费者,理论上每个消费者消费5个分区,这个分配的过程我们成为Rebalance(重平衡),在rebalance的过程中,所有实例共同参与,在协调组件的帮助下完成,整个过程中,所有实例都不能消费任何分区。因此,rebalance对consumer的tps影响很大。

触发条件

常见的有三种情况会触发Rebalance:

1,组成员数发生变更:比如网络中断、心跳中断、有新的consumer实例加入组或者离开组、或者有实例崩溃,被踢出组。都会引起group成员数变化。
2,订阅主题数发生变更 :比如消费topic传入的是正则表达式,则有可能匹配到新增的topic。
3,订阅主题的分区数发生变更 :比如动态的修改topic的分区数。

注意:(对1的补充)对于不同topic,使用相同consumer group,如果有一个消费者程序停止或新增,所有相同consumer group都会Rebalance,这个估计很多人目前还不认为是这样,我上面有实际测试,真的是这样。我感觉有必要变个颜色。

缺点

Rebalance时所有消费者无法消费数据
Rebalance速度慢
Rebalance 效率不高

Coordinator(协调者)介绍

Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。也就是说,所有 Broker 都有各自的 Coordinator 组件。

如何避免 Rebalance

最简单粗暴的就是 : 减少组成员数量发生变化

每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求,表明它还存活着。如果某个 Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该 Consumer 已经“死”了,从而将其从 Group 中移除,然后开启新一轮 Rebalance。Consumer 端有个参数,叫 session.timeout.ms,就是被用来表征此事的。该参数的默认值是 10 秒,即如果 Coordinator 在 10 秒之内没有收到 Group 下某 Consumer 实例的心跳,它就会认为这个 Consumer 实例已经挂了。可以这么说,session.timout.ms 决定了 Consumer 存活性的时间间隔。

除了这个参数,Consumer 还提供了一个允许你控制发送心跳请求频率的参数,就是 heartbeat.interval.ms。这个值设置得越小,Consumer 实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启 Rebalance,因为,目前 Coordinator 通知各个 Consumer 实例开启 Rebalance 的方法,就是将 REBALANCE_NEEDED 标志封装进心跳请求的响应体中。

除了以上两个参数,Consumer 端还有一个参数,用于控制 Consumer 实际消费能力对 Rebalance 的影响,即 max.poll.interval.ms 参数。它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。

max.poll.records 表示每次消费的时候,获取多少条消息。获取的消息条数越多,需要处理的时间越长。所以每次拉取的消息数不能太多,需要保证在 max.poll.interval.ms 设置的时间内能消费完,否则会发生 rebalance。

所以在我们日常开发中,不相干的业务也要避免Consumer Group 设置成不相同的,最好一个topic一个group,最安全了。

下图正好是kafka-client源码的几个配置项的解释

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzI3MDkzNDY1_size_16_color_FFFFFF_t_70 1

上面图是几个涉及到kafka rebalance的几个关键配置key和对应的英文描述,下面再贴上这个kafka-client源码里面对这几个关键配置key的默认值的截图。

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzI3MDkzNDY1_size_16_color_FFFFFF_t_70 2

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzI3MDkzNDY1_size_16_color_FFFFFF_t_70 3

消费者心跳超时

我们知道消费者是通过心跳和协调者保持通讯的,如果协调者收不到心跳,那么协调者会认为这个消费者死亡了,从而发起 rebalance。而 kafka 的消费者参数设置中,跟心跳相关的两个参数为:

  1. session.timeout.ms 设置了超时时间
  2. heartbeat.interval.ms 心跳时间间隔

这时候需要调整 session.timeout.ms 和 heartbeat.interval.ms 参数,使得消费者与协调者能保持心跳。一般来说,超时时间应该是心跳间隔的 3 倍时间。即 session.timeout.ms 如果设置为 180 秒,那么 heartbeat.interval.ms 最多设置为 60 秒。为什么要这么设置超时时间应该是心跳间隔的 3 倍时间?因为这样的话,在一个超时周期内就可以有多次心跳,避免网络问题导致偶发失败。

消费者处理时间过长

如果消费者处理时间过长,那么同样会导致协调者认为该 consumer 死亡了,从而发起重平衡。而 kafka 的消费者参数设置中,跟消费处理的两个参数为:

  1. max.poll.interval.ms 每次消费的处理时间
  2. max.poll.records 每次消费的消息数

对于这种情况,一般来说就是增加消费者处理的时间(即提高 max.poll.interval.ms 的值),减少每次处理的消息数(即减少 max.poll.records 的值)。

除此之外,超时时间参数(session.timeout.ms)与 消费者每次处理的时间(max.poll.interval.ms)也是有关联的。max.poll.interval.ms 时间不能超过 session.timeout.ms 时间。 因为在 kafka 消费者的实现中,其是单线程去消费消息和执行心跳的,如果线程卡在处理消息,那么这时候即使到时间要心跳了,还是没有线程可以去执行心跳操作。很多同学在处理问题的时候,明明设置了很长的 session.timeout.ms 时间,但最终还是心跳超时了,就是因为没有处理好这两个参数的关联。

对于 rebalance 类问题,简单总结就是:处理好心跳超时问题和消费处理超时问题

对于心跳超时问题。一般是调高心跳超时时间(session.timeout.ms),调整超时时间(session.timeout.ms)和心跳间隔时间(heartbeat.interval.ms)的比例。阿里云官方文档建议超时时间(session.timeout.ms)设置成 25s,最长不超过 30s。那么心跳间隔时间(heartbeat.interval.ms)就不超过 10s。
对于消费处理超时问题。一般是增加消费者处理的时间(max.poll.interval.ms),减少每次处理的消息数(max.poll.records)。阿里云官方文档建议 max.poll.records 参数要远小于当前消费组的消费能力(records < 单个线程每秒消费的条数 x 消费线程的个数 x session.timeout的秒数)。

2,实际测试

先是简单的实现一个监听器,且不说网上的,在这个listener里面手动的提交和获取偏移量offset。

  1. package com.lxk.storm.kafka.listener;
  2. import com.google.common.collect.Sets;
  3. import com.lxk.tool.JsonUtils;
  4. import com.lxk.tool.TimeUtils;
  5. import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
  6. import org.apache.kafka.common.TopicPartition;
  7. import java.util.Collection;
  8. import java.util.TreeSet;
  9. /**
  10. * kafka的rebalance(姑且一看啥情况下会触发kafka的rebalance操作)
  11. * (1)消费者组中新添加消费者读取到原本是其他消费者读取的消息
  12. * (2)消费者关闭或崩溃之后离开群组,原本由他读取的partition将由群组里其他消费者读取
  13. * (3)当向一个Topic添加新的partition,会发生partition在消费者中的重新分配
  14. *
  15. * @author LiXuekai on 2021/3/19
  16. */
  17. public class MyRebalanceListener implements ConsumerRebalanceListener {
  18. /**
  19. * rebalance开启新一轮的重平衡前会调用,一般用于手动提交位移,及审计功能
  20. */
  21. @Override
  22. public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
  23. TreeSet<String> treeSet = Sets.newTreeSet();
  24. partitions.forEach(partition -> treeSet.add(partition.toString()));
  25. System.out.println("kafka rebalance start. onPartitionsRevoked " + TimeUtils.now() + JsonUtils.parseObjToFormatJson(treeSet));
  26. }
  27. /**
  28. * rebalance在重平衡结束后会调用,一般用于消费逻辑处理
  29. */
  30. @Override
  31. public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
  32. TreeSet<String> treeSet = Sets.newTreeSet();
  33. partitions.forEach(partition -> treeSet.add(partition.toString()));
  34. System.out.println("kafka rebalance start. onPartitionsAssigned " + TimeUtils.now() + JsonUtils.parseObjToFormatJson(treeSet));
  35. }
  36. }

简单实现这个接口,里面需要覆盖的2个方法,revoked是撤销的意思,意思是kafka在对topic的分区进行重分配的时候即rebalance的时候,要先把分出去的topic分区给回收回来,然后再统一进行重新分配。所以,刚刚开始启动的时候,这个地方打印的参数是个空数组,因为上次还没分配呢。assigned,就分配的意思,ed过去式,就已经分配完了。假设现在启动消费者的时候,就是一次性的启动5个consumer,这个时候,这个listener的日志会如何输出呢?看下面的测试方法。

  1. package com.lxk.storm.kafka.listener;
  2. import com.google.common.collect.Lists;
  3. import com.google.common.collect.Sets;
  4. import com.lxk.storm.kafka.config.KafkaConfig;
  5. import com.lxk.storm.kafka.constants.LxkConstants;
  6. import com.lxk.tool.JsonUtils;
  7. import com.lxk.tool.TimeUtils;
  8. import org.apache.kafka.clients.consumer.ConsumerRecords;
  9. import org.apache.kafka.clients.consumer.KafkaConsumer;
  10. import org.apache.kafka.clients.producer.KafkaProducer;
  11. import org.apache.kafka.clients.producer.ProducerRecord;
  12. import org.junit.Test;
  13. import java.util.Set;
  14. import java.util.concurrent.TimeUnit;
  15. /**
  16. * @author LiXuekai on 2021/3/22
  17. */
  18. public class KafkaListenerTest {
  19. private static final int MAX = 5;
  20. private static final int TOTAL_TOPIC_PARTITION = 15;
  21. /**
  22. * 我测试的topic是15个分区,我启动15个producer,分别往特定的分区造数。
  23. * 看看n个消费者,在rebalance的时候,是怎么消费的。
  24. * 单单根据consumer的分区信息,怎么感觉和理论上不一致呢,那就再换个方式测试验证一下。
  25. */
  26. @Test
  27. public void producer() throws InterruptedException {
  28. for (int i = 0; i < TOTAL_TOPIC_PARTITION; i++) {
  29. int finalIndex = i;
  30. new Thread(() -> {
  31. System.out.println("producer " + finalIndex + " this run................");
  32. String name = "producer()-[" + finalIndex + "]";
  33. produce(name, finalIndex);
  34. }).start();
  35. }
  36. TimeUnit.MINUTES.sleep(100);
  37. }
  38. /**
  39. * 造数程序,都往指定的分区造数。
  40. *
  41. * @param name name
  42. * @param partition partition
  43. */
  44. private void produce(String name, int partition) {
  45. KafkaProducer<String, String> producer = new KafkaProducer<>(KafkaConfig.producerConf());
  46. while (true) {
  47. String now = TimeUtils.now();
  48. // 直接转json,竟然是{},因为这个对象没得getter and setter方法,醉了,直接toString呢,打印的又太多了。醉了又。。。。
  49. ProducerRecord<String, String> record = new ProducerRecord<>(LxkConstants.TEST_TOPIC_NAME, partition, now, name);
  50. String partitionInfo = record.partition() + " ";
  51. System.out.println(now + " 生产者【" + name + "】 record partition info is " + partitionInfo);
  52. producer.send(record);
  53. try {
  54. TimeUnit.SECONDS.sleep(1);
  55. } catch (InterruptedException e) {
  56. System.out.println("sleep error....");
  57. }
  58. }
  59. }
  60. @Test
  61. public void rebalance1() throws InterruptedException {
  62. for (int i = 0; i < MAX; i++) {
  63. int finalIndex = i;
  64. new Thread(() -> {
  65. System.out.println("rebalance1 " + finalIndex + " this run................");
  66. consumer("rebalance1()-[" + finalIndex + "]");
  67. }).start();
  68. }
  69. TimeUnit.MINUTES.sleep(100);
  70. }
  71. @Test
  72. public void rebalance2() throws InterruptedException {
  73. for (int i = 0; i < MAX; i++) {
  74. int finalIndex = i;
  75. new Thread(() -> {
  76. System.out.println("rebalance2 " + finalIndex + " this run................");
  77. consumer("rebalance2()-[" + finalIndex + "]");
  78. }).start();
  79. }
  80. TimeUnit.MINUTES.sleep(100);
  81. }
  82. @Test
  83. public void rebalance3() throws InterruptedException {
  84. for (int i = 0; i < MAX; i++) {
  85. int finalIndex = i;
  86. new Thread(() -> {
  87. System.out.println("rebalance3 " + finalIndex + " this run................");
  88. consumer("rebalance3()-[" + finalIndex + "]");
  89. }).start();
  90. }
  91. TimeUnit.MINUTES.sleep(100);
  92. }
  93. /**
  94. * 启动消费者消费数据
  95. */
  96. private void consumer(String index) {
  97. String name = Thread.currentThread().getName();
  98. System.out.println(name + " this run................");
  99. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(KafkaConfig.consumerConf());
  100. consumer.subscribe(Lists.newArrayList(LxkConstants.TEST_TOPIC_NAME), new MyRebalanceListener());
  101. //Runnable task = () -> {
  102. // Set<TopicPartition> assignment = consumer.assignment();
  103. // Set<Integer> set = assignment.stream().map(TopicPartition::partition).collect(Collectors.toSet());
  104. // System.out.println(index + " 分区情况:" + name + " 当前consumer所占有分区信息:" + JsonUtils.parseObjToJson(set) + " 消费的recordInfoSet:" + JsonUtils.parseObjToJson(recordInfoSet));
  105. //};
  106. //
  107. //MonitorService.getMonitorExecutor().scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);
  108. while (true) {
  109. ConsumerRecords<String, String> records = consumer.poll(10);
  110. Set<String> recordInfoSet = Sets.newHashSet();
  111. records.forEach(record -> {
  112. String s = record.partition() + "";
  113. recordInfoSet.add(s);
  114. });
  115. System.out.println(TimeUtils.now() + " " + index + " 当前消费者消费的记录的所有分区信息 " + JsonUtils.parseObjToJson(recordInfoSet));
  116. try {
  117. TimeUnit.SECONDS.sleep(1);
  118. } catch (InterruptedException e) {
  119. e.printStackTrace();
  120. }
  121. recordInfoSet.clear();
  122. }
  123. }
  124. }

上面是一个测试类,里面kafka消费者使用的配置也就是极其简单的配置,差不多都是使用的默认配置。先是弄15个线程,每个线程里面new一个生产者,往kafka里面造数据,因为kafka的这个topic已经确定他有15个分区,我这就安排15个生产者,依次对应的往kafka的这个topic造数,然后,指定分区的造,然后,消费者根据消费到的record,每个record里面都有这个记录的分区信息,用consumer消费到的记录的这个信息,来判断当前consumer占有哪些个分区。里面有个需要注意的地方,consumer的poll方法需要一直的不停的调用才行,之前的测试时候,就没注意到这一点,只是在线程里面调用了一次,然后这个consumer会在启动5分之后,与kafka的消费者管理端失联,然后,这个失联也会触发rebalance动作。还有就是我上面代码里面有段注释掉的代码,本意是启动15个定时任务线程,10秒一运行,打印一下各自consumer的被分到的分区的信息,但是因为线程太多了,下面还是while true循环,这个定时线程几乎抢不到CPU执行时间片,那就不用这个定时任务的打印了,直接在消费完kafka之后,统计消费到的记录的分区信息,因为造数的时候,是均匀的往topic的每个分区造数的,每个分区,理论上是只能有一个消费线程在操作的,所以,应该可以判断的。下面是kafka的consumer的和producer的配置。

  1. package com.lxk.storm.kafka.config;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import java.util.Properties;
  5. import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
  6. import static org.apache.kafka.clients.producer.ProducerConfig.*;
  7. /**
  8. * kafka测试相关的配置
  9. *
  10. * @author LiXuekai on 2021/3/22
  11. */
  12. public class KafkaConfig {
  13. /**
  14. * 消费kafka时候的配置
  15. */
  16. public static Properties consumerConf() {
  17. Properties conf = new Properties();
  18. conf.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.191:9092");
  19. conf.setProperty(GROUP_ID_CONFIG, "lxk-test");
  20. conf.setProperty(ENABLE_AUTO_COMMIT_CONFIG, "true");
  21. conf.setProperty(AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
  22. conf.setProperty(AUTO_OFFSET_RESET_CONFIG, "latest");
  23. conf.setProperty(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  24. conf.setProperty(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  25. //默认值是30秒,影响kafka的rebalance
  26. conf.setProperty(SESSION_TIMEOUT_MS_CONFIG, "30000");
  27. //默认值是5分钟,影响kafka的rebalance
  28. conf.setProperty(MAX_POLL_INTERVAL_MS_CONFIG, "300000");
  29. //默认值是3秒,影响kafka的rebalance
  30. conf.setProperty(HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
  31. return conf;
  32. }
  33. /**
  34. * 生产者的配置
  35. */
  36. public static Properties producerConf() {
  37. Properties conf = new Properties();
  38. conf.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.191:9092");
  39. conf.put(ACKS_CONFIG, "all");
  40. conf.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  41. conf.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  42. return conf;
  43. }
  44. }

先看一下producer的造数输出信息情况,符合预期,第index个生产者往第index个分区造数。

  1. 2021-03-30 09:35:45 生产者【producer()-[13]】 record partition info is 13
  2. 2021-03-30 09:35:45 生产者【producer()-[5]】 record partition info is 5
  3. 2021-03-30 09:35:45 生产者【producer()-[0]】 record partition info is 0
  4. 2021-03-30 09:35:45 生产者【producer()-[1]】 record partition info is 1
  5. 2021-03-30 09:35:45 生产者【producer()-[8]】 record partition info is 8
  6. 2021-03-30 09:35:45 生产者【producer()-[9]】 record partition info is 9
  7. 2021-03-30 09:35:45 生产者【producer()-[7]】 record partition info is 7
  8. 2021-03-30 09:35:45 生产者【producer()-[14]】 record partition info is 14
  9. 2021-03-30 09:35:45 生产者【producer()-[3]】 record partition info is 3
  10. 2021-03-30 09:35:45 生产者【producer()-[10]】 record partition info is 10
  11. 2021-03-30 09:35:45 生产者【producer()-[12]】 record partition info is 12
  12. 2021-03-30 09:35:45 生产者【producer()-[4]】 record partition info is 4
  13. 2021-03-30 09:35:45 生产者【producer()-[6]】 record partition info is 6
  14. 2021-03-30 09:35:45 生产者【producer()-[2]】 record partition info is 2
  15. 2021-03-30 09:35:45 生产者【producer()-[11]】 record partition info is 11

现在启动消费者,去消费kafka上的topic数据,在kafka上是15个分区。在只启动测试方法1即rebalance1()的时候,看看实际日志是什么情况,如下。

  1. 1092 [Thread-1] INFO o.a.k.c.u.AppInfoParser - Kafka version : 1.0.0
  2. 1092 [Thread-1] INFO o.a.k.c.u.AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
  3. 1100 [Thread-3] INFO o.a.k.c.u.AppInfoParser - Kafka version : 1.0.0
  4. 1101 [Thread-3] INFO o.a.k.c.u.AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
  5. 1102 [Thread-2] INFO o.a.k.c.u.AppInfoParser - Kafka version : 1.0.0
  6. 1102 [Thread-2] INFO o.a.k.c.u.AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
  7. 1103 [Thread-0] INFO o.a.k.c.u.AppInfoParser - Kafka version : 1.0.0
  8. 1103 [Thread-0] INFO o.a.k.c.u.AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
  9. 1107 [Thread-4] INFO o.a.k.c.u.AppInfoParser - Kafka version : 1.0.0
  10. 1111 [Thread-4] INFO o.a.k.c.u.AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
  11. 1360 [Thread-3] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, groupId=lxk-test] Discovered coordinator 192.168.1.191:9092 (id: 2147483646 rack: null)
  12. 1361 [Thread-2] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, groupId=lxk-test] Discovered coordinator 192.168.1.191:9092 (id: 2147483646 rack: null)
  13. 1360 [Thread-0] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-3, groupId=lxk-test] Discovered coordinator 192.168.1.191:9092 (id: 2147483646 rack: null)
  14. 1360 [Thread-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=lxk-test] Discovered coordinator 192.168.1.191:9092 (id: 2147483646 rack: null)
  15. 1361 [Thread-4] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-2, groupId=lxk-test] Discovered coordinator 192.168.1.191:9092 (id: 2147483646 rack: null)
  16. 1365 [Thread-4] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-2, groupId=lxk-test] Revoking previously assigned partitions []
  17. 1365 [Thread-0] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-3, groupId=lxk-test] Revoking previously assigned partitions []
  18. 1365 [Thread-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=lxk-test] Revoking previously assigned partitions []
  19. 1365 [Thread-2] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-4, groupId=lxk-test] Revoking previously assigned partitions []
  20. 1365 [Thread-3] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-5, groupId=lxk-test] Revoking previously assigned partitions []
  21. kafka rebalance start. onPartitionsRevoked 2021-03-30 10:34:27[]
  22. kafka rebalance start. onPartitionsRevoked 2021-03-30 10:34:27[]
  23. kafka rebalance start. onPartitionsRevoked 2021-03-30 10:34:27[]
  24. 1588 [Thread-2] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, groupId=lxk-test] (Re-)joining group
  25. 1588 [Thread-3] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, groupId=lxk-test] (Re-)joining group
  26. kafka rebalance start. onPartitionsRevoked 2021-03-30 10:34:27[]
  27. 1589 [Thread-4] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-2, groupId=lxk-test] (Re-)joining group
  28. 1589 [Thread-0] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-3, groupId=lxk-test] (Re-)joining group
  29. kafka rebalance start. onPartitionsRevoked 2021-03-30 10:34:27[]
  30. 1590 [Thread-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=lxk-test] (Re-)joining group
  31. 1702 [Thread-3] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, groupId=lxk-test] (Re-)joining group
  32. 1787 [Thread-3] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, groupId=lxk-test] Successfully joined group with generation 39
  33. 1788 [Thread-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=lxk-test] Successfully joined group with generation 39
  34. 1790 [Thread-3] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-5, groupId=lxk-test] Setting newly assigned partitions [citic_test_lxk-13, citic_test_lxk-14, citic_test_lxk-12]
  35. 1791 [Thread-2] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, groupId=lxk-test] Successfully joined group with generation 39
  36. kafka rebalance start. onPartitionsAssigned 2021-03-30 10:34:28["citic_test_lxk-12","citic_test_lxk-13","citic_test_lxk-14"]
  37. 1791 [Thread-4] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-2, groupId=lxk-test] Successfully joined group with generation 39
  38. 1791 [Thread-2] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-4, groupId=lxk-test] Setting newly assigned partitions [citic_test_lxk-9, citic_test_lxk-10, citic_test_lxk-11]
  39. kafka rebalance start. onPartitionsAssigned 2021-03-30 10:34:28["citic_test_lxk-10","citic_test_lxk-11","citic_test_lxk-9"]
  40. 1792 [Thread-4] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-2, groupId=lxk-test] Setting newly assigned partitions [citic_test_lxk-5, citic_test_lxk-3, citic_test_lxk-4]
  41. kafka rebalance start. onPartitionsAssigned 2021-03-30 10:34:28["citic_test_lxk-3","citic_test_lxk-4","citic_test_lxk-5"]
  42. 1793 [Thread-0] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-3, groupId=lxk-test] Successfully joined group with generation 39
  43. 1793 [Thread-0] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-3, groupId=lxk-test] Setting newly assigned partitions [citic_test_lxk-6, citic_test_lxk-7, citic_test_lxk-8]
  44. kafka rebalance start. onPartitionsAssigned 2021-03-30 10:34:28["citic_test_lxk-6","citic_test_lxk-7","citic_test_lxk-8"]
  45. 1790 [Thread-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=lxk-test] Setting newly assigned partitions [citic_test_lxk-1, citic_test_lxk-2, citic_test_lxk-0]
  46. kafka rebalance start. onPartitionsAssigned 2021-03-30 10:34:28["citic_test_lxk-0","citic_test_lxk-1","citic_test_lxk-2"]
  47. 2021-03-30 10:34:28 rebalance1()-[4] 当前消费者消费的记录的所有分区信息 []
  48. 2021-03-30 10:34:28 rebalance1()-[2] 当前消费者消费的记录的所有分区信息 []
  49. 2021-03-30 10:34:28 rebalance1()-[3] 当前消费者消费的记录的所有分区信息 []
  50. 2021-03-30 10:34:28 rebalance1()-[1] 当前消费者消费的记录的所有分区信息 []
  51. 2021-03-30 10:34:28 rebalance1()-[0] 当前消费者消费的记录的所有分区信息 []
  52. 2021-03-30 10:34:30 rebalance1()-[4] 当前消费者消费的记录的所有分区信息 ["3","4","5"]
  53. 2021-03-30 10:34:30 rebalance1()-[2] 当前消费者消费的记录的所有分区信息 ["11","9","10"]
  54. 2021-03-30 10:34:30 rebalance1()-[1] 当前消费者消费的记录的所有分区信息 ["0","1","2"]
  55. 2021-03-30 10:34:30 rebalance1()-[0] 当前消费者消费的记录的所有分区信息 ["6","7","8"]
  56. 2021-03-30 10:34:30 rebalance1()-[3] 当前消费者消费的记录的所有分区信息 ["12","13","14"]
  57. 2021-03-30 10:34:32 rebalance1()-[4] 当前消费者消费的记录的所有分区信息 ["3","4","5"]
  58. 2021-03-30 10:34:32 rebalance1()-[2] 当前消费者消费的记录的所有分区信息 ["11","9","10"]
  59. 2021-03-30 10:34:32 rebalance1()-[3] 当前消费者消费的记录的所有分区信息 ["12","13","14"]
  60. 2021-03-30 10:34:32 rebalance1()-[0] 当前消费者消费的记录的所有分区信息 ["6","7","8"]
  61. 2021-03-30 10:34:32 rebalance1()-[1] 当前消费者消费的记录的所有分区信息 ["0","1","2"]

测试方法1启动之后,瞬间启动5个线程,每个线程里面都会new一个kafka的consumer,这5个consumer使用的消费配置都是一样的。都是一个group id值,然后可以在日志里面看到consumer rebalance listener打印的日志,在listener日志打印之前,可以看到previously这个单词,就是在获取这个consumer之前是否被分配了topic的分区信息,获取的是空的,回收完成之后,就开始打印revoked日志,后面的数组也就是空的,然后5个线程中的5个consumer就开始对15个分区进行分组,给每个consumer 他们的client id是不一样的,在分组的时候呢,有个分组器,使用的是RangeAssignor,然后,就均分,分完之后,listener也打印分区rebalance的日志了,最后,线程打印出每个线程里面占有的topic的分区号0-14,看样子是平均分的。这个rebalance listener也正常起作用了。同一时期kafka的日志,也留一下。

  1. [2021-03-30 10:34:27,990] INFO [GroupCoordinator 1]: Preparing to rebalance group lxk-test with old generation 37 (__consumer_offsets-26) (kafka.coordinator.group.GroupCoordinator)
  2. [2021-03-30 10:34:28,080] INFO [GroupCoordinator 1]: Stabilized group lxk-test generation 38 (__consumer_offsets-26) (kafka.coordinator.group.GroupCoordinator)
  3. [2021-03-30 10:34:28,081] INFO [GroupCoordinator 1]: Preparing to rebalance group lxk-test with old generation 38 (__consumer_offsets-26) (kafka.coordinator.group.GroupCoordinator)
  4. [2021-03-30 10:34:28,168] INFO [GroupCoordinator 1]: Stabilized group lxk-test generation 39 (__consumer_offsets-26) (kafka.coordinator.group.GroupCoordinator)
  5. [2021-03-30 10:34:28,172] INFO [GroupCoordinator 1]: Assignment received from leader for group lxk-test for generation 39 (kafka.coordinator.group.GroupCoordinator)

从这个kafka的日志里面,也可以很清晰的看到,在启动的时候,准备去rebalance,然后建立分组,然后创建,然后assignment,这个时候就是topic分区分完了的时候,之后的日志,是停掉测试方法时候打印的日志,线程关掉了,就又要少了消费者,又得再次rebalance了。那个generation后面跟了不少数字,因为只启动一个consumer,那他就分得所有的分区,但是,实际启动的时候,一下子启动了5个consumer实例,虽然是一下子,但是对kafka来说,还是有先来后到一说的,那在第一个到的时候,正准备去rebalance呢,哎,又来一个,然后,哎又来一个,然后,好像暂时没再来的了,那就暂时都到了。就在这个generation 2 上rebalance完活了。

好奇代码为啥还有rebalance2(),rebalance3()吗?是的,准备先启动方法1,稍事休息,然后再启动方法2、3,都消费这个topic,都使用相同的group id,看看kafk会rebalance吗?以及如何rebalance的。先说一下,理论上的结果是:启动方法1的时候,一个线程分3个分区,正好平均分就跟上面的结果差不多,之后再启动方法2、3,再次触发kafka的rebalance操作,触发rebalance的时候,理论上,那个listener里面的打印日志,方法1应该会被触发,等rebalance完之后,现在是一共启动了15个线程,好嘛,15个分区,平均分一下,应该是一个线程分得一个分区,很好的实现kafka支持集群的设计初衷。实际结果是这样吗?好的,上才艺。

在启动2和3的时候,方法1的打印受到了2次波及,rebalance listener 打印了日志,2次打印具体如下。

  1. 021-03-30 10:39:18 rebalance1()-[3] 当前消费者消费的记录的所有分区信息 ["12","13","14"]
  2. 2021-03-30 10:39:18 rebalance1()-[4] 当前消费者消费的记录的所有分区信息 ["3","4","5"]
  3. 2021-03-30 10:39:18 rebalance1()-[1] 当前消费者消费的记录的所有分区信息 ["0","1","2"]
  4. 2021-03-30 10:39:18 rebalance1()-[2] 当前消费者消费的记录的所有分区信息 ["11","9","10"]
  5. 294473 [Thread-0] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-3, groupId=lxk-test] Revoking previously assigned partitions [citic_test_lxk-6, citic_test_lxk-7, citic_test_lxk-8]
  6. kafka rebalance start. onPartitionsRevoked 2021-03-30 10:39:20["citic_test_lxk-6","citic_test_lxk-7","citic_test_lxk-8"]
  7. 294474 [Thread-0] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-3, groupId=lxk-test] (Re-)joining group
  8. 294474 [Thread-3] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-5, groupId=lxk-test] Revoking previously assigned partitions [citic_test_lxk-13, citic_test_lxk-14, citic_test_lxk-12]
  9. kafka rebalance start. onPartitionsRevoked 2021-03-30 10:39:20["citic_test_lxk-12","citic_test_lxk-13","citic_test_lxk-14"]
  10. 294474 [Thread-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=lxk-test] Revoking previously assigned partitions [citic_test_lxk-1, citic_test_lxk-2, citic_test_lxk-0]
  11. 294474 [Thread-2] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-4, groupId=lxk-test] Revoking previously assigned partitions [citic_test_lxk-9, citic_test_lxk-10, citic_test_lxk-11]
  12. 294475 [Thread-3] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, groupId=lxk-test] (Re-)joining group
  13. 294474 [Thread-4] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-2, groupId=lxk-test] Revoking previously assigned partitions [citic_test_lxk-5, citic_test_lxk-3, citic_test_lxk-4]
  14. kafka rebalance start. onPartitionsRevoked 2021-03-30 10:39:20["citic_test_lxk-10","citic_test_lxk-11","citic_test_lxk-9"]
  15. kafka rebalance start. onPartitionsRevoked 2021-03-30 10:39:20["citic_test_lxk-3","citic_test_lxk-4","citic_test_lxk-5"]
  16. 294475 [Thread-2] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, groupId=lxk-test] (Re-)joining group
  17. kafka rebalance start. onPartitionsRevoked 2021-03-30 10:39:20["citic_test_lxk-0","citic_test_lxk-1","citic_test_lxk-2"]
  18. 294475 [Thread-4] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-2, groupId=lxk-test] (Re-)joining group
  19. 294475 [Thread-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=lxk-test] (Re-)joining group
  20. 294555 [Thread-4] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-2, groupId=lxk-test] Successfully joined group with generation 40
  21. 294555 [Thread-2] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, groupId=lxk-test] Successfully joined group with generation 40
  22. 294556 [Thread-0] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-3, groupId=lxk-test] Successfully joined group with generation 40
  23. 294557 [Thread-2] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-4, groupId=lxk-test] Setting newly assigned partitions [citic_test_lxk-12]
  24. 294557 [Thread-3] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, groupId=lxk-test] Successfully joined group with generation 40
  25. kafka rebalance start. onPartitionsAssigned 2021-03-30 10:39:20["citic_test_lxk-12"]
  26. 294557 [Thread-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=lxk-test] Successfully joined group with generation 40
  27. 294557 [Thread-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=lxk-test] Setting newly assigned partitions [citic_test_lxk-1, citic_test_lxk-0]
  28. kafka rebalance start. onPartitionsAssigned 2021-03-30 10:39:20["citic_test_lxk-0","citic_test_lxk-1"]
  29. 294557 [Thread-4] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-2, groupId=lxk-test] Setting newly assigned partitions [citic_test_lxk-6, citic_test_lxk-7]
  30. kafka rebalance start. onPartitionsAssigned 2021-03-30 10:39:20["citic_test_lxk-6","citic_test_lxk-7"]
  31. 294558 [Thread-3] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-5, groupId=lxk-test] Setting newly assigned partitions [citic_test_lxk-13]
  32. kafka rebalance start. onPartitionsAssigned 2021-03-30 10:39:20["citic_test_lxk-13"]
  33. 294559 [Thread-0] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-3, groupId=lxk-test] Setting newly assigned partitions [citic_test_lxk-10]
  34. kafka rebalance start. onPartitionsAssigned 2021-03-30 10:39:20["citic_test_lxk-10"]
  35. 2021-03-30 10:39:28 rebalance1()-[0] 当前消费者消费的记录的所有分区信息 ["10"]
  36. 2021-03-30 10:39:28 rebalance1()-[3] 当前消费者消费的记录的所有分区信息 ["13"]
  37. 2021-03-30 10:39:28 rebalance1()-[4] 当前消费者消费的记录的所有分区信息 ["6","7"]
  38. 2021-03-30 10:39:28 rebalance1()-[1] 当前消费者消费的记录的所有分区信息 ["0","1"]
  39. 2021-03-30 10:39:28 rebalance1()-[2] 当前消费者消费的记录的所有分区信息 ["12"]
  40. 304585 [Thread-0] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-3, groupId=lxk-test] Revoking previously assigned partitions [citic_test_lxk-10]
  41. kafka rebalance start. onPartitionsRevoked 2021-03-30 10:39:30["citic_test_lxk-10"]
  42. 304585 [Thread-0] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-3, groupId=lxk-test] (Re-)joining group
  43. 304588 [Thread-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=lxk-test] Revoking previously assigned partitions [citic_test_lxk-1, citic_test_lxk-0]
  44. 304588 [Thread-3] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-5, groupId=lxk-test] Revoking previously assigned partitions [citic_test_lxk-13]
  45. 304588 [Thread-4] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-2, groupId=lxk-test] Revoking previously assigned partitions [citic_test_lxk-6, citic_test_lxk-7]
  46. 304588 [Thread-2] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-4, groupId=lxk-test] Revoking previously assigned partitions [citic_test_lxk-12]
  47. kafka rebalance start. onPartitionsRevoked 2021-03-30 10:39:30["citic_test_lxk-6","citic_test_lxk-7"]
  48. kafka rebalance start. onPartitionsRevoked 2021-03-30 10:39:30["citic_test_lxk-13"]
  49. kafka rebalance start. onPartitionsRevoked 2021-03-30 10:39:30["citic_test_lxk-12"]
  50. 304588 [Thread-4] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-2, groupId=lxk-test] (Re-)joining group
  51. 304588 [Thread-3] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, groupId=lxk-test] (Re-)joining group
  52. 304588 [Thread-2] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, groupId=lxk-test] (Re-)joining group
  53. kafka rebalance start. onPartitionsRevoked 2021-03-30 10:39:30["citic_test_lxk-0","citic_test_lxk-1"]
  54. 304588 [Thread-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=lxk-test] (Re-)joining group
  55. 304766 [Thread-2] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, groupId=lxk-test] Successfully joined group with generation 41
  56. 304768 [Thread-2] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-4, groupId=lxk-test] Setting newly assigned partitions [citic_test_lxk-11]
  57. 304768 [Thread-3] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, groupId=lxk-test] Successfully joined group with generation 41
  58. kafka rebalance start. onPartitionsAssigned 2021-03-30 10:39:31["citic_test_lxk-11"]
  59. 304769 [Thread-3] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-5, groupId=lxk-test] Setting newly assigned partitions [citic_test_lxk-12]
  60. kafka rebalance start. onPartitionsAssigned 2021-03-30 10:39:31["citic_test_lxk-12"]
  61. 304769 [Thread-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=lxk-test] Successfully joined group with generation 41
  62. 304770 [Thread-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=lxk-test] Setting newly assigned partitions [citic_test_lxk-0]
  63. kafka rebalance start. onPartitionsAssigned 2021-03-30 10:39:31["citic_test_lxk-0"]
  64. 304768 [Thread-4] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-2, groupId=lxk-test] Successfully joined group with generation 41
  65. 304772 [Thread-4] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-2, groupId=lxk-test] Setting newly assigned partitions [citic_test_lxk-5]
  66. kafka rebalance start. onPartitionsAssigned 2021-03-30 10:39:31["citic_test_lxk-5"]
  67. 304773 [Thread-0] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-3, groupId=lxk-test] Successfully joined group with generation 41
  68. 304773 [Thread-0] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-3, groupId=lxk-test] Setting newly assigned partitions [citic_test_lxk-8]
  69. kafka rebalance start. onPartitionsAssigned 2021-03-30 10:39:31["citic_test_lxk-8"]
  70. 2021-03-30 10:39:31 rebalance1()-[1] 当前消费者消费的记录的所有分区信息 ["0"]
  71. 2021-03-30 10:39:31 rebalance1()-[0] 当前消费者消费的记录的所有分区信息 ["8"]
  72. 2021-03-30 10:39:31 rebalance1()-[2] 当前消费者消费的记录的所有分区信息 ["11"]
  73. 2021-03-30 10:39:31 rebalance1()-[3] 当前消费者消费的记录的所有分区信息 ["12"]
  74. 2021-03-30 10:39:31 rebalance1()-[4] 当前消费者消费的记录的所有分区信息 ["5"]

说明这个rebalance listener确实正常工作,可以监听到kafka的rebalance动作。
方法2启动之后,再去启动方法3,方法2也打印了2次listener的日志。我这就不占地方了,就不贴了。
当时kafka的server.log日志也发生了rebalance操作,启动方法2、3的时候触发了2次重平衡操作,服务端也打印了2次日志。

  1. [2021-03-30 10:38:59,930] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
  2. [2021-03-30 10:39:17,721] INFO [GroupCoordinator 1]: Preparing to rebalance group lxk-test with old generation 39 (__consumer_offsets-26) (kafka.coordinator.group.GroupCoordinator)
  3. [2021-03-30 10:39:20,838] INFO [GroupCoordinator 1]: Stabilized group lxk-test generation 40 (__consumer_offsets-26) (kafka.coordinator.group.GroupCoordinator)
  4. [2021-03-30 10:39:20,845] INFO [GroupCoordinator 1]: Assignment received from leader for group lxk-test for generation 40 (kafka.coordinator.group.GroupCoordinator)
  5. [2021-03-30 10:39:28,841] INFO [GroupCoordinator 1]: Preparing to rebalance group lxk-test with old generation 40 (__consumer_offsets-26) (kafka.coordinator.group.GroupCoordinator)
  6. [2021-03-30 10:39:31,011] INFO [GroupCoordinator 1]: Stabilized group lxk-test generation 41 (__consumer_offsets-26) (kafka.coordinator.group.GroupCoordinator)
  7. [2021-03-30 10:39:31,030] INFO [GroupCoordinator 1]: Assignment received from leader for group lxk-test for generation 41 (kafka.coordinator.group.GroupCoordinator)

最后当方法3开始消费打印分区信息时候,看看3个方法的打印是不是均衡的占有分区,这一分钟的消费信息,可以看出来,确实已经按照kafka的分区负载均衡理论平衡了。39分31秒的时候,平衡完成,下面就看到3个方法的15个消费者是均衡的。

  1. 2021-03-30 10:39:33 rebalance3()-[0] 当前消费者消费的记录的所有分区信息 ["6"]
  2. 2021-03-30 10:39:33 rebalance3()-[4] 当前消费者消费的记录的所有分区信息 ["4"]
  3. 2021-03-30 10:39:33 rebalance3()-[1] 当前消费者消费的记录的所有分区信息 ["2"]
  4. 2021-03-30 10:39:33 rebalance3()-[3] 当前消费者消费的记录的所有分区信息 ["9"]
  5. 2021-03-30 10:39:33 rebalance3()-[2] 当前消费者消费的记录的所有分区信息 ["13"]
  6. 2021-03-30 10:39:33 rebalance2()-[2] 当前消费者消费的记录的所有分区信息 ["14"]
  7. 2021-03-30 10:39:33 rebalance2()-[4] 当前消费者消费的记录的所有分区信息 ["7"]
  8. 2021-03-30 10:39:33 rebalance2()-[1] 当前消费者消费的记录的所有分区信息 ["3"]
  9. 2021-03-30 10:39:33 rebalance2()-[0] 当前消费者消费的记录的所有分区信息 ["10"]
  10. 2021-03-30 10:39:33 rebalance2()-[3] 当前消费者消费的记录的所有分区信息 ["1"]
  11. 2021-03-30 10:39:33 rebalance1()-[1] 当前消费者消费的记录的所有分区信息 ["0"]
  12. 2021-03-30 10:39:33 rebalance1()-[0] 当前消费者消费的记录的所有分区信息 ["8"]
  13. 2021-03-30 10:39:33 rebalance1()-[2] 当前消费者消费的记录的所有分区信息 ["11"]
  14. 2021-03-30 10:39:33 rebalance1()-[4] 当前消费者消费的记录的所有分区信息 ["5"]
  15. 2021-03-30 10:39:33 rebalance1()-[3] 当前消费者消费的记录的所有分区信息 ["12"]

最后,在kafka manager里面看看这个topic的消费信息,下面是3个方法都run的时候的截图,看consumer 的id是15个不一样的。

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzI3MDkzNDY1_size_16_color_FFFFFF_t_70 4

再走一下关停的测试,下面是关掉方法3的这个topic的信息截图,还是消费者的id,发现前5个消费者各自占2个分区,后面5个各自占1个分区,一个15个分区,被10个消费者均衡分配。哦,关停方法3的时候,其他2个地方也打印了listener的日志了。

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzI3MDkzNDY1_size_16_color_FFFFFF_t_70 5

再关停方法2,再看看这个topic的截图信息。现在消费者剩下5个了,每个消费者占3个分区。那个listener的日志也打印了。

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzI3MDkzNDY1_size_16_color_FFFFFF_t_70 6

在这个关停的操作的时候,kafka server.log也有如下日志:消失的消费者就被那个coordinator给remove了

  1. [2021-03-30 11:01:29,203] INFO [GroupCoordinator 1]: Member consumer-2-68573f53-4858-47d1-b043-6747f28f317d in group lxk-test has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
  2. [2021-03-30 11:01:29,204] INFO [GroupCoordinator 1]: Preparing to rebalance group lxk-test with old generation 41 (__consumer_offsets-26) (kafka.coordinator.group.GroupCoordinator)
  3. [2021-03-30 11:01:29,820] INFO [GroupCoordinator 1]: Member consumer-4-4f644847-9565-41f0-9a2b-4fa40554fc01 in group lxk-test has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
  4. [2021-03-30 11:01:29,953] INFO [GroupCoordinator 1]: Member consumer-1-8d0c9158-30e0-4159-98a0-d65346b883e3 in group lxk-test has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
  5. [2021-03-30 11:01:30,056] INFO [GroupCoordinator 1]: Member consumer-5-6c3efa5c-ea6a-457d-90ac-c8fbc19958c6 in group lxk-test has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
  6. [2021-03-30 11:01:30,154] INFO [GroupCoordinator 1]: Member consumer-3-b97e9a63-bbf5-4e36-b963-ad0f0c0b2ba6 in group lxk-test has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
  7. [2021-03-30 11:01:31,487] INFO [GroupCoordinator 1]: Stabilized group lxk-test generation 42 (__consumer_offsets-26) (kafka.coordinator.group.GroupCoordinator)
  8. [2021-03-30 11:01:31,606] INFO [GroupCoordinator 1]: Assignment received from leader for group lxk-test for generation 42 (kafka.coordinator.group.GroupCoordinator)
  9. [2021-03-30 11:07:16,264] INFO [GroupCoordinator 1]: Member consumer-3-c09ff92f-e945-4afd-9b8c-d604c7cbc352 in group lxk-test has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
  10. [2021-03-30 11:07:16,264] INFO [GroupCoordinator 1]: Preparing to rebalance group lxk-test with old generation 42 (__consumer_offsets-26) (kafka.coordinator.group.GroupCoordinator)
  11. [2021-03-30 11:07:16,264] INFO [GroupCoordinator 1]: Member consumer-4-f7a5f47b-4d92-4d8b-993d-3a1cef73c091 in group lxk-test has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
  12. [2021-03-30 11:07:16,264] INFO [GroupCoordinator 1]: Member consumer-5-e6dcbac5-6249-40df-acce-2f9b583ae129 in group lxk-test has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
  13. [2021-03-30 11:07:16,264] INFO [GroupCoordinator 1]: Member consumer-1-52ea9c90-a02a-4ed6-ac86-0380b2b4a8ef in group lxk-test has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
  14. [2021-03-30 11:07:16,266] INFO [GroupCoordinator 1]: Member consumer-2-38e2baa1-0e3a-47d9-bcd2-c82a50701ed4 in group lxk-test has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
  15. [2021-03-30 11:07:20,264] INFO [GroupCoordinator 1]: Stabilized group lxk-test generation 43 (__consumer_offsets-26) (kafka.coordinator.group.GroupCoordinator)
  16. [2021-03-30 11:07:20,268] INFO [GroupCoordinator 1]: Assignment received from leader for group lxk-test for generation 43 (kafka.coordinator.group.GroupCoordinator)
  17. [2021-03-30 11:08:59,951] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 21 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

最后关完了消费之是什么情况呢?生产者是一直在造呢。

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzI3MDkzNDY1_size_16_color_FFFFFF_t_70 7

这个看到15给分区数据增长很均衡,但是consumer的offset就停止了,因为都不消费啦,然后那个lag值一直在上涨,这个时候,就算是kafka产生了积压,lag值的意义:这个lag是0或者是负数,则说明kafka的消费者运行状况良好,数据来了就给消费走了。如果是正数,则说明kafka数据积压了,要么消费者挂了,要么消费能力处理不行,导致积压了。

还有就是顺带做了另外的一个测试:

当使用相同的group.id去消费kafka的一个topic 假设是A,然后再启动一个consumer,使用相同的group.id,去消费不同的另一个topic 假设是B,这个时候,也是会触发kafka的rebalance操作的。就是说:A的n个consumer也会进行重平衡。这个测试,简单证实一下kafka的rebalance的触发条件:同一个group.id下的不同topic的consumer变化,是会影响的,引发kafka的rebalance操作的,之前一直以为A、B两个topic在一个分组里面,是不会互相影响的,但是却被实际测试打脸了。他们是有关系的。

我的测试代码,摆在github上了,想看的可以看看。

https://github.com/cmshome/JavaNote/tree/master/storm/src/test/java/com/lxk/storm/kafka/listener

发表评论

表情:
评论列表 (有 0 条评论,729人围观)

还没有评论,来说两句吧...

相关阅读