spark广播变量和累加器
spark广播变量和累加器
广播变量
Spark中分布式执行的代码需要传递到各个Executor的Task上运行。对于一些只读、固定的数据(比如从DB中读出的数据),每次都需要Driver广播到各个Task上,这样效率低下。广播变量允许将变量只广播(提前广播)给各个Executor。该Executor上的各个Task再从所在节点的BlockManager获取变量,而不是从Driver获取变量,从而提升了效率。
一个Executor只需要在第一个Task启动时,获得一份Broadcast数据,之后的Task都从本节点的BlockManager中获取相关数据。
累加器
通过SparkContext.accumulator(v)来创建accumulator类型的变量,然后运行的task可以使用“+=”操作符来进行累加。但是task不能读取到该变量,只有driver program能够读取(通过.value),这也是为了避免使用太多读写锁吧。
自定义累加器类型
累加器类型除Spark自带的int、float、Double外,也支持开发人员自定义。方法是继承AccumulatorParam[Vector]。
spark广播用法例子:
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.immutable.HashMap
object BroadcastDemo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("CacheRadius")
.setMaster("local[2]")
val sc= new SparkContext(conf)
val input="E://sparkData/cacheAndPersist.txt"
val data=sc.textFile(input).map(_.split("\\|",100)).map(line => {
val Array(privateIP, account,timeFormat, timeType) = line
(privateIP, (account, timeFormat.toLong, timeType.toInt))
})
var accountHash = new HashMap[String, Set[(String, Long, Int)]]()
data.groupByKey().collect().foreach(x => {
accountHash += (x._1 -> x._2.toSet)
})
val broacast=sc.broadcast(accountHash)
println(broacast.id)
val hashvalue=broacast.value
for(entry <- hashvalue){
println(entry._1+"|"+entry._2)
}
}
}
输入数据:
cacheAndPersist.txt文件数据:
10.174.97.42|31390107408|1476881928|1
10.199.26.197|31700077403|1476939505|1
10.222.227.122|15226578865|1476944785|1
10.196.126.12|31190162047|1476935964|1
10.199.119.20|31700073759|1476863280|3
10.200.4.220|31390148532|1476923569|1
10.170.239.161|31090089699|1476930496|3
10.172.4.109|31590369821|1476946852|1
10.190.124.146|31390129535|1476274156|1
10.190.142.184|31390059311|1476665662|1
10.170.242.158|31090170423|1476491831|1
10.199.119.20|31700073753|1476863283|1
10.200.4.220|31390348533|1476923569|3
10.170.239.161|31090389699|1476930496|3
10.172.4.109|31590363821|1476946852|1
10.190.124.146|31390329535|1476274156|1
10.190.142.184|31393059311|1476665662|3
10.170.242.158|31093170423|1476491831|3
运行结果:
源码:
是SparkContext类的方法:
Spark累加器用法例子:
import org.apache.spark.{AccumulatorParam, SparkConf, SparkContext}
/**
* Created by User on 2016/11/2.
*/
object AccumulatorDemo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local[2]")
.setAppName("AccumulatorDemo")
val sc: SparkContext = new SparkContext(conf)
val arrAccu=Array(0L,0L,0L,0L,0L)
val accumulatorArr=sc.accumulator(arrAccu,"HADOOP")(MyAcculumatorParam)
val accumulatorMl=sc.accumulator(0,"ML")
val accumulatorDl=sc.accumulator(0L,"DL")
val arr=Array("ML","DL","CNN","RNN","ML","HADOOP","SPARK","ML")
for(i <- 0 to arr.length-1){
if(arr(i).equals("ML")){
accumulatorMl += 1
}else if(arr(i).equals("DL")){
accumulatorDl+=1
}else if(arr(i).equals("HADOOP")){
accumulatorArr += Array(1L,1L,1L,1L,1L)
}
}
println("ML="+accumulatorMl.name.get+"、"+accumulatorMl.value)
println("DL="+accumulatorDl.name.get+"、"+accumulatorDl.value)
println("HADOOP="+accumulatorArr.name.get+"、"+accumulatorArr.value.mkString(","))
}
object MyAcculumatorParam extends AccumulatorParam[Array[Long]]{
override def addInPlace(r1: Array[Long], r2: Array[Long]): Array[Long] = {
r1.zip(r2).map(x => x._1+x._2)
}
def zero(initialValue: Array[Long]): Array[Long] = {
new Array[Long](initialValue.length)
}
}
}
输出:
不加(MyAcculumatorParam)的话,运行时候会报错,
Error:(15, 38) could not find implicit value for parameter param: org.apache.spark.AccumulatorParam[Array[Long]]
val accumulatorArr=sc.accumulator(arrAccu,”HADOOP”)
Error:(15, 38) not enough arguments for method accumulator: (implicit param: org.apache.spark.AccumulatorParam[Array[Long]])org.apache.spark.Accumulator[Array[Long]].
Unspecified value parameter param.
val accumulatorArr=sc.accumulator(arrAccu,”HADOOP”)
源码:
/**
* A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
* in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be
* available when you create Accumulators of a specific type.
*
* @tparam T type of value to accumulate
*/
trait AccumulatorParam[T] extends AccumulableParam[T, T] {
def addAccumulator(t1: T, t2: T): T = {
addInPlace(t1, t2)
}
}
object AccumulatorParam {
// The following implicit objects were in SparkContext before 1.2 and users had to
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
// them automatically. However, as there are duplicate codes in SparkContext for backward
// compatibility, please update them accordingly if you modify the following implicit objects.
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double): Double = 0.0
}
implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
def zero(initialValue: Int): Int = 0
}
implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
def addInPlace(t1: Long, t2: Long): Long = t1 + t2
def zero(initialValue: Long): Long = 0L
}
implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
def addInPlace(t1: Float, t2: Float): Float = t1 + t2
def zero(initialValue: Float): Float = 0f
}
// TODO: Add AccumulatorParams for other types, e.g. lists and strings
}
还没有评论,来说两句吧...