Flink实操 : 广播变量/累加器/分布式缓存
.
- 一 .前言
- 二 .广播变量使用
- 2.1.前言
- 2.2. 使用
- 三 .累加器
- 3.1. 前言
- 3.2. 使用
- 四 .分布式缓存
- 4.1. 前言
- 4.2.使用
一 .前言
二 .广播变量使用
2.1.前言
Flink支持广播。可以将数据广播到TaskManager上,数据存储到内存中。
数据存储在内存中,这样可以减缓大量的shuffle操作;比如在数据join阶段,不可避免的就是大量的shuffle操作,我们可以把其中一个dataStream广播出去,一直加载到taskManager的内存中,可以直接在内存中拿数据,避免了大量的shuffle,导致集群性能下降;
广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。
另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的。
- 可以理解广播就是一个公共的共享变量
- 将一个数据集广播后,不同的Task都可以在节点上获取到
- 每个节点
只存一份
- 如果不使用广播,每一个Task都会拷贝一份数据集,造成内存资源浪费
2.2. 使用
- 在需要使用广播的操作后,使用
withBroadcastSet
创建广播 在操作中,使用getRuntimeContext.getBroadcastVariable
[广播数据类型]
(广播名
)获取广播变量package com.boyi.broadcast
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
object BroadCastDemo {
def main(args: Array[String]): Unit = {
// 1. 获取`ExecutionEnvironment`运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
// 1. 分别创建两个数据集
val studentDataSet: DataSet[(Int, String)] = env.fromCollection(List((1, "张三"), (2, "李四"), (3, "王五")))
val scoreDataSet: DataSet[(Int, String, Int)] = env.fromCollection(List((1, "语文", 50), (2, "数学", 70), (3, "英文", 86)))
// 1. 使用`RichMapFunction`对`成绩`数据集进行map转换
// 将成绩数据(学生ID,学科,成绩) -> (学生姓名,学科,成绩)
val resultDataSet: DataSet[(String, String, Int)] = scoreDataSet.map(new RichMapFunction[(Int, String, Int), (String, String, Int)] {
var bc_studentList: List[(Int, String)] = null
// - 重写`open`方法中,获取广播数据
override def open(parameters: Configuration): Unit = {
import scala.collection.JavaConverters._
bc_studentList = getRuntimeContext.getBroadcastVariable[(Int, String)]("bc_student").asScala.toList
}
// - 在`map`方法中使用广播进行转换
override def map(value: (Int, String, Int)): (String, String, Int) = {
// 获取学生ID
val studentId: Int = value._1
// 过滤出和学生ID相同的内容
val tuples: List[(Int, String)] = bc_studentList.filter((x: (Int, String)) => x._1 == studentId)
// 构建元组
(tuples(0)._2,value._2,value._3)
}
}).withBroadcastSet(studentDataSet, "bc_student")
// 3. 打印测试
resultDataSet.print()
}
}
三 .累加器
3.1. 前言
Accumulator 即累加器,与 MapReduce counter 的应用场景差不多,都能很好地观察task在运行期间的数据变化
可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。
Flink现在有以下内置累加器。每个累加器都实现了Accumulator接口。
- IntCounter
- LongCounter
- DoubleCounter
3.2. 使用
遍历下列数据, 打印出单词的总数
"a","b","c","d"
开发步骤:
- 获取批处理环境
- 加载本地集合
map转换
- 定义累加器
- 注册累加器
- 累加数据
- 数据写入到文件中
- 执行任务,获取任务执行结果对象(JobExecutionResult)
- 获取累加器数值
打印数值
package com.boyi.broadcast
import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration/* counter 累加器 */
object BatchDemoCounter {
def main(args: Array[String]): Unit = {//获取执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val data = env.fromElements("a","b","c","d")
val res = data.map(new RichMapFunction[String,String] {
//1:定义累加器
val numLines = new IntCounter
override def open(parameters: Configuration): Unit = {
super.open(parameters)
//2:注册累加器
getRuntimeContext.addAccumulator("num-lines",this.numLines)
}
var sum = 0;
override def map(value: String) = {
//如果并行度为1,使用普通的累加求和即可,但是设置多个并行度,则普通的累加求和结果就不准了
sum += 1;
System.out.println("sum:"+sum);
this.numLines.add(1)
value
}
}).setParallelism(1)
res.writeAsText(“/opt/a/tmp/BatchDemoCounter”)
val jobResult = env.execute("BatchDemoCounterScala")
// //3:获取累加器
val num = jobResult.getAccumulatorResult[Int]("num-lines")
println("num:"+num)
}
}
四 .分布式缓存
4.1. 前言
Flink提供了一个类似于Hadoop的分布式缓存,让并行运行实例的函数可以在本地访问。
这个功能可以被使用来分享外部静态的数据.
缓存的使用流程:
使用ExecutionEnvironment实例对本地的或者远程的文件(例如:HDFS上的文件),为缓存文件指定一个名字注册该缓存文件。当程序执行时候,Flink会自动将复制文件或者目录到所有worker节点的本地文件系统中,函数可以根据名字去该节点的本地文件系统中检索该文件!
注意:广播是将变量分发到各个worker节点的内存上,分布式缓存是将文件缓存到各个worker节点上
4.2.使用
遍历下列数据, 并在open方法中获取缓存的文件
a,b,c,d
import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
/** * 分布式缓存 */
object BatchDemoDisCache {
def main(args: Array[String]): Unit = {
//获取执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//隐式转换
import org.apache.flink.api.scala._
//1:注册文件
env.registerCachedFile("/opt/a/tmp/BatchDemoDisCache.txt","BatchDemoDisCache.txt")
//读取数据
val data = env.fromElements("a","b","c","d")
val result = data.map(new RichMapFunction[String,String] {
override def open(parameters: Configuration): Unit = {
super.open(parameters)
//访问数据
val myFile = getRuntimeContext.getDistributedCache.getFile("BatchDemoDisCache.txt")
val lines = FileUtils.readLines(myFile)
val it = lines.iterator()
while (it.hasNext){
val line = it.next();
println("line:"+line)
}
}
override def map(value: String) = {
value
}
})
result.print()
}
}
地址: https://github.com/BoYiZhang/flink-demo
还没有评论,来说两句吧...