女装网站建设规划书,小程序到哪里去找,电商货源网站,男科24小时免费咨询Spark 的shuffle机制
一、Spark ShuffleManager 发展历程
Spark 1.1.0 之前 在 Spark 1.1.0 之前#xff0c;Spark 使用 BlockStoreShuffleFetcher 来处理 Shuffle 操作。这个实现主要依赖于直接从 BlockManager 获取 Shuffle 数据#xff0c;并通过网络进行交换。 Spark …Spark 的shuffle机制
一、Spark ShuffleManager 发展历程
Spark 1.1.0 之前 在 Spark 1.1.0 之前Spark 使用 BlockStoreShuffleFetcher 来处理 Shuffle 操作。这个实现主要依赖于直接从 BlockManager 获取 Shuffle 数据并通过网络进行交换。 Spark 1.1.x默认使用 HashShuffleManager HashShuffleManager 使用哈希算法将数据划分到不同的分区。在进行 Shuffle 操作时Spark 会为每个键计算一个哈希值然后根据该值将数据分配到相应的分区。 Spark 1.2.x 及以后版本默认使用 SortShuffleManager Spark 2.0.x 及以后版本不在使用HashShuffleManager,默认使用 SortShuffleManager
二、HashShuffleManager 原理 假设每个 Executor 只有1个CPU core也就是说无论这个 Executor 上分配多少个 task 线程同一时间都只能执行一个 task 线程。 未经优化的HashShuffleManager工作原理 1.Shuffle Write 阶段 将每个task处理的数据按照key进行hash算法从而将相同key都写入同一个磁盘文件而每一个磁盘文件都只属于下游stage的一个task在数据写入磁盘之前会现将数据写入内存缓冲区中当内存缓冲区填满以后才会溢写到磁盘文件中去。下一个stage的task有多少个当前stage的每个task就要创建多少分磁盘文件比如当前stage有20个task,总共有4个Executor,每个Executor执行5个task下一个stage总共有40个task那么每个Executor上就要创建200个磁盘文件所有Executor会创建800个磁盘文件由此可见未经过优化的shuffle write操作所产生的磁盘文件数据是惊人的。 2. Shuffle Read 阶段 将上一个stage的计算结果中所有相同的key从各个节点上通过网络都拉取到自己所在的节点上然后按照key进行聚合或连接等操作。由于shuffle write阶段map task给下游stage的每个reduce task都创建了一个磁盘文件因此shuffle read阶段每个reduce task只要从上游stage的所有map task所在节点上拉取属于自己的那个磁盘文件即可。shuffle read的拉取过程是一边拉取一边聚合的每个shuffle read task都有一个自己的buffer缓冲每次只能拉取与buffer缓冲相同大小的数据然后在内存中进行聚合等操作聚合完一批数据再拉取下一批以此类推直接所有数据拉取完并得到最终结果。 优化后的HashShuffleManager工作原理 为了优化 HashShuffleManager可以启用参数 spark.shuffle.consolidateFiles该参数的默认值为 false启用后设置为 true可以启动优化机制。 开启优化机制后的效果 1.Shuffle Write 阶段 在 shuffle write 过程中task就不是为下游stage的每个task创建一个磁盘文件了此时会出现shuffleFileGroup的概念每个shuffleFileGroup会对应一批磁盘文件磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个 CPU core就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个 shuffleFileGroup并将数据写入对应的磁盘文件内。当执行下一批task时下一批task就会复用之前已有的 shuffleFileGroup包括其中的磁盘文件。consolidate 机制允许不同的task复用同一批磁盘文件这样就可以有效将多个task的磁盘文件进行一定程度上的合并从而大幅度减少磁盘文件的数量进而提升 shuffle write 的性能。比如当前stage有20个task,总共有4个Executor,每个Executor执行5个task下一个stage总共有40个task那么每个Executor上就要创建40个磁盘文件所有Executor会创建160个磁盘文件由此可见优化后shuffle write操作所产生的磁盘文件较优化前明显减少。 2. Shuffle Read 阶段 由于shuffle write阶段每个Executor仅为下游每个reduce task创建一个磁盘文件在shuffle read阶段每个reduce task只要从上游stage的所有map task所在节点上拉取属于自己的那个磁盘文件即可。 三、SortShuffleManager 原理 SortShuffleManager 的运行机制主要分成两种一种是普通运行机制另一种是bypass运行机制。 当 shuffle read task 的数量小于等于 spark.shuffle.sort.bypassMergeThreshold参数(默认为 200)的值且不是聚合类的shuffle算子时就会启用 bypass 机制。 普通运行机制的SortShuffleManager工作原理 在该模式下数据会先写入一个内存数据结构中此时根据不同的 shuffle 算子 可能选用不同的数据结构。如果是 reduceByKey 这种聚合类的 shuffle 算子那么会 选用 Map 数据结构一边通过 Map 进行聚合一边写入内存;如果是join 这种普通的 shuffle 算子那么会选用 Array 数据结构直接写入内存。接着每写一条数据进入内存数据结构之后就会判断一下是否达到了某个临界阈值。如果达到临界阈值的话那么就会尝试将内存数据结构中的数据溢写到磁盘然后清空内存数据结构。在溢写到磁盘文件之前会先根据 key 对内存数据结构中已有的数据进行排序。 排序过后会分批将数据写入磁盘文件。默认的 batch 数量是10000 条也就是说排序好的数据会以每批 1 万条数据的形式分批写入磁盘文件。写入磁盘文件是通 过 Java 的BufferedOutputStream 实现的。BufferedOutputStream 是 Java的缓冲输出流首先会将数据缓冲在内存中当内存缓冲填满之后再一次写入磁盘文件中这样可以减少磁盘 IO 次数提升性能。一个 task 将所有数据写入内存数据结构的过程中会发生多次磁盘溢写操作 也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并这就是 merge 过程此时会将之前所有临时磁盘文件中的数据读取出来然后依次写入最 终的磁盘文件之中。此外由于一个 task就只对应一个磁盘文件也就意味着该 task 为下游 stage 的 task 准备的数据都在这一个文件中因此还会单独写一份索引文件其中标识了下游各个 task 的数据在文件中的 start offset 与 end offset。SortShuffleManager 由于有一个磁盘文件 merge 的过程因此大大减少了文件数量。比如第一个 stage 有 20 个 task总共有 4 个 Executor每个 Executor 执行 5 个 task而第二个 stage 有 40 个 task。由于每个 task 最终只有一个磁盘文件因此 此时每个 Executor 上只有 5 个磁盘文件所有 Executor 只有 20 个磁盘文件。 bypass运行机制的SortShuffleManager工作原理 每个 task 会为每个下游 task 都创建一个临时磁盘文件并将数据按 key进行 hash 然后根据 key 的 hash 值将 key 写入对应的磁盘文件之中。当然写入磁盘文件时也是先写入内存缓冲缓冲写满之后再溢写到磁盘文件的。最后同样会将所有临时磁盘文件都合并成一个磁盘文件并创建一个单独的索引文件。该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的因为都要创建数量惊人的磁盘文件只是在最后会做一个磁盘文件的合并而已。因此 少量的最终磁盘文件也让该机制相对未经优化的 HashShuffleManager 来说shuffle read 的性能会更好。而该机制与普通 SortShuffleManager 运行机制的不同在于:第一磁盘写机制 不同;第二不会进行排序。也就是说启用该机制的最大好处在于shuffle write 过程中不需要进行数据的排序操作也就节省掉了这部分的性能开销。