Spark任务性能调优之shuffle调优

超、凢脫俗 2023-10-16 12:18 153阅读 0赞

4ec8f4365d2040ed9ece56b1e4f044cf.png

大多数Spark作业的性能主要就是消耗在了shuffle环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。

因此,如果要让作业的性能更上一层楼,就有必要对shuffle过程进行调优。

但是也必须提醒大家的是,影响一个Spark作业性能的因素,主要还是代码开发、资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已。因此大家务必把握住调优的基本原则,千万不要舍本逐末。

下面我们就给大家详细讲解shuffle的原理,以及相关参数的说明,同时给出各个参数的调优建议。

在Spark的源码中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。

而随着Spark的版本的发展,ShuffleManager也在不断迭代,变得越来越先进。

在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。

该ShuffleManager而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。

因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。

SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。

在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

Spark会产生shuffle的算子

去重

  • def distinct()
  • def distinct(numPartitions: Int)

聚合

  • def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
  • def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
  • def groupBy[K](f: T => K, p: Partitioner):RDD[(K, Iterable[V])]
  • def groupByKey(partitioner: Partitioner):RDD[(K, Iterable[V])]
  • def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner): RDD[(K, U)]
  • def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int): RDD[(K, U)]
  • def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
  • def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
  • def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]

排序

  • def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
  • def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

重分区

  • def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
  • def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null)

集合或者表操作

  • def intersection(other: RDD[T]): RDD[T]
  • def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
  • def intersection(other: RDD[T], numPartitions: Int): RDD[T]
  • def subtract(other: RDD[T], numPartitions: Int): RDD[T]
  • def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
  • def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
  • def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
  • def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]

  • def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

  • def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
  • def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
  • def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

HashShuffleManager运行原理

未经优化的HashShuffleManager

这里我们先明确一个假设前提:每个Executor只有1个CPU core,也就是说,无论这个Executor上分配多少个task线程,同一时间都只能执行一个task线程。

我们先从shuffle write开始说起。shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据按key进行“分类”。

所谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。

在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

那么每个执行shuffle write的task,要为下一个stage创建多少个磁盘文件呢?

很简单,下一个stage的task有多少个,当前stage的每个task就要创建多少份磁盘文件。

比如下一个stage总共有100个task,那么当前stage的每个task都要创建100份磁盘文件。如果当前stage有50个task,总共有10个Executor,每个Executor执行5个Task,那么每个Executor上总共就要创建500个磁盘文件,所有Executor上会创建5000个磁盘文件。

由此可见,未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。

接着我们来说说shuffle read。

shuffle read,通常就是一个stage刚开始时要做的事情。

此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。

由于shuffle write的过程中,task给下游stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。

shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。

聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。

优化后的HashShuffleManager

这里说的优化,是指我们可以设置一个参数,spark.shuffle.consolidateFiles。

该参数默认值为false,将其设置为true即可开启优化机制。

通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。

开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。

此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。

一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。

当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。

也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。

因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。

假设第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor,每个Executor执行5个task。

那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件,所有Executor会产生5000个磁盘文件的。

但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为:CPU core的数量 * 下一个stage的task数量。

也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件。

最后:下方这份完整的软件测试视频教程已经整理上传完成,需要的朋友们可以自行领取【保证100%免费】

8b0171187dce405e878f274a8cdd9ec0.png

软件测试面试文档

我们学习必然是为了找到高薪的工作,下面这些面试题是来自阿里、腾讯、字节等一线互联网大厂最新的面试资料,并且有字节大佬给出了权威的解答,刷完这一套面试资料相信大家都能找到满意的工作。

a3e4226c935c4242804cbdf26b571bc6.png

58df6cd288094ad8874e814f2477620b.gif

发表评论

表情:
评论列表 (有 0 条评论,153人围观)

还没有评论,来说两句吧...

相关阅读