Kubernetes 搭建 Kafka 和 Zookeeper 集群(完整教程)

刺骨的言语ヽ痛彻心扉 2022-12-19 09:21 342阅读 0赞

推荐阅读

Helm3(K8S 资源对象管理工具)视频教程:https://edu.csdn.net/course/detail/32506
Helm3(K8S 资源对象管理工具)博客专栏:https://blog.csdn.net/xzk9381/category_10895812.html

本文原文链接:https://blog.csdn.net/xzk9381/article/details/109535377,转载请注明出处。如有发现文章中的任何问题,欢迎评论区留言。

本文完整介绍了在K8S中搭建 Kafka 和 Zookeeper 集群,并且通过修改镜像,实现扩容 Pod 时自动扩展 brokerID 和 zookeeper 集群信息,无需手动干预。

一、服务版本信息:

  • Kafka:v2.13-2.6.0
  • Zookeeper:v3.6.2
  • Kubernetes:v1.18.4

二、制作 Zookeeper 镜像

Zookeeper 使用的是 docker hub 中提供的官方镜像,使用如下命令可以直接下载:

  1. docker pull zookeeper:3.6.2

由于官方镜像中使用的启动脚本不适用于我们公司内部使用,所以对其中的 docker-entrypoint.sh 脚本和 Dockerfile 进行了一些修改。

1. 修改 docker-entrypoint.sh 脚本

