3.【kafka运维】Topic的生产和消费运维脚本 朱雀 2022-09-02 08:08 219阅读 0赞 ### 文章目录 ### * * 1.Topic的发送kafka-console-producer.sh * 2. Topic的消费kafka-console-consumer.sh * 3. 持续批量推送消息kafka-verifiable-producer.sh * 4. 持续批量拉取消息kafka-verifiable-consumer * More > **日常运维** 、**问题排查** 怎么能够少了滴滴开源的 > [滴滴开源LogiKM一站式Kafka监控与管控平台][LogiKM_Kafka] ## 1.Topic的发送kafka-console-producer.sh ## **1.1 生产无key消息** ## 生产者 bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties **1.2 生产有key消息** 加上属性`--property parse.key=true` ## 生产者 bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties --property parse.key=true 默认消息key与消息value间使用“Tab键”进行分隔,所以消息key以及value中切勿使用转义字符(\\t) -------------------- 可选参数 <table> <thead> <tr> <th>参数</th> <th>值类型</th> <th>说明</th> <th>有效值</th> </tr> </thead> <tbody> <tr> <td>–bootstrap-server</td> <td>String</td> <td>要连接的服务器必需(除非指定–broker-list)</td> <td>如:host1:prot1,host2:prot2</td> </tr> <tr> <td>–topic</td> <td>String</td> <td>(必需)接收消息的主题名称</td> <td></td> </tr> <tr> <td>–batch-size</td> <td>Integer</td> <td>单个批处理中发送的消息数</td> <td>200(默认值)</td> </tr> <tr> <td>–compression-codec</td> <td>String</td> <td>压缩编解码器</td> <td>none、gzip(默认值)snappy、lz4、zstd</td> </tr> <tr> <td>–max-block-ms</td> <td>Long</td> <td>在发送请求期间,生产者将阻止的最长时间</td> <td>60000(默认值)</td> </tr> <tr> <td>–max-memory-bytes</td> <td>Long</td> <td>生产者用来缓冲等待发送到服务器的总内存</td> <td>33554432(默认值)</td> </tr> <tr> <td>–max-partition-memory-bytes</td> <td>Long</td> <td>为分区分配的缓冲区大小</td> <td>16384</td> </tr> <tr> <td>–message-send-max-retries</td> <td>Integer</td> <td>最大的重试发送次数</td> <td>3</td> </tr> <tr> <td>–metadata-expiry-ms</td> <td>Long</td> <td>强制更新元数据的时间阈值(ms)</td> <td>300000</td> </tr> <tr> <td>–producer-property</td> <td>String</td> <td>将自定义属性传递给生成器的机制</td> <td>如:key=value</td> </tr> <tr> <td>–producer.config</td> <td>String</td> <td>生产者配置属性文件[–producer-property]优先于此配置 配置文件完整路径</td> <td></td> </tr> <tr> <td>–property</td> <td>String</td> <td>自定义消息读取器</td> <td>parse.key=true/false key.separator=<key.separator>ignore.error=true/false</td> </tr> <tr> <td>–request-required-acks</td> <td>String</td> <td>生产者请求的确认方式</td> <td>0、1(默认值)、all</td> </tr> <tr> <td>–request-timeout-ms</td> <td>Integer</td> <td>生产者请求的确认超时时间</td> <td>1500(默认值)</td> </tr> <tr> <td>–retry-backoff-ms</td> <td>Integer</td> <td>生产者重试前,刷新元数据的等待时间阈值</td> <td>100(默认值)</td> </tr> <tr> <td>–socket-buffer-size</td> <td>Integer</td> <td>TCP接收缓冲大小</td> <td>102400(默认值)</td> </tr> <tr> <td>–timeout</td> <td>Integer</td> <td>消息排队异步等待处理的时间阈值</td> <td>1000(默认值)</td> </tr> <tr> <td>–sync</td> <td>同步发送消息</td> <td></td> <td></td> </tr> <tr> <td>–version</td> <td>显示 Kafka 版本</td> <td>不配合其他参数时,显示为本地Kafka版本</td> <td></td> </tr> <tr> <td>–help</td> <td>打印帮助信息</td> <td></td> <td></td> </tr> </tbody> </table> ## 2. Topic的消费kafka-console-consumer.sh ## **1. 新客户端从头消费`--from-beginning` (注意这里是新客户端,如果之前已经消费过了是不会从头消费的)** 下面没有指定客户端名称,所以每次执行都是新客户端都会从头消费 > sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning **2. 正则表达式匹配topic进行消费`--whitelist`** **`消费所有的topic`** > sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.\*’ **`消费所有的topic,并且还从头消费`** > sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.\*’ --from-beginning **3.显示key进行消费`--property print.key=true`** > sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --property print.key=true **4. 指定分区消费`--partition` 指定起始偏移量消费`--offset`** > sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset 100 **5. 给客户端命名`--group`** 注意给客户端命名之后,如果之前有过消费,那么`--from-beginning`就不会再从头消费了 > sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group test-group **6. 添加客户端属性`--consumer-property`** 这个参数也可以给客户端添加属性,但是注意 不能多个地方配置同一个属性,他们是互斥的;比如在下面的基础上还加上属性`--group test-group` 那肯定不行 > sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test `--consumer-property group.id=test-consumer-group` **7. 添加客户端属性`--consumer.config`** 跟`--consumer-property` 一样的性质,都是添加客户端的属性,不过这里是指定一个文件,把属性写在文件里面, `--consumer-property` 的优先级大于 `--consumer.config` > sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer.properties -------------------- <table> <thead> <tr> <th>参数</th> <th>描述</th> <th>例子</th> </tr> </thead> <tbody> <tr> <td><code>--group</code></td> <td>指定消费者所属组的ID</td> <td></td> </tr> <tr> <td><code>--topic</code></td> <td>被消费的topic</td> <td></td> </tr> <tr> <td><code>--partition</code></td> <td>指定分区 ;除非指定<code>–offset</code>,否则从分区结束(latest)开始消费</td> <td><code>--partition 0</code></td> </tr> <tr> <td><code>--offset</code></td> <td>执行消费的起始offset位置 ;默认值: latest; /latest /earliest /偏移量</td> <td><code>--offset</code> 10</td> </tr> <tr> <td><code>--whitelist</code></td> <td>正则表达式匹配topic;<code>--topic</code>就不用指定了; 匹配到的所有topic都会消费; 当然用了这个参数,<code>--partition</code> <code>--offset</code>等就不能使用了</td> <td></td> </tr> <tr> <td><code>--consumer-property</code></td> <td>将用户定义的属性以key=value的形式传递给使用者</td> <td><code>--consumer-property</code>group.id=test-consumer-group</td> </tr> <tr> <td><code>--consumer.config</code></td> <td>消费者配置属性文件请注意,[<code>consumer-property</code>]优先于此配置</td> <td><code>--consumer.config</code> config/consumer.properties</td> </tr> <tr> <td><code>--property</code></td> <td>初始化消息格式化程序的属性</td> <td>print.timestamp=true,false 、print.key=true,false 、print.value=true,false 、key.separator=<key.separator> 、line.separator=<line.separator>、key.deserializer=<key.deserializer>、value.deserializer=<value.deserializer></td> </tr> <tr> <td><code>--from-beginning</code></td> <td>从存在的最早消息开始,而不是从最新消息开始,注意如果配置了客户端名称并且之前消费过,那就不会从头消费了</td> <td></td> </tr> <tr> <td><code>--max-messages</code></td> <td>消费的最大数据量,若不指定,则持续消费下去</td> <td><code>--max-messages</code> 100</td> </tr> <tr> <td><code>--skip-message-on-error</code></td> <td>如果处理消息时出错,请跳过它而不是暂停</td> <td></td> </tr> <tr> <td><code>--isolation-level</code></td> <td>设置为read_committed以过滤掉未提交的事务性消息,设置为read_uncommitted以读取所有消息,默认值:read_uncommitted</td> <td></td> </tr> <tr> <td><code>--formatter</code></td> <td>kafka.tools.DefaultMessageFormatter、kafka.tools.LoggingMessageFormatter、kafka.tools.NoOpMessageFormatter、kafka.tools.ChecksumMessageFormatter</td> <td></td> </tr> </tbody> </table> ## 3. 持续批量推送消息kafka-verifiable-producer.sh ## **单次发送100条消息`--max-messages 100`** 一共要推送多少条,默认为-1,-1表示一直推送到进程关闭位置 > sh bin/kafka-verifiable-producer.sh --topic test\_create\_topic4 --bootstrap-server localhost:9092 `--max-messages 100` **每秒发送最大吞吐量不超过消息 `--throughput 100`** 推送消息时的吞吐量,单位messages/sec。默认为-1,表示没有限制 > sh bin/kafka-verifiable-producer.sh --topic test\_create\_topic4 --bootstrap-server localhost:9092 `--throughput 100` **发送的消息体带前缀`--value-prefix`** > sh bin/kafka-verifiable-producer.sh --topic test\_create\_topic4 --bootstrap-server localhost:9092 `--value-prefix 666` 注意`--value-prefix 666`必须是整数,发送的消息体的格式是加上一个 点号`.` 例如: `666.` 其他参数: `--producer.config CONFIG_FILE` 指定producer的配置文件 `--acks ACKS` 每次推送消息的ack值,默认是-1 ## 4. 持续批量拉取消息kafka-verifiable-consumer ## **持续消费** > sh bin/kafka-verifiable-consumer.sh --group-id test\_consumer --bootstrap-server localhost:9092 --topic test\_create\_topic4 **单次最大消费10条消息`--max-messages 10`** > sh bin/kafka-verifiable-consumer.sh --group-id test\_consumer --bootstrap-server localhost:9092 --topic test\_create\_topic4 `--max-messages 10` -------------------- 相关可选参数 <table> <thead> <tr> <th>参数</th> <th>描述</th> <th>例子</th> </tr> </thead> <tbody> <tr> <td><code>--bootstrap-server</code> 指定kafka服务</td> <td>指定连接到的kafka服务;</td> <td>–bootstrap-server localhost:9092</td> </tr> <tr> <td><code>--topic</code></td> <td>指定消费的topic</td> <td></td> </tr> <tr> <td><code>--group-id</code></td> <td>消费者id;不指定的话每次都是新的组id</td> <td></td> </tr> <tr> <td><code>group-instance-id</code></td> <td>消费组实例ID,唯一值</td> <td></td> </tr> <tr> <td><code>--max-messages</code></td> <td>单次最大消费的消息数量</td> <td></td> </tr> <tr> <td><code>--enable-autocommit</code></td> <td>是否开启offset自动提交;默认为false</td> <td></td> </tr> <tr> <td><code>--reset-policy</code></td> <td>当以前没有消费记录时,选择要拉取offset的策略,可以是<code>earliest</code>, <code>latest</code>,<code>none</code>。默认是earliest</td> <td></td> </tr> <tr> <td><code>--assignment-strategy</code></td> <td>consumer分配分区策略,默认是<code>org.apache.kafka.clients.consumer.RangeAssignor</code></td> <td></td> </tr> <tr> <td><code>--consumer.config</code></td> <td>指定consumer的配置文件</td> <td></td> </tr> </tbody> </table> ## More ## Kafka专栏持续更新中…(源码、原理、实战、运维、视频、面试视频) -------------------- [【kafka运维】Kafka全网最全最详细运维命令合集(精品强烈建议收藏!!!)\_石臻臻的杂货铺-CSDN博客][kafka_Kafka_-CSDN] [【kafka实战】分区重分配可能出现的问题和排查问题思路(生产环境实战,干货!!!非常干!!!建议收藏)][kafka] [【kafka异常】kafka 常见异常处理方案(持续更新! 建议收藏)][kafka_kafka _] [【kafka运维】分区从分配、数据迁移、副本扩缩容 (附教学视频)][kafka_] [【kafka源码】ReassignPartitionsCommand源码分析(副本扩缩、数据迁移、副本重分配、副本跨路径迁移][kafka_ReassignPartitionsCommand] [【kafka】点击更多…][kafka 1] [LogiKM_Kafka]: https://github.com/didi/LogiKM [kafka_Kafka_-CSDN]: https://blog.csdn.net/u010634066/article/details/118215928?spm=1001.2014.3001.5501 [kafka]: https://blog.csdn.net/u010634066/article/details/118631272?spm=1001.2014.3001.5501 [kafka_kafka _]: https://blog.csdn.net/u010634066/article/details/118105676?spm=1001.2014.3001.5501 [kafka_]: https://blog.csdn.net/u010634066/article/details/118028403?spm=1001.2014.3001.5501 [kafka_ReassignPartitionsCommand]: https://blog.csdn.net/u010634066/article/details/118051963 [kafka 1]: https://mp.weixin.qq.com/mp/appmsgalbum?__biz=Mzg4ODY1NTcxNg==&action=getalbum&album_id=1966026980307304450#wechat_redirect
还没有评论,来说两句吧...