kafka 常用运维命令介绍(二)

忘是亡心i 2022-04-06 15:01 704阅读 0赞

文章目录

    • 一、producer 相关命令
        1. kafka-console-producer 生产消息
        1. 使用 kafka-producer-perf-test 进行producer的基准测试
        1. 使用 kafka-verifiable-producer 批量推送消息
        1. 使用kafka-replay-log-producer进行topic之间的消息复制
    • 二、consumer相关命令
        1. kafka-console-consumer 消费消息
        1. 使用 kafka-consumer-perf-test 进行consumer的基准测试
        1. 使用kafka-verifiable-consumer批量拉取消息
        1. 使用kafka-consumer-groups命令管理ConsumerGroup
        • ➀、列出所有的ConsumerGroup(新旧版本api区别)
        • ➁、删除ConsumerGroup
        • ➂、列出ConsumerGroup详情
          • 查看group的offset消费记录
          • 查看group的member信息
          • 查看group的状态
        • ➃、重置group的消费记录
          • dry-run 模式
          • —execute 模式
          • 重置offset的几种策略
    • 三、replica数据一致性校验

一、producer 相关命令

1. kafka-console-producer 生产消息

使用kafka-console-producer我们可以快速往某个topic推送消息。kafka-console-producer使用的也是KafkaProducer类进行消息的推送,因此KafkaProducer支持的参数kafka-console-producer都可以配置。

有关KafkaProducer的相关原理可以看我的这篇博客:

https://blog.csdn.net/u013332124/article/details/81321942

  1. # 执行下面这条命令后会进入producer的交互界面,输入字符串就会将消息推送到kafka集群
  2. kafka-console-producer --broker-list 127.0.0.1:9092 --topic test
  3. # 推送10条消息 分别是1、2、3、...、10
  4. seq 10 | kafka-console-producer --broker-list 127.0.0.1:9092 --topic yangjb
  5. # 推送hello world 到kafka集群
  6. echo "nihao world" | kafka-console-producer --broker-list 127.0.0.1:9092 --topic yangjb

2. 使用 kafka-producer-perf-test 进行producer的基准测试

我们要修改某个配置时,经常想知道这个配置的修改对kafka的性能会有哪些影响,这时候就可以来个基准测试来衡量配置修改对producer性能的影响。kafka官方就提供了这样一个工具,让我们很方面的对produer的性能进行测试。

下面是kafka-producer-perf-test支持的一些参数

  1. --topic TOPIC 指定topic
  2. --num-records NUM-RECORDS 要推送多少条数据
  3. --payload-delimiter PAYLOAD-DELIMITER 当使用payload文件生成数据时,指定每条消息的之间的分割符,默认是换行符
  4. --throughput THROUGHPUT 推送消息时的吞吐量,单位是 messages/sec。必须指定
  5. --producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...] 指定producer的一些配置
  6. --producer.config CONFIG-FILE 直接指定配置文件
  7. --print-metrics 是否要在最后输出度量指标,默认是false
  8. # 生成数据的方式有两种,一种是我们指定一个消息大小,该工具会随机生成一个指定大小的字符串推送,一个是我们指定一个文件,工具会从该文件中随机选取一条消息推送
  9. # 下面两种方式只能选择一种
  10. --record-size RECORD-SIZE 指定每条消息的大小,大小是bytes
  11. --payload-file PAYLOAD-FILE 指定文件存放目录

下面我们测试使用producer一次推送100条数据

  1. # 通过 --producer-props指定要连接的broker地址
  2. # --num-records 指定一共要推送100条
  3. # --throughput 表示吞吐量,限制每秒20
  4. # --record-size 表示每条消息的大小是20B
  5. kafka-producer-perf-test --producer-props bootstrap.servers=127.0.0.1:9092 client.id=perftest --num-records 100 --throughput 10 --topic test --record-size 20

最后输出报告:

  1. 52 records sent, 10.4 records/sec (0.00 MB/sec), 5.2 ms avg latency, 137.0 max latency.
  2. 100 records sent, 9.993005 records/sec (0.00 MB/sec), 3.78 ms avg latency, 137.00 ms max latency, 2 ms 50th, 4 ms 95th, 137 ms 99th, 137 ms 99.9th.

我们可以编辑一个payload.txt,输入

  1. hello
  2. world
  3. producer
  4. perf
  5. test

接着使用该payload.txt进行测试

  1. # --payload-file 指定文件地址
  2. kafka-producer-perf-test --producer-props bootstrap.servers=127.0.0.1:9092 client.id=perftest --payload-file payload.txt --num-records 100 --throughput 100 --topic test

