Spark内核Shuffle流程及源码
Spark内核Shuffle流程及源码
目录
- Spark内核Shuffle流程及源码
- 一、Shuffle的原理和执行过程
- 二、HashShuffle解析
- 2.1 未优化的HashShuffle
- 2.2 优化后的HashShuffle
- 三、SortShuffle解析
- 3.1 SortShuffle
- 3.2 bypassShuffle
- 四、Shuffle写磁盘
- 4.1 shuffleWriterProcessor(写处理器)
- 4.2 使用BypassShuffle条件
- 4.3 使用SerializedShuffle条件
- 4.4 使用BaseShuffle
- 4.5 插入数据(缓存+溢写)
- 4.6 merge合并
- 4.7 写磁盘
- 五、Shuffle读取磁盘
Spark最初版本HashShuffle
Spark0.8.1版本以后优化后的HashShuffle
Spark1.1版本加入SortShuffle,默认是HashShuffle
Spark1.2版本默认是SortShuffle,但是可配置HashShuffle
Spark2.0版本删除HashShuffle只有SortShuffle
一、Shuffle的原理和执行过程
Shuffle一定会有落盘。
- 如果shuffle过程中落盘数据量减少,那么可以提高性能。
- 算子如果存在预聚合功能,可以提高shuffle的性能。
二、HashShuffle解析
2.1 未优化的HashShuffle
2.2 优化后的HashShuffle
优化的HashShuffle过程就是启用合并机制,合并机制就是复用buffer,开启合并机制的配置是spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。
官网参数说明:http://spark.apache.org/docs/0.8.1/configuration.html
三、SortShuffle解析
3.1 SortShuffle
在该模式下,数据会先写入一个数据结构,reduceByKey写入Map,一边通过Map局部聚合,一边写入内存。Join算子写入ArrayList直接写入内存中。然后需要判断是否达到阈值,如果达到就会将内存数据结构的数据写入到磁盘,清空内存数据结构。
在溢写磁盘前,先根据key进行排序,排序过后的数据,会分批写入到磁盘文件中。默认批次为10000条,数据会以每批一万条写入到磁盘文件。写入磁盘文件通过缓冲区溢写的方式,每次溢写都会产生一个磁盘文件,也就是说一个Task过程会产生多个临时文件。
最后在每个Task中,将所有的临时文件合并,这就是merge过程,此过程将所有临时文件读取出来,一次写入到最终文件。意味着一个Task的所有数据都在这一个文件中。同时单独写一份索引文件,标识下游各个Task的数据在文件中的索引,start offset和end offset。
3.2 bypassShuffle
bypassShuffle和SortShuffle的区别就是不对数据排序。
bypass运行机制的触发条件如下:
1)shuffle reduce task数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值,默认为200。
2)不是聚合类的shuffle算子(比如reduceByKey不行)。
四、Shuffle写磁盘
4.1 shuffleWriterProcessor(写处理器)
DAGScheduler.scala
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
... ...
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
// shuffle写过程
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
// shuffle读过程
case stage: ResultStage =>
... ...
}
} catch {
... ...
}
}
Task.scala
private[spark] abstract class Task[T](... ...) extends Serializable {
final def run(... ...): T = {
runTask(context)
}
}
Ctrl+h查找runTask 实现类ShuffleMapTask.scala
private[spark] class ShuffleMapTask(... ...)
extends Task[MapStatus](... ...){
override def runTask(context: TaskContext): MapStatus = {
... ...
dep.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition)
}
}
ShuffleWriteProcessor.scala
def write(... ...): MapStatus = {
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](
dep.shuffleHandle,
mapId,
context,
createMetricsReporter(context))
writer.write(
rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
} catch {
... ...
}
}
查找(ctrl + h)ShuffleManager的实现类,SortShuffleManager
SortShuffleManager.scala
override def getWriter[K, V]( handle: ShuffleHandle,
mapId: Long,
context: TaskContext,
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] =
... ...
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(... ...)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(... ...)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(... ...)
}
}
因为getWriter的第一个输入参数是dep.shuffleHandle,点击dep.shuffleHandle
Dependency.scala
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(shuffleId, this)
ShuffleManager.scala
def registerShuffle[K, V, C](shuffleId: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle
4.2 使用BypassShuffle条件
BypassMergeSortShuffleHandle使用条件:
1)不能使用预聚合
2)如果下游的分区数量小于等于200(可配置)
处理器 | 写对象 | 判断条件 |
---|---|---|
SerializedShuffleHandle | UnsafeShuffleWriter | 1.序列化规则支持重定位操作(java序列化不支持,Kryo支持)2.不能使用预聚合 3.如果下游的分区数量小于或等于1677216 |
BypassMergeSortShuffleHandle | BypassMergeSortShuffleWriter | 1.不能使用预聚合 2.如果下游的分区数量小于等于200(可配置) |
BaseShuffleHandle | SortShuffleWriter | 其他情况 |
查找(ctrl + h)registerShuffle 实现类,SortShuffleManager.scala
override def registerShuffle[K, V, C](
shuffleId: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
//使用BypassShuffle条件:不能使用预聚合功能;默认下游分区数据不能大于200
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
new BypassMergeSortShuffleHandle[K, V](
shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
new SerializedShuffleHandle[K, V](
shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
new BaseShuffleHandle(shuffleId, dependency)
}
}
点击shouldBypassMergeSort
SortShuffleWriter.scala
private[spark] object SortShuffleWriter {
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
// 是否有map阶段预聚合(支持预聚合不能用)
if (dep.mapSideCombine) {
false
} else {
// SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD = 200分区
val bypassMergeThreshold: Int = conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD)
// 如果下游分区器的数量,小于200(可配置),可以使用bypass
dep.partitioner.numPartitions <= bypassMergeThreshold
}
}
}
4.3 使用SerializedShuffle条件
SerializedShuffleHandle使用条件:
- 序列化规则支持重定位操作(java序列化不支持,Kryo支持)
- 不能使用预聚合
- 如果下游的分区数量小于或等于1677216
点击canUseSerializedShuffle
SortShuffleManager.scala
def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
val shufId = dependency.shuffleId
val numPartitions = dependency.partitioner.numPartitions
// 是否支持将两个独立的序列化对象 重定位,聚合到一起
// 1默认的java序列化不支持;Kryo序列化支持重定位(可以用)
if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
false
} else if (dependency.mapSideCombine) { // 2支持预聚合也不能用
false
} else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) { //3如果下游分区的数量大于16777216,也不能用
false
} else {
true
}
}
4.4 使用BaseShuffle
点击SortShuffleWriter
SortShuffleWriter.scala
override def write(records: Iterator[Product2[K, V]]): Unit = {
// 判断是否有预聚合功能,支持会有aggregator和排序规则
sorter = if (dep.mapSideCombine) {
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
// 插入数据
sorter.insertAll(records)
val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
dep.shuffleId, mapId, dep.partitioner.numPartitions)
// 插入数据
sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)
val partitionLengths = mapOutputWriter.commitAllPartitions()
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
}
4.5 插入数据(缓存+溢写)
ExternalSorter.scala
def insertAll(records: Iterator[Product2[K, V]]): Unit = {
val shouldCombine = aggregator.isDefined
// 判断是否支持预聚合,支持预聚合,采用map结构,不支持预聚合采用buffer结构
if (shouldCombine) {
val mergeValue = aggregator.get.mergeValue
val createCombiner = aggregator.get.createCombiner
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (records.hasNext) {
addElementsRead()
kv = records.next()
// 如果支持预聚合,在map阶段聚合,将相同key,的value聚合
map.changeValue((getPartition(kv._1), kv._1), update)
// 是否能够溢写
maybeSpillCollection(usingMap = true)
}
} else {
while (records.hasNext) {
addElementsRead()
val kv = records.next()
// 如果不支持预聚合,value不需要聚合 (key,(value1,value2))
buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
maybeSpillCollection(usingMap = false)
}
}
}
private def maybeSpillCollection(usingMap: Boolean): Unit = {
var estimatedSize = 0L
if (usingMap) {
estimatedSize = map.estimateSize()
if (maybeSpill(map, estimatedSize)) {
map = new PartitionedAppendOnlyMap[K, C]
}
} else {
estimatedSize = buffer.estimateSize()
if (maybeSpill(buffer, estimatedSize)) {
buffer = new PartitionedPairBuffer[K, C]
}
}
if (estimatedSize > _peakMemoryUsedBytes) {
_peakMemoryUsedBytes = estimatedSize
}
}
Spillable.scala
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
var shouldSpill = false
// myMemoryThreshold默认值内存门槛是5m
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
val amountToRequest = 2 * currentMemory - myMemoryThreshold
// 申请内存
val granted = acquireMemory(amountToRequest)
myMemoryThreshold += granted
// 当前内存大于(尝试申请的内存+门槛),就需要溢写了
shouldSpill = currentMemory >= myMemoryThreshold
}
// 强制溢写 读取数据的值 超过了Int的最大值
shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
if (shouldSpill) {
_spillCount += 1
logSpillage(currentMemory)
// 溢写
spill(collection)
_elementsRead = 0
_memoryBytesSpilled += currentMemory
// 释放内存
releaseMemory()
}
shouldSpill
}
protected def spill(collection: C): Unit
查找(ctrl +h)spill 的实现类ExternalSorter
ExternalSorter.scala
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
spills += spillFile
}
private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator)
: SpilledFile = {
// 创建临时文件
val (blockId, file) = diskBlockManager.createTempShuffleBlock()
var objectsWritten: Long = 0
val spillMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics
// 溢写文件前,fileBufferSize缓冲区大小默认32m
val writer: DiskBlockObjectWriter =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)
… …
SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)
}
4.6 merge合并
来到SortShuffleWriter.scala
override def write(records: Iterator[Product2[K, V]]): Unit = {
sorter = if (dep.mapSideCombine) {
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
sorter.insertAll(records)
val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
dep.shuffleId, mapId, dep.partitioner.numPartitions)
// 合并
sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)
val partitionLengths = mapOutputWriter.commitAllPartitions()
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
}
ExternalSorter.scala
def writePartitionedMapOutput(
shuffleId: Int,
mapId: Long,
mapOutputWriter: ShuffleMapOutputWriter): Unit = {
var nextPartitionId = 0
// 如果溢写文件为空,只对内存中数据处理
if (spills.isEmpty) {
// Case where we only have in-memory data
... ...
} else {
// We must perform merge-sort; get an iterator by partition and write everything directly.
//如果溢写文件不为空,需要将多个溢写文件合并
for ((id, elements) <- this.partitionedIterator) {
val blockId = ShuffleBlockId(shuffleId, mapId, id)
var partitionWriter: ShufflePartitionWriter = null
var partitionPairsWriter: ShufflePartitionPairsWriter = null
… …
} {
if (partitionPairsWriter != null) {
partitionPairsWriter.close()
}
}
nextPartitionId = id + 1
}
}
… …
}
def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
val usingMap = aggregator.isDefined
val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
if (spills.isEmpty) {
if (ordering.isEmpty) {
groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
} else {
groupByPartition(destructiveIterator(
collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
}
} else {
// 合并溢写文件和内存中数据
merge(spills, destructiveIterator(
collection.partitionedDestructiveSortedIterator(comparator)))
}
}
private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
: Iterator[(Int, Iterator[Product2[K, C]])] = {
val readers = spills.map(new SpillReader(_))
val inMemBuffered = inMemory.buffered
(0 until numPartitions).iterator.map { p =>
val inMemIterator = new IteratorForPartition(p, inMemBuffered)
val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
if (aggregator.isDefined) {
(p, mergeWithAggregation(
iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
} else if (ordering.isDefined) {
// 归并排序
(p, mergeSort(iterators, ordering.get))
} else {
(p, iterators.iterator.flatten)
}
}
}
4.7 写磁盘
来到SortShuffleWriter.scala
override def write(records: Iterator[Product2[K, V]]): Unit = {
sorter = if (dep.mapSideCombine) {
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
sorter.insertAll(records)
val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
dep.shuffleId, mapId, dep.partitioner.numPartitions)
// 合并
sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)
// 写磁盘
val partitionLengths = mapOutputWriter.commitAllPartitions()
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
}
查找(ctrl + h)commitAllPartitions实现类,来到LocalDiskShuffleMapOutputWriter.java
public long[] commitAllPartitions() throws IOException {
if (outputFileChannel != null && outputFileChannel.position() != bytesWrittenToMergedFile) {
... ...
}
cleanUp();
File resolvedTmp = outputTempFile != null && outputTempFile.isFile() ? outputTempFile : null;
blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, resolvedTmp);
return partitionLengths;
}
IndexShuffleBlockResolver.scala
def writeIndexFileAndCommit(
shuffleId: Int,
mapId: Long,
lengths: Array[Long],
dataTmp: File): Unit = {
val indexFile = getIndexFile(shuffleId, mapId)
val indexTmp = Utils.tempFileWith(indexFile)
try {
val dataFile = getDataFile(shuffleId, mapId)
synchronized {
val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
if (existingLengths != null) {
System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
if (dataTmp != null && dataTmp.exists()) {
dataTmp.delete()
}
} else {
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
Utils.tryWithSafeFinally {
var offset = 0L
out.writeLong(offset)
for (length <- lengths) {
offset += length
out.writeLong(offset)
}
} {
out.close()
}
if (indexFile.exists()) {
indexFile.delete()
}
if (dataFile.exists()) {
dataFile.delete()
}
if (!indexTmp.renameTo(indexFile)) {
throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
}
if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
}
}
}
} finally {
... ...
}
}
五、Shuffle读取磁盘
DAGScheduler.scala
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
... ...
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
... ...
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
}
} catch {
... ...
}
}
ResultTask.scala
private[spark] class ResultTask[T, U](... ...)
extends Task[U](... ...)
with Serializable {
override def runTask(context: TaskContext): U = {
func(context, rdd.iterator(partition, context))
}
}
RDD.scala
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
}
private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
... ...
computeOrReadCheckpoint(partition, context)
... ...
}
def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] ={
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}
def compute(split: Partition, context: TaskContext): Iterator[T]
全局查找compute,由于我们是ShuffledRDD,所以点击ShuffledRDD.scala,搜索compute
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
val metrics = context.taskMetrics().createTempShuffleReadMetrics()
SparkEnv.get.shuffleManager.getReader(
dep.shuffleHandle, split.index, split.index + 1, context, metrics)
.read()
.asInstanceOf[Iterator[(K, C)]]
}
ShuffleManager.scala文件
def getReader[K, C](... ...): ShuffleReader[K, C]
查找(ctrl + h)getReader 的实现类,SortShuffleManager.scala
override def getReader[K, C](... ...): ShuffleReader[K, C] = {
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
handle.shuffleId, startPartition, endPartition)
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context))
}
在BlockStoreShuffleReader.scala文件中查找read方法
override def read(): Iterator[Product2[K, C]] = {
val wrappedStreams = new ShuffleBlockFetcherIterator(
... ...
// 读缓冲区大小 默认 48m
SparkEnv.get.conf.get(config.REDUCER_MAX_SIZE_IN_FLIGHT) * 1024 * 1024,
SparkEnv.get.conf.get(config.REDUCER_MAX_REQS_IN_FLIGHT),
... ...
}
还没有评论,来说两句吧...