离线数仓(二):Flume集群环境的搭建

太过爱你忘了你带给我的痛 2022-12-31 05:18 396阅读 0赞

文章目录

  • 一、业务日志生成
  • 二、采集日志 Flume 集群的搭建
    • 2.1 Flume 集群安装
    • 2.2 Flume Source 和 Channel 说明
    • 2.3 日志采集 Flume 配置
    • 2.4 自定义拦截器的创建
    • 2.5 日志采集 Flume 启动停止脚本
    • 2.6 Flume 内存优化
  • 三、Kafka 集群的搭建
    • 3.1 Kafka 集群安装
    • 3.2 Kafka 压力测试
    • 3.3 Kafka 机器数量计算
    • 3.4 消费 Kafka 数据 Flume 配置

一、业务日志生成

业务日志生成服务规划:hadoop100hadoop101

① 代码参数说明

  1. // 参数一:控制发送每条的延时时间,默认是0
  2. Long delay = args.length > 0 ? Long.parseLong(args[0]) : 0L;
  3. // 参数二:循环遍历次数
  4. int loop_len = args.length > 1 ? Integer.parseInt(args[1]) : 1000;

② 将生成日志的服务 jar 包上传至 hadoop100 、 hadoop101 ,执行 jar 包

  1. [root@hadoop100 logs-app]# java -jar log-collector-1.0.jar

jar执行的两种方式:
java -classpath 需要在jar包后面指定全类名;
java -jar 需要查看一下解压的jar包META-INF/ MANIFEST.MF文件中,Main-Class是否有全类名。

  1. [root@hadoop100 logs-app]# java -jar log-collector-1.0.jar 2>/dev/null 1>/dev/null

可简写为

  1. [root@hadoop100 logs-app]# java -jar log-collector-1.0.jar >/dev/null 2>&1

/dev/null 是 Linux 的“黑洞”
标准输入0:从键盘获得输入 /proc/self/fd/0
标准输出1:输出到屏幕(即控制台) /proc/self/fd/1
错误输出2:输出到屏幕(即控制台) /proc/self/fd/2

③ 在 /tmp/logs 路径下查看生成的日志文件

  1. [root@hadoop100 module]# ls /tmp/logs
  2. app-2020-12-21.log

④ 日志生成启动脚本

  1. #! /bin/bash
  2. for i in hadoop102 hadoop103
  3. do
  4. ssh $i "java -jar /opt/module/log-collector-1.0.jar $1 $2 >/dev/null 2>&1 &"
  5. done

二、采集日志 Flume 集群的搭建

2.1 Flume 集群安装

采集日志Flume集群规划:hadoop100hadoop101

① 解压 apache-flume-1.9.0-bin.tar.gz 到 /opt/module/ 目录下

② 将 lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3

③ 修改配置文件中 JAVA_HOME

  1. [root@hadoop100 module]# mv flume-env.sh.template flume-env.sh
  2. [root@hadoop100 module]# vim flume-env.sh
  3. export JAVA_HOME=/opt/module/jdk1.8.0_212

2.2 Flume Source 和 Channel 说明

① Source






















名称 作用
TailDir Source 断点续传、多目录。Flume1.6 以前需要自己自定义 Source 记录每次读取文件位置,实现断点续传
Exec Source 可以实时搜集数据,但是在 Flume 不运行或者 Shell 命令出错的情况下,数据将会丢失
Spooling Directory Source 监控目录,不支持断点续

② Channel

采用 Kafka Channel ,省去了 Sink ,提高了效率。 KafkaChannel 数据存储在 Kafka 里面,所以数据是存储在磁盘中。

注意在Flume1.7以前,Kafka Channel很少有人使用,因为发现parseAsFlumeEvent这个配置起不了作用。也就是无论parseAsFlumeEvent配置为true还是false,都会转为Flume Event。这样的话,造成的结果是,会始终都把Flumeheaders中的信息混合着内容一起写入Kafka的消息中,这显然不是我所需要的,我只是需要把内容写入即可。

2.3 日志采集 Flume 配置

