Kafka 集群部署

ゝ一纸荒年。 2021-11-26 19:46 577阅读 0赞

  Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。Kafka是一种高吞吐量的分布式发布订阅消息系统:1469203-20190616171050988-1888073953.png
  (1)通过O的磁盘数据结构提供消息的持久化,够保持长时间的稳定性能。
  (2)高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
  (3)支持通过Kafka服务器和消费机集群来分区消息。
  (4)支持Hadoop并行数据加载。

  本博客主要以:单节点单Broker部署、单节点多Broker部署、集群部署(多节点多Broker)来讲解。在实际生产环境中常用的是第三种方式,以集群的方式来部署Kafka。Kafka比较依赖zookeeper集群,如果想要使用Kafka,就必须部署zookeeper集群,Kafka中的消费偏置信息、kafka集群、topic信息会被存储在ZK中。

在部署集群前,需要部署部署zookeeper集群,直接按照zookeeper3.5.5集群部署部署并启动。

一、Kafka 单节点部署

准备:

关闭防火墙:systemctl stop firewalld.service
禁止防火墙自动启动:systemctl disable firewalld.service

关闭selinux : setenforce 0
禁止selinux启动:vim /etc/selinux/config

  1. SELINUX=disabled

各节点的host解析:

  1. 10.0.0.11 node01
  2. 10.0.0.12 node02
  3. 10.0.0.13 node03

1.Kafka 单节点单Broker部署及使用

下载:http://kafka.apache.org或者

  1. wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.1/kafka_2.12-2.2.1.tgz

安装

  1. tar xf kafka_2.12-2.2.1.tgz -C /usr/local/src
  2. ln -s /usr/local/src/kafka_2.12-2.2.1 /usr/local/kafka

配置kafka

参考官网:http://kafka.apache.org/quickstart

  1. mkdir -p /data/kafka/logs

添加环境变量

  1. echo -e 'export KAFKA_HOME=/usr/local/kafka\nexport PATH=$KAFKA_HOME/bin:$PATH' >>/etc/profile
  2. source /etc/profile

进入kafka的config目录下,在server.properties文件,添加如下配置

vim /usr/local/kafka/config/server.properties

  1. #broker id 全局唯一
  2. broker.id=0
  3. #监听
  4. port=9092
  5. #日志目录
  6. log.dirs=/data/kafka/logs
  7. #配置zookeeper的连接
  8. zookeeper.connect=node01:2181

启动kafka

  1. kafka-server-start.sh $KAFKA_HOME/config/server.properties

打印的日志信息没有报错,可以看到如下信息

  1. [2019-06-16 17:44:14,757] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

但是并不能保证Kafka已经启动成功,输入jps查看进程,如果可以看到Kafka进程,表示启动成功

  1. [root@node01 ~]# jps
  2. 8577 QuorumPeerMain
  3. 11000 Jps
  4. 1899 -- process information unavailable
  5. 10637 Kafka
  6. [root@node01 ~]# jps -m
  7. 8577 QuorumPeerMain /usr/local/zookeeper/bin/../conf/zoo.cfg
  8. 1899 -- process information unavailable
  9. 10637 Kafka config/server.properties
  10. 11022 Jps -m

创建topic

kafka-topics.sh —create —zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic test

参数说明:
  —zookeeper:指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样
  —replication-factor:指定副本数量
  —partitions:指定分区数量
  —topic:主题名称

删除主题

  1. kafka-topics.sh --delete --zookeeper node01:9092 --topic test

查看所有的topic信息

  1. kafka-topics.sh --list --zookeeper node01:2181
  2. test

启动生产者

  1. kafka-console-producer.sh --broker-list node01:9092 --topic test

启动消费者

  1. kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic test

1469203-20190616181451222-1958878331.png

备注

  —from-beginning:如果有表示从最开始消费数据,旧的和新的数据都会被消费,而没有该参数表示只会消费新产生的数据。

2.Kafka 单节点多Broker部署及使用

配置Kafka

参考官网:http://kafka.apache.org/quickstart

拷贝server.properties三份

  1. cd /usr/local/kafka/config/
  2. cp server.properties server-1.properties
  3. cp server.properties server-2.properties
  4. cp server.properties server-3.properties

修改server-1.properties