修改后的 docker-entrypoint.sh 脚本如下(原脚本内容可参考:https://github.com/31z4/zookeeper-docker/tree/2373492c6f8e74d3c1167726b19babe8ac7055dd/3.6.2):

  1. #!/bin/bash
  2. set -e
  3. HOST=$(hostname -s)
  4. DOMAIN=$(hostname -d)
  5. CLIENT_PORT=2181
  6. SERVER_PORT=2888
  7. ELECTION_PORT=3888
  8. function createConfig(){
  9. if [[ ! -f "$ZOO_CONF_DIR/${HOST}/zoo.cfg" ]]; then
  10. mkdir -p $ZOO_CONF_DIR/${HOST}
  11. mkdir -p $ZOO_DATA_DIR/${HOST}
  12. mkdir -p $ZOO_DATA_LOG_DIR/${HOST}
  13. CONFIG="$ZOO_CONF_DIR/${HOST}/zoo.cfg"
  14. {
  15. echo "dataDir=$ZOO_DATA_DIR/${HOST}"
  16. echo "dataLogDir=$ZOO_DATA_LOG_DIR/${HOST}"
  17. echo "tickTime=$ZOO_TICK_TIME"
  18. echo "initLimit=$ZOO_INIT_LIMIT"
  19. echo "syncLimit=$ZOO_SYNC_LIMIT"
  20. echo "autopurge.snapRetainCount=$ZOO_AUTOPURGE_SNAPRETAINCOUNT"
  21. echo "autopurge.purgeInterval=$ZOO_AUTOPURGE_PURGEINTERVAL"
  22. echo "maxClientCnxns=$ZOO_MAX_CLIENT_CNXNS"
  23. echo "standaloneEnabled=$ZOO_STANDALONE_ENABLED"
  24. echo "admin.enableServer=$ZOO_ADMINSERVER_ENABLED"
  25. } >> ${CONFIG}
  26. if [[ -n $ZOO_4LW_COMMANDS_WHITELIST ]]; then
  27. echo "4lw.commands.whitelist=$ZOO_4LW_COMMANDS_WHITELIST" >> ${CONFIG}
  28. fi
  29. for cfg_extra_entry in $ZOO_CFG_EXTRA; do
  30. echo "$cfg_extra_entry" >> ${CONFIG}
  31. done
  32. fi
  33. }
  34. function getHostNum(){
  35. if [[ $HOST =~ (.*)-([0-9]+)$ ]]; then
  36. NAME=${BASH_REMATCH[1]}
  37. ORD=${BASH_REMATCH[2]}
  38. else
  39. echo "Fialed to parse name and ordinal of Pod"
  40. exit 1
  41. fi
  42. }
  43. function createID(){
  44. ID_FILE="$ZOO_DATA_DIR/${HOST}/myid"
  45. MY_ID=$((ORD+1))
  46. echo $MY_ID > $ID_FILE
  47. }
  48. function addServer(){
  49. for (( i=1; i<=$SERVERS; i++ ))
  50. do
  51. s="server.$i=$NAME-$((i-1)).$DOMAIN:$SERVER_PORT:$ELECTION_PORT;$CLIENT_PORT"
  52. [[ $(grep "$s" $ZOO_CONF_DIR/${ HOST}/zoo.cfg) ]] || echo $s >> $ZOO_CONF_DIR/${HOST}/zoo.cfg
  53. done
  54. }
  55. function userPerm(){
  56. if [[ "$1" = 'zkServer.sh' && "$(id -u)" = '0' ]]; then
  57. chown -R zookeeper "$ZOO_DATA_DIR" "$ZOO_DATA_LOG_DIR" "$ZOO_LOG_DIR" "$ZOO_CONF_DIR"
  58. exec gosu zookeeper "$0" "$@"
  59. fi
  60. }
  61. function startZK(){
  62. /apache-zookeeper-3.6.2-bin/bin/zkServer.sh --config "$ZOO_CONF_DIR/$(hostname -s)" start-foreground
  63. }
  64. createConfig
  65. getHostNum
  66. createID
  67. addServer
  68. userPerm
  69. startZK

2. 修改 Dockerfile

我这里对于 Dockerfile 的改动很小,只是将原来的 ENTRYPOINT 配置项注释掉,CMD 配置项更改为由 docker-entrypoint.sh 启动:

  1. FROM openjdk:11-jre-slim
  2. ENV ZOO_CONF_DIR=/conf \
  3. ZOO_DATA_DIR=/data \
  4. ZOO_DATA_LOG_DIR=/datalog \
  5. ZOO_LOG_DIR=/logs \
  6. ZOO_TICK_TIME=2000 \
  7. ZOO_INIT_LIMIT=5 \
  8. ZOO_SYNC_LIMIT=2 \
  9. ZOO_AUTOPURGE_PURGEINTERVAL=0 \
  10. ZOO_AUTOPURGE_SNAPRETAINCOUNT=3 \
  11. ZOO_MAX_CLIENT_CNXNS=60 \
  12. ZOO_STANDALONE_ENABLED=true \
  13. ZOO_ADMINSERVER_ENABLED=true
  14. # Add a user with an explicit UID/GID and create necessary directories
  15. RUN set -eux; \
  16. groupadd -r zookeeper --gid=1000; \
  17. useradd -r -g zookeeper --uid=1000 zookeeper; \
  18. mkdir -p "$ZOO_DATA_LOG_DIR" "$ZOO_DATA_DIR" "$ZOO_CONF_DIR" "$ZOO_LOG_DIR"; \
  19. chown zookeeper:zookeeper "$ZOO_DATA_LOG_DIR" "$ZOO_DATA_DIR" "$ZOO_CONF_DIR" "$ZOO_LOG_DIR"
  20. # Install required packges
  21. RUN set -eux; \
  22. apt-get update; \
  23. DEBIAN_FRONTEND=noninteractive \
  24. apt-get install -y --no-install-recommends \
  25. ca-certificates \
  26. dirmngr \
  27. gosu \
  28. gnupg \
  29. netcat \
  30. wget; \
  31. rm -rf /var/lib/apt/lists/*; \
  32. # Verify that gosu binary works
  33. gosu nobody true
  34. ARG GPG_KEY=BBE7232D7991050B54C8EA0ADC08637CA615D22C
  35. ARG SHORT_DISTRO_NAME=zookeeper-3.6.2
  36. ARG DISTRO_NAME=apache-zookeeper-3.6.2-bin
  37. # Download Apache Zookeeper, verify its PGP signature, untar and clean up
  38. RUN set -eux; \
  39. ddist() { \
  40. local f="$1"; shift; \
  41. local distFile="$1"; shift; \
  42. local success=; \
  43. local distUrl=; \
  44. for distUrl in \
  45. 'https://www.apache.org/dyn/closer.cgi?action=download&filename=' \
  46. https://www-us.apache.org/dist/ \
  47. https://www.apache.org/dist/ \
  48. https://archive.apache.org/dist/ \
  49. ; do \
  50. if wget -q -O "$f" "$distUrl$distFile" && [ -s "$f" ]; then \
  51. success=1; \
  52. break; \
  53. fi; \
  54. done; \
  55. [ -n "$success" ]; \
  56. }; \
  57. ddist "$DISTRO_NAME.tar.gz" "zookeeper/$SHORT_DISTRO_NAME/$DISTRO_NAME.tar.gz"; \
  58. ddist "$DISTRO_NAME.tar.gz.asc" "zookeeper/$SHORT_DISTRO_NAME/$DISTRO_NAME.tar.gz.asc"; \
  59. export GNUPGHOME="$(mktemp -d)"; \
  60. gpg --keyserver ha.pool.sks-keyservers.net --recv-key "$GPG_KEY" || \
  61. gpg --keyserver pgp.mit.edu --recv-keys "$GPG_KEY" || \
  62. gpg --keyserver keyserver.pgp.com --recv-keys "$GPG_KEY"; \
  63. gpg --batch --verify "$DISTRO_NAME.tar.gz.asc" "$DISTRO_NAME.tar.gz"; \
  64. tar -zxf "$DISTRO_NAME.tar.gz"; \
  65. mv "$DISTRO_NAME/conf/"* "$ZOO_CONF_DIR"; \
  66. rm -rf "$GNUPGHOME" "$DISTRO_NAME.tar.gz" "$DISTRO_NAME.tar.gz.asc"; \
  67. chown -R zookeeper:zookeeper "/$DISTRO_NAME"
  68. WORKDIR $DISTRO_NAME
  69. VOLUME ["$ZOO_DATA_DIR", "$ZOO_DATA_LOG_DIR", "$ZOO_LOG_DIR"]
  70. EXPOSE 2181 2888 3888 8080
  71. ENV PATH=$PATH:/$DISTRO_NAME/bin \
  72. ZOOCFGDIR=$ZOO_CONF_DIR
  73. COPY docker-entrypoint.sh /
  74. # 将 ENTRYPOINT 内容注释
  75. # ENTRYPOINT ["/docker-entrypoint.sh"]
  76. # 将原 CMD 注释,并新增下面的配置
  77. # CMD ["zkServer.sh", "start-foreground"]
  78. CMD ["/docker-entrypoint.sh"]

3. 打包镜像并上传私服

在 Dockerfile 的根目录下,使用如下命令打包镜像,并修改 tag

  1. docker build --tag 10.16.12.204/ops/zookeeper:custom-v3.6.2 -f Dockerfile .

上传至镜像仓库:

  1. docker push 10.16.12.204/ops/zookeeper:custom-v3.6.2

三、制作 Kafka 镜像

制作 Kafka 镜像是基于 docker hub 中 wurstmeister 制作的镜像,原镜像文件可使用如下命令下载:

  1. docker pull wurstmeister/kafka:2.13-2.6.0

这个镜像中使用 start-kafka.sh 脚本来初始化 Kafka 的配置并启动,但是其中有些内容不符合在 K8S 中部署的需求,所以对该脚本进行修改。

1. 修改 start-kafka.sh 脚本

原始的 start-kafka.sh 脚本内容可到 https://github.com/wurstmeister/kafka-docker 中查看。修改后的内容如下:

  1. #!/bin/bash -e
  2. # Allow specific kafka versions to perform any unique bootstrap operations
  3. OVERRIDE_FILE="/opt/overrides/${KAFKA_VERSION}.sh"
  4. if [[ -x "$OVERRIDE_FILE" ]]; then
  5. echo "Executing override file $OVERRIDE_FILE"
  6. eval "$OVERRIDE_FILE"
  7. fi
  8. # Store original IFS config, so we can restore it at various stages
  9. ORIG_IFS=$IFS
  10. if [[ -z "$KAFKA_ZOOKEEPER_CONNECT" ]]; then
  11. echo "ERROR: missing mandatory config: KAFKA_ZOOKEEPER_CONNECT"
  12. exit 1
  13. fi
  14. if [[ -z "$KAFKA_PORT" ]]; then
  15. export KAFKA_PORT=9092
  16. fi
  17. create-topics.sh &
  18. unset KAFKA_CREATE_TOPICS
  19. if [[ -z "$KAFKA_BROKER_ID" ]]; then
  20. if [[ -n "$BROKER_ID_COMMAND" ]]; then
  21. KAFKA_BROKER_ID=$(eval "$BROKER_ID_COMMAND")
  22. export KAFKA_BROKER_ID
  23. else
  24. export KAFKA_BROKER_ID=-1
  25. fi
  26. fi
  27. if [[ -z "$KAFKA_LOG_DIRS" ]]; then
  28. export KAFKA_LOG_DIRS="/kafka/kafka-logs-$HOSTNAME"
  29. fi
  30. if [[ -n "$KAFKA_HEAP_OPTS" ]]; then
  31. sed -r -i 's/(export KAFKA_HEAP_OPTS)="(.*)"/\1="'"$KAFKA_HEAP_OPTS"'"/g' "$KAFKA_HOME/bin/kafka-server-start.sh"
  32. unset KAFKA_HEAP_OPTS
  33. fi
  34. if [[ -n "$HOSTNAME_COMMAND" ]]; then
  35. HOSTNAME_VALUE=$(eval "$HOSTNAME_COMMAND")
  36. # Replace any occurences of _{HOSTNAME_COMMAND} with the value
  37. IFS=$'\n'
  38. for VAR in $(env); do
  39. if [[ $VAR =~ ^KAFKA_ && "$VAR" =~ "_{HOSTNAME_COMMAND}" ]]; then
  40. eval "export ${VAR//_\{HOSTNAME_COMMAND\}/$HOSTNAME_VALUE}"
  41. fi
  42. done
  43. IFS=$ORIG_IFS
  44. fi
  45. if [[ -n "$PORT_COMMAND" ]]; then
  46. PORT_VALUE=$(eval "$PORT_COMMAND")
  47. # Replace any occurences of _{PORT_COMMAND} with the value
  48. IFS=$'\n'
  49. for VAR in $(env); do
  50. if [[ $VAR =~ ^KAFKA_ && "$VAR" =~ "_{PORT_COMMAND}" ]]; then
  51. eval "export ${VAR//_\{PORT_COMMAND\}/$PORT_VALUE}"
  52. fi
  53. done
  54. IFS=$ORIG_IFS
  55. fi
  56. if [[ -n "$RACK_COMMAND" && -z "$KAFKA_BROKER_RACK" ]]; then
  57. KAFKA_BROKER_RACK=$(eval "$RACK_COMMAND")
  58. export KAFKA_BROKER_RACK
  59. fi
  60. if [[ -z "$KAFKA_ADVERTISED_HOST_NAME$KAFKA_LISTENERS" ]]; then
  61. if [[ -n "$KAFKA_ADVERTISED_LISTENERS" ]]; then
  62. echo "ERROR: Missing environment variable KAFKA_LISTENERS. Must be specified when using KAFKA_ADVERTISED_LISTENERS"
  63. exit 1
  64. elif [[ -z "$HOSTNAME_VALUE" ]]; then
  65. echo "ERROR: No listener or advertised hostname configuration provided in environment."
  66. echo " Please define KAFKA_LISTENERS / (deprecated) KAFKA_ADVERTISED_HOST_NAME"
  67. exit 1
  68. fi
  69. export KAFKA_ADVERTISED_HOST_NAME="$HOSTNAME_VALUE"
  70. fi
  71. echo "" >> "$KAFKA_HOME/config/server.properties"
  72. (
  73. function updateConfig() {
  74. key=$1
  75. value=$2
  76. file=$3
  77. echo "[Configuring] '$key' in '$file'"
  78. if grep -E -q "^#?$key=" "$file"; then
  79. sed -r -i "s@^#?$key=.*@$key=$value@g" "$file"
  80. else
  81. echo "$key=$value" >> "$file"
  82. fi
  83. }
  84. # KAFKA_VERSION + KAFKA_HOME + grep -rohe KAFKA[A-Z0-0_]* /opt/kafka/bin | sort | uniq | tr '\n' '|'
  85. EXCLUSIONS="|KAFKA_VERSION|KAFKA_HOME|KAFKA_DEBUG|KAFKA_GC_LOG_OPTS|KAFKA_HEAP_OPTS|KAFKA_JMX_OPTS|KAFKA_JVM_PERFORMANCE_OPTS|KAFKA_LOG|KAFKA_OPTS|"
  86. IFS=$'\n'
  87. for VAR in $(env)
  88. do
  89. env_var=$(echo "$VAR" | cut -d= -f1)
  90. if [[ "$EXCLUSIONS" = *"|$env_var|"* ]]; then
  91. echo "Excluding $env_var from broker config"
  92. continue
  93. fi
  94. if [[ $env_var =~ ^KAFKA_ ]]; then
  95. kafka_name=$(echo "$env_var" | cut -d_ -f2- | tr '[:upper:]' '[:lower:]' | tr _ .)
  96. updateConfig "$kafka_name" "${!env_var}" "$KAFKA_HOME/config/server.properties"
  97. fi
  98. if [[ $env_var =~ ^LOG4J_ ]]; then
  99. log4j_name=$(echo "$env_var" | tr '[:upper:]' '[:lower:]' | tr _ .)
  100. updateConfig "$log4j_name" "${!env_var}" "$KAFKA_HOME/config/log4j.properties"
  101. fi
  102. done
  103. PODNAME=$(hostname -s | awk -F'-' 'OFS="-"{ $NF="";print}' |sed 's/-$//g')
  104. for ((i=0;i<$SERVERS;i++))
  105. do
  106. BOOTSTRAP_SERVERS+="$PODNAME-$i.$(hostname -d):${KAFKA_PORT},"
  107. done
  108. BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS%?}
  109. echo ${BOOTSTRAP_SERVERS} > /opt/log.txt
  110. sed -i "s/bootstrap.servers.*$/bootstrap.servers=$BOOTSTRAP_SERVERS/g" $KAFKA_HOME/config/consumer.properties
  111. sed -i "s/bootstrap.servers.*$/bootstrap.servers=$BOOTSTRAP_SERVERS/g" $KAFKA_HOME/config/producer.properties
  112. )
  113. if [[ -n "$CUSTOM_INIT_SCRIPT" ]] ; then
  114. eval "$CUSTOM_INIT_SCRIPT"
  115. fi
  116. exec "$KAFKA_HOME/bin/kafka-server-start.sh" "$KAFKA_HOME/config/server.properties"

2. 修改 Dockerfile

Dockerfile 未做其他修改,只是将修改后的 start-kafka.sh 脚本添加到镜像中,并使用 bash 环境来执行脚本(否则会有些命令无法执行):

  1. FROM wurstmeister/kafka:2.13-2.6.0
  2. ADD start-kafka.sh /
  3. CMD ["bash","start-kafka.sh"]

3. 打包镜像并上传私服

使用如下命令重新打包镜像并修改 tag:

  1. docker build --tag 10.16.12.204/ops/kafka:custom-v2.13-2.6.0 -f Dockerfile .

将镜像上传至镜像仓库:

  1. docker push 10.16.12.204/ops/kafka:custom-v2.13-2.6.0

四、创建命名空间

整个 Kafka 和 Zookeeper 集群都要在同一个命名空间下,所以使用如下 yaml 文件创建 ns-kafka 命名空间:

  1. ---
  2. apiVersion: v1
  3. kind: Namespace
  4. metadata:
  5. name: ns-kafka
  6. labels:
  7. name: ns-kafka

五、创建 Secret

Kubelet 到镜像仓库中拉取镜像需要进行验证,所以创建一个用于验证 Harbor 仓库的 Secret:

  1. kubectl create secret docker-registry harbor-secret --namespace=ns-kafka --docker-server=http://10.16.12.204 --docker-username=admin --docker-password=Harbor12345

六、创建 PV 和 PVC

在此次搭建集群的过程中,计划让 Kafka 集群和 Zookeeper 集群使用同一个 PV。在前面定义 Pod 初始化脚本时可以看到,Kafka 和 Zookeeper 中的数据目录以及日志目录,都是在以自己主机名命名的目录下,所以即便使用同一个 PV,也可以对目录进行区分。创建 PV 和 PVC 的 yaml 文件内容如下:

  1. ---
  2. apiVersion: v1
  3. kind: PersistentVolume
  4. metadata:
  5. name: kafka-data-pv
  6. spec:
  7. accessModes:
  8. - ReadWriteMany
  9. capacity:
  10. storage: 500Gi
  11. local:
  12. path: /opt/ops_ceph_data/kafka_data
  13. nodeAffinity:
  14. required:
  15. nodeSelectorTerms:
  16. - matchExpressions:
  17. - key: kafka-cluster
  18. operator: In
  19. values:
  20. - "true"
  21. persistentVolumeReclaimPolicy: Retain
  22. ---
  23. kind: PersistentVolumeClaim
  24. apiVersion: v1
  25. metadata:
  26. name: kafka-data-pvc
  27. namespace: ns-kafka
  28. spec:
  29. accessModes:
  30. - ReadWriteMany
  31. resources:
  32. requests:
  33. storage: 500Gi

需要声明的一点是,我当前使用的存储是 cephfs,并将其挂载到 K8S 的各个节点的 /opt/ops_ceph_data 目录下,所以在创建 PV 的时候使用的存储类型是 local。

七、创建 Labels

由于上面创建 PV 时指定的存储类型是 local,这个 PV 只能在满足指定 Label 的节点中进行调度,所以为集群中的所有节点添加一个 label:

  1. for i in 1 2 3 4 5; do kubectl label nodes k8s-node${i} kafka-cluster=true; done

八、创建 Zookeeper 集群

1. 创建 Service

创建用于 Zookeeper 与其他节点通信的 Service,yaml 文件内容如下:

  1. ---
  2. apiVersion: v1
  3. kind: Service
  4. metadata:
  5. name: zk-inner-service
  6. namespace: ns-kafka
  7. labels:
  8. app: zk
  9. spec:
  10. selector:
  11. app: zk
  12. clusterIP: None
  13. ports:
  14. - name: server
  15. port: 2888
  16. - name: leader-election
  17. port: 3888
  18. ---
  19. apiVersion: v1
  20. kind: Service
  21. metadata:
  22. name: zk-client-service
  23. namespace: ns-kafka
  24. labels:
  25. app: zk
  26. spec:
  27. selector:
  28. app: zk
  29. type: NodePort
  30. ports:
  31. - name: client
  32. port: 2181
  33. nodePort: 31811

2. 创建 StatefulSet

Zookeeper 属于有状态服务,所以要使用 StatefulSet 来部署,yaml 文件内容如下:

  1. ---
  2. apiVersion: apps/v1
  3. kind: StatefulSet
  4. metadata:
  5. name: zk
  6. namespace: ns-kafka
  7. spec:
  8. selector:
  9. matchLabels:
  10. app: zk
  11. serviceName: "zk-inner-service"
  12. replicas: 3
  13. updateStrategy:
  14. type: RollingUpdate
  15. podManagementPolicy: Parallel
  16. template:
  17. metadata:
  18. labels:
  19. app: zk
  20. spec:
  21. containers:
  22. - name: zk
  23. imagePullPolicy: Always
  24. image: 10.16.12.204/ops/zookeeper:custom-v3.6.2
  25. resources:
  26. requests:
  27. memory: "500Mi"
  28. cpu: "0.5"
  29. ports:
  30. - containerPort: 2181
  31. name: client
  32. - containerPort: 2888
  33. name: server
  34. - containerPort: 3888
  35. name: leader-election
  36. env:
  37. - name: SERVERS
  38. value: "3"
  39. - name: ZOO_CONF_DIR
  40. value: /opt/conf
  41. - name: ZOO_DATA_DIR
  42. value: /opt/data
  43. - name: ZOO_DATA_LOG_DIR
  44. value: /opt/data_log
  45. volumeMounts:
  46. - name: zookeeper-data
  47. mountPath: /opt/data
  48. subPath: zookeeper-cluster-data/data
  49. - name: zookeeper-data
  50. mountPath: /opt/data_log
  51. subPath: zookeeper-cluster-data/data_log
  52. - name: data-conf
  53. mountPath: /etc/localtime
  54. imagePullSecrets:
  55. - name: harbor-secret
  56. volumes:
  57. - name: zookeeper-data
  58. persistentVolumeClaim:
  59. claimName: kafka-data-pvc
  60. - name: data-conf
  61. hostPath:
  62. path: /usr/share/zoneinfo/Asia/Shanghai

3. 验证集群状态

集群搭建完成后,查看 zookeeper 各个节点当前的状态,使用如下命令:

  1. [@k8s-master1 /]# for i in 0 1 2; do kubectl exec -it zk-$i -n ns-kafka -- zkServer.sh --config /opt/conf/zk-$i status; done
  2. ZooKeeper JMX enabled by default
  3. Using config: /opt/conf/zk-0/zoo.cfg
  4. Client port found: 2181. Client address: localhost. Client SSL: false.
  5. Mode: follower
  6. ZooKeeper JMX enabled by default
  7. Using config: /opt/conf/zk-1/zoo.cfg
  8. Client port found: 2181. Client address: localhost. Client SSL: false.
  9. Mode: leader
  10. ZooKeeper JMX enabled by default
  11. Using config: /opt/conf/zk-2/zoo.cfg
  12. Client port found: 2181. Client address: localhost. Client SSL: false.
  13. Mode: follower

可以看到当前集群中是一个 leader,两个follower。接下来验证集群各个节点的消息同步,首先在 zk-0 节点上创建一个信息:

  1. [@k8s-master1 /]# kubectl exec -it zk-0 -n ns-kafka -- zkCli.sh
  2. [zk: localhost:2181(CONNECTED) 0] create /testMessage Hello
  3. Created /testMessage

在其他两个节点上查看这条消息:

  1. [@k8s-master1 /]# kubectl exec -it zk-1 -n ns-kafka -- zkCli.sh
  2. [zk: localhost:2181(CONNECTED) 0] get /testMessage
  3. Hello
  4. [@k8s-master1 /]# kubectl exec -it zk-2 -n ns-kafka -- zkCli.sh
  5. [zk: localhost:2181(CONNECTED) 0] get /testMessage
  6. Hello

可以正常看到消息,代表集群当前运行正常。

本文原文链接:https://blog.csdn.net/xzk9381/article/details/109535377,转载请注明出处。如有发现文章中的任何问题,欢迎评论区留言。

九、创建 Kafka 集群

1. 创建 Service

创建用于 Kafka 通信的 Service,yaml 文件内容如下:

  1. ---
  2. apiVersion: v1
  3. kind: Service
  4. metadata:
  5. name: kafka-service
  6. namespace: ns-kafka
  7. labels:
  8. app: kafka
  9. spec:
  10. ports:
  11. - port: 9092
  12. name: server
  13. clusterIP: None
  14. selector:
  15. app: kafka

2. 创建 StatefulSet

Kafka 属于有状态服务,所以要使用 StatefulSet 来部署,yaml 文件内容如下:

  1. ---
  2. apiVersion: apps/v1
  3. kind: StatefulSet
  4. metadata:
  5. name: kafka
  6. namespace: ns-kafka
  7. spec:
  8. selector:
  9. matchLabels:
  10. app: kafka
  11. serviceName: "kafka-service"
  12. replicas: 3
  13. updateStrategy:
  14. type: RollingUpdate
  15. podManagementPolicy: Parallel
  16. template:
  17. metadata:
  18. labels:
  19. app: kafka
  20. spec:
  21. imagePullSecrets:
  22. - name: harbor-secret
  23. containers:
  24. - name: kafka
  25. imagePullPolicy: Always
  26. image: 10.16.12.204/ops/kafka:custom-v2.13-2.6.0
  27. resources:
  28. requests:
  29. memory: "500Mi"
  30. cpu: "0.5"
  31. env:
  32. - name: SERVERS # 要确保 SERVERS 设置的值与副本数一致
  33. value: "3"
  34. - name: KAFKA_LISTENERS
  35. value: "PLAINTEXT://:9092"
  36. - name: KAFKA_ZOOKEEPER_CONNECT # 设置 Zookeeper 连接地址
  37. value: "zk-inner-service.ns-kafka.svc.cluster.local:2181"
  38. - name: KAFKA_PORT
  39. value: "9092"
  40. - name: KAFKA_MESSAGE_MAX_BYTES
  41. value: "20000000"
  42. - name: BROKER_ID_COMMAND # 这个变量用于在容器内部生成一个 broker id
  43. value: "hostname | awk -F'-' '{print $NF}'"
  44. volumeMounts:
  45. - name: kafka-log # 只需要将 kafka 的 log 目录持久化存储
  46. mountPath: /kafka
  47. subPath: kafka-cluster-log
  48. - name: data-conf
  49. mountPath: /etc/localtime
  50. volumes:
  51. - name: kafka-log
  52. persistentVolumeClaim:
  53. claimName: kafka-data-pvc
  54. - name: data-conf
  55. hostPath:
  56. path: /usr/share/zoneinfo/Asia/Shanghai

3. 验证集群状态

3.1 在 Zookeeper 中查看 broker
  1. [@k8s-master1 ~]# kubectl exec -it zk-0 -n ns-kafka -- zkCli.sh
  2. Connecting to localhost:2181
  3. [zk: localhost:2181(CONNECTED) 0] ls /
  4. [admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
  5. [zk: localhost:2181(CONNECTED) 1] ls /brokers
  6. [ids, seqid, topics]
  7. [zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
  8. [0, 1, 2]
  9. [zk: localhost:2181(CONNECTED) 3] get /brokers/ids/0
  10. { "listener_security_protocol_map":{ "PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-0.kafka-service.ns-kafka.svc.cluster.local:9092"],"jmx_port":-1,"port":9092,"host":"kafka-0.kafka-service.ns-kafka.svc.cluster.local","version":4,"timestamp":"1604644074102"}
  11. [zk: localhost:2181(CONNECTED) 4] get /brokers/ids/1
  12. { "listener_security_protocol_map":{ "PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-1.kafka-service.ns-kafka.svc.cluster.local:9092"],"jmx_port":-1,"port":9092,"host":"kafka-1.kafka-service.ns-kafka.svc.cluster.local","version":4,"timestamp":"1604644074079"}
  13. [zk: localhost:2181(CONNECTED) 5] get /brokers/ids/2
  14. { "listener_security_protocol_map":{ "PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-2.kafka-service.ns-kafka.svc.cluster.local:9092"],"jmx_port":-1,"port":9092,"host":"kafka-2.kafka-service.ns-kafka.svc.cluster.local","version":4,"timestamp":"1604644074009"}

可以看到 3 个 broker 都已经在 zookeeper 中注册了。

3.2 创建 Topic

在 kafka-0 节点中创建一个名为 Message 的 topic,3个分区,3个副本:

  1. [@k8s-master1 ~]# kubectl exec -it kafka-0 -n ns-kafka -- /bin/bash
  2. bash-4.4# kafka-topics.sh --create --topic Message --zookeeper zk-inner-service.ns-kafka.svc.cluster.local:2181 --partitions 3 --replication-factor 3
  3. Created topic Message.

在 zk-1 节点中查看是否存在这个 Topic:

  1. [@k8s-master1 ~]# kubectl exec -it zk-1 -n ns-kafka -- zkCli.sh
  2. Connecting to localhost:2181
  3. [zk: localhost:2181(CONNECTED) 0] ls /
  4. [admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
  5. [zk: localhost:2181(CONNECTED) 1] ls /brokers
  6. [ids, seqid, topics]
  7. [zk: localhost:2181(CONNECTED) 3] ls /brokers/topics
  8. [Message]

可以看到 Zookeeper 中已经存在这个 Topic 了。

3.3 模拟生产者和消费者

首先 在 kafka-1 上模拟生产者向 Message 中写入消息:

  1. [@k8s-master1 ~]# kubectl exec -it kafka-1 -n ns-kafka -- /bin/bash
  2. bash-4.4# kafka-console-producer.sh --topic Message --broker-list kafka-0.kafka-service.ns-kafka.svc.cluster.local:9092,kafka-1.kafka-service.ns-kafka.svc.cluster.local:9092,kafka-2.kafka-service.ns-kafka.svc.cluster.local:9092
  3. >This is a test message
  4. >Welcome to Kafka

然后在 kafka-2 中模拟消费者消费这些信息:

  1. [@k8s-master1 ~]# kubectl exec -it kafka-2 -n ns-kafka -- /bin/bash
  2. bash-4.4# kafka-console-consumer.sh --topic Message --bootstrap-server kafka-0.kafka-service.ns-kafka.svc.cluster.local:9092,kafka-1.kafka-service.ns-kafka.svc.cluster.local:9092,kafka-2.kafka-service.ns-kafka.svc.cluster.local:9092 --from-beginning
  3. This is a test message
  4. Welcome to Kafka

可以正常生产消息和消费消息,代表 Kafka 集群运行正常。

十、FAQ

1. 如何在 yaml 文件中指定要创建的 Topic

在 yaml 文件中指定如下 env,即可在 Pod 启动自动创建 Topic:

  1. env:
  2. - name: KAFKA_CREATE_TOPICS
  3. value: "Topic1:1:3,Topic2:1:1:compact"

上面的内容代表 Topic1 会有 1 个分区,3个副本,Topic2 会有 1 个分区,1 个副本并且副本的 cleanup.policy 设置为 compact。

自动创建 Topic 一定要设置 KAFKA_CREATE_TOPICS 变量,然后会由 create_topic.sh 脚本(镜像中存在)根据变量内容自动创建。

2. 为 Topic 设置的 compaction 不生效

可参考网址:https://github.com/wurstmeister/kafka-docker/wiki\#topic-compaction-does-not-work

本文原文链接:https://blog.csdn.net/xzk9381/article/details/109535377,转载请注明出处。如有发现文章中的任何问题,欢迎评论区留言。

发表评论

表情:
评论列表 (有 0 条评论,342人围观)

还没有评论,来说两句吧...

相关阅读