在这里插入图片描述

  1. a1.sources=r1
  2. a1.channels=c1 c2
  3. # configure source
  4. a1.sources.r1.type = TAILDIR
  5. a1.sources.r1.positionFile = /opt/module/flume-1.9.0/test/log_position.json
  6. a1.sources.r1.filegroups = f1
  7. a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
  8. a1.sources.r1.fileHeader = true
  9. a1.sources.r1.channels = c1 c2
  10. #interceptor
  11. a1.sources.r1.interceptors = i1 i2
  12. a1.sources.r1.interceptors.i1.type = com.hucheng.flume.LogETLInterceptor$Builder
  13. a1.sources.r1.interceptors.i2.type = com.hucheng.flume.LogTypeInterceptor$Builder
  14. a1.sources.r1.selector.type = multiplexing
  15. a1.sources.r1.selector.header = topic
  16. a1.sources.r1.selector.mapping.topic_start = c1
  17. a1.sources.r1.selector.mapping.topic_event = c2
  18. # configure channel
  19. a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
  20. a1.channels.c1.kafka.bootstrap.servers = hadoop100:9092,hadoop101:9092,hadoop102:9092
  21. a1.channels.c1.kafka.topic = topic_start
  22. a1.channels.c1.parseAsFlumeEvent = false
  23. a1.channels.c1.kafka.consumer.group.id = flume-consumer
  24. a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
  25. a1.channels.c2.kafka.bootstrap.servers = hadoop100:9092,hadoop101:9092,hadoop102:9092
  26. a1.channels.c2.kafka.topic = topic_event
  27. a1.channels.c2.parseAsFlumeEvent = false
  28. a1.channels.c2.kafka.consumer.group.id = flume-consumer

2.4 自定义拦截器的创建

本项目中自定义了两个拦截器,分别是:ETL拦截器、日志类型区分拦截器。

  • ETL拦截器主要用于,过滤时间戳不合法和JSON数据不完整的日志;
  • 日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往Kafka的不同Topic

引入依赖:

  1. <dependency>
  2. <groupId>org.apache.flume</groupId>
  3. <artifactId>flume-ng-core</artifactId>
  4. <version>1.9.0</version>
  5. </dependency>

① LogETLInterceptor

  1. public class LogETLInterceptor implements Interceptor {
  2. @Override
  3. public void initialize() { }
  4. @Override
  5. public Event intercept(Event event) {
  6. // 1、获取数据
  7. byte[] body = event.getBody();
  8. String log = new String(body, Charset.forName("UTF-8"));
  9. // 2、判断数据类型冰箱Header中赋值
  10. if (log.contains("start")) {
  11. if (LogUtils.validateStart(log)) {
  12. return event;
  13. }
  14. } else {
  15. if (LogUtils.validateEvent(log)) {
  16. return event;
  17. }
  18. }
  19. return null;
  20. }
  21. @Override
  22. public List<Event> intercept(List<Event> list) {
  23. List<Event> interceptors = new ArrayList<>();
  24. for (Event event : list) {
  25. if (event != null) {
  26. interceptors.add(intercept(event));
  27. }
  28. }
  29. return interceptors;
  30. }
  31. @Override
  32. public void close() { }
  33. public static class Builder implements Interceptor.Builder {
  34. @Override
  35. public Interceptor build() {
  36. return new LogETLInterceptor();
  37. }
  38. @Override
  39. public void configure(Context context) {
  40. }
  41. }
  42. }

② LogTypeInterceptor

  1. public class LogTypeInterceptor implements Interceptor {
  2. @Override
  3. public void initialize() {
  4. }
  5. @Override
  6. public Event intercept(Event event) {
  7. //1、获取body数据
  8. byte[] body = event.getBody();
  9. String log = new String(body, Charset.forName("UTF-8"));
  10. //2、获取headers数据
  11. Map<String, String> headers = event.getHeaders();
  12. if (log.contains("start")) {
  13. headers.put("topic", "topic_start");
  14. } else {
  15. headers.put("topic", "topic_event");
  16. }
  17. return event;
  18. }
  19. @Override
  20. public List<Event> intercept(List<Event> list) {
  21. return list;
  22. }
  23. @Override
  24. public void close() {
  25. }
  26. public static class Builder implements Interceptor.Builder {
  27. @Override
  28. public Interceptor build() {
  29. return new LogTypeInterceptor();
  30. }
  31. @Override
  32. public void configure(Context context) {
  33. }
  34. }
  35. }

