Flink实操 : 广播变量/累加器/分布式缓存

£神魔★判官ぃ 2022-11-04 15:20 364阅读 0赞

.

  • 一 .前言
  • 二 .广播变量使用
    • 2.1.前言
    • 2.2. 使用
  • 三 .累加器
    • 3.1. 前言
    • 3.2. 使用
  • 四 .分布式缓存
    • 4.1. 前言
    • 4.2.使用

一 .前言

二 .广播变量使用

2.1.前言

Flink支持广播。可以将数据广播到TaskManager上,数据存储到内存中。

数据存储在内存中,这样可以减缓大量的shuffle操作;比如在数据join阶段,不可避免的就是大量的shuffle操作,我们可以把其中一个dataStream广播出去,一直加载到taskManager的内存中,可以直接在内存中拿数据,避免了大量的shuffle,导致集群性能下降;

广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。
另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的。

在这里插入图片描述

  • 可以理解广播就是一个公共的共享变量
  • 将一个数据集广播后,不同的Task都可以在节点上获取到
  • 每个节点只存一份
  • 如果不使用广播,每一个Task都会拷贝一份数据集,造成内存资源浪费

2.2. 使用

  • 在需要使用广播的操作后,使用withBroadcastSet创建广播
  • 在操作中,使用getRuntimeContext.getBroadcastVariable[广播数据类型](广播名)获取广播变量

    package com.boyi.broadcast

  1. import org.apache.flink.api.common.functions.RichMapFunction
  2. import org.apache.flink.api.scala._
  3. import org.apache.flink.configuration.Configuration
  4. object BroadCastDemo {
  5. def main(args: Array[String]): Unit = {
  6. // 1. 获取`ExecutionEnvironment`运行环境
  7. val env = ExecutionEnvironment.getExecutionEnvironment
  8. // 1. 分别创建两个数据集
  9. val studentDataSet: DataSet[(Int, String)] = env.fromCollection(List((1, "张三"), (2, "李四"), (3, "王五")))
  10. val scoreDataSet: DataSet[(Int, String, Int)] = env.fromCollection(List((1, "语文", 50), (2, "数学", 70), (3, "英文", 86)))
  11. // 1. 使用`RichMapFunction`对`成绩`数据集进行map转换
  12. // 将成绩数据(学生ID,学科,成绩) -> (学生姓名,学科,成绩)
  13. val resultDataSet: DataSet[(String, String, Int)] = scoreDataSet.map(new RichMapFunction[(Int, String, Int), (String, String, Int)] {
  14. var bc_studentList: List[(Int, String)] = null
  15. // - 重写`open`方法中,获取广播数据
  16. override def open(parameters: Configuration): Unit = {
  17. import scala.collection.JavaConverters._
  18. bc_studentList = getRuntimeContext.getBroadcastVariable[(Int, String)]("bc_student").asScala.toList
  19. }
  20. // - 在`map`方法中使用广播进行转换
  21. override def map(value: (Int, String, Int)): (String, String, Int) = {
  22. // 获取学生ID
  23. val studentId: Int = value._1
  24. // 过滤出和学生ID相同的内容
  25. val tuples: List[(Int, String)] = bc_studentList.filter((x: (Int, String)) => x._1 == studentId)
  26. // 构建元组
  27. (tuples(0)._2,value._2,value._3)
  28. }
  29. }).withBroadcastSet(studentDataSet, "bc_student")
  30. // 3. 打印测试
  31. resultDataSet.print()
  32. }
  33. }

三 .累加器

3.1. 前言

Accumulator 即累加器,与 MapReduce counter 的应用场景差不多,都能很好地观察task在运行期间的数据变化
可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。

Flink现在有以下内置累加器。每个累加器都实现了Accumulator接口。

  • IntCounter
  • LongCounter
  • DoubleCounter

3.2. 使用

遍历下列数据, 打印出单词的总数

  1. "a","b","c","d"

