spark广播变量和累加器

叁歲伎倆 2022-07-13 14:41 256阅读 0赞

spark广播变量和累加器

广播变量

Spark中分布式执行的代码需要传递到各个Executor的Task上运行。对于一些只读、固定的数据(比如从DB中读出的数据),每次都需要Driver广播到各个Task上,这样效率低下。广播变量允许将变量只广播(提前广播)给各个Executor。该Executor上的各个Task再从所在节点的BlockManager获取变量,而不是从Driver获取变量,从而提升了效率。

一个Executor只需要在第一个Task启动时,获得一份Broadcast数据,之后的Task都从本节点的BlockManager中获取相关数据。

累加器

通过SparkContext.accumulator(v)来创建accumulator类型的变量,然后运行的task可以使用“+=”操作符来进行累加。但是task不能读取到该变量,只有driver program能够读取(通过.value),这也是为了避免使用太多读写锁吧。

自定义累加器类型

累加器类型除Spark自带的int、float、Double外,也支持开发人员自定义。方法是继承AccumulatorParam[Vector]。

spark广播用法例子:

  1. import org.apache.spark.{SparkConf, SparkContext}
  2. import scala.collection.immutable.HashMap
  3. object BroadcastDemo {
  4. def main(args: Array[String]): Unit = {
  5. val conf: SparkConf = new SparkConf()
  6. .setAppName("CacheRadius")
  7. .setMaster("local[2]")
  8. val sc= new SparkContext(conf)
  9. val input="E://sparkData/cacheAndPersist.txt"
  10. val data=sc.textFile(input).map(_.split("\\|",100)).map(line => {
  11. val Array(privateIP, account,timeFormat, timeType) = line
  12. (privateIP, (account, timeFormat.toLong, timeType.toInt))
  13. })
  14. var accountHash = new HashMap[String, Set[(String, Long, Int)]]()
  15. data.groupByKey().collect().foreach(x => {
  16. accountHash += (x._1 -> x._2.toSet)
  17. })
  18. val broacast=sc.broadcast(accountHash)
  19. println(broacast.id)
  20. val hashvalue=broacast.value
  21. for(entry <- hashvalue){
  22. println(entry._1+"|"+entry._2)
  23. }
  24. }
  25. }

输入数据:

cacheAndPersist.txt文件数据:

10.174.97.42|31390107408|1476881928|1

10.199.26.197|31700077403|1476939505|1

10.222.227.122|15226578865|1476944785|1

10.196.126.12|31190162047|1476935964|1

10.199.119.20|31700073759|1476863280|3

10.200.4.220|31390148532|1476923569|1

10.170.239.161|31090089699|1476930496|3

10.172.4.109|31590369821|1476946852|1

10.190.124.146|31390129535|1476274156|1

10.190.142.184|31390059311|1476665662|1

10.170.242.158|31090170423|1476491831|1

10.199.119.20|31700073753|1476863283|1

10.200.4.220|31390348533|1476923569|3

10.170.239.161|31090389699|1476930496|3

10.172.4.109|31590363821|1476946852|1

10.190.124.146|31390329535|1476274156|1

10.190.142.184|31393059311|1476665662|3

10.170.242.158|31093170423|1476491831|3

运行结果:

Center

源码:

是SparkContext类的方法:

Center 1

Spark累加器用法例子:

  1. import org.apache.spark.{AccumulatorParam, SparkConf, SparkContext}
  2. /**
  3. * Created by User on 2016/11/2.
  4. */
  5. object AccumulatorDemo {
  6. def main(args: Array[String]): Unit = {
  7. val conf: SparkConf = new SparkConf()
  8. .setMaster("local[2]")
  9. .setAppName("AccumulatorDemo")
  10. val sc: SparkContext = new SparkContext(conf)
  11. val arrAccu=Array(0L,0L,0L,0L,0L)
  12. val accumulatorArr=sc.accumulator(arrAccu,"HADOOP")(MyAcculumatorParam)
  13. val accumulatorMl=sc.accumulator(0,"ML")
  14. val accumulatorDl=sc.accumulator(0L,"DL")
  15. val arr=Array("ML","DL","CNN","RNN","ML","HADOOP","SPARK","ML")
  16. for(i <- 0 to arr.length-1){
  17. if(arr(i).equals("ML")){
  18. accumulatorMl += 1
  19. }else if(arr(i).equals("DL")){
  20. accumulatorDl+=1
  21. }else if(arr(i).equals("HADOOP")){
  22. accumulatorArr += Array(1L,1L,1L,1L,1L)
  23. }
  24. }
  25. println("ML="+accumulatorMl.name.get+"、"+accumulatorMl.value)
  26. println("DL="+accumulatorDl.name.get+"、"+accumulatorDl.value)
  27. println("HADOOP="+accumulatorArr.name.get+"、"+accumulatorArr.value.mkString(","))
  28. }
  29. object MyAcculumatorParam extends AccumulatorParam[Array[Long]]{
  30. override def addInPlace(r1: Array[Long], r2: Array[Long]): Array[Long] = {
  31. r1.zip(r2).map(x => x._1+x._2)
  32. }
  33. def zero(initialValue: Array[Long]): Array[Long] = {
  34. new Array[Long](initialValue.length)
  35. }
  36. }
  37. }

输出:

Center 2

不加(MyAcculumatorParam)的话,运行时候会报错,

Error:(15, 38) could not find implicit value for parameter param: org.apache.spark.AccumulatorParam[Array[Long]]

val accumulatorArr=sc.accumulator(arrAccu,”HADOOP”)

Error:(15, 38) not enough arguments for method accumulator: (implicit param: org.apache.spark.AccumulatorParam[Array[Long]])org.apache.spark.Accumulator[Array[Long]].

Unspecified value parameter param.

val accumulatorArr=sc.accumulator(arrAccu,”HADOOP”)

  1. 源码:
  2. /**
  3. * A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
  4. * in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be
  5. * available when you create Accumulators of a specific type.
  6. *
  7. * @tparam T type of value to accumulate
  8. */
  9. trait AccumulatorParam[T] extends AccumulableParam[T, T] {
  10. def addAccumulator(t1: T, t2: T): T = {
  11. addInPlace(t1, t2)
  12. }
  13. }
  14. object AccumulatorParam {
  15. // The following implicit objects were in SparkContext before 1.2 and users had to
  16. // `import SparkContext._` to enable them. Now we move them here to make the compiler find
  17. // them automatically. However, as there are duplicate codes in SparkContext for backward
  18. // compatibility, please update them accordingly if you modify the following implicit objects.
  19. implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
  20. def addInPlace(t1: Double, t2: Double): Double = t1 + t2
  21. def zero(initialValue: Double): Double = 0.0
  22. }
  23. implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
  24. def addInPlace(t1: Int, t2: Int): Int = t1 + t2
  25. def zero(initialValue: Int): Int = 0
  26. }
  27. implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
  28. def addInPlace(t1: Long, t2: Long): Long = t1 + t2
  29. def zero(initialValue: Long): Long = 0L
  30. }
  31. implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
  32. def addInPlace(t1: Float, t2: Float): Float = t1 + t2
  33. def zero(initialValue: Float): Float = 0f
  34. }
  35. // TODO: Add AccumulatorParams for other types, e.g. lists and strings
  36. }

发表评论

表情:
评论列表 (有 0 条评论,256人围观)

还没有评论,来说两句吧...

相关阅读