2.5 日志采集 Flume 启动停止脚本

  1. #! /bin/bash
  2. case $1 in
  3. "start"){
  4. for i in hadoop100 hadoop101
  5. do
  6. echo " --------启动 $i 采集flume-------"
  7. ssh $i "nohup /opt/module/flume-1.9.0/bin/flume-ng agent --conf-file /opt/module/flume-1.9.0/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume-1.9.0/logs 2>&1 &"
  8. done
  9. };;
  10. "stop"){
  11. for i in hadoop100 hadoop101
  12. do
  13. echo " --------停止 $i 采集flume-------"
  14. ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "
  15. done
  16. };;
  17. esac

2.6 Flume 内存优化

问题描述:如果启动消费Flume抛出如下异常

  1. ERROR hdfs.HDFSEventSink: process failed java.lang.OutOfMemoryError: GC overhead limit exceeded

**解决方案:**修改目录下/conf/flume-env.sh

  1. export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

三、Kafka 集群的搭建

3.1 Kafka 集群安装

Kafka集群规划:hadoop100hadoop101hadoop102

① 解压安装包并修改文件夹名

  1. [root@hadoop100 software]# tar -zxvf kafka_2.11-2.4.1.tgz -C /opt/module/
  2. [root@hadoop100 module]# mv kafka_2.11-2.4.1/ kafka-2.4.1/

② 在 Kafka 文件下创建 log 文件夹

  1. [root@hadoop100 kafka-2.4.1]# mkdir log

③ 修改配置文件

  1. [root@hadoop100 config]# vim server.properties
  2. #broker的全局唯一编号,不能重复
  3. broker.id=0
  4. #删除topic功能使能
  5. delete.topic.enable=true
  6. #kafka运行日志存放的路径
  7. log.dirs=/opt/module/kafka-2.4.1/log
  8. #配置连接Zookeeper集群地址
  9. zookeeper.connect=hadoop100:2181,hadoop101:2181,hadoop102:2181/kafka

④ 分发 Kafka 文件夹以及修改不同的 broker.id

⑤ Kafka 群启脚本

  1. #! /bin/bash
  2. case $1 in
  3. "start"){
  4. for i in hadoop100 hadoop101 hadoop102
  5. do
  6. echo " --------启动 $i Kafka-------"
  7. ssh $i "/opt/module/kafka-2.4.1/bin/kafka-server-start.sh -daemon /opt/module/kafka-2.4.1/config/server.properties "
  8. done
  9. };;
  10. "stop"){
  11. for i in hadoop100 hadoop101 hadoop102
  12. do
  13. echo " --------停止 $i Kafka-------"
  14. ssh $i "/opt/module/kafka-2.4.1/bin/kafka-server-stop.sh stop"
  15. done
  16. };;
  17. esac

3.2 Kafka 压力测试

Kafka官方自带的脚本,对Kafka进行压测。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈。

① Kafka Producer 压力测试

  1. [root@hadoop100 kafka-2.4.1]# bin/kafka-producer-perf-test.sh --topic test \
  2. --record-size 100 --num-records 100000 --throughput -1 \
  3. --producer-props bootstrap.servers=hadoop100:9092,hadoop101:9092,hadoop102:9092
  4. 100000 records sent, 81632.653061 records/sec (7.79 MB/sec), 307.21 ms avg
  5. latency, 466.00 ms max latency, 333 ms 50th, 442 ms 95th, 464 ms 99th, 466 ms 99.9th.

参数说明:

  • record-size是一条信息有多大,单位是字节
  • num-records是总共发送多少条信息
  • throughput是每秒多少条信息,设成-1,表示不限流,可测出生产者最大吞吐量

打印说明:本例中一共写入 10w 条消息,吞吐量为9.14 MB/sec ,每次写入的平均延迟为307.21毫秒,最大的延迟为466.00毫秒。

② Kafka Consumer 压力测试

  1. [root@hadoop100 kafka-2.4.1]# bin/kafka-consumer-perf-test.sh \
  2. --broker-list hadoop100:9092,hadoop101:9092,hadoop102:9092 \
  3. --topic test --fetch-size 10000 --messages 10000000 --threads 1
  4. start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec,
  5. rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
  6. 2020-12-29 19:42:31:728, 2020-12-29 19:42:42:975, 9.5367, 0.8479, 100000, 8891.2599,
  7. 1609242152253, -1609242141006, -0.0000, -0.0001

