无锡网站建设哪家好,微指数官网,怎么才能让网站图文展示,武清网站开发文章目录1. 简介2. Hash Shuffle和Sort Shuffle2.1 Hash Shuffle2.1.1 未经优化的hashShuffleManager2.1.2 经优化的hashShuffleManager2.1.3 优化前后磁盘文件数对比2.2 Srot Shuffle Manager3. Shuffle配置选项1. 简介
Spark在DAG调度阶段会将一个Job划分为多个Stage#x…
文章目录1. 简介2. Hash Shuffle和Sort Shuffle2.1 Hash Shuffle2.1.1 未经优化的hashShuffleManager2.1.2 经优化的hashShuffleManager2.1.3 优化前后磁盘文件数对比2.2 Srot Shuffle Manager3. Shuffle配置选项1. 简介
Spark在DAG调度阶段会将一个Job划分为多个Stage上游Stage做map工作下游Stage做reduce工作其本质上还是MapReduce计算框架。Shuffle是连接map和reduce之间的桥梁它将map的输出对应到reduce输入中涉及到序列化反序列化、跨节点网络IO以及磁盘读写IO等。 Spark的Shuffle分为Write和Read两个阶段分属于两个不同的Stage前者是Parent Stage的最后一步后者是Child Stage的第一步。 执行Shuffle的主体是Stage中的并发任务这些任务分ShuffleMapTask和ResultTask两种ShuffleMapTask要进行ShuffleResultTask负责返回计算结果一个Job中只有最后的Stage采用ResultTask其他的均为ShuffleMapTask。如果要按照map端和reduce端来分析的话ShuffleMapTask可以即是map端任务又是reduce端任务因为Spark中的Shuffle是可以串行的ResultTask则只能充当reduce端任务的角色。
Spark在1.1以前的版本一直是采用Hash Shuffle的实现的方式到1.1版本时参考Hadoop MapReduce的实现开始引入Sort Shuffle在1.5版本时开始Tungsten钨丝计划引入UnSafe Shuffle优化内存及CPU的使用在1.6中将Tungsten统一到Sort Shuffle中实现自我感知选择最佳Shuffle方式到的2.0版本Hash Shuffle已被删除所有 Shuffle方式全部统一到Sort Shuffle一个实现中。
2. Hash Shuffle和Sort Shuffle
在Spark的中负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager也即shuffle管理器。ShuffleManager随着Spark的发展有两种实现的方式分别为HashShuffleManager和SortShuffleManager因此spark的Shuffle有Hash Shuffle和Sort Shuffle两种。
在Spark 1.2以前默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端就是会产生大量的中间磁盘文件进而由大量的磁盘IO操作影响了性能。因此在Spark 1.2以后的版本中默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说有了一定的改进。主要就在于每个Task在进行shuffle操作时虽然也会产生较多的临时磁盘文件但是最后会将所有的临时文件合并(merge)成一个磁盘文件因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉 取自己的数据时只要根据索引读取每个磁盘文件中的部分数据即可。
2.1 Hash Shuffle
Shuffle阶段划分
shuffle writemapper阶段上一个stage得到最后的结果写出shuffle read reduce阶段下一个stage拉取上一个stage进行合并
2.1.1 未经优化的hashShuffleManager
HashShuffle是根据task的计算结果的key值的hashcode%ReduceTask来决定放入哪一个区分这样保证相同的数据一定放入一个分区。 上图是Hash Shuffle的过程根据下游的task决定生成几个文件先生成缓冲区文件在写入磁盘文件再将block文件进行合并。未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。
2.1.2 经优化的hashShuffleManager
在shuffle write过程中task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念每个shuffleFileGroup会对应一批磁盘文件每一个Group磁盘文件的数量与下游stage的task数量是相同的。 2.1.3 优化前后磁盘文件数对比
未经优化 上游的task数量m 下游的task数量n 上游的executor数量k (mk) 总共的磁盘文件m*n 优化之后的 上游的task数量m 下游的task数量n 上游的executor数量k (mk) 总共的磁盘文件k*n
2.2 Srot Shuffle Manager
SortShuffle对比HashShuffle可以减少很多的磁盘文件以节省网络IO的开销。
SortShuffleManager的运行机制主要分成两种一种是普通运行机制另一种是bypass运行机制。当shuffle write task的数量小于等spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200)就会启用bypass机制。 1该模式下数据会先写入一个内存数据结构中(默认5M)此时根据不同的shuffle算子可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子那么会选用Map数据结构一边通过Map进行聚合一边写入内存如果是join这种普通的shuffle算子那么会选用Array数据结构直接写入内存。 2接着每写一条数据进入内存数据结构之后就会判断一下是否达到了某个临界阈值。如果达到临界阈值的话那么就会尝试将内存数据结构中的数据溢写到磁盘然后清空内存数据结构。 3排序 在溢写到磁盘文件之前会先根据key对内存数据结构中已有的数据进行排序。 4溢写 排序过后会分批将数据写入磁盘文件。默认的batch数量是10000条也就是说排序好的数据会以每批1万条数据的形式分批写入磁盘文件。 5merge 一个task将所有数据写入内存数据结构的过程中会发生多次磁盘溢写操作也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并成1个磁盘文件这就是merge过程。由于一个task就只对应一个磁盘文件也就意味着该task为Reduce端的stage的task准备的数据都在这一个文件中因此还会单独写一份索引文件其中标识了下游各个task的数据在文件中的start offset与end offset。
Sort Shuffle bypass运行机制
运行条件
shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold200参数的值。不是map combine聚合的shuffle算子(比如reduceByKey有map combie)。 此时task会为每个reduce端的task都创建一个临时磁盘文件并将数据按key进行hash然后根据key的hash值将key写入对应的磁盘文件之中。当然写入磁盘文件时也是先写入内存缓冲缓冲写满之后再溢写到磁盘文件的。最后同样会将所有临时磁盘文件都合并成一个磁盘文件并创建一个单独的索引文件。该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的因为都要创建数量惊人的磁盘文件只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件也让该机制相对未经优化的HashShuffleManager来说shuffle read的性能会更好。
bypass运行机制和SortShuffleManager运行机制区别
磁盘写机制不同;不会进行排序。也就是说启用该机制的最大好处在于shuffle write过程中不需要进行数据的排序操作 也就节省掉了这部分的性能开销。
总结
SortShuffle也分为普通机制和bypass机制普通机制在内存数据结构(默认为5M)完成排序会产生2M个磁盘小文件。而当shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。或者算子不是聚合类的shuffle算子(比如reduceByKey)的时候会触发SortShuffle的bypass机制SortShuffle的bypass机制不会进行排序极大的提高了其性能。
SortShuffle主要是对磁盘文件进行合并来进行文件数量的减少同时两类Shuffle都需要经过内存缓冲区溢写磁盘的场景。所以可以得知尽管Spark是内存迭代计算框架但是内存迭代主要在窄依赖中。 在宽依赖(Shuffle)中磁盘交互还是一个无可避免的情况。所以我们要尽量减少Shuffle的出现不要进行无意义的Shuffle计算。
3. Shuffle配置选项
spark 的shuffle调优主要是调整缓冲的大小拉取次数重试重试次数与等待时间内存比例分配是否进行排序操作等等
Shuffle阶段划分
shuffle writemapper阶段上一个stage得到最后的结果写出shuffle read reduce阶段下一个stage拉取上一个stage进行合并
spark.shuffle.file.buffer
参数说明该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小默认是32K。将数据写到磁盘文件之前会先写入buffer缓冲中待缓冲写满之后才会溢写到磁盘。调优建议如果作业可用的内存资源较为充足的话可以适当增加这个参数的大小比如64k从而减少shuffle write过程中溢写磁盘文件的次数也就可以减少磁盘IO次数进而提升性能。在实践中发现合理调节该参数性能会有1%~5%的提升。
spark.reducer.maxSizeInFlight
参数说明该参数用于设置shuffle read task的buffer缓冲大小而这个buffer缓冲决定了每次能够拉取多少数据。(默认48M)调优建议如果作业可用的内存资源较为充足的话可以适当增加这个参数的大小比如96m从而减少拉取数据的次数也就可以减少网络传输的次数进而提升性能。在实践中发现合理调节该参数性能会有1%~5%的提升。
spark.shuffle.io.maxRetries and spark.shuffle.io.retryWait
spark.shuffle.io.maxRetries shuffle read task从shuffle write task所在节点拉取属于自己的数据时如果因为网络异常导致拉取失败是会自动进行重试的。该参数就代表了可以重试的最大次数。默认是3次spark.shuffle.io.retryWait该参数代表了每次重试拉取数据的等待间隔。默认为5s调优建议一般的调优都是将重试次数调高不调整时间间隔。
spark.shuffle.memoryFraction
参数说明该参数代表了Executor内存中分配给shuffle read task进行聚合操作内存比例。
spark.shuffle.manager
参数说明该参数用于设置shufflemanager的类型默认为sort。Spark1.5x以后有三个可选项Hashspark1.x版本的默认值HashShuffleManagerSortspark2.x版本的默认值普通机制当shuffle read task 的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数自动开启bypass 机制。
spark.shuffle.sort.bypassMergeThreshold
参数说明当ShuffleManager为SortShuffleManager时如果shuffle read task的数量小于这个阈值默认是200则shuffle write过程中不会进行排序操作。调优建议当使用SortShuffleManager时如果的确不需要排序操作那么建议将这个参数调大一些