Spark内核Shuffle流程及源码

痛定思痛。 2022-11-03 11:45 256阅读 0赞

Spark内核Shuffle流程及源码

目录

  • Spark内核Shuffle流程及源码
    • 一、Shuffle的原理和执行过程
    • 二、HashShuffle解析
      • 2.1 未优化的HashShuffle
      • 2.2 优化后的HashShuffle
    • 三、SortShuffle解析
      • 3.1 SortShuffle
      • 3.2 bypassShuffle
    • 四、Shuffle写磁盘
      • 4.1 shuffleWriterProcessor(写处理器)
      • 4.2 使用BypassShuffle条件
      • 4.3 使用SerializedShuffle条件
      • 4.4 使用BaseShuffle
      • 4.5 插入数据(缓存+溢写)
      • 4.6 merge合并
      • 4.7 写磁盘
    • 五、Shuffle读取磁盘

Spark最初版本HashShuffle
Spark0.8.1版本以后优化后的HashShuffle
Spark1.1版本加入SortShuffle,默认是HashShuffle
Spark1.2版本默认是SortShuffle,但是可配置HashShuffle
Spark2.0版本删除HashShuffle只有SortShuffle

一、Shuffle的原理和执行过程

Shuffle一定会有落盘。

  • 如果shuffle过程中落盘数据量减少,那么可以提高性能。
  • 算子如果存在预聚合功能,可以提高shuffle的性能。
    在这里插入图片描述

二、HashShuffle解析

2.1 未优化的HashShuffle

在这里插入图片描述

2.2 优化后的HashShuffle

优化的HashShuffle过程就是启用合并机制,合并机制就是复用buffer,开启合并机制的配置是spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。
官网参数说明:http://spark.apache.org/docs/0.8.1/configuration.html
在这里插入图片描述

三、SortShuffle解析

3.1 SortShuffle

在这里插入图片描述

在该模式下,数据会先写入一个数据结构,reduceByKey写入Map,一边通过Map局部聚合,一边写入内存。Join算子写入ArrayList直接写入内存中。然后需要判断是否达到阈值,如果达到就会将内存数据结构的数据写入到磁盘,清空内存数据结构。
在溢写磁盘前,先根据key进行排序,排序过后的数据,会分批写入到磁盘文件中。默认批次为10000条,数据会以每批一万条写入到磁盘文件。写入磁盘文件通过缓冲区溢写的方式,每次溢写都会产生一个磁盘文件,也就是说一个Task过程会产生多个临时文件。
最后在每个Task中,将所有的临时文件合并,这就是merge过程,此过程将所有临时文件读取出来,一次写入到最终文件。意味着一个Task的所有数据都在这一个文件中。同时单独写一份索引文件,标识下游各个Task的数据在文件中的索引,start offset和end offset。

3.2 bypassShuffle

bypassShuffle和SortShuffle的区别就是不对数据排序。
bypass运行机制的触发条件如下:

1)shuffle reduce task数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值,默认为200。
2)不是聚合类的shuffle算子(比如reduceByKey不行)。
在这里插入图片描述

四、Shuffle写磁盘

4.1 shuffleWriterProcessor(写处理器)

DAGScheduler.scala

  1. private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
  2. ... ...
  3. val tasks: Seq[Task[_]] = try {
  4. val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
  5. stage match {
  6. // shuffle写过程
  7. case stage: ShuffleMapStage =>
  8. stage.pendingPartitions.clear()
  9. partitionsToCompute.map { id =>
  10. val locs = taskIdToLocations(id)
  11. val part = partitions(id)
  12. stage.pendingPartitions += id
  13. new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
  14. taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
  15. Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
  16. }
  17. // shuffle读过程
  18. case stage: ResultStage =>
  19. ... ...
  20. }
  21. } catch {
  22. ... ...
  23. }
  24. }

Task.scala

  1. private[spark] abstract class Task[T](... ...) extends Serializable {
  2. final def run(... ...): T = {
  3. runTask(context)
  4. }
  5. }