参数说明:

  • –zookeeper:指定zookeeper的链接信息
  • –topic:指定topic的名称
  • –fetch-size:指定每次fetch的数据的大小
  • –messages:总共要消费的消息个数

打印说明:

  • data.consumed.in.MB:共消费数据
  • MB.sec:吞吐量
  • data.consumed.in.nMsg:共消费多少条数据
  • nMsg.sec:平均每秒消费多少条

3.3 Kafka 机器数量计算

Kafka机器数量(经验公式)= 2*(峰值生产速度*副本数/100)+1

先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署Kafka的数量。比如我们的峰值生产速度是50M/s。副本数为2。

Kafka机器数量=2*(50*2/100)+ 1=3台

3.4 消费 Kafka 数据 Flume 配置

集群规划Hadoop102

配置分析
在这里插入图片描述

配置详情

  1. ## 组件
  2. a1.sources=r1 r2
  3. a1.channels=c1 c2
  4. a1.sinks=k1 k2
  5. ## source1
  6. a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
  7. a1.sources.r1.batchSize = 5000
  8. a1.sources.r1.batchDurationMillis = 2000
  9. a1.sources.r1.kafka.bootstrap.servers = hadoop100:9092,hadoop101:9092,hadoop102:9092
  10. a1.sources.r1.kafka.topics=topic_start
  11. ## source2
  12. a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
  13. a1.sources.r2.batchSize = 5000
  14. a1.sources.r2.batchDurationMillis = 2000
  15. a1.sources.r2.kafka.bootstrap.servers = hadoop100:9092,hadoop101:9092,hadoop102:9092
  16. a1.sources.r2.kafka.topics=topic_event
  17. ## channel1
  18. a1.channels.c1.type = file
  19. a1.channels.c1.checkpointDir = /opt/module/flume-1.9.0/checkpoint/behavior1
  20. a1.channels.c1.dataDirs = /opt/module/flume-1.9.0/data/behavior1/
  21. a1.channels.c1.maxFileSize = 2146435071
  22. a1.channels.c1.capacity = 1000000
  23. a1.channels.c1.keep-alive = 6
  24. ## channel2
  25. a1.channels.c2.type = file
  26. a1.channels.c2.checkpointDir = /opt/module/flume-1.9.0/checkpoint/behavior2
  27. a1.channels.c2.dataDirs = /opt/module/flume-1.9.0/data/behavior2/
  28. a1.channels.c2.maxFileSize = 2146435071
  29. a1.channels.c2.capacity = 1000000
  30. a1.channels.c2.keep-alive = 6
  31. ## sink1
  32. a1.sinks.k1.type = hdfs
  33. a1.sinks.k1.hdfs.path = hdfs://hadoop100:8020/origin_data/mall/log/topic_start/%Y-%m-%d
  34. a1.sinks.k1.hdfs.filePrefix = logstart-
  35. ##sink2
  36. a1.sinks.k2.type = hdfs
  37. a1.sinks.k2.hdfs.path = hdfs://hadoop100:8020/origin_data/mall/log/topic_event/%Y-%m-%d
  38. a1.sinks.k2.hdfs.filePrefix = logevent-
  39. ## 不要产生大量小文件
  40. a1.sinks.k1.hdfs.rollInterval = 10
  41. a1.sinks.k1.hdfs.rollSize = 134217728
  42. a1.sinks.k1.hdfs.rollCount = 0
  43. a1.sinks.k2.hdfs.rollInterval = 10
  44. a1.sinks.k2.hdfs.rollSize = 134217728
  45. a1.sinks.k2.hdfs.rollCount = 0
  46. ## 控制输出文件是原生文件。
  47. a1.sinks.k1.hdfs.fileType = CompressedStream
  48. a1.sinks.k2.hdfs.fileType = CompressedStream
  49. a1.sinks.k1.hdfs.codeC = lzop
  50. a1.sinks.k2.hdfs.codeC = lzop
  51. ## 拼装
  52. a1.sources.r1.channels = c1
  53. a1.sinks.k1.channel= c1
  54. a1.sources.r2.channels = c2
  55. a1.sinks.k2.channel= c2

发表评论

表情:
评论列表 (有 0 条评论,396人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Flume

    1. 概念 集群的意思是多台机器,最少有2台机器,一台机器从数据源中获取数据,将数据传送到另一台机器上,然后输出。接下来就要实现Flume集群搭建。集群如下图所示。 !