离线数仓(二):Flume集群环境的搭建
文章目录
- 一、业务日志生成
- 二、采集日志 Flume 集群的搭建
- 2.1 Flume 集群安装
- 2.2 Flume Source 和 Channel 说明
- 2.3 日志采集 Flume 配置
- 2.4 自定义拦截器的创建
- 2.5 日志采集 Flume 启动停止脚本
- 2.6 Flume 内存优化
- 三、Kafka 集群的搭建
- 3.1 Kafka 集群安装
- 3.2 Kafka 压力测试
- 3.3 Kafka 机器数量计算
- 3.4 消费 Kafka 数据 Flume 配置
一、业务日志生成
业务日志生成服务规划:hadoop100
、hadoop101
① 代码参数说明
// 参数一:控制发送每条的延时时间,默认是0
Long delay = args.length > 0 ? Long.parseLong(args[0]) : 0L;
// 参数二:循环遍历次数
int loop_len = args.length > 1 ? Integer.parseInt(args[1]) : 1000;
② 将生成日志的服务 jar 包上传至 hadoop100 、 hadoop101 ,执行 jar 包
[root@hadoop100 logs-app]# java -jar log-collector-1.0.jar
jar执行的两种方式:
java -classpath 需要在jar包后面指定全类名;
java -jar 需要查看一下解压的jar包META-INF/ MANIFEST.MF文件中,Main-Class是否有全类名。
[root@hadoop100 logs-app]# java -jar log-collector-1.0.jar 2>/dev/null 1>/dev/null
可简写为
[root@hadoop100 logs-app]# java -jar log-collector-1.0.jar >/dev/null 2>&1
/dev/null 是 Linux 的“黑洞”
标准输入0:从键盘获得输入 /proc/self/fd/0
标准输出1:输出到屏幕(即控制台) /proc/self/fd/1
错误输出2:输出到屏幕(即控制台) /proc/self/fd/2
③ 在 /tmp/logs 路径下查看生成的日志文件
[root@hadoop100 module]# ls /tmp/logs
app-2020-12-21.log
④ 日志生成启动脚本
#! /bin/bash
for i in hadoop102 hadoop103
do
ssh $i "java -jar /opt/module/log-collector-1.0.jar $1 $2 >/dev/null 2>&1 &"
done
二、采集日志 Flume 集群的搭建
2.1 Flume 集群安装
采集日志Flume
集群规划:hadoop100
、hadoop101
① 解压 apache-flume-1.9.0-bin.tar.gz 到 /opt/module/ 目录下
② 将 lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3
③ 修改配置文件中 JAVA_HOME
[root@hadoop100 module]# mv flume-env.sh.template flume-env.sh
[root@hadoop100 module]# vim flume-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_212
2.2 Flume Source 和 Channel 说明
① Source
名称 | 作用 |
---|---|
TailDir Source | 断点续传、多目录。Flume1.6 以前需要自己自定义 Source 记录每次读取文件位置,实现断点续传 |
Exec Source | 可以实时搜集数据,但是在 Flume 不运行或者 Shell 命令出错的情况下,数据将会丢失 |
Spooling Directory Source | 监控目录,不支持断点续 |
② Channel
采用 Kafka Channel ,省去了 Sink ,提高了效率。 KafkaChannel 数据存储在 Kafka 里面,所以数据是存储在磁盘中。
注意在Flume1.7
以前,Kafka Channel
很少有人使用,因为发现parseAsFlumeEvent
这个配置起不了作用。也就是无论parseAsFlumeEvent
配置为true
还是false
,都会转为Flume Event
。这样的话,造成的结果是,会始终都把Flume
的headers
中的信息混合着内容一起写入Kafka
的消息中,这显然不是我所需要的,我只是需要把内容写入即可。
2.3 日志采集 Flume 配置
a1.sources=r1
a1.channels=c1 c2
# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.hucheng.flume.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.hucheng.flume.LogTypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2
# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop100:9092,hadoop101:9092,hadoop102:9092
a1.channels.c1.kafka.topic = topic_start
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = hadoop100:9092,hadoop101:9092,hadoop102:9092
a1.channels.c2.kafka.topic = topic_event
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer
2.4 自定义拦截器的创建
本项目中自定义了两个拦截器,分别是:ETL
拦截器、日志类型区分拦截器。
ETL
拦截器主要用于,过滤时间戳不合法和JSON
数据不完整的日志;- 日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往
Kafka
的不同Topic
。
引入依赖:
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
① LogETLInterceptor
public class LogETLInterceptor implements Interceptor {
@Override
public void initialize() { }
@Override
public Event intercept(Event event) {
// 1、获取数据
byte[] body = event.getBody();
String log = new String(body, Charset.forName("UTF-8"));
// 2、判断数据类型冰箱Header中赋值
if (log.contains("start")) {
if (LogUtils.validateStart(log)) {
return event;
}
} else {
if (LogUtils.validateEvent(log)) {
return event;
}
}
return null;
}
@Override
public List<Event> intercept(List<Event> list) {
List<Event> interceptors = new ArrayList<>();
for (Event event : list) {
if (event != null) {
interceptors.add(intercept(event));
}
}
return interceptors;
}
@Override
public void close() { }
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new LogETLInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
② LogTypeInterceptor
public class LogTypeInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
//1、获取body数据
byte[] body = event.getBody();
String log = new String(body, Charset.forName("UTF-8"));
//2、获取headers数据
Map<String, String> headers = event.getHeaders();
if (log.contains("start")) {
headers.put("topic", "topic_start");
} else {
headers.put("topic", "topic_event");
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
return list;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new LogTypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
2.5 日志采集 Flume 启动停止脚本
#! /bin/bash
case $1 in
"start"){
for i in hadoop100 hadoop101
do
echo " --------启动 $i 采集flume-------"
ssh $i "nohup /opt/module/flume-1.9.0/bin/flume-ng agent --conf-file /opt/module/flume-1.9.0/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume-1.9.0/logs 2>&1 &"
done
};;
"stop"){
for i in hadoop100 hadoop101
do
echo " --------停止 $i 采集flume-------"
ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "
done
};;
esac
2.6 Flume 内存优化
问题描述:如果启动消费Flume
抛出如下异常
ERROR hdfs.HDFSEventSink: process failed java.lang.OutOfMemoryError: GC overhead limit exceeded
**解决方案:**修改目录下/conf/flume-env.sh
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
三、Kafka 集群的搭建
3.1 Kafka 集群安装
Kafka
集群规划:hadoop100
、hadoop101
、hadoop102
① 解压安装包并修改文件夹名
[root@hadoop100 software]# tar -zxvf kafka_2.11-2.4.1.tgz -C /opt/module/
[root@hadoop100 module]# mv kafka_2.11-2.4.1/ kafka-2.4.1/
② 在 Kafka 文件下创建 log 文件夹
[root@hadoop100 kafka-2.4.1]# mkdir log
③ 修改配置文件
[root@hadoop100 config]# vim server.properties
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka-2.4.1/log
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop100:2181,hadoop101:2181,hadoop102:2181/kafka
④ 分发 Kafka 文件夹以及修改不同的 broker.id
⑤ Kafka 群启脚本
#! /bin/bash
case $1 in
"start"){
for i in hadoop100 hadoop101 hadoop102
do
echo " --------启动 $i Kafka-------"
ssh $i "/opt/module/kafka-2.4.1/bin/kafka-server-start.sh -daemon /opt/module/kafka-2.4.1/config/server.properties "
done
};;
"stop"){
for i in hadoop100 hadoop101 hadoop102
do
echo " --------停止 $i Kafka-------"
ssh $i "/opt/module/kafka-2.4.1/bin/kafka-server-stop.sh stop"
done
};;
esac
3.2 Kafka 压力测试
用Kafka
官方自带的脚本,对Kafka
进行压测。Kafka
压测时,可以查看到哪个地方出现了瓶颈(CPU
,内存,网络IO
)。一般都是网络IO
达到瓶颈。
① Kafka Producer 压力测试
[root@hadoop100 kafka-2.4.1]# bin/kafka-producer-perf-test.sh --topic test \
--record-size 100 --num-records 100000 --throughput -1 \
--producer-props bootstrap.servers=hadoop100:9092,hadoop101:9092,hadoop102:9092
100000 records sent, 81632.653061 records/sec (7.79 MB/sec), 307.21 ms avg
latency, 466.00 ms max latency, 333 ms 50th, 442 ms 95th, 464 ms 99th, 466 ms 99.9th.
参数说明:
record-size
是一条信息有多大,单位是字节num-records
是总共发送多少条信息throughput
是每秒多少条信息,设成-1,表示不限流,可测出生产者最大吞吐量
打印说明:本例中一共写入 10w 条消息,吞吐量为9.14 MB/sec ,每次写入的平均延迟为307.21毫秒,最大的延迟为466.00毫秒。
② Kafka Consumer 压力测试
[root@hadoop100 kafka-2.4.1]# bin/kafka-consumer-perf-test.sh \
--broker-list hadoop100:9092,hadoop101:9092,hadoop102:9092 \
--topic test --fetch-size 10000 --messages 10000000 --threads 1
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec,
rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2020-12-29 19:42:31:728, 2020-12-29 19:42:42:975, 9.5367, 0.8479, 100000, 8891.2599,
1609242152253, -1609242141006, -0.0000, -0.0001
参数说明:
- –zookeeper:指定
zookeeper
的链接信息 - –topic:指定
topic
的名称 - –fetch-size:指定每次
fetch
的数据的大小 - –messages:总共要消费的消息个数
打印说明:
- data.consumed.in.MB:共消费数据
- MB.sec:吞吐量
- data.consumed.in.nMsg:共消费多少条数据
- nMsg.sec:平均每秒消费多少条
3.3 Kafka 机器数量计算
Kafka
机器数量(经验公式)= 2*(峰值生产速度*副本数/100)+1
先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署Kafka
的数量。比如我们的峰值生产速度是50M/s。副本数为2。
Kafka
机器数量=2*(50*2/100)+ 1=3台
3.4 消费 Kafka 数据 Flume 配置
集群规划:Hadoop102
配置分析:
配置详情:
## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop100:9092,hadoop101:9092,hadoop102:9092
a1.sources.r1.kafka.topics=topic_start
## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = hadoop100:9092,hadoop101:9092,hadoop102:9092
a1.sources.r2.kafka.topics=topic_event
## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume-1.9.0/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume-1.9.0/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /opt/module/flume-1.9.0/checkpoint/behavior2
a1.channels.c2.dataDirs = /opt/module/flume-1.9.0/data/behavior2/
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop100:8020/origin_data/mall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://hadoop100:8020/origin_data/mall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
## 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2
还没有评论,来说两句吧...