当前位置: 首页 > news >正文

网站 工信部备案 收回重庆明建网络科技有限公司

网站 工信部备案 收回,重庆明建网络科技有限公司,最基本的网站设计,宁波北京网站建设2.2、Spark SQL2.2.1、Execute EngineSparkSql的整体提交执行流程和Hive的执行流程基本上一致。站在通用的角度#xff0c;对于SparkSql来说#xff0c;从Sql到Spark的RDD执行需要经历两个大的阶段#xff1a;逻辑计划和物理计划逻辑计划层面会把用户提交的sql转换成树型结构…2.2、Spark SQL2.2.1、Execute EngineSparkSql的整体提交执行流程和Hive的执行流程基本上一致。站在通用的角度对于SparkSql来说从Sql到Spark的RDD执行需要经历两个大的阶段逻辑计划和物理计划逻辑计划层面会把用户提交的sql转换成树型结构把sql中的逻辑映射到逻辑算子树的不同节点该阶段并不会真正的进行提交执行只是作为中间阶段。在这个过程中会经历三个阶段1、未解析的逻辑算子树(Unresolved LogicalPlan)该阶段只是通过Antlr Parser把sql进行词法分析语法验证得到数据结构并不包含任何数据信息。2、解析后的逻辑算子数(Analyzed LogicalPlan),这个阶段会结合Catalog元数据信息对第一阶段得到的节点进行绑定3、优化逻辑算子树Optimized LogicalPlan,该阶段结合节点数据信息应用一些优化规则对一些低效的逻辑计划进行转换。物理计划层面会把上一步优化后的逻辑算子树进行进一步的转换生成物理算子树物理算子树上的节点会直接生成RDD或者对RDD进行transformation操作并最终执行。那么对物理计划进行细分的话又可以分为三个子阶段1、物理算子树列表(Iterable[PhysicalPlan])根据优化后得到的逻辑算子树进行转换生成物理算子树的列表。2、最优物理算子树(SparkPlan)从物理算子树列表中按照一定的策略选取最优的物理算子树。3、准备算子树(Prepared SparkPlan)得到最优的算子树之后那么就开始准备一些执行工作如执行代码生成、确保分区操作正确、物理算子树节点重用等工作。最后会对生成的RDD执行Action操作进行真正的作业执行。以上所有的流程均是在Spark的Driver端完成的这个时候还不涉及到集群环境。上述的所有流程可以通过SparkSession类的sql方法作为入口调用SessionState各种对象(SparkSqlParser、Analyzer、Optimizer、SparkPlanner)最后封装一个QueryExecution对象。所以上面的每一步流程都有单独独立的类功能实现对于我们日常开发工作中进一步剥离分析进行二次加工提供了很大的。Spark SQL在执行SQL之前会将SQL或者Dataset程序解析成逻辑计划然后经历一系列的优化最后确定一个可执行的物理计划。最终选择的物理计划的不同对性能有很大的影响。如何选择最佳的执行计划这便是Spark SQL的Catalyst优化器的核心工作。Catalyst早期主要是基于规则的优化器RBO在Spark 2.2中又加入了基于代价的优化CBO。2.2.1.1. RBO根据上面的执行流程SparkSql在逻辑优化层面主要是基于规则的优化即RBO(Rule-Based-Optimization)1、每个优化都是以Rule的形式存在每条Rule都是对Analyzed Plan的等价转换2、RBO易于扩展新增规则可以非常方便嵌入到Optimizer中3、RBO优化的主要思路在于减少参与计算的数据量以及计算本身的代价。如常见的谓词下推、常量合并、列裁剪等优化手段2.2.1.2、CBORBO层面的优化主要是针对逻辑计划未考虑到数据本身的特点(数据分布、大小)以及算子执行(中间结果集分布、大小)的代价因此sparksql又引入了CBO优化机制(Cost-Based Optimized)该优化主要在物理计划层面其原理是计算所有可能的物理计划的代价并挑选出代价最小的物理计划其核心在于评估一个给定的物理执行计划的代价其代价等于每个执行节点的代价总和。而每个执行节点的代价又分为两个部分1、该执行节点对数据集的影响或者说该节点输出数据集的大小和分布。2、该执行节点操作算子的代价。操作算子的代价相对比较固定可以用规则来描述。而执行节点输出数据集主要分为两部分1、初始数据集例如原始文件其数据集的大小和分布可以直接统计得到的。2、中间节点输出数据集的大小和分布可以根据输入数据集的信息和操作本身的特点来推算。因此CBO优化最主要需要先解决两个问题1、怎么样子可以获取到原始数据集的统计信息2、如何根据输入数据集估算特定算子的输出数据特征情况2.2.1.2.1、如何统计到原始数据集的信息可以通过Analyze table来分析统计出原始数据集的大小略2.2.1.2.2、算子代价估计SQL中最常见的就是Join操作这里以Join方法为例说明SparkSql的CBO是如何进行估价的。主要是通过以下公式Cost rows * weight size * (1-weight) ;其中rows为行数代表CPU代价Size为大小代表IO代价。Cost CostCpu * weight CostIO * (1-weight)Weight权重的配置可以通过spark.sql.cbo.joinRecorder.card.weight决定默认为0.72.2.1.3、AE2.2.1.3.1、背景在生产环境中往往需要提前配置好分区数以及使用资源然后在运行的过程中或者事后进行不断的调整参数值来达到最优。但是由于每次计算的数据量可能会变化很大那么可能需要每次都会人工干涉进行调优这也意味sql作业很难以最优的性能去运行。而且Catalyst优化器的一些优化工作是在计划阶段一旦优化完成之后在运行期间就不能改变。因此需要在运行期间拿到更多的运行信息不断调整执行计划来达到最优因此在Spark2.3之后引入了一个Adaptive(自适应)执行机制需要通过spark.sql.adaptive.enabled参数来开启其机制2.2.1.3.2、执行原理根据Spark作业执行流程可知是先根据RDD的DAG图进行划分生成Stage然后提交作业执行因此在执行过程中计划是不会发生变化的。那么自适应执行的基本思路是在执行计划中事先划分好stage然后按stage提交执行在运行时收集当前stage的shuffle统计信息以此来优化下一个stage的执行计划然后再提交执行后续的stage。对于图中两表join的执行计划来说会创建3个QueryStage。最后一个QueryStage中的执行计划是join本身它有2个QueryStageInput代表它的输入分别指向2个孩子的QueryStage。在执行QueryStage时我们首先提交它的孩子stage并且收集这些stage运行时的信息。当这些孩子stage运行完毕后我们可以知道它们的大小等信息以此来判断QueryStage中的计划是否可以优化更新。例如当我们获知某一张表的大小是5M它小于broadcast的阈值时我们可以将SortMergeJoin转化成BroadcastHashJoin来优化当前的执行计划。我们也可以根据孩子stage产生的shuffle数据量来动态地调整该stage的reducer个数。在完成一系列的优化处理后最终我们为该QueryStage生成RDD的DAG图并且提交给DAG Scheduler来执行2.2.1.3.3、实现点该机制主要有三个功能点1、自动设置shuffle分区数主要解决的问题有以下几点1.1、如果设置分区数过小可能会导致每个task处理大量的数据会发生溢写磁盘的情况影响性能甚至发生频繁GC或者OOM。1.2、如果设置分区数过大可能会导致每个task处理小量的数据而且会有可能产生小文件甚至会出现资源空闲的情况。1.3、设置分区数是对所有的Stage都会生效而每个Stage所处理的数据量和分布都不太一样所以全局的分区数只能对某些Stage是最优的无法做到全局最优。例如我们设置的shufflepartition个数为5在map stage结束之后我们知道每一个partition的大小分别是70MB30MB20MB10MB和50MB。假设我们设置每一个reducer处理的目标数据量是64MB那么在运行时我们可以实际使用3个reducer。第一个reducer处理partition 0 (70MB)第二个reducer处理连续的partition 1 到3共60MB第三个reducer处理partition 4 (50MB)2、动态调整执行计算以join操作为例在Spark中最常见的策略是BroadcastHashJoin和SortMergeJoin。BroadcastHashJoin属于map side join其原理是当其中一张表存储空间大小小于broadcast阈值时Spark选择将这张小表广播到每一个Executor上然后在map阶段每一个mapper读取大表的一个分片并且和整张小表进行join整个过程中避免了把大表的数据在集群中进行shuffle。而SortMergeJoin在map阶段2张数据表都按相同的分区方式进行shuffle写reduce阶段每个reducer将两张表属于对应partition的数据拉取到同一个任务中做join。CBO根据数据的大小尽可能把join操作优化成BroadcastHashJoin。Spark中使用参数spark.sql.autoBroadcastJoinThreshold来控制选择BroadcastHashJoin的阈值默认是10MB。然而对于复杂的SQL查询它可能使用中间结果来作为join的输入在计划阶段Spark并不能精确地知道join中两表的大小或者会错误地估计它们的大小以致于错失了使用BroadcastHashJoin策略来优化join执行的机会。但是在运行时通过从shuffle写得到的信息我们可以动态地选用BroadcastHashJoin。3、动态处理数据倾斜在SQL作业中数据倾斜是很常见的问题但都是事后人为通过一些手段进行解决的那么能不能在运行时自动处理掉呢假设A表和B表做inner join并且A表中第0个partition是一个倾斜的partition。一般情况下A表和B表中partition 0的数据都会shuffle到同一个reducer中进行处理由于这个reducer需要通过网络拉取大量的数据并且进行处理它会成为一个最慢的任务拖慢整体的性能。在自适应执行框架下一旦我们发现A表的partition 0发生倾斜我们随后使用N个任务去处理该partition每个任务只读取若干个mapper的shuffle 输出文件然后读取B表partition 0的数据做join。最后我们将N个任务join的结果通过Union操作合并起来。为了实现这样的处理我们对shuffle read的接口也做了改变允许它只读取部分mapper中某一个partition的数据。在这样的处理中B表的partition 0会被读取N次虽然这增加了一定的额外代价但是通过N个任务处理倾斜数据带来的收益仍然大于这样的代价。如果B表中partition 0也发生倾斜对于inner join来说我们也可以将B表的partition 0分成若干块分别与A表的partition 0进行join最终union起来。但对于其它的join类型例如Left Semi Join我们暂时不支持将B表的partition 0拆分。4、Left join build left side map对于left join的情况可以对左表进行HashMapBuild。可以实现小左表left join 大右表的情况下进行ShuffledHashJoin调整。原理1、在构建左表Map的时候额外维持一个“是否匹配成功”的映射表。2、在和右表join结束之后把所有没有匹配到的key用null来join填充。
http://www.w-s-a.com/news/582013/