vim /usr/local/kafka/config/server-1.properties

  1. #broker id 全局唯一
  2. broker.id=1
  3. #监听
  4. port=9093
  5. #日志目录
  6. log.dirs=/data/kafka/logs-1
  7. #配置zookeeper的连接
  8. zookeeper.connect=node01:2181

修改server-2.properties

vim /usr/local/kafka/config/server-2.properties

  1. #broker id 全局唯一
  2. broker.id=2
  3. #监听
  4. port=9094
  5. #日志目录
  6. log.dirs=/data/kafka/logs-2
  7. #配置zookeeper的连接
  8. zookeeper.connect=node01:2181

修改server-3.properties

vim /usr/local/kafka/config/server-3.properties

  1. #broker id 全局唯一
  2. broker.id=3
  3. #监听
  4. port=9095
  5. #日志目录
  6. log.dirs=/data/kafka/logs-3
  7. #配置zookeeper的连接
  8. zookeeper.connect=node01:2181

创建日志文件夹

  1. mkdir -p /data/kafka/{logs-1,logs-2,logs-3}

启动Kafka(分别启动server1、2、3)

  1. kafka-server-start.sh $KAFKA_HOME/config/server-1.properties
  2. kafka-server-start.sh $KAFKA_HOME/config/server-2.properties
  3. kafka-server-start.sh $KAFKA_HOME/config/server-3.properties

查看进程

  1. [root@node01 ~]# jps
  2. 8577 QuorumPeerMain
  3. 18353 Jps
  4. 17606 Kafka
  5. 1899 -- process information unavailable
  6. 17243 Kafka
  7. 17964 Kafka
  8. [root@node01 ~]# jps -m
  9. 8577 QuorumPeerMain /usr/local/zookeeper/bin/../conf/zoo.cfg
  10. 17606 Kafka /usr/local/kafka/config/server-2.properties
  11. 1899 -- process information unavailable
  12. 17243 Kafka /usr/local/kafka/config/server-1.properties
  13. 18363 Jps -m
  14. 17964 Kafka /usr/local/kafka/config/server-3.properties

创建topic(指定副本数量为3)

  1. kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 3 --partitions 1 --topic wzxmt

查看所有的topic信息

  1. kafka-topics.sh --list --zookeeper node01:2181

1469203-20190616202901179-804816849.png

查看某个topic的详细信息

  1. kafka-topics.sh --describe --zookeeper node01:2181 --topic wzxmt

1469203-20190616190325147-1326644860.png

启动生产者

  1. kafka-console-producer.sh --broker-list node01:9093,node01:9094,node01:9095 --topic wzxmt

启动消费者

  1. kafka-console-consumer.sh --bootstrap-server node01:2181 --from-beginning --topic wzxmt

1469203-20190616204510248-468414464.png

单节点多borker容错性测试

Kafka是支持容错的,上面我们已经完成了Kafka单节点多Blocker的部署,下面我们来对Kafka的容错性进行测试,测试步骤如下

(1).查看topic的详细信息,观察那个blocker的角色是leader,那些blocker的角色是follower

  1. kafka-topics.sh --describe --zookeeper node01:2181 --topic wzxmt

1469203-20190616204937595-1733655115.png

(2).手工kill掉任意一个状态是follower的borker,测试生成和消费信息是否正确

步骤1中可以看到 1为leader,2 和 3为 follower,将follower为2的进程kill掉

1469203-20190616205419751-1871269790.png

启动生产和消费者测试信息是否正确

1469203-20190616210046420-993480434.png

结论:kill掉任意一个状态是follower的broker,生成和消费信息正确,不受任何影响.

(3).手工kill掉状态是leader的borker,测试生产和消费的信息是否正确

borker1的角色为leader,将它kill掉,borker 3变成了leader .

1469203-20190616211307794-840332877.png

启动生产和消费者测试信息是否正确.

1469203-20190616211540767-1879282482.png

结论:kill掉状态是leader的borker,生产和消费的信息正确

总结:不管当前状态的borker是leader还是follower,当我们kill掉后,只要有一个borker能够正常使用,则消息仍然能够正常的生产和发送。即Kafka的容错性是有保证的!

二、Kafka 多节点部署(多节点多borker)

在多个节点进行安装部署,部署方式与前面单节点一样。

修改配置文件server.properties