Ctrl+h查找runTask 实现类ShuffleMapTask.scala

  1. private[spark] class ShuffleMapTask(... ...)
  2. extends Task[MapStatus](... ...){
  3. override def runTask(context: TaskContext): MapStatus = {
  4. ... ...
  5. dep.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition)
  6. }
  7. }
  8. ShuffleWriteProcessor.scala
  9. def write(... ...): MapStatus = {
  10. var writer: ShuffleWriter[Any, Any] = null
  11. try {
  12. val manager = SparkEnv.get.shuffleManager
  13. writer = manager.getWriter[Any, Any](
  14. dep.shuffleHandle,
  15. mapId,
  16. context,
  17. createMetricsReporter(context))
  18. writer.write(
  19. rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
  20. writer.stop(success = true).get
  21. } catch {
  22. ... ...
  23. }
  24. }

查找(ctrl + h)ShuffleManager的实现类,SortShuffleManager
SortShuffleManager.scala

  1. override def getWriter[K, V]( handle: ShuffleHandle,
  2. mapId: Long,
  3. context: TaskContext,
  4. metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] =
  5. ... ...
  6. handle match {
  7. case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
  8. new UnsafeShuffleWriter(... ...)
  9. case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
  10. new BypassMergeSortShuffleWriter(... ...)
  11. case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
  12. new SortShuffleWriter(... ...)
  13. }
  14. }

因为getWriter的第一个输入参数是dep.shuffleHandle,点击dep.shuffleHandle
Dependency.scala

  1. val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(shuffleId, this)

ShuffleManager.scala

  1. def registerShuffle[K, V, C](shuffleId: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle

4.2 使用BypassShuffle条件

BypassMergeSortShuffleHandle使用条件:

1)不能使用预聚合

2)如果下游的分区数量小于等于200(可配置)


























处理器 写对象 判断条件
SerializedShuffleHandle UnsafeShuffleWriter 1.序列化规则支持重定位操作(java序列化不支持,Kryo支持)2.不能使用预聚合 3.如果下游的分区数量小于或等于1677216
BypassMergeSortShuffleHandle BypassMergeSortShuffleWriter 1.不能使用预聚合 2.如果下游的分区数量小于等于200(可配置)
BaseShuffleHandle SortShuffleWriter 其他情况

查找(ctrl + h)registerShuffle 实现类,SortShuffleManager.scala

  1. override def registerShuffle[K, V, C](
  2. shuffleId: Int,
  3. dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
  4. //使用BypassShuffle条件:不能使用预聚合功能;默认下游分区数据不能大于200
  5. if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
  6. new BypassMergeSortShuffleHandle[K, V](
  7. shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
  8. } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
  9. new SerializedShuffleHandle[K, V](
  10. shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
  11. } else {
  12. new BaseShuffleHandle(shuffleId, dependency)
  13. }
  14. }

点击shouldBypassMergeSort
SortShuffleWriter.scala

  1. private[spark] object SortShuffleWriter {
  2. def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
  3. // 是否有map阶段预聚合(支持预聚合不能用)
  4. if (dep.mapSideCombine) {
  5. false
  6. } else {
  7. // SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD = 200分区
  8. val bypassMergeThreshold: Int = conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD)
  9. // 如果下游分区器的数量,小于200(可配置),可以使用bypass
  10. dep.partitioner.numPartitions <= bypassMergeThreshold
  11. }
  12. }
  13. }

4.3 使用SerializedShuffle条件

SerializedShuffleHandle使用条件:

  1. 序列化规则支持重定位操作(java序列化不支持,Kryo支持)
  2. 不能使用预聚合
  3. 如果下游的分区数量小于或等于1677216

点击canUseSerializedShuffle
SortShuffleManager.scala

  1. def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
  2. val shufId = dependency.shuffleId
  3. val numPartitions = dependency.partitioner.numPartitions
  4. // 是否支持将两个独立的序列化对象 重定位,聚合到一起
  5. // 1默认的java序列化不支持;Kryo序列化支持重定位(可以用)
  6. if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
  7. false
  8. } else if (dependency.mapSideCombine) { // 2支持预聚合也不能用
  9. false
  10. } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) { //3如果下游分区的数量大于16777216,也不能用
  11. false
  12. } else {
  13. true
  14. }
  15. }

4.4 使用BaseShuffle

