Spark 的两种 Shuffle

Bertha 。 2023-02-14 02:37 69阅读 0赞

文章目录

      1. HashShuffle
          1. 优化前
          1. 优化后
      1. SortShuffle
          1. 普通运行机制
          1. bypass运行机制

1. HashShuffle

1. 优化前

在这里插入图片描述

  1. 1. shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),
  2. 而将每个task处理的数据按key进行“划分”。所谓“划分”,就是对相同的key执行hash算法,从而将相同key都写入同
  3. 一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲
  4. 中,当内存缓冲填满之后,才会溢写到磁盘文件中去。
  5. 2. 下一个stagetask有多少个,当前stage的每个task就要创建多少份磁盘文件。比如下一个stage总共有100task
  6. 那么当前stage的每个task都要创建100份磁盘文件。如果当前stage50task,总共有10Executor,每个
  7. Executor执行5task,那么每个Executor上总共就要创建500个磁盘文件,所有Executor上会创建5000个磁盘文件。
  8. 由此可见,未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。
  9. 3. shuffle read阶段,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算
  10. 结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle
  11. write的过程中,map task给下游stage的每个reduce task都创建了一个磁盘文件,因此shuffle read的过程中,每
  12. reduce task只要从上游stage的所有map task所在节点上,拉取属于自己的那一个磁盘文件即可。
  13. shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓冲,每次都
  14. 只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数
  15. 据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。

2. 优化后

在这里插入图片描述

  1. 1. 为了优化HashShuffleManager我们可以设置一个参数,spark.shuffle. consolidateFiles,该参数默认值为
  2. false,将其设置为true即可开启优化机制,通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。
  3. 2. 开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了,
  4. 此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage
  5. task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task
  6. 会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。
  7. 3. ExecutorCPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup
  8. 包括其中的磁盘文件,也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,
  9. consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,
  10. 从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。
  11. 4. 假设第二个stage100task,第一个stage50task,总共还是有10ExecutorExecutor CPU个数为1),
  12. 每个Executor执行5task。那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件,
  13. 所有Executor会产生5000个磁盘文件的。但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为:
  14. CPU core的数量 * 下一个stagetask数量,也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor
  15. 只会创建1000个磁盘文件。

2. SortShuffle

SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort. bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

1. 普通运行机制

在这里插入图片描述

  1. 1. 在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。如果是
  2. reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join
  3. 这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判
  4. 断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存
  5. 数据结构。
  6. 2. 在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默
  7. 认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过
  8. JavaBufferedOutputStream实现的。BufferedOutputStreamJava的缓冲输出流,首先会将数据缓冲在内存中,
  9. 当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。
  10. 3. 一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所
  11. 有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的
  12. 磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stagetask准备的数据都在这一个
  13. 文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offsetend offset
  14. 4. SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage50task
  15. 总共有10Executor,每个Executor执行5task,而第二个stage100task。由于每个task最终只有一个磁盘文
  16. 件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。

2. bypass运行机制

在这里插入图片描述
bypass运行机制的触发条件如下:

  1. shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
  2. 不是聚合类的shuffle算子。

    1. 此时,每个task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入
      对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临
      时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

    2. 该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后
      会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,
      shuffle read的性能会更好。

    3. 而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用
      该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

发表评论

表情:
评论列表 (有 0 条评论,69人围观)

还没有评论,来说两句吧...

相关阅读

    相关 SparkShuffle总结

    Shuffle概念 shuffle,是一种多对多的依赖关系,即每个Reduce Task从每个Map Task产生数的据中读取一片数据,极限情况下可能触发M\R个数据拷贝

    相关 [spark内核]shuffle机制

    1.核心知识点目录: ![70][] 一 概述 Shuffle就是对数据进行重组,由于分布式计算的特性和要求,在实现细节上更加繁琐和复杂 在MapReduce框架,Sh