Kafka——》消费组管理kafka-consumer-groups

痛定思痛。 2024-03-31 15:32 150阅读 0赞

推荐链接:
总结——》【Java】
总结——》【Mysql】
总结——》【Redis】
总结——》【Kafka】
总结——》【Spring】
总结——》【SpringBoot】
总结——》【MyBatis、MyBatis-Plus】

Kafka——》消费组管理kafka-consumer-groups

  • list:查看消费者组列表
  • describe:查看消费者组详情
  • state:查看消费者组状态
  • members:查看消费者组成员
  • delete:删除消费者组
  • reset-offsets:重置消费组的偏移量
  • delete-offsets:删除消费组的偏移量

KAFKA_HOME=/opt/app/install/kafka


































































参数 描述
—bootstrap-server kafka服务地址
—list 查看消费者组列表
—group 指定消费者组
—all-groups 所有消费者组
—describe 查看消费者组详情
—state 查看消费者组状态
—members 查看消费者组成员
—delete 删除消费者组
—reset-offsets 重置消费组的偏移量
—delete-offsets 删除消费组的偏移量
—dry-run 预先执行重置偏移量
—excute 真正执行重置偏移量
—to-earliest 将offset重置到最早
—to-latest 将offset重置到最近

list:查看消费者组列表

  1. # 查看消费者组列表
  2. bin/kafka-consumer-groups.sh --bootstrap-server x.x.x.x:9092 --list

describe:查看消费者组详情

  1. # 查看所有消费者组详情
  2. bin/kafka-consumer-groups.sh --bootstrap-server 10.116.192.27:9092 --describe --all-groups
  3. # 查看指定消费者组详情
  4. bin/kafka-consumer-groups.sh --bootstrap-server 10.116.192.27:9092 --describe --group listenerForSyncEsfCommunity1





























结果列 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
描述 消费组 主题 分区编号 当前offset 最新offset 消息滞后(未消费)数量 消费者ID 主机 客户端ID

state:查看消费者组状态

  1. # 查看所有消费者组状态
  2. bin/kafka-consumer-groups.sh --bootstrap-server 10.116.192.27:9092 --describe --all-groups --state
  3. # 查看指定消费者组状态
  4. bin/kafka-consumer-groups.sh --bootstrap-server 10.116.192.27:9092 --describe --group listenerForSyncEsfCommunity1 --state





















结果列 GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
描述 消费组 协调者ID 分配策略 状态
- Stable:有消费者成员
- Empty:没有消费者成员
- Dead
- PreparingRebalance
- CompletingRebalance
成员数量

members:查看消费者组成员

  1. # 查看所有消费者组成员
  2. bin/kafka-consumer-groups.sh --bootstrap-server 10.116.192.27:9092 --describe --all-groups --members
  3. # 查看指定消费者组成员
  4. bin/kafka-consumer-groups.sh --bootstrap-server 10.116.192.27:9092 --describe --group listenerForSyncEsfCommunity1 --members





















结果列 GROUP CONSUMER-ID HOST CLIENT-ID #PARTITION
描述 消费组 消费者ID 主机 客户端ID 分区

delete:删除消费者组

  1. # 删除所有消费者组
  2. bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --delete --all-groups
  3. # 删除指定消费者组
  4. bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --delete --group listenerForSyncEsfCommunity1
  5. # 只有这个消费组的所有客户端都停止消费/不在线才能够成功删除,否则会报下面异常
  6. Error: Deletion of some consumer groups failed:
  7. * Group 'listenerForSyncEsfCommunity1' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.

reset-offsets:重置消费组的偏移量














































重置Offset的模式 描述
—to-earliest 重置到最早offset
—to-latest 重置到最近offset
—to-current 重置到当前offset
—to-datetime 重置到指定时间offset
格式:YYYY-MM-DDTHH:mm:SS.sss
示例:2021-6-26T00:00:00.000
—to-offset 重置到指定offset(多个分区都重置,一般不用这个)
—shift-by 按照偏移量增加或者减少offset
- 正数:往前增加
- 负数:往后减少
—from-file 根据CVS文档来重置
—dry-run 预先执行重置偏移量
—excute 真正执行重置偏移量
  1. # 重置指定消费组的所有Topic的偏移量
  2. bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --reset-offsets --to-earliest --group listenerForSyncEsfCommunity1 --all-topic --dry-run
  3. # 重置指定消费组的指定Topic的偏移量
  4. bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --reset-offsets --to-earliest --group listenerForSyncEsfCommunity1 --topic test_topic --dry-run
  5. # 重置所有消费组的所有Topic的偏移量
  6. bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --reset-offsets --to-earliest --all-group --all-topic --dry-run
  7. # 重置所有消费组的指定Topic的偏移量
  8. bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --reset-offsets --to-earliest --all-group --topic test_topic --dry-run
  9. # 只有这个消费组不可用状态才能重置成功,否则会报下面异常:
  10. Error: Assignments can only be reset if the group 'listenerForSyncEsfCommunity1' is inactive, but the current state is Stable.
  11. TOPIC PARTITION NEW-OFFSET

delete-offsets:删除消费组的偏移量

  1. # 删除指定消费组的指定Topic的偏移量(下一次从头消费)
  2. bin/kafka-consumer-groups.sh --bootstrap-server 10.122.139.40:9092 --delete-offsets --group listenerForSyncEsfCommunity1 --topic test_topic
  3. # 只有这个消费组不可用状态才能删除成功,否则会报异常

发表评论

表情:
评论列表 (有 0 条评论,150人围观)

还没有评论,来说两句吧...

相关阅读

    相关 kafka系列-进阶篇之消费

    前言 消费组是kafka一个非常有意思的设计。在高并发方面,可以使用多个台服务器放在同一个消费组中,就可以保证所有的消费者拉取的消费不会重复并且完整,这样就可以提高消费者