centos7安装kafka并配置集群

痛定思痛。 2023-05-29 13:02 78阅读 0赞

下载

  1. wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.1/kafka_2.12-2.3.1.tgz

service.properties

  1. # Licensed to the Apache Software Foundation (ASF) under one or more
  2. # contributor license agreements. See the NOTICE file distributed with
  3. # this work for additional information regarding copyright ownership.
  4. # The ASF licenses this file to You under the Apache License, Version 2.0
  5. # (the "License"); you may not use this file except in compliance with
  6. # the License. You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. # see kafka.server.KafkaConfig for additional details and defaults
  16. ############################# Server Basics #############################
  17. # The id of the broker. This must be set to a unique integer for each broker.
  18. #节点的id,这个必须唯一
  19. broker.id=0
  20. ############################# Socket Server Settings #############################
  21. # The address the socket server listens on. It will get the value returned from
  22. # java.net.InetAddress.getCanonicalHostName() if not configured.
  23. # FORMAT:
  24. # listeners = listener_name://host_name:port
  25. # EXAMPLE:
  26. # listeners = PLAINTEXT://your.host.name:9092
  27. #默认服务端口号:9092
  28. listeners=PLAINTEXT://:9092
  29. # Hostname and port the broker will advertise to producers and consumers. If not set,
  30. # it uses the value for "listeners" if configured. Otherwise, it will use the value
  31. # returned from java.net.InetAddress.getCanonicalHostName().
  32. #提供给生产者和消费者的地址
  33. #如果没有配置会使用listeners
  34. #如果listeners也没有配置会使用主机名,这时候可能会外界访问不通
  35. advertised.listeners=PLAINTEXT://ip:9092
  36. # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
  37. #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
  38. # The number of threads that the server uses for receiving requests from the network and sending responses to the network
  39. num.network.threads=3
  40. # The number of threads that the server uses for processing requests, which may include disk I/O
  41. num.io.threads=8
  42. # The send buffer (SO_SNDBUF) used by the socket server
  43. socket.send.buffer.bytes=102400
  44. # The receive buffer (SO_RCVBUF) used by the socket server
  45. socket.receive.buffer.bytes=102400
  46. # The maximum size of a request that the socket server will accept (protection against OOM)
  47. socket.request.max.bytes=104857600
  48. ############################# Log Basics #############################
  49. # A comma separated list of directories under which to store log files
  50. #kafka的数据存储目录,逗号隔开多个目录(这个不是日志存储文件夹)
  51. log.dirs=/opt/nachuan/data/kafka
  52. # The default number of log partitions per topic. More partitions allow greater
  53. # parallelism for consumption, but this will also result in more files across
  54. # the brokers.
  55. #默认的主题分片数量,当创建主题没有指定分片数量,默认用这个
  56. num.partitions=1
  57. # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
  58. # This value is recommended to be increased for installations with data dirs located in RAID array.
  59. num.recovery.threads.per.data.dir=1
  60. ############################# Internal Topic Settings #############################
  61. # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
  62. # For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
  63. offsets.topic.replication.factor=1
  64. transaction.state.log.replication.factor=1
  65. transaction.state.log.min.isr=1
  66. ############################# Log Flush Policy #############################
  67. # Messages are immediately written to the filesystem but by default we only fsync() to sync
  68. # the OS cache lazily. The following configurations control the flush of data to disk.
  69. # There are a few important trade-offs here:
  70. # 1. Durability: Unflushed data may be lost if you are not using replication.
  71. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
  72. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
  73. # The settings below allow one to configure the flush policy to flush data after a period of time or
  74. # every N messages (or both). This can be done globally and overridden on a per-topic basis.
  75. # The number of messages to accept before forcing a flush of data to disk
  76. #log.flush.interval.messages=10000
  77. # The maximum amount of time a message can sit in a log before we force a flush
  78. #log.flush.interval.ms=1000
  79. ############################# Log Retention Policy #############################
  80. # The following configurations control the disposal of log segments. The policy can
  81. # be set to delete segments after a period of time, or after a given size has accumulated.
  82. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
  83. # from the end of the log.
  84. # The minimum age of a log file to be eligible for deletion due to age
  85. log.retention.hours=168
  86. # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
  87. # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
  88. #log.retention.bytes=1073741824
  89. # The maximum size of a log segment file. When this size is reached a new log segment will be created.
  90. log.segment.bytes=1073741824
  91. # The interval at which log segments are checked to see if they can be deleted according
  92. # to the retention policies
  93. log.retention.check.interval.ms=300000
  94. ############################# Zookeeper #############################
  95. # Zookeeper connection string (see zookeeper docs for details).
  96. # This is a comma separated host:port pairs, each corresponding to a zk
  97. # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
  98. # You can also append an optional chroot string to the urls to specify the
  99. # root directory for all kafka znodes.
  100. #元数据存放的zookeeper链接地址
  101. zookeeper.connect=localhost:2181
  102. # Timeout in ms for connecting to zookeeper
  103. zookeeper.connection.timeout.ms=6000
  104. ############################# Group Coordinator Settings #############################
  105. # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
  106. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
  107. # The default value for this is 3 seconds.
  108. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
  109. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
  110. #消费组的重平衡延时(单位毫秒),注意:生产环境恢复为默认3秒,或根据实际增大
  111. group.initial.rebalance.delay.ms=0

