【kafka原理】 消费者偏移量__consumer_offsets_相关解析 分手后的思念是犯贱 2021-06-10 20:40 347阅读 0赞 我们在kafka的log文件中发现了还有很多以 `__consumer_offsets_`的文件夹;总共50个; 由于Zookeeper并不适合大批量的频繁写入操作,新版Kafka已推荐将consumer的位移信息保存在Kafka内部的topic中,即`__consumer_offsets` topic,并且默认提供了`kafka_consumer_groups.sh`脚本供用户查看consumer信息。 `__consumer_offsets` 是 kafka 自行创建的,和普通的 topic 相同。它存在的目的之一就是保存 consumer 提交的位移。 `__consumer_offsets` 的每条消息格式大致如图所示 ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3UwMTA2MzQwNjY_size_16_color_FFFFFF_t_70_pic_center] 可以想象成一个 KV 格式的消息,key 就是一个三元组:`group.id+topic+分区号`,而 value 就是 offset 的值。 考虑到一个 kafka 生成环境中可能有很多`consumer` 和 `consumer group`,如果这些 consumer 同时提交位移,则必将加重 \_\_consumer\_offsets 的写入负载,因此 kafka 默认为该 topic 创建了50个分区,并且对每个 `group.id`做哈希求模运算`Math.abs(groupID.hashCode()) % numPartitions`,从而将负载分散到不同的 \_\_consumer\_offsets 分区上。 一般情况下,当集群中第一次有消费者消费消息时会自动创建`__consumer_offsets`,它的副本因子受 `offsets.topic.replication.factor` 参数的约束,默认值为3(注意:该参数的使用限制在0.11.0.0版本发生变化),分区数可以通过 `offsets.topic.num.partitions` 参数设置,默认值为50。 ### 1. 消费Topic消息 ### 打开一个session a,执行下面的消费者命令 ;指定了`消费组:szz1-group`; `topic:szz1-test-topic` bin/kafka-console-consumer.sh --bootstrap-server xxx1:9092,xxx2:9092,xxx3:9092 --group szz1-group --topic szz1-test-topic **2.产生消息** 打开一个新的session b,执行生产消息命令 bin/kafka-console-producer.sh --broker-list xxx1:9092,xxx2:9092,xxx3:9092 --topic szz1-test-topic 发送几条消息 ![在这里插入图片描述][202010271124076.png_pic_center] 然后可以看到刚刚打开的 session a 消费了消息; ![在这里插入图片描述][20201027112455861.png_pic_center] ### 3. 查看指定消费组的消费位置offset ### bin/kafka-consumer-groups.sh --bootstrap-server xxx1:9092,xxx2:9092,xxx3:9092 --describe --group szz1-group ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3UwMTA2MzQwNjY_size_16_color_FFFFFF_t_70_pic_center 1] 可以看到图中 展示了每个`partition` 对应的消费者id; 因为只开了一个消费者; 所以是这个消费者同时消费3个`partition`; `CURRENT-OFFSET: 当前消费组消费到的偏移量` `LOG-END-OFFSET: 日志最后的偏移量` `CURRENT-OFFSET` = `LOG-END-OFFSET` 说明当前消费组已经全部消费了; 那么我把 session a 关掉;现在没有消费者之后; 我再发送几条消息看看; ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3UwMTA2MzQwNjY_size_16_color_FFFFFF_t_70_pic_center 2] 我发送了2条消息之后, `partition-0` `partition-1` 的`LOG-END-OFFSET: 日志最后的偏移量`分别增加了1; 但是`CURRENT-OFFSET: 当前消费组消费到的偏移量` 保持不变;因为没有被消费; **重新打开一个消费组 继续消费**\* 重新打开session之后, 会发现控制台输出了刚刚发送的2条消息; 并且偏移量也更新了 ![在这里插入图片描述][20201027113806535.png_pic_center] ### 4. 从头开始消费 --from-beginning ### 如果我们用新的消费组去消费一个Topic,那么默认这个消费组的offset会是最新的; 也就是说历史的不会消费 例如下面我们新开一个session c ;消费组设置为`szz1-group3` bin/kafka-console-consumer.sh --bootstrap-server xxx1:9092,xxx2:9092,xxx3:9092 --group szz1-group3 --topic szz1-test-topic 查看消费情况 bin/kafka-consumer-groups.sh --bootstrap-server xxx1:9092,xxx2:9092,xxx3:9092 --describe --group szz1-group3 ![在这里插入图片描述][20201027123506994.png_pic_center] 可以看到`CURRENT-OFFSET` = `LOG-END-OFFSET` ; 如何让新的消费组/者 从头开始消费呢? 加上参数 `--from-beginning` ### 5.如何确认 consume\_group 在哪个\_\_consumer\_offsets-? 中 ### `Math.abs(groupID.hashCode()) % numPartitions` ### 6. 查找\_\_consumer\_offsets 分区数中的消费组偏移量offset ### 上面的 **3. 查看指定消费组的消费位置offset** 中,我们知道如何查看指定的topic消费组的偏移量; 那还有一种方式也可以查询 先通过 `consume_group` 确定分区数; 例如 `"szz1-group".hashCode()%50=32`; 那我们就知道 `szz-group`消费组的偏移量信息存放在 `__consumer_offsets_32`中; 通过命令 bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 32 --broker-list xxx1:9092,xxx2:9092,xxx3:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" ![在这里插入图片描述][20201027135823471.png_pic_center] **前面的 是key 后面的是value;key由 消费组+Topic+分区数 确定; 后面的value就包含了 消费组的偏移量信息等等** 然后接着我们发送几个消息,并且进行消费; 上面的控制台会自动更新为新的offset; ### 7 查询topic的分布情况 ### bin/kafka-topics.sh --describe --zookeeper xxx:2181 --topic TOPIC名称 ### 8 清理 \_\_consumer\_offsets ### [https://www.cnblogs.com/ding2016/p/9294544.html][https_www.cnblogs.com_ding2016_p_9294544.html] [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3UwMTA2MzQwNjY_size_16_color_FFFFFF_t_70_pic_center]: /images/20210527/cdafcc8048774e388572b5197830753f.png [202010271124076.png_pic_center]: /images/20210527/00a644cac2ef4c118feb7eb4a5b12c3b.png [20201027112455861.png_pic_center]: /images/20210527/7eece8fa931c4848929a547b3da5be31.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3UwMTA2MzQwNjY_size_16_color_FFFFFF_t_70_pic_center 1]: /images/20210527/c0ff78c2e9204b7eb1ee7693c131eec1.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3UwMTA2MzQwNjY_size_16_color_FFFFFF_t_70_pic_center 2]: /images/20210527/eea85c37fd084f89b6dce849b6772b62.png [20201027113806535.png_pic_center]: /images/20210527/fcaef735fce5459a80375ab07b0435ba.png [20201027123506994.png_pic_center]: /images/20210527/5f1c138a068f4bc5963f4b1ed2efce83.png [20201027135823471.png_pic_center]: /images/20210527/b34585990abb48b4b05cec4857858fbd.png [https_www.cnblogs.com_ding2016_p_9294544.html]: https://www.cnblogs.com/ding2016/p/9294544.html
还没有评论,来说两句吧...