当前位置: 首页 > news >正文

.net网站建设网站建设价格怎么算

.net网站建设,网站建设价格怎么算,南京市城乡建设局网站,广告投放目录 Shuffle概述 Shuffle执行流程 总体流程 中间文件 ShuffledRDD生成 Stage划分 Task划分 Map端写入(Shuffle Write) Reduce端读取(Shuffle Read) Spark Shuffle演变 SortShuffleManager运行机制 普通运行机制 bypass 运行机制 Tungsten Sort Shuffle 运行机制…目录 Shuffle概述 Shuffle执行流程 总体流程 中间文件 ShuffledRDD生成 Stage划分 Task划分 Map端写入(Shuffle Write) Reduce端读取(Shuffle Read) Spark Shuffle演变 SortShuffleManager运行机制 普通运行机制 bypass 运行机制 Tungsten Sort Shuffle 运行机制 基于Sort的Shuffle机制的优缺点 Shuffle调优 广播变量 shuffle参数调优 spark.shuffle.file.buffer spark.reducer.maxSizeInFlight spark.shuffle.io.maxRetries spark.shuffle.io.retryWait spark.shuffle.memoryFraction spark.shuffle.manager spark.shuffle.sort.bypassMergeThreshold spark.shuffle.consolidateFiles Shuffle概述 我们知道Spark的Shuffle与Hadoop中的MapReduce过程有很多相似之处但也有自己的优势。Spark在Shuffle过程中权衡内存与磁盘间的使用尽最大努力将数据在内存中进行分组、排序等。当内存不足时Spark也可以将数据溢写到磁盘中而且实现相同的功能这也体现了RDD的弹性之处。 Shuffle的本质是数据重组分发的过程。 Shuffle 定义集群范围内跨节点、跨进程的数据分发。 Shuffle过程中集群会需要大量资源进行磁盘和网络的I/O。在DAG的计算链条中Shuffle环节的执行性能往往是最差的。 做个通俗的比喻类比橘子分练机RDD的分练机就是Partitioner。 举个例子: line.flatMap(_.split( )).map((_, 1)) .reduceByKey(__).collect().foreach(println)以Shuffle为边界reduceByKey的计算被切割为两个执行阶段。Shuffle之前的Stage叫作Map阶段而把Shuffle之后的 Stage称作Reduce阶段。在Map阶段每个Executors先把自己负责的数据分区做初步聚合又叫 Map 端聚合、局部聚合在Shuffle环节不同的单词被分发到不同节点的Executors中最后的Reduce阶段Executors以单词为Key做第二次聚合从而完成统计计数的任务。如下图所示。 Shuffle执行流程 总体流程 根据Shuffle(宽依赖即ShuffleDependency)划分前后两个Stage前一个Stage(Stage1)中将数据按key进行分组写入本节点的BlockManager管理的文件中。每个分区Map端输出的保存位置存储在MapOutputTrackerMaster中后一个Stage(Stage2)中计算某个分区的数据时首先会通过MapOutputTrackerMaster找到该分区的数据都在哪些节点上再拉取相应节点的数据完成Stage2中的数据的加载进而执行后续的RDD的转换。 MapOutputTracker组件也是主从架构在Driver中为MapOutputTrackerMaster在Executor中为MapOutputTrackerWorker。Master中保存了每个Shuffle的Map端每个分区的输出信息。Worker通过与Master通信获取某个Shuffle的Reduce端对应的Map端数据保存在哪些节点中。 中间文件 Map阶段与Reduce阶段通过生产与消费Shuffle中间文件的方式来完成集群范围内的数据交换。 在Map执行阶段每个Task以下简称 Map Task都会生成包含data 文件与index文件的Shuffle中间文件。也就是说Shuffle 文件的生成是以Map Task为粒度的Map阶段有多少个Map Task就会生成多少份Shuffle中间文件。 ShuffledRDD生成 scala sc.textFile(/root/tmp/a.txt,3).flatMap(xx.split(,)).map(x(x,1)).reduceByKey((a,b)ab) val res2: org.apache.spark.rdd.RDD[(String, Int)] ShuffledRDD[10] at reduceByKey at console:1 reduceByKey默认使用的是 HashPartitioner 相当于橘子分拣器。除了Partitioner此外生成ShuffledRDD时还需要传入Aggregator可用于Map端聚合和Reduce端聚合Serializer如KryoSerializer等。 ShuffledRDD 调用 getDependencies 方法获取依赖返回的是 ShuffleDependencyShuffleDependency 里依赖的父RDD即为调用算子时的RDD。 ShuffledRDD的计算函数与其他窄依赖的计算函数也不同普通map()函数执行时计算某分区的数据时只需对父RDD的某分区数据进行转换即可。但ShuffledRDD某分区计算时必须到不同的节点拉取对应分区的结果才能完成该分区数据的加载。 Stage划分 Task划分 Stage划分完成后每个Stage会根据计算的RDD的分区数量划分多少个Task每个Task计算RDD的一个分区的数据。ShuffleMapStage中划分的Task为ShuffleMapTaskShuffleMapTask会被序列化到Executor节点中进行执行ShuffleMapTask的执行会将该分区的数据进行分组如果需要Map端聚合在分组过程中则还会进行聚合操作。最终将分组的数据写入到所在节点的文件中。 Map端写入(Shuffle Write) Shuffle写入临时文件的过程叫做Shuffle Write。 Spark现支持三种writer分为BypassMergeSortShuffleWriter SortShuffleWriter 和 UnsafeShuffleWriter。 每种Shuffle witer都有非常复杂的实现机制。如果你对Shuffle的底层实现非常感兴趣可以参考: https://blog.csdn.net/wendelee/article/details/109818711 在生成中间文件的过程中Spark 会借助一种类似于 Map 的数据结构来计算、缓存并排序数据分区中的数据记录。这种 Map 结构的 Key 是Reduce Task Partition IDRecord Key的二元组而 Value 是原数据记录中的数据值。 总结下来Shuffle 中间文件的生成过程分为如下几个步骤: 对于数据分区中的数据记录逐一计算其目标分区然后填充内存数据结构当数据结构填满后如果分区中还有未处理的数据记录就对结构中的数据记录按目标分区 IDKey排序将所有数据溢出到临时文件同时清空数据结构重复前 2 个步骤直到分区中所有的数据记录都被处理为止对所有临时文件和内存数据结构中剩余的数据记录做归并排序生成数据文件和索引文件。 Reduce端读取(Shuffle Read) 对于所有 Map Task 生成的中间文件Reduce Task 需要通过网络从不同节点的硬盘中下载并拉取属于自己的数据内容。不同的 Reduce Task 正是根据 index 文件中的起始索引来确定哪些数据内容是属于自己的。这个拉取数据的过程被叫做Shuffle Read。 Shuffle Reader的实现都被封装在了BlockStoreShuffleReader。 整个Reader的流程主要是 首先新建ShuffleBlockFetcherIterator获取数据迭代器会返回(blockId, inputStream)的数据迭代器对每个block数据进行压缩和加密操作是通过serializerManager进行的对每个block数据进行反序列化反序列化输入流成为K,V数据迭代器对迭代器添加监控和数据处理完成后的清洗函数处理工作如果要进行聚合操作会对各个map的当前reduceId的数据进行聚合如果需要排序对聚合后的数据进行排序操作。 需要特别注意的是Shuffle Reader过程可以从两个地方来读取数据块一个是本地的block一个是远程的block。远程的block读取是通过向BlockTransferService这个服务发送读取数据块请求来获取数据数据。那么如何区分是从本地读还是从远程读取呢 是通过每个块的executorID来区分的本地环境的executorID和块的id相等就是从本地读若不相等就会从远端节点读取数据。 Spark Shuffle演变 我们可以看到从Spark2.0以后Hash Based Shuffle退出了历史舞台本着过时不讲的原则我们来看一下SortShuffleManager的运行机制。 目前Spark2.0及以上的版本Shuffle框架主要包括以下几个部分 ShuffleManager 这是一个接口负责管理shuffle相关的组件比如通过它来注册shuffle的操作函数获取writer和reader等。在sparkenv中注册通过sprkconf进行配置配置参数是spark.shuffle.manager默认是sort也就是SortShuffleManager类。在早期的spark版本中也实现过hashmanager后来全部统一成sort。 ShuffleReader 在reduce任务中去获取来自多个mapper任务的合并记录数据。实现该接口的类只有一个BlockStoreShuffleReader。 ShuffleWriter 在mapper任务中把记录到shuffle系统。这是一个抽象类实现该抽象类的有SortShuffleWriterUnsafeShuffleWriterBypassMergeSortShuffleWriter三个。 ShuffleBlockResolver 该接口的实现类需要理解如何为逻辑的shuffle块标识(map,reduce,shuffle等)获取数据。实现者可以通过文件或文件片段来封装shuffle数据。当获取到shuffle数据时BlockStore使用它来抽象不同的shuffle实现。该接口的实现类为IndexShuffleBlockResolver。 SortShuffleManager运行机制 SortShuffleManager的运行机制分为三种 普通运行机制bypass运行机制 当 shuffle read task 的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时默认为 200就会启用 bypass 机制Tungsten Sort运行机制 开启此运行机制需设置配置项spark.shuffle.managertungsten-sort。但是开启此项配置也不能保证就一定采用此运行机制。 普通运行机制 在该模式下数据会先写入一个内存数据结构中此时根据不同的 shuffle 算子可能选用不同的数据结构。如果是 reduceByKey 这种聚合类的 shuffle 算子那么会选用 Map 数据结构一边通过 Map 进行聚合一边写入内存如果是 join 这种普通的 shuffle 算子那么会选用 Array 数据结构直接写入内存。接着每写一条数据进入内存数据结构之后就会判断一下是否达到了某个临界阈值。如果达到临界阈值的话那么就会尝试将内存数据结构中的数据溢写到磁盘然后清空内存数据结构。 在溢写到磁盘文件之前会先根据 key 对内存数据结构中已有的数据进行排序。排序过后会分批将数据写入磁盘文件。默认的 batch 数量是 10000 条也就是说排序好的数据会以每批 1 万条数据的形式分批写入磁盘文件。写入磁盘文件是通过 Java 的 BufferedOutputStream 实现的。BufferedOutputStream 是 Java 的缓冲输出流首先会将数据缓冲在内存中当内存缓冲满溢之后再一次写入磁盘文件中这样可以减少磁盘 IO 次数提升性能。 一个 task 将所有数据写入内存数据结构的过程中会发生多次磁盘溢写操作也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并这就是merge 过程此时会将之前所有临时磁盘文件中的数据读取出来然后依次写入最终的磁盘文件之中。此外由于一个 task 就只对应一个磁盘文件也就意味着该 task 为下游 stage 的 task 准备的数据都在这一个文件中因此还会单独写一份索引文件其中标识了下游各个 task 的数据在文件中的 start offset 与 end offset。 SortShuffleManager由于有一个磁盘文件 merge 的过程因此大大减少了文件数量。比如第一个 stage 有 50 个 task总共有 10 个 Executor每个 Executor 执行 5 个 task而第二个 stage 有 100 个 task。由于每个 task 最终只有一个磁盘文件因此此时每个 Executor 上只有 5 个磁盘文件所有 Executor 只有 50 个磁盘文件。 普通运行机制的 SortShuffleManager 工作原理如下图所示 bypass 运行机制 Reducer 端任务数比较少的情况下基于Hash Shuffle实现机制明显比基于Sort Shuffle实现机制要快因此基于Sort huffle实现机制提供了一个回退方案就是 bypass 运行机制。对于 Reducer 端任务数少于配置属性spark.shuffle.sort.bypassMergeThreshold设置的个数时使用带 Hash 风格的回退计划。 bypass 运行机制的触发条件如下 shuffle map task 数量小于spark.shuffle.sort.bypassMergeThreshold200参数的值。不是聚合类的 shuffle 算子。 此时每个 task 会为每个下游 task 都创建一个临时磁盘文件并将数据按 key 进行 hash 然后根据 key 的 hash 值将 key 写入对应的磁盘文件之中。当然写入磁盘文件时也是先写入内存缓冲缓冲写满之后再溢写到磁盘文件的。最后同样会将所有临时磁盘文件都合并成一个磁盘文件并创建一个单独的索引文件。 该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的因为都要创建数量惊人的磁盘文件只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件也让该机制相对未经优化的HashShuffleManager来说shuffle read的性能会更好。 而该机制与普通SortShuffleManager运行机制的不同在于第一磁盘写机制不同第二不会进行排序。也就是说启用该机制的最大好处在于shuffle write过程中不需要进行数据的排序操作也就节省掉了这部分的性能开销。 bypass运行机制的SortShuffleManager工作原理如下图所示 Tungsten Sort Shuffle 运行机制 基于 Tungsten Sort 的 Shuffle 实现机制主要是借助 Tungsten 项目所做的优化来高效处理 Shuffle。 Spark 提供了配置属性用于选择具体的 Shuffle 实现机制但需要说明的是虽然默认情况下 Spark 默认开启的是基于 SortShuffle 实现机制但实际上参考 Shuffle 的框架内核部分可知基于 SortShuffle 的实现机制与基于 Tungsten Sort Shuffle 实现机制都是使用 SortShuffleManager而内部使用的具体的实现机制是通过提供的两个方法进行判断的 对应非基于 Tungsten Sort 时通过 SortShuffleWriter.shouldBypassMergeSort 方法判断是否需要回退到 Hash 风格的 Shuffle 实现机制当该方法返回的条件不满足时则通过 SortShuffleManager.canUseSerializedShuffle 方法判断是否需要采用基于 Tungsten Sort Shuffle 实现机制而当这两个方法返回都为 false即都不满足对应的条件时会自动采用普通运行机制。 因此当设置了spark.shuffle.managertungsten-sort 时也不能保证就一定采用基于 Tungsten Sort 的 Shuffle 实现机制。 要实现 Tungsten Sort Shuffle 机制需要满足以下条件 Shuffle 依赖中不带聚合操作或没有对输出进行排序的要求。Shuffle 的序列化器支持序列化值的重定位当前仅支持 KryoSerializer Spark SQL 框架自定义的序列化器。Shuffle 过程中的输出分区个数少于 16777216 个。 实际上使用过程中还有其他一些限制如引入 Page 形式的内存管理模型后内部单条记录的长度不能超过 128 MB 具体内存模型可以参考 PackedRecordPointer 类。另外分区个数的限制也是该内存模型导致的。 所以目前使用基于 Tungsten Sort Shuffle 实现机制条件还是比较苛刻的。 基于Sort的Shuffle机制的优缺点 优点 小文件的数量大量减少Mapper 端的内存占用变少Spark 不仅可以处理小规模的数据即使处理大规模的数据也不会很容易达到性能瓶颈。缺点 如果 Mapper 中 Task 的数量过大依旧会产生很多小文件此时在 Shuffle 传数据的过程中到 Reducer 端 Reducer 会需要同时大量地记录进行反序列化导致大量内存消耗和 GC 负担巨大造成系统缓慢甚至崩溃强制了在 Mapper 端必须要排序即使数据本身并不需要排序它要基于记录本身进行排序这就是Sort-Based Shuffle最致命的性能消耗。 Shuffle调优 广播变量 在数据关联场景中广播变量是克制 Shuffle 的杀手锏。 一个形象的图例如下 在广播变量的运行机制下普通变量存储的数据封装成广播变量由 Driver 端以 Executors 为粒度进行分发每一个 Executors 接收到广播变量之后将其交由 BlockManager管理。 当然使用广播变量也有很多的制约例如 当创建完广播变量后续不可以对广播变量进行修改保证所有的节点都能获得相同的广播变量。在数据量较大的情况下Driver可能会成为瓶颈 shuffle参数调优 spark.shuffle.file.buffer 默认值32k参数说明该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前会先写入buffer缓冲中待缓冲写满之后才会溢写到磁盘。调优建议如果作业可用的内存资源较为充足的话可以适当增加这个参数的大小比如64k从而减少shuffle write过程中溢写磁盘文件的次数也就可以减少磁盘IO次数进而提升性能。在实践中发现合理调节该参数性能会有1%~5%的提升。 spark.reducer.maxSizeInFlight 默认值48m参数说明该参数用于设置shuffle read task的buffer缓冲大小而这个buffer缓冲决定了每次能够拉取多少数据。调优建议如果作业可用的内存资源较为充足的话可以适当增加这个参数的大小比如96m从而减少拉取数据的次数也就可以减少网络传输的次数进而提升性能。在实践中发现合理调节该参数性能会有1%~5%的提升。 spark.shuffle.io.maxRetries 默认值3参数说明shuffle read task从shuffle write task所在节点拉取属于自己的数据时如果因为网络异常导致拉取失败是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功就可能会导致作业执行失败。调优建议对于那些包含了特别耗时的shuffle操作的作业建议增加重试最大次数比如60次以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现对于针对超大数据量数十亿~上百亿的shuffle过程调节该参数可以大幅度提升稳定性。 spark.shuffle.io.retryWait 默认值5s参数说明具体解释同上该参数代表了每次重试拉取数据的等待间隔默认是5s。调优建议建议加大间隔时长比如60s以增加shuffle操作的稳定性。 spark.shuffle.memoryFraction 默认值0.2参数说明该参数代表了Executor内存中分配给shuffle read task进行聚合操作的内存比例默认是20%。调优建议在资源参数调优中讲解过这个参数。如果内存充足而且很少使用持久化操作建议调高这个比例给shuffle read的聚合操作更多内存以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现合理调节该参数可以将性能提升10%左右。 spark.shuffle.manager 默认值sort参数说明该参数用于设置ShuffleManager的类型。Spark 1.5以后有三个可选项hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似但是使用了tungsten计划中的堆外内存管理机制内存使用效率更高。调优建议由于SortShuffleManager默认会对数据进行排序因此如果你的业务逻辑中需要该排序机制的话则使用默认的SortShuffleManager就可以而如果你的业务逻辑不需要对数据进行排序那么建议参考后面的几个参数调优通过bypass机制或优化的HashShuffleManager来避免排序操作同时提供较好的磁盘读写性能。这里要注意的是tungsten-sort要慎用因为之前发现了一些相应的bug。 spark.shuffle.sort.bypassMergeThreshold 默认值200参数说明当ShuffleManager为SortShuffleManager时如果shuffle read task的数量小于这个阈值默认是200则shuffle write过程中不会进行排序操作而是直接按照未经优化的HashShuffleManager的方式去写数据但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件并会创建单独的索引文件。调优建议当你使用SortShuffleManager时如果的确不需要排序操作那么建议将这个参数调大一些大于shuffle read task的数量。那么此时就会自动启用bypass机制map-side就不会进行排序了减少了排序的性能开销。但是这种方式下依然会产生大量的磁盘文件因此shuffle write性能有待提高。 spark.shuffle.consolidateFiles 注意Spark 2.0已经看不到HashShuffleManager类了。 默认值false参数说明如果使用HashShuffleManager该参数有效。如果设置为true那么就会开启consolidate机制会大幅度合并shuffle write的输出文件对于shuffle read task数量特别多的情况下这种方法可以极大地减少磁盘IO开销提升性能。调优建议如果的确不需要SortShuffleManager的排序机制那么除了使用bypass机制还可以尝试将spark.shffle.manager参数手动指定为hash使用HashShuffleManager同时开启consolidate机制。在实践中尝试过发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。
http://www.w-s-a.com/news/454449/