点击SortShuffleWriter
SortShuffleWriter.scala

  1. override def write(records: Iterator[Product2[K, V]]): Unit = {
  2. // 判断是否有预聚合功能,支持会有aggregator和排序规则
  3. sorter = if (dep.mapSideCombine) {
  4. new ExternalSorter[K, V, C](
  5. context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
  6. } else {
  7. new ExternalSorter[K, V, V](
  8. context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
  9. }
  10. // 插入数据
  11. sorter.insertAll(records)
  12. val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
  13. dep.shuffleId, mapId, dep.partitioner.numPartitions)
  14. // 插入数据
  15. sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)
  16. val partitionLengths = mapOutputWriter.commitAllPartitions()
  17. mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
  18. }

4.5 插入数据(缓存+溢写)

ExternalSorter.scala

  1. def insertAll(records: Iterator[Product2[K, V]]): Unit = {
  2. val shouldCombine = aggregator.isDefined
  3. // 判断是否支持预聚合,支持预聚合,采用map结构,不支持预聚合采用buffer结构
  4. if (shouldCombine) {
  5. val mergeValue = aggregator.get.mergeValue
  6. val createCombiner = aggregator.get.createCombiner
  7. var kv: Product2[K, V] = null
  8. val update = (hadValue: Boolean, oldValue: C) => {
  9. if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
  10. }
  11. while (records.hasNext) {
  12. addElementsRead()
  13. kv = records.next()
  14. // 如果支持预聚合,在map阶段聚合,将相同key,的value聚合
  15. map.changeValue((getPartition(kv._1), kv._1), update)
  16. // 是否能够溢写
  17. maybeSpillCollection(usingMap = true)
  18. }
  19. } else {
  20. while (records.hasNext) {
  21. addElementsRead()
  22. val kv = records.next()
  23. // 如果不支持预聚合,value不需要聚合 (key,(value1,value2))
  24. buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
  25. maybeSpillCollection(usingMap = false)
  26. }
  27. }
  28. }
  29. private def maybeSpillCollection(usingMap: Boolean): Unit = {
  30. var estimatedSize = 0L
  31. if (usingMap) {
  32. estimatedSize = map.estimateSize()
  33. if (maybeSpill(map, estimatedSize)) {
  34. map = new PartitionedAppendOnlyMap[K, C]
  35. }
  36. } else {
  37. estimatedSize = buffer.estimateSize()
  38. if (maybeSpill(buffer, estimatedSize)) {
  39. buffer = new PartitionedPairBuffer[K, C]
  40. }
  41. }
  42. if (estimatedSize > _peakMemoryUsedBytes) {
  43. _peakMemoryUsedBytes = estimatedSize
  44. }
  45. }

Spillable.scala

  1. protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
  2. var shouldSpill = false
  3. // myMemoryThreshold默认值内存门槛是5m
  4. if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
  5. val amountToRequest = 2 * currentMemory - myMemoryThreshold
  6. // 申请内存
  7. val granted = acquireMemory(amountToRequest)
  8. myMemoryThreshold += granted
  9. // 当前内存大于(尝试申请的内存+门槛),就需要溢写了
  10. shouldSpill = currentMemory >= myMemoryThreshold
  11. }
  12. // 强制溢写 读取数据的值 超过了Int的最大值
  13. shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
  14. if (shouldSpill) {
  15. _spillCount += 1
  16. logSpillage(currentMemory)
  17. // 溢写
  18. spill(collection)
  19. _elementsRead = 0
  20. _memoryBytesSpilled += currentMemory
  21. // 释放内存
  22. releaseMemory()
  23. }
  24. shouldSpill
  25. }
  26. protected def spill(collection: C): Unit

查找(ctrl +h)spill 的实现类ExternalSorter

  1. ExternalSorter.scala
  2. override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
  3. val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
  4. val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
  5. spills += spillFile
  6. }
  7. private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator)
  8. : SpilledFile = {
  9. // 创建临时文件
  10. val (blockId, file) = diskBlockManager.createTempShuffleBlock()
  11. var objectsWritten: Long = 0
  12. val spillMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics
  13. // 溢写文件前,fileBufferSize缓冲区大小默认32m
  14. val writer: DiskBlockObjectWriter =
  15. blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)
  16. SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)
  17. }

4.6 merge合并

