Flume三:负载均衡和故障转移(SinkProcessor)

小灰灰 2023-06-30 10:55 61阅读 0赞

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzIyMDQ5Nzcz_size_16_color_FFFFFF_t_70

案例一:故障转移

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzIyMDQ5Nzcz_size_16_color_FFFFFF_t_70 1

agent1 : flume.conf(node2)FailoverSinkProcessor

  1. #定义
  2. a1.sources = r1
  3. a1.channels = c1
  4. a1.sinks = k1 k2
  5. #定义sink组
  6. a1.sinkgroups = g1
  7. #source
  8. a1.sources.r1.type = TAILDIR
  9. # 用于断点续传,文件中包含各个路径下各个文件当前读取到的偏移量
  10. a1.sources.r1.positionFile = /opt/flume_conf/tmp/tail_dir.json
  11. # 文件组 如两目录 f1 f2
  12. a1.sources.r1.filegroups = f1
  13. # f1
  14. a1.sources.r1.filegroups.f1 = /opt/data/.*.log
  15. # 将数据流复制给所有 channel , 可以不用设置,默认是复制模式
  16. a1.sources.r1.selector.type = replicating
  17. #channel
  18. a1.channels.c1.type = memory
  19. a1.channels.c1.capacity = 1000
  20. a1.channels.c1.transactionCapacity = 100
  21. #sink
  22. # sink 端的 avro 是一个数据发送者
  23. a1.sinks.k1.type = avro
  24. a1.sinks.k1.hostname = node3
  25. a1.sinks.k1.port = 4141
  26. a1.sinks.k2.type = avro
  27. a1.sinks.k2.hostname = node3
  28. a1.sinks.k2.port = 4142
  29. #sink的故障转移机制,agent2优先级高于agent3
  30. a1.sinkgroups.g1.processor.type = failover
  31. a1.sinkgroups.g1.processor.priority.k1 = 10
  32. a1.sinkgroups.g1.processor.priority.k2 = 5
  33. a1.sinkgroups.g1.processor.maxpenalty = 10000
  34. #关联关系
  35. a1.sources.r1.channels = c1
  36. a1.sinks.k1.channel = c1
  37. a1.sinks.k2.channel = c1
  38. a1.sinkgroups.g1.sinks = k1 k2

agent2 : flume.conf(node3)

  1. #定义
  2. a2.sources = r1
  3. a2.sinks = k1
  4. a2.channels = c1
  5. #source
  6. a2.sources.r1.type = avro
  7. a2.sources.r1.bind = node3
  8. a2.sources.r1.port = 4141
  9. #channel
  10. a2.channels.c1.type = memory
  11. a2.channels.c1.capacity = 1000
  12. a2.channels.c1.transactionCapacity = 100
  13. #sink
  14. #控制台日志打印
  15. a2.sinks.k1.type = logger
  16. #关联关系
  17. a2.sources.r1.channels = c1
  18. a2.sinks.k1.channel = c1

agent3 : flume.conf(node3)

  1. #定义
  2. a3.sources = r1
  3. a3.sinks = k1
  4. a3.channels = c1
  5. #source
  6. a3.sources.r1.type = avro
  7. a3.sources.r1.bind = node3
  8. a3.sources.r1.port = 4142
  9. # channel
  10. a3.channels.c1.type = memory
  11. a3.channels.c1.capacity = 1000
  12. a3.channels.c1.transactionCapacity = 100
  13. #sink
  14. #存储到hdfs
  15. a3.sinks.k1.type = hdfs
  16. a3.sinks.k1.hdfs.path=hdfs://mycluster/log_data/nginx/%Y%m%d/%H
  17. #上传文件的前缀
  18. a3.sinks.k1.hdfs.filePrefix = upload-
  19. #是否按照时间滚动文件夹
  20. a3.sinks.k1.hdfs.round = true
  21. #多少时间单位创建一个新的文件夹
  22. a3.sinks.k1.hdfs.roundValue = 1
  23. #重新定义时间单位
  24. a3.sinks.k1.hdfs.roundUnit = hour
  25. #是否使用本地时间戳
  26. a3.sinks.k1.hdfs.useLocalTimeStamp = true
  27. #积攒多少个 Event 才 flush 到 HDFS 一次
  28. a3.sinks.k1.hdfs.batchSize = 100
  29. #设置文件类型,可支持压缩
  30. a3.sinks.k1.hdfs.fileType = DataStream
  31. #多久生成一个新的文件
  32. a3.sinks.k1.hdfs.rollInterval = 60
  33. #设置每个文件的滚动大小大概是 128M
  34. a3.sinks.k1.hdfs.rollSize = 134217700
  35. #文件的滚动与 Event 数量无关
  36. a3.sinks.k1.hdfs.rollCount = 0
  37. #访问hdfs超时时间,5分钟
  38. a1.sinks.k1.hdfs.callTimeout=300000
  39. # 关联关系
  40. a3.sources.r1.channels = c1
  41. a3.sinks.k1.channel = c1

启动

  1. 依次执行
  2. node3节点
  3. flume-ng agent -n a3 -c conf -f /opt/flume_conf/flume3.conf
  4. flume-ng agent -n a2 -c conf -f /opt/flume_conf/flume2.conf
  5. node2节点
  6. flume-ng agent -n a1 -c conf -f /opt/flume_conf/flume1.conf

测试:

  1. **1、启动后查看 agent2 logger控制台日志被打印出来,同时HDFS中没有新文件创建**

2、kill 掉 agent2 ,此时查看HDFS中已经有了对应的日志存储文件

案例二:负载均衡LoadBalancingSinkProcessor

将案例一中的agent1配置中的sinkgroups的配置改为如下即可

  1. a1.sinkgroups = g1
  2. a1.sinkgroups.g1.sinks = k1 k2
  3. a1.sinkgroups.g1.processor.type = load_balance
  4. a1.sinkgroups.g1.processor.backoff = true
  5. a1.sinkgroups.g1.processor.selector = random

发表评论

表情:
评论列表 (有 0 条评论,61人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Flume ——故障转移

    Sink groups允许组织多个sink到一个实体上。 Sink processors能够提供在组内所有Sink之间实现负载均衡的能力,而且在失败的情况下能够进行故障转移从一