启动

1、Kafka是用zookeeper来存储元数据,生产环境请一定要部署一个独立的zookeeper集群(至少3个 节点)。

学习用,用Kafka里带的zookeeper。
首先要启动zookeeper 。

  1. [root@node4 ~]# cd /usr/kafka/latest/
  2. [root@node4 latest]# bin/zookeeper-server-start.sh config/zookeeper.properties &

启动Kafka(指定配置文件)

  1. [root@node4 latest]# bin/kafka-server-start.sh config/server.properties &

集群搭建

在多台机器上启动多个实例,链接到同一个zookeeper上就可以,需要改他们到唯一id和数据目录

调整kafka的内存大小(默认1g)
文件位置:
在这里插入图片描述
在这里插入图片描述
调整后:

  1. export KAFKA_HEAP_OPTS="-Xmx256M -Xms512M"

在这里插入图片描述

创建topic和产看topic详情
  1. [root@node4 latest]# bin/kafka-topics.sh --create --bootstrap-server 192.168.100.12:9092 --replication-factor 3 --partitions 1 --topic my-13-topic
  2. [root@node4 latest]#
  3. [root@node4 latest]#
  4. [root@node4 latest]# bin/kafka-topics.sh --describe --bootstrap-server 192.168.100.12:9092 --topic my-13-topic
  5. Topic:my-13-topic PartitionCount:1 ReplicationFactor:3 Configs:segment.bytes=1073741824
  6. Topic: my-13-topic Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
上方命令解释:

–replication-factor 3 备份因子 副本数 3
–partitions 1 分片数 这个主题有几个分片
创建了一个有一个分片,每个分片有3个副本的主题 my-13-topic 。
分片是对主题Topic 的数据的分布式存储,它是对数据的物理分割 。 但它本身是一个逻辑概念。
分片的存储实体是副本 3个副本,就是这个分片的数据会存3份。

Topic: my-13-topic Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Partition: 0 分片号
Leader: 0 leader 副本在 0号Broker上
Replicas: 0,2,1 = 三个副本位于 0 2 1 号broker上
Isr: 0,2,1 = in sync 处于同步状态的broker

分片和broker的关系 broker: 一般是一台计算机上一个Broker 分片: 分片的一个副本存放在一个broker上。

创建3个分片3个副本的主题

  1. bin/kafka-topics.sh --create --bootstrap-server 39.96.13.43:9092 --replication-factor 3 --partitions 3 --topic my-43-topic

在这里插入图片描述
消费者消费消息:从某个topic中

  1. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

监控管理工具:

Kakfa自身未提供图形化的监控管理工具,市面上有很多开源的监控管理工具,但都不怎么成熟可靠。
这里给介绍2款稍可靠的工具。

  1. kafka eagle
    https://www.jianshu.com/p/552ab3e23c96
  2. Kafka Offset Monitor

https://github.com/quantifind/KafkaOffsetMonitor
可以实时监控:

  • Kafka集群状态
  • Topic、Consumer Group列表
  • 图形化展示topic和consumer之间的关系
  • 图形化展示consumer的Offset、Lag等信息
它是一个jar 包,使用很简单
  1. java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --offsetStorage kafka
  2. --zk zk-server1,zk-server2 \
  3. --port 8080 \ --refresh 10.seconds \ --retain 2.days

0.2.0 版本启动命令

  1. java -cp KafkaOffsetMonitor-assembly-0.2.0.jar \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --zk zk-server1,zk-server2 \
  2. --port 8088 \
  3. --refresh 10.seconds \
  4. --retain 2.days

The arguments are:
offsetStorage valid options are ‘‘zookeeper’’, ‘‘kafka’’ or ‘‘storm’’. Anything else falls back to ‘‘zookeeper’’ 【说明】0.2.1版本才有这个参数
zk the ZooKeeper hosts
port on what port will the app be available
refresh how often should the app refresh and store a point in the DB
retain how long should points be kept in the DB
dbName where to store the history (default ‘offsetapp’)
kafkaOffsetForceFromStart only applies to ‘‘kafka’’ format. Force KafkaOffsetMonitor to scan the commit messages from start (see notes below)
stormZKOffsetBase only applies to ‘‘storm’’ format. Change the offset storage base in zookeeper, default to ‘’/stormconsumers’’ (see notes below)
pluginsArgs additional arguments used by extensions (see below)

启动后就可以在浏览器中访问了:http://localhost:8088

spring中使用

  1. 引入jar包


    org.springframework.kafka
    spring-kafka
  2. 配置文件:
    在这里插入图片描述
    发生的时候 type
    别名:
    foo:com.study.kafka.sample_01_pub_sub.common.Foo1,bar:com.study.kafka.sample_02_multi_m ethod_listener.common.Bar1

接收消息的时候 type=foo:com.study.kafka.sample_01_pub_sub.common.Foo1 type=bar

MessageConverter的调用

在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,78人围观)

还没有评论,来说两句吧...

相关阅读

    相关 CentOS7 配置

    我一直以为的集群是一批服务器组成一个群体-有任务过来时快速分配,快速完成,所以在集群环境搭建好之后,我就傻眼了,为什么服务器A上Nginx启动,服务器B上Ngin