来到SortShuffleWriter.scala

  1. override def write(records: Iterator[Product2[K, V]]): Unit = {
  2. sorter = if (dep.mapSideCombine) {
  3. new ExternalSorter[K, V, C](
  4. context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
  5. } else {
  6. new ExternalSorter[K, V, V](
  7. context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
  8. }
  9. sorter.insertAll(records)
  10. val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
  11. dep.shuffleId, mapId, dep.partitioner.numPartitions)
  12. // 合并
  13. sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)
  14. val partitionLengths = mapOutputWriter.commitAllPartitions()
  15. mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
  16. }

ExternalSorter.scala

  1. def writePartitionedMapOutput(
  2. shuffleId: Int,
  3. mapId: Long,
  4. mapOutputWriter: ShuffleMapOutputWriter): Unit = {
  5. var nextPartitionId = 0
  6. // 如果溢写文件为空,只对内存中数据处理
  7. if (spills.isEmpty) {
  8. // Case where we only have in-memory data
  9. ... ...
  10. } else {
  11. // We must perform merge-sort; get an iterator by partition and write everything directly.
  12. //如果溢写文件不为空,需要将多个溢写文件合并
  13. for ((id, elements) <- this.partitionedIterator) {
  14. val blockId = ShuffleBlockId(shuffleId, mapId, id)
  15. var partitionWriter: ShufflePartitionWriter = null
  16. var partitionPairsWriter: ShufflePartitionPairsWriter = null
  17. } {
  18. if (partitionPairsWriter != null) {
  19. partitionPairsWriter.close()
  20. }
  21. }
  22. nextPartitionId = id + 1
  23. }
  24. }
  25. }
  26. def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
  27. val usingMap = aggregator.isDefined
  28. val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
  29. if (spills.isEmpty) {
  30. if (ordering.isEmpty) {
  31. groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
  32. } else {
  33. groupByPartition(destructiveIterator(
  34. collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
  35. }
  36. } else {
  37. // 合并溢写文件和内存中数据
  38. merge(spills, destructiveIterator(
  39. collection.partitionedDestructiveSortedIterator(comparator)))
  40. }
  41. }
  42. private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
  43. : Iterator[(Int, Iterator[Product2[K, C]])] = {
  44. val readers = spills.map(new SpillReader(_))
  45. val inMemBuffered = inMemory.buffered
  46. (0 until numPartitions).iterator.map { p =>
  47. val inMemIterator = new IteratorForPartition(p, inMemBuffered)
  48. val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
  49. if (aggregator.isDefined) {
  50. (p, mergeWithAggregation(
  51. iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
  52. } else if (ordering.isDefined) {
  53. // 归并排序
  54. (p, mergeSort(iterators, ordering.get))
  55. } else {
  56. (p, iterators.iterator.flatten)
  57. }
  58. }
  59. }

4.7 写磁盘

来到SortShuffleWriter.scala

  1. override def write(records: Iterator[Product2[K, V]]): Unit = {
  2. sorter = if (dep.mapSideCombine) {
  3. new ExternalSorter[K, V, C](
  4. context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
  5. } else {
  6. new ExternalSorter[K, V, V](
  7. context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
  8. }
  9. sorter.insertAll(records)
  10. val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
  11. dep.shuffleId, mapId, dep.partitioner.numPartitions)
  12. // 合并
  13. sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)
  14. // 写磁盘
  15. val partitionLengths = mapOutputWriter.commitAllPartitions()
  16. mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
  17. }

查找(ctrl + h)commitAllPartitions实现类,来到LocalDiskShuffleMapOutputWriter.java

  1. public long[] commitAllPartitions() throws IOException {
  2. if (outputFileChannel != null && outputFileChannel.position() != bytesWrittenToMergedFile) {
  3. ... ...
  4. }
  5. cleanUp();
  6. File resolvedTmp = outputTempFile != null && outputTempFile.isFile() ? outputTempFile : null;
  7. blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, resolvedTmp);
  8. return partitionLengths;
  9. }
  10. IndexShuffleBlockResolver.scala
  11. def writeIndexFileAndCommit(
  12. shuffleId: Int,
  13. mapId: Long,
  14. lengths: Array[Long],
  15. dataTmp: File): Unit = {
  16. val indexFile = getIndexFile(shuffleId, mapId)
  17. val indexTmp = Utils.tempFileWith(indexFile)
  18. try {
  19. val dataFile = getDataFile(shuffleId, mapId)
  20. synchronized {
  21. val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
  22. if (existingLengths != null) {
  23. System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
  24. if (dataTmp != null && dataTmp.exists()) {
  25. dataTmp.delete()
  26. }
  27. } else {
  28. val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
  29. Utils.tryWithSafeFinally {
  30. var offset = 0L
  31. out.writeLong(offset)
  32. for (length <- lengths) {
  33. offset += length
  34. out.writeLong(offset)
  35. }
  36. } {
  37. out.close()
  38. }
  39. if (indexFile.exists()) {
  40. indexFile.delete()
  41. }
  42. if (dataFile.exists()) {
  43. dataFile.delete()
  44. }
  45. if (!indexTmp.renameTo(indexFile)) {
  46. throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
  47. }
  48. if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
  49. throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
  50. }
  51. }
  52. }
  53. } finally {
  54. ... ...
  55. }
  56. }

五、Shuffle读取磁盘

DAGScheduler.scala

  1. private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
  2. ... ...
  3. val tasks: Seq[Task[_]] = try {
  4. val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
  5. stage match {
  6. case stage: ShuffleMapStage =>
  7. ... ...
  8. case stage: ResultStage =>
  9. partitionsToCompute.map { id =>
  10. val p: Int = stage.partitions(id)
  11. val part = partitions(p)
  12. val locs = taskIdToLocations(id)
  13. new ResultTask(stage.id, stage.latestInfo.attemptNumber,
  14. taskBinary, part, locs, id, properties, serializedTaskMetrics,
  15. Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
  16. stage.rdd.isBarrier())
  17. }
  18. }
  19. } catch {
  20. ... ...
  21. }
  22. }