相关文章:

  • 个旧市哪里有做网站wordpress内页php页面
  • 程序员接活的平台网站互联网平台建设方案
  • 网站安全建设模板深圳企业管理咨询公司
  • 做网站 还是淘宝店wordpress分类链接后加
  • wordpress腾讯云 COSseo内容优化心得
  • 特价旅游机票网站建设i营销
  • 如何成立网站深圳创业项目
  • 建设商业网站惠州网站建设推荐乐云seo
  • 如何申请免费域名做网站免费推广神器
  • 自媒体人专用网站安岳网站建设
  • 特乐网站建设做网站推广要多少钱
  • 山东省建设安全生产协会网站义乌跨境电商公司前十名
  • 做网站优化就是发文章吗起飞页自助建站平台的特点
  • 做网站还是做app好慈溪机械加工网
  • 上传下载文件网站开发的php源码腾讯企点
  • 给分管领导网站建设情况汇报怎么写网络运营的岗位职责及任职要求
  • 电线电缆技术支持中山网站建设广告设计培训学校有哪些
  • 如何禁止通过ip访问网站wordpress无法调用主题布局和图片
  • 江西建设工程信息网站重庆网站推广大全
  • 南浔区住房城乡建设局网站网页设计基础学什么
  • 萧山做网站的企业网站建设 西安
  • 江西省城乡建设厅网站百度站长资源平台
  • 本地搭建linux服务器做网站免费查企业信息查询
  • 电商网站建设与运营网上购物哪个网站最好
  • 做app做网站从何学起网站设计需要什么证
  • 设计网站最重要的是要有良好的短网址还原
  • 大连建设银行招聘网站做seo是要先有网站吗
  • 中山做网站的wordpress建站教程百科
  • 湛江专业网站制作做网站需要工具
  • 做音箱木工网站吉林平安建设网站