vim /usr/local/kafka/config/server.properties

  1. #broker的全局唯一编号,不能重复,只能是数字
  2. broker.id=1
  3. #用来监听链接的端口,producer或consumer将在此端口建立连接
  4. port=9092
  5. #处理网络请求的线程数量
  6. num.network.threads=3
  7. #用来处理磁盘IO的线程数量
  8. num.io.threads=8
  9. #发送套接字的缓冲区大小
  10. socket.send.buffer.bytes=102400
  11. #接受套接字的缓冲区大小
  12. socket.receive.buffer.bytes=102400
  13. #请求套接字的缓冲区大小
  14. socket.request.max.bytes=104857600
  15. #kafka消息存放的路径(持久化到磁盘)
  16. log.dirs=/data/kafka/logs
  17. #topic在当前broker上的分片个数
  18. num.partitions=2
  19. #用来恢复和清理data下数据的线程数量
  20. num.recovery.threads.per.data.dir=1
  21. #segment文件保留的最长时间,超时将被删除
  22. log.retention.hours=168
  23. #滚动生成新的segment文件的最大时间
  24. log.roll.hours=168
  25. #日志文件中每个segment的大小,默认为1G
  26. log.segment.bytes=1073741824
  27. #周期性检查文件大小的时间
  28. log.retention.check.interval.ms=300000
  29. #日志清理是否打开
  30. log.cleaner.enable=true
  31. #broker需要使用zookeeper保存meta数据
  32. zookeeper.connect=node01:2181,node02:2181,node03:2181
  33. #zookeeper链接超时时间
  34. zookeeper.connection.timeout.ms=6000
  35. #partion buffer中,消息的条数达到阈值,将触发flush到磁盘
  36. log.flush.interval.messages=10000
  37. #消息buffer的时间,达到阈值,将触发flush到磁盘
  38. log.flush.interval.ms=3000
  39. #删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
  40. delete.topic.enable=true
  41. #此处的host.name为本机IP(重要),如果不改,则客户端会抛出:Producerconnection to node01:9092 unsuccessful 错误!
  42. host.name=node01
  43. #延迟初始使用者重新平衡的时间(生产用3)
  44. group.initial.rebalance.delay.ms=0

不同节点之间只需要修改server.properties 的 broker.id,broker.id不能相同。

保证zookeeper集群运行正常情况下,启动服务

  1. kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

  #备注”&“或”-daemon”在后台运行,不占用当前窗口

查看Kafka是否启动成功,输入jps查看进程,如果可以看到Kafka进程,表示启动成功

1469203-20190616235445841-728261945.png

创建topic

kafka-topics.sh —create —zookeeper node01:2181 —replication-factor 3 —partitions 2 —topic www

查看topic详细信息

  1. kafka-topics.sh --describe --zookeeper node01:2181 --topic www

1469203-20190617002054399-105091202.png

启动生产者

  1. [root@node01 ~]# kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic www

启动2个消费者

  1. [root@node02 bin]# kafka-console-consumer.sh --bootstrap-server node02:9092 --from-beginning --topic www
  2. [root@node03 bin]# kafka-console-consumer.sh --bootstrap-server node03:9092 --from-beginning --topic www

1469203-20190618144637079-248538399.png

  Kafka 集群与单节点多Broker是测试相同,结果相同;从Kafka 单节点多Broker与多节点多Broker的容错性测试,得出的结论是:

  不管当前状态的borker是leader还是follower,当我们kill掉后,只要有一个borker能够正常使用,则消息仍然能够正常的生产和发送。即Kafka的容错性是有保证!

转载于:https://www.cnblogs.com/wzxmt/p/11032133.html

发表评论

表情:
评论列表 (有 0 条评论,577人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Kafka部署

    *1集群部署的基本流程** 下载安装包、解压安装包、修改配置文件、分发安装包、启动集群 **2集群部署的基础环境准备** 安装前的准备工作(zk集群已经部署完毕...

    相关 部署Kafka

    前言 关于kafka的工作机制,已经在上篇博文:[Kafka原理及单机部署][Kafka]中详细写出来,这里只是将kafka的一个群集部署写了出来。 > 博文大纲:

    相关 kafka部署

    在[上篇文章][Link 1]中讲解了kafka的单点部署,本篇讲解kafka的集群部署。 1.拷贝kafka配置文件并启动 进入kafka的解压文件,输入以下两行命令

    相关 Kafka 部署

      Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动