该工具在执行时,会读取payload.txt的内容,然后根据--payload-delimiter将文本分成一条条消息,接着测试的时候会随机发送这些消息。

3. 使用 kafka-verifiable-producer 批量推送消息

kafka提供了kafka-verifiable-producer工具用于快速的推送一批消息到producer,并且可以打印出各条推送消息的元信息。推送的消息是从0开始不断往上递增。

支持参数

  1. --topic TOPIC 指定topic
  2. --broker-list HOST1:PORT1[,HOST2:PORT2[...]] 指定kafka broker地址
  3. --max-messages MAX-MESSAGES 一共要推送多少条,默认为-1,-1表示一直推送到进程关闭位置
  4. --throughput THROUGHPUT 推送消息时的吞吐量,单位messages/sec。默认为-1,表示没有限制
  5. --acks ACKS 每次推送消息的ack值,默认是-1
  6. --producer.config CONFIG_FILE 指定producer的配置文件
  7. --value-prefix VALUE-PREFIX 推送的消息默认是递增的数字,我们可以在这些消息前面加上指定的前缀。这个前缀好像也必须是数字

demo:

  1. # --max-messages 10 总共推送10条
  2. # 每秒推送2条
  3. kafka-verifiable-producer --broker-list 127.0.0.1:9092 --topic test --max-messages 10 --throughput 2

输出:

  1. { "timestamp":1544327879247,"name":"startup_complete"}
  2. { "timestamp":1544327879413,"name":"producer_send_success","key":null,"value":"0","offset":91029,"partition":0,"topic":"test"}
  3. { "timestamp":1544327879415,"name":"producer_send_success","key":null,"value":"1","offset":91030,"partition":0,"topic":"test"}
  4. { "timestamp":1544327879904,"name":"producer_send_success","key":null,"value":"2","offset":91031,"partition":0,"topic":"test"}
  5. { "timestamp":1544327880406,"name":"producer_send_success","key":null,"value":"3","offset":91032,"partition":0,"topic":"test"}
  6. { "timestamp":1544327880913,"name":"producer_send_success","key":null,"value":"4","offset":91033,"partition":0,"topic":"test"}
  7. { "timestamp":1544327881414,"name":"producer_send_success","key":null,"value":"5","offset":91034,"partition":0,"topic":"test"}
  8. { "timestamp":1544327881918,"name":"producer_send_success","key":null,"value":"6","offset":91035,"partition":0,"topic":"test"}
  9. { "timestamp":1544327882422,"name":"producer_send_success","key":null,"value":"7","offset":91036,"partition":0,"topic":"test"}
  10. { "timestamp":1544327882924,"name":"producer_send_success","key":null,"value":"8","offset":91037,"partition":0,"topic":"test"}
  11. { "timestamp":1544327883430,"name":"producer_send_success","key":null,"value":"9","offset":91038,"partition":0,"topic":"test"}
  12. { "timestamp":1544327883942,"name":"shutdown_complete"}
  13. { "timestamp":1544327883943,"name":"tool_data","sent":10,"acked":10,"target_throughput":2,"avg_throughput":2.1294718909710393}

4. 使用kafka-replay-log-producer进行topic之间的消息复制

使用kafka-replay-log-producer可以将一个topic的消息复制到另外一个topic上。它的流程是先从topic拉取消息,然后推送到另一个topic。

支持的参数:

  1. --broker-list <String: hostname:port> 指定broker的地址
  2. --inputtopic <String: input-topic> 要读取的topic名称
  3. --messages <Integer: count> 要复制的消息数量,默认是-1,也就是全部
  4. --outputtopic <String: output-topic> 要复制到哪个topic
  5. --property <String: producer properties> 可以指定producer的一些参数
  6. --reporting-interval <Integer: size> 汇报进度的频率,默认是5000汇报一次
  7. --sync 是否开启同步模式
  8. --threads <Integer: threads> 复制消息的线程数
  9. --zookeeper <String: zookeeper url> zk地址

demo:

  1. # 把topic-test的消息复制给topic-aaaa
  2. # --messages 表示只复制前50条
  3. kafka-replay-log-producer --broker-list 127.0.0.1:9092 --zookeeper 127.0.0.1:2181/kafka --inputtopic test --outputtopic aaaa --messages 50

二、consumer相关命令

1. kafka-console-consumer 消费消息

使用kafka-console-consumer可以消费指定topic的消息。底层也是使用KafkaConsumer进行消费的。相关消费原理可以看我的这两篇博客:

Consumer 加入&离开 Group详解(九)

Consumer 拉取日志流程详解(十)

  1. # 指定消费topic-test的消息
  2. # --from-beginning 表示如果之前没有过消费记录,就从第一条开始消费
  3. kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --group hahaeh --topic test --from-beginning

kafka-console-consumer 还支持配置一些其他的参数,用户可以自行通过 —help 参数查看。

2. 使用 kafka-consumer-perf-test 进行consumer的基准测试

和producer一样,kafka也会consumer提供了一个命令来进行基准测试。

  1. # --fetch-size 表示一次请求拉取多少条数据
  2. # --messages 表示总共要拉取多少条数据
  3. kafka-consumer-perf-test --broker-list 127.0.0.1:9092 --fetch-size 200 --group oka --topic test --messages 200

输出:

  1. start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
  2. 2018-12-09 13:37:19:118, 2018-12-09 13:37:20:393, 0.0003, 0.0002, 162, 127.0588, 21, 1254, 0.0002, 129.1866

kafka-consumer-perf-test还支持其他参数:

  1. --consumer.config <String: config file> 指定Consumer使用的配置文件
  2. --date-format 定义输出的日志格式
  3. --from-latest 如果之前没有消费记录,是否从之前消费过的地方开始消费
  4. --num-fetch-threads 拉取消息的线程数
  5. --threads 处理消息的线程数
  6. --reporting-interval 多久输出一次执行过程信息

3. 使用kafka-verifiable-consumer批量拉取消息

kafka-verifiable-consumer可以批量的拉取消息,其实和kafka-console-consumer命令差不多。不过使用kafka-verifiable-consumer消费消息输出的内容更丰富,还包括offset等信息,并且可以设置只读取几条消息等。kafka-console-consumer是有多少读多少。

  1. # --max-messages 5 表示只拉取5条
  2. # --verbose 表示输出每一条消息的内容
  3. kafka-verifiable-consumer --broker-list 127.0.0.1:9092 --max-messages 5 --group-id hello --topic test --verbose

输出:

  1. { "timestamp":1544335112709,"name":"startup_complete"}
  2. { "timestamp":1544335112862,"name":"partitions_revoked","partitions":[]}
  3. { "timestamp":1544335112883,"name":"partitions_assigned","partitions":[{ "topic":"test","partition":0}]}
  4. { "timestamp":1544335112919,"name":"record_data","key":null,"value":"90218","topic":"test","partition":0,"offset":90877}
  5. { "timestamp":1544335112920,"name":"record_data","key":null,"value":"90219","topic":"test","partition":0,"offset":90878}
  6. { "timestamp":1544335112920,"name":"record_data","key":null,"value":"0","topic":"test","partition":0,"offset":90879}
  7. { "timestamp":1544335112921,"name":"record_data","key":null,"value":"1","topic":"test","partition":0,"offset":90880}
  8. { "timestamp":1544335112921,"name":"record_data","key":null,"value":"2","topic":"test","partition":0,"offset":90881}
  9. { "timestamp":1544335112921,"name":"records_consumed","count":162,"partitions":[{ "topic":"test","partition":0,"count":5,"minOffset":90877,"maxOffset":90881}]}
  10. { "timestamp":1544335112928,"name":"offsets_committed","offsets":[{ "topic":"test","partition":0,"offset":90882}],"success":true}
  11. { "timestamp":1544335112943,"name":"shutdown_complete"}

kafka-verifiable-consumer命令还支持以下参数:

  1. --session-timeout consumer的超时时间
  2. --enable-autocommit 是否开启自动offset提交,默认是false
  3. --reset-policy 当以前没有消费记录时,选择要拉取offset的策略,可以是'earliest', 'latest','none'。默认是earliest
  4. --assignment-strategy consumer分配分区策略,默认是RoundRobinAssignor
  5. --consumer.config 指定consumer的配置

4. 使用kafka-consumer-groups命令管理ConsumerGroup

➀、列出所有的ConsumerGroup(新旧版本api区别)

由于kafka consumer api有新版本和旧版本的区别,因此使用kafka-consumer-groups进行group的管理时,内部使用的机制也不一样。如果我们使用—zookeeper来连接集群,则使用的是旧版本的consumer group管理规则,也就是ConsumerGroup的一些元数据是存储在zk上的。如果使用--bootstrap-server来连接,则是面向新版本的consumer group规则。

列出使用旧版本的所有consumer group

  1. kafka-consumer-groups --zookeeper 127.0.0.1:2181/kafka --list

