kafka 常用运维命令介绍(一)

快来打我* 2022-04-06 07:42 1425阅读 0赞

文章目录

    • 一、连接zk
      • zkCli 命令
    • 二、topic 相关
      • 列出所有的topic & 获取命令帮助
      • 创建topic
      • 列出所有topic的详情
      • 删除topic
      • 修改topic相关信息
    • 三、分区副本重分配
      • 提交分区副本重分配任务:
      • 让系统自动帮我们生成重分配json文件:
      • 其他参数
      • 分区副本重分配过程
    • 四、删除某个partition的数据
    • 五、全局&topic配置修改
      • 配置更新原理
      • 修改broker配置
      • 修改topic的配置
      • 修改client的配置
    • 六、查看broker上磁盘的使用情况
    • 七、使用kafka-preferred-replica-election进行leader选举

一、连接zk

由于kafka的各种元数据都存储在zk,要连接kafka集群也要通过zk获取各个broker的ip端口然后连接broker。因此,大多数kafka自带的运维命令都要指定zk的地址,比如用kafka-topics列出所有topics:

  1. kafka-topics --zookeeper localhost:2181/kafka --list

--zookeeper参数是必须指定的。另外,如果kafka集群启动的时候在配置文件中指定了namespace,记得要在zk的地址后面也要加上kafka所属的namespace。否则kafka就找不到kafka集群的相关元数据了。

由于kafka的元数据都存储在zk,因此掌握好如何查看zk的数据也是运维kafka集群的一个关键。

zkCli 命令

zookeeper安装包一般都会提供zkCli命令来让用户连接zookeeper集群。

  1. ./zkCli.sh -timeout 5000 -r -server ip:port

之后进入zk的交互界面,就可以输出相关命令查看zk的数据了。

  1. # 查看kafka集群下所有的brokers id列表
  2. ls /brokers/ids
  3. # 查看 1003 broker的信息
  4. get /brokers/ids/1003

其他zk交互界面的命令这里不多做介绍,zkCli的帮助文档已经写的很清楚了。

二、topic 相关

kafka-topics可以进行和topics相关的一些操作。下面介绍一下如何运用该命令来操作kafka topics。

该命令最终是调用kafka源码中的TopicCommand类来实现的。

列出所有的topic & 获取命令帮助

  1. # 列出帮助文档,英文好的同学基本看帮助文档就可以指定大概怎么使用该命令了
  2. kafka-topics --help
  3. # 列出kafka集群下的所有topics,这里需要指定kafka机器元数据存储所在的zk机器地址,记得如果有namespace,要也加上,否则将连不上kafka集群
  4. kafka-topics --zookeeper localhost:2181/kafka --list

创建topic

  1. # 创建一个topic为test的topic,并指定分区数为5,副本数为1。这里的副本数不能超过broker的数量,否则会报错
  2. kafka-topics --topic test --zookeeper localhost:2181/kafka --create --replication-factor 1 --partitions 5
  3. # 创建时指定副本在哪个broker上,多个partition之间用逗号分隔,副本之间用":"分割,第一个副本默认是leader
  4. kafka-topics.sh --zookeeper 172.19.0.5:2181 --topic lyt2 --create --replica-assignment 1001:1002,1001:1002,1001:1002

列出所有topic的详情

通过 --describe 参数可以列出我们指定的topics详情,包括 partitions、leader、replicas、isr等。

  1. kafka-topics --zookeeper localhost:2181/kafka --describe test test_yangjb
  2. # 输出
  3. Topic:test PartitionCount:5 ReplicationFactor:3 Configs:
  4. Topic: test Partition: 0 Leader: 1001 Replicas: 1001,1002,1003 Isr: 1002,1001,1003
  5. Topic: test Partition: 1 Leader: 1002 Replicas: 1002,1003,1001 Isr: 1002,1003,1001
  6. Topic: test Partition: 2 Leader: 1003 Replicas: 1003,1001,1002 Isr: 1002,1001,1003
  7. Topic: test Partition: 3 Leader: 1001 Replicas: 1001,1003,1002 Isr: 1002,1001,1003
  8. Topic: test Partition: 4 Leader: 1002 Replicas: 1002,1001,1003 Isr: 1002,1001,1003