相关文章:

  • 漳浦网站建设网络营销推广策略
  • 龙岗商城网站建设教程百度关键词排名突然没了
  • 深圳网站建设服务哪家有织梦网站模板安装
  • 网站设计与网页制作代码大全网站开发还找到工作吗
  • 给设计网站做图会字体侵权吗站长工具seo综合查询张家界新娘
  • 网站的建设与颜色搭配win7在iis中新建一个网站
  • 单位做网站有哪些功能型类的网站
  • 网站怎样做优惠卷移动互联网开发培训
  • 重庆网站建设帝维科技网站做定向的作用
  • 网站建设工作室wp主题模板做污事网站
  • 网站建设 深圳 凡科重庆家居网站制作公司
  • 自己也可以免费轻松创建一个网站企业收录网站有什么用
  • 帮别人做网站违法导航网站开发工具
  • seo网站外包公司字画价格网站建设方案
  • 网站国内空间价格销售技巧
  • 广安建设企业网站qq互联网站备案号
  • 京东网站建设的要求vs2010做的网站
  • wordpress 新闻杂志主题佛山企业网站排名优化
  • 选服务好的网站建设金华市开发区人才网
  • 广州建站商城南阳高质量建设大城市网站
  • 网站建设合同封面模板做代炼的网站
  • 外贸网站建站要多少钱南昌优化排名推广
  • 做公司网站的尺寸一般是多大企业管理网站
  • 苏州网站设计公司兴田德润i简介做签证宾馆订单用啥网站
  • 网站页面设计工具做网站租空间
  • 做智能网站系统百度提交入口
  • 网站建设代理商电话网站规划和建设方案
  • 双桥区网站制作seo 首页
  • 电子商务网站建设前期准备wordpress域名指向二级目录
  • 汕头建站网站模板淮北做网站电话