列出新版本的所有consumer group

  1. kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list

➁、删除ConsumerGroup

删除指定的group

  1. # 删除 helo和hahah 这两个group
  2. kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --delete --group helo --group hahah

删除指定group的指定topic的消费记录(topic级别的删除仅在旧版本api中支持)

  1. # 旧版本api 必须指定zk地址
  2. kafka-consumer-groups --zookeeper 127.0.0.1:2181/kafka --delete --group helo --topic test

删除指定topic在所有group中的消费记录(topic级别的删除仅在旧版本api中支持)

  1. # 旧版本api 必须指定zk地址
  2. kafka-consumer-groups --zookeeper 127.0.0.1:2181/kafka --delete --topic test

➂、列出ConsumerGroup详情

通过—describe可以从不同维度观察group的信息。

查看group的offset消费记录
  1. # --offset是--describe的默认选项,可以不传
  2. kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --describe --group mytest --offset

输出

  1. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  2. test 0 796670 796676 6 consumer-1-00ab2315-e3f3-4261-8392-f9fae4668f87 /172.20.16.13 consumer-1
查看group的member信息
  1. # --members 表示输出member信息
  2. kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --describe --group mytest --members

输出

  1. CONSUMER-ID HOST CLIENT-ID #PARTITIONS
  2. consumer-1-00ab2315-e3f3-4261-8392-f9fae4668f87 /172.20.16.13 consumer-1 1
查看group的状态
  1. kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --describe --group mytest --state

输出:

  1. COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
  2. 172.20.16.13:9092 (0) range Stable 1

➃、重置group的消费记录

当选择重置消费记录操作时,目标Group的状态一定不能是活跃的。也就是该group中不能有consumer在消费。

通过 --reset-offsets 可以重置指定group的消费记录。和--reset-offsets搭配的有两个选项,--dry-run--execute,默认是--dry-run

dry-run 模式

当运行在--dry-run模式下,重置操作不会真正的执行,只会预演重置offset的结果。该模式也是为了让用户谨慎的操作,否则直接重置消费记录会造成各个consumer消息读取的异常。

  1. # --shift-by -1 表示将消费的offset重置成当前消费的offset-1
  2. kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --reset-offsets --shift-by -1 --topic test --group mytest --dry-run

输出

  1. TOPIC PARTITION NEW-OFFSET
  2. test 0 797054

此时如果去查询该group的消费offset,会发现该group的消费offset其实还是797055,并没有发生改变。

—execute 模式

通过--execute参数可以直接执行重置操作。

  1. kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --reset-offsets --shift-by -1 --topic test --group mytest --execute
重置offset的几种策略

该命令提供了多种offset重置策略给我们选择如何重置offset

  1. --to-current 直接重置offset到当前的offset,也就是LOE
  2. --to-datetime <String: datetime> 重置offset到指定时间的offset
  3. --to-earliest 重置offset到最开始的那条offset
  4. --to-offset <Long: offset> 重置offset到目标的offset
  5. --shift-by <Long:n> 根据当前的offset进行重置,n可以是正负数
  6. --from-file <String: path to CSV file> 通过外部的csv文件描述来进行重置

Demo:

  1. # 将group mytest的test 这个topic的消费offset重置到666
  2. # 注意如果topic分区中的最小offset比666还大的话,就会直接使用该最小offset作为消费offset
  3. kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --reset-offsets --topic test --group mytest --execute --to-offset 666
  4. # 重置到最早的那条offset
  5. kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --reset-offsets --topic test --group mytest --execute --to-earliest

我们再看下如何使用--from-file来重置offset。首先先编辑一个文件 reset.csv

  1. test,0,796000

3列分别是topicName,partition,offset。最后输入重置命令

  1. kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --reset-offsets --group mytest --execute --from-file reset.csv

三、replica数据一致性校验

通过kafka-replica-verification命令可以检查指定topic的各个partition的replic的数据是否一致。

  1. kafka-replica-verification --broker-list 127.0.0.1:9092

默认是检查全部topic,可以通过指定topic-white-list来指定只检查一些topic。

replica一致性检查主要是根据partition的HW来检查的,大概原理是在所有的broker都开启一个fetcher,然后拉取数据做检查各个replica的数据是否一致。因此,进行该检查时要保证所有的broker都在线,否则该工具会一直阻塞直到broker全部启动。

发表评论

表情:
评论列表 (有 0 条评论,704人围观)

还没有评论,来说两句吧...

相关阅读