下面是一些 使用—describe时可以使用的其他参数

  1. # 只列出修改了默认配置的那些topic。并可以查看修改了哪些topic配置
  2. --topics-with-overrides
  3. # 列出那些目前没有leader的topic
  4. --under-replicated-partitions
  5. # 列出那些正在同步的topic或者同步出现异常的topic
  6. --under-replicated-partitions

删除topic

注意,kafka删除topic是异步的,因此并不是命令返回了topic就已经被成功删除。而是等待后台的删除任务执行成功才真正删除该topic。

  1. kafka-topics --zookeeper localhost:2181/kafka --delete --topic yangjb_test

修改topic相关信息

通过 --alter 参数可以修改topic的信息,能修改的信息包括 partition数量、replica分配情况、topic配置。如果要修改 partition数量时,修改的后的数量一定要比当前的数量大,否则会报错。

  1. # 将partition数量修改成7个
  2. kafka-topics --zookeeper localhost:2181/kafka --topic test --alter --partitions 7
  3. # 通过 --replica-assignment 参数指定新增partition的副本分布情况
  4. # 如果原先的partition数量是3,那么新增的一个分区的副本分布应该在1002和1003
  5. kafka-topics --zookeeper localhost:2181/kafka --topic test -alter --partitions 4 --replica-assignment 1001:1002,1001:1002,1001:1002,1002:1003
  6. # 修改topic test的配置 flush.ms =30000 。
  7. kafka-topics --zookeeper localhost:2181/kafka --topic test --alter --config flush.ms=30000
  8. # 删除topic test的 flush.ms 配置
  9. kafka-topics --zookeeper localhost:2181/kafka --topic test --alter --delete-config flush.ms

注意,在后续的kafka版本中,关于topic的配置的修改删除可能会被移到kafka-configs.sh中。官方建议使用kafka-configs来修改topic的配置。

三、分区副本重分配

在数据量大的情况下,各个broker上的数据量经常会不一致,有的broker上数据非常大,有的则很小,为了让数据更均匀的分布在各个broker,我们就要学会对topic的partion进行分区副本重分配。

首先建立一个json文件,用来描述如何分配分区副本。

assign.json

  1. {
  2. "partitions": [
  3. {
  4. "topic": "test",
  5. "partition": 1,
  6. "replicas": [
  7. 1002,
  8. 1003
  9. ]
  10. },
  11. {
  12. "topic": "test",
  13. "partition": 2,
  14. "replicas": [
  15. 1003,
  16. 1002
  17. ]
  18. }
  19. ],
  20. "version": 1
  21. }

文件中只要指定要重新分配副本的分区号就可以,不需要列出所有分区。

提交分区副本重分配任务:

  1. # --execute 参数表示执行
  2. kafka-reassign-partitions --zookeeper localhost:2181/kafka --reassignment-json-file assign.json --execute
  3. # --verify 参数表示查看分区副本重分配任务的执行状态
  4. kafka-reassign-partitions --zookeeper localhost:2181/kafka --reassignment-json-file assign.json --verify

让系统自动帮我们生成重分配json文件:

执行命令之前需要建立一个json文件,告诉系统要重分配哪些分区:

gen.json:

  1. {
  2. "topics": [
  3. {
  4. "topic": "foo"
  5. }
  6. ],
  7. "version": 1
  8. }

接着执行命令

  1. # --generate 表示生成重分配的json文件
  2. # --topics-to-move-json-file 指定要重分配哪些topic
  3. # --broker-list 表示要分配到哪些broker上去
  4. kafka-reassign-partitions --zookeeper localhost:2181/kafka --generate --topics-to-move-json-file gen.json --broker-list 1001,1002,1003

其他参数

以下参数低版本的kafka并不支持,要使用之前请先确定你使用的版本支持这些特性

  1. # 指定重分配时,在一个broker上,各个日志目录之间复制数据的阈值,最低要求 1 KB/s
  2. # 如果重分配任务正在进行,第二次执行会修改原来设置的阈值
  3. --replica-alter-log-dirs
  4. # 指定重分配时,在不同broker之间传输数据的阈值,最低要求 1 KB/s
  5. # 如果重分配任务正在进行,第二次执行会修改原来设置的阈值
  6. --throttle
  7. # 等待重分配任务开始的超时时间
  8. --timeout