开发步骤:

  1. 获取批处理环境
  2. 加载本地集合
  3. map转换

    1. 定义累加器
    2. 注册累加器
    3. 累加数据
  4. 数据写入到文件中
  5. 执行任务,获取任务执行结果对象(JobExecutionResult)
  6. 获取累加器数值
  7. 打印数值

    package com.boyi.broadcast

    import org.apache.flink.api.common.accumulators.IntCounter
    import org.apache.flink.api.common.functions.RichMapFunction
    import org.apache.flink.api.scala.ExecutionEnvironment
    import org.apache.flink.configuration.Configuration

    /* counter 累加器 */
    object BatchDemoCounter {
    def main(args: Array[String]): Unit = {

    1. //获取执行环境
    2. val env = ExecutionEnvironment.getExecutionEnvironment
    3. import org.apache.flink.api.scala._
    4. val data = env.fromElements("a","b","c","d")
    5. val res = data.map(new RichMapFunction[String,String] {
    6. //1:定义累加器
    7. val numLines = new IntCounter
    8. override def open(parameters: Configuration): Unit = {
    9. super.open(parameters)
    10. //2:注册累加器
    11. getRuntimeContext.addAccumulator("num-lines",this.numLines)
    12. }
    13. var sum = 0;
    14. override def map(value: String) = {
    15. //如果并行度为1,使用普通的累加求和即可,但是设置多个并行度,则普通的累加求和结果就不准了
    16. sum += 1;
    17. System.out.println("sum:"+sum);
    18. this.numLines.add(1)
    19. value
    20. }
    21. }).setParallelism(1)

    res.writeAsText(“/opt/a/tmp/BatchDemoCounter”)

    1. val jobResult = env.execute("BatchDemoCounterScala")
    2. // //3:获取累加器
    3. val num = jobResult.getAccumulatorResult[Int]("num-lines")
    4. println("num:"+num)

    }
    }

四 .分布式缓存

4.1. 前言

Flink提供了一个类似于Hadoop的分布式缓存,让并行运行实例的函数可以在本地访问。
这个功能可以被使用来分享外部静态的数据.

缓存的使用流程:

使用ExecutionEnvironment实例对本地的或者远程的文件(例如:HDFS上的文件),为缓存文件指定一个名字注册该缓存文件。当程序执行时候,Flink会自动将复制文件或者目录到所有worker节点的本地文件系统中,函数可以根据名字去该节点的本地文件系统中检索该文件!

注意:广播是将变量分发到各个worker节点的内存上,分布式缓存是将文件缓存到各个worker节点上

4.2.使用

遍历下列数据, 并在open方法中获取缓存的文件

  1. a,b,c,d
  2. import org.apache.commons.io.FileUtils
  3. import org.apache.flink.api.common.functions.RichMapFunction
  4. import org.apache.flink.api.scala.ExecutionEnvironment
  5. import org.apache.flink.configuration.Configuration
  6. /** * 分布式缓存 */
  7. object BatchDemoDisCache {
  8. def main(args: Array[String]): Unit = {
  9. //获取执行环境
  10. val env = ExecutionEnvironment.getExecutionEnvironment
  11. //隐式转换
  12. import org.apache.flink.api.scala._
  13. //1:注册文件
  14. env.registerCachedFile("/opt/a/tmp/BatchDemoDisCache.txt","BatchDemoDisCache.txt")
  15. //读取数据
  16. val data = env.fromElements("a","b","c","d")
  17. val result = data.map(new RichMapFunction[String,String] {
  18. override def open(parameters: Configuration): Unit = {
  19. super.open(parameters)
  20. //访问数据
  21. val myFile = getRuntimeContext.getDistributedCache.getFile("BatchDemoDisCache.txt")
  22. val lines = FileUtils.readLines(myFile)
  23. val it = lines.iterator()
  24. while (it.hasNext){
  25. val line = it.next();
  26. println("line:"+line)
  27. }
  28. }
  29. override def map(value: String) = {
  30. value
  31. }
  32. })
  33. result.print()
  34. }
  35. }

地址: https://github.com/BoYiZhang/flink-demo

发表评论

表情:
评论列表 (有 0 条评论,364人围观)

还没有评论,来说两句吧...

相关阅读