ResultTask.scala

  1. private[spark] class ResultTask[T, U](... ...)
  2. extends Task[U](... ...)
  3. with Serializable {
  4. override def runTask(context: TaskContext): U = {
  5. func(context, rdd.iterator(partition, context))
  6. }
  7. }

RDD.scala

  1. final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  2. if (storageLevel != StorageLevel.NONE) {
  3. getOrCompute(split, context)
  4. } else {
  5. computeOrReadCheckpoint(split, context)
  6. }
  7. }
  8. private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
  9. ... ...
  10. computeOrReadCheckpoint(partition, context)
  11. ... ...
  12. }
  13. def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] ={
  14. if (isCheckpointedAndMaterialized) {
  15. firstParent[T].iterator(split, context)
  16. } else {
  17. compute(split, context)
  18. }
  19. }
  20. def compute(split: Partition, context: TaskContext): Iterator[T]

全局查找compute,由于我们是ShuffledRDD,所以点击ShuffledRDD.scala,搜索compute

  1. override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
  2. val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
  3. val metrics = context.taskMetrics().createTempShuffleReadMetrics()
  4. SparkEnv.get.shuffleManager.getReader(
  5. dep.shuffleHandle, split.index, split.index + 1, context, metrics)
  6. .read()
  7. .asInstanceOf[Iterator[(K, C)]]
  8. }

ShuffleManager.scala文件

  1. def getReader[K, C](... ...): ShuffleReader[K, C]

查找(ctrl + h)getReader 的实现类,SortShuffleManager.scala

  1. override def getReader[K, C](... ...): ShuffleReader[K, C] = {
  2. val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
  3. handle.shuffleId, startPartition, endPartition)
  4. new BlockStoreShuffleReader(
  5. handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
  6. shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context))
  7. }

在BlockStoreShuffleReader.scala文件中查找read方法

  1. override def read(): Iterator[Product2[K, C]] = {
  2. val wrappedStreams = new ShuffleBlockFetcherIterator(
  3. ... ...
  4. // 读缓冲区大小 默认 48m
  5. SparkEnv.get.conf.get(config.REDUCER_MAX_SIZE_IN_FLIGHT) * 1024 * 1024,
  6. SparkEnv.get.conf.get(config.REDUCER_MAX_REQS_IN_FLIGHT),
  7. ... ...
  8. }

发表评论

表情:
评论列表 (有 0 条评论,256人围观)

还没有评论,来说两句吧...

相关阅读

    相关 [spark内核]shuffle机制

    1.核心知识点目录: ![70][] 一 概述 Shuffle就是对数据进行重组,由于分布式计算的特性和要求,在实现细节上更加繁琐和复杂 在MapReduce框架,Sh