分区副本重分配过程

详情可以看kafka源码的KafkaController#onPartitionReassignment()的方法注解。

RAR = Reassigned replicas,目标要分配的副本情况
OAR = Original list of replicas for partition,原先的副本分配情况
AR = current assigned replicas,当前的副本分配情况

  1. 更新zk处的partition副本配置:AR=RAR+OAR
  2. 向所有RAR+OAR的副本发送元数据更新请求
  3. 将新增的那部分的副本状态设置为NewReplica。也就是 RAR-OAR 那部分副本
  4. 等待所有的副本和leader保持同步。也就是抱着RAR+OAR的副本都在isr中了
  5. 将所有在RAR中的副本状态都设置为OnlineReplica
  6. 在内存中先将AR=RAR
  7. 如果leader不在RAR中,就需要重新竞选leader。采用ReassignedPartitionLeaderSelector选举
  8. 将所有准备移除的副本状态设置为OfflineReplica。也就是OAR-RAR的那部分副本。这时partition的isr会收缩
  9. 将所有准备移除的副本状态设置为NonExistentReplica。这时所在的分区副本数据会被删除。
  10. 将内存中的AR更新到zk
  11. 更新zk的/admin/reassign_partitions路径,移除这个partition
  12. 发送新的元数据到各个broker上

假设当前有OAR = {1, 2, 3}, RAR = {4,5,6},在进行partition reaassigned的过程中会发生如下变化









































AR leader/isr 步骤
{1,2,3} 1/{1,2,3} 初始状态
{1,2,3,4,5,6} 1/{1,2,3,4,5,6} 步骤2
{1,2,3,4,5,6} 1/{1,2,3,4,5,6} 步骤4
{1,2,3,4,5,6} 4/{1,2,3,4,5,6} 步骤7
{1,2,3,4,5,6} 4/{1,2,3,4,5,6} 步骤8
{4,5,6} 4/{4,5,6} 步骤10

四、删除某个partition的数据

使用kafka-delete-records命令可以删除指定topic-partition在指定offset之前的所有数据。

该命令是kafka在0.11版本之后才支持的。

首先需要编写删除offset描述json文件:

delete.json

  1. {
  2. "partitions": [
  3. {
  4. "topic": "test",
  5. "partition": 0,
  6. "offset": 24
  7. }
  8. ],
  9. "version": 1
  10. }

上面的json文件表示删除topic是test的0号parition的24之前的所有offset,也就是1-23这些offset的数据都会被删除掉。

  1. kafka-delete-records --bootstrap-server 127.0.0.1:9092 --offset-json-file delete.json

五、全局&topic配置修改

通过kafka-configs命令,我们可以修改broker的配置,以及topic的配置、client和user的配置。

配置更新原理

kafka-configs命令修改配置后会被写到对应的zookeeper的节点上持久化,之后kafka集群重启后还会加载这些配置,并覆盖配置文件的那些配置。也就是说,如果在此处设置了某个配置项,之后在配置文件中对这个配置项的改动都不会起作用,因为被覆盖了。

用该命令修改了配置后,可以在zk的节点下看到对应的配置内容。

节点目录一般是 /config/entityType/entityName,entityType可以是brokers、topics、users、clients。entityName表示具体的名称,比如broker的id,topic的名称等。

比如要看0号broker修改过的配置项,可以在zk交互界面中输入

  1. # /kafka 是命名空间
  2. get /kafka/config/brokers/0

修改broker配置

  1. # 将0号broker的配置 log.cleaner.backoff.ms修改成1000,flush.ms 也修改成1000
  2. # --alter 表示要修改配置项
  3. # --add-config 后面跟着要修改的配置项
  4. kafka-configs --bootstrap-server 127.0.0.1:9092 --entity-type brokers --entity-name 0 --add-config log.cleaner.backoff.ms=1000,flush.ms=1000 --alter
  5. # 删除0号broker 对 log.cleaner.backoff.ms的配置
  6. kafka-configs --bootstrap-server 127.0.0.1:9092 --entity-type brokers --entity-name 0 --delete-config log.cleaner.backoff.ms --alter
  7. # 列出0号broker修改过的配置项
  8. kafka-configs --bootstrap-server 127.0.0.1:9092 --entity-type brokers --entity-name 0 --describe

修改topic的配置

  1. # 将test这个topic的 delete.retention.ms修改成1000,flush.ms 也修改成1000
  2. kafka-configs --zookeeper 127.0.0.1:2181/kafka --entity-type topics --entity-name test --add-config delete.retention.ms=1000,flush.ms=1000 --alter
  3. # 删除test这个topic的 delete.retention.ms和flush.ms配置项
  4. kafka-configs --zookeeper 127.0.0.1:2181/kafka --entity-type topics --entity-name test --delete-config delete.retention.ms,flush.ms --alter
  5. # 列出 test这个topic修改过的配置项
  6. kafka-configs --zookeeper 127.0.0.1:2181/kafka --entity-type topics --entity-name test --describe

修改client的配置

这里的client是指客户端,也就是produer或者consumer。客户端支持修改的配置有

  1. # 请求限制
  2. request_percentage
  3. # 推送消息时的流量控制
  4. producer_byte_rate
  5. # 消费时的流量控制
  6. consumer_byte_rate

通过指定clientId我们可以控制指定客户端的配置,从而控制他们的流量不会超过我们设定的值

  1. # 设置 客户端id 为test的 producer_byte_rate和consumer_byte_rate为1024
  2. kafka-configs --zookeeper 127.0.0.1:2181/kafka --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=1024' --entity-type clients --entity-name test

六、查看broker上磁盘的使用情况

在0.11版本中,新增一个命令kafka-log-dirs可以查看broker的磁盘使用情况。

该命令可以从两个维度观察磁盘的使用情况,一个是指定broker id,查看该broker的数据目录的各个topic parition的占用大小。还可以直接指定topic,查看这些topic的partition在各个broker上的使用情况。甚至可以两个过滤条件一起用,同时指定brokerId和topic。

  1. # 查看0、1号broker上各个topic partition的磁盘使用情况
  2. kafka-log-dirs --bootstrap-server 127.0.0.1:9092 --broker-list 0,1 --describe
  3. # 查看topic:test 在各个broker上的磁盘使用情况
  4. kafka-log-dirs --bootstrap-server 127.0.0.1:9092 --topic-list test --describe
  5. # 查看topic test 在0号broker上的磁盘使用情况
  6. kafka-log-dirs --bootstrap-server 127.0.0.1:9092 --topic-list test --broker-list 0 --describe

输出示例:

  1. {
  2. "version": 1,
  3. "brokers": [
  4. {
  5. "broker": 1001,
  6. "logDirs": [
  7. {
  8. "logDir": "/kafka/kafka-logs-7da01186c90a",
  9. "error": null,
  10. "partitions": [
  11. {
  12. "partition": "test-4",
  13. "size": 0,
  14. "offsetLag": 0,
  15. "isFuture": false
  16. },
  17. {
  18. "partition": "test-0",
  19. "size": 0,
  20. "offsetLag": 0,
  21. "isFuture": false
  22. }
  23. ]
  24. }
  25. ]
  26. }
  27. ]
  28. }

七、使用kafka-preferred-replica-election进行leader选举

当我们查看某个topic partition时,会输出该partiton replica的列表,其中replica列表的第一个replica被kafka称为preferred replica。

  1. Topic: test Partition: 0 Leader: 1002 Replicas: 1001,1002,1003 Isr: 1002,1001,1003

上面的test partition-0中,1001就是那个preferred replica。在大多情况下,preferred replica一般就是leader,但是有些情况可能不是。因此,kafka提供了kafka-preferred-replica-election来将preferred replica选举成leader。

首先我们需要编辑prefered.json 文件:

  1. {
  2. "partitions": [
  3. {
  4. "topic": "test",
  5. "partition": 0
  6. }
  7. ]
  8. }

该文件告诉工具我们要对topic test的partition-0进行preferred选举:

  1. kafka-preferred-replica-election --zookeeper 127.0.0.1:2181 --path-to-json-file prefered.json

发表评论

表情:
评论列表 (有 0 条评论,1425人围观)

还没有评论,来说两句吧...

相关阅读