阳江网站关键字优化,海外服务器ip免费,模板做的网站如何下载,农家乐怎么做网站115、Spark的任务执行流程
driver和executor#xff0c;结构式一主多从模式#xff0c; driver#xff1a;spark的驱动节点#xff0c;用于执行spark任务中的main方法#xff0c;负责实际代码的执行工作#xff1b;主要负责#xff1a;将代码逻辑转换为任务、在executo…115、Spark的任务执行流程
driver和executor结构式一主多从模式 driverspark的驱动节点用于执行spark任务中的main方法负责实际代码的执行工作主要负责将代码逻辑转换为任务、在executor之间调度任务、跟踪executor的执行情况。
Executorspark的执行节点是jvm的一个进程负责在spark作业中运行具体的任务任务之间相互独立spark应用启动时候executor节点被同时启动伴随整个spark应用的生命周期而存在
主要功能负责运行组成spark应用的任务并将结果返回给驱动进程、 在任务提交之后都先启动driver然后driver向集群管理中注册应用程序根据任务的配置文件分配executor并启动然后driver等待资源满足执行main函数spark为懒加载当执行到action算子时候才开始真正执行开始反向推算根据宽依赖进行stage的划分随后每一个stage对应一个个taskset一个taskset中有多个tasktask会被指定到executor中执行
116、Spark提交job的流程
117、Spark的阶段划分
spark的阶段划分分为两个阶段转换阶段和动作阶段分别对应转换算子和行动算子 每遇到一个宽依赖就划分一个stage 在一个stage内部会有很多task被执行同一个stage中所有的task结束之后才能根据DAG依赖执行下一个stage中的task
阶段划分stage的依据就是RDD之间的宽窄依赖遇到宽依赖就划分stage每个stage包含一个或者多个task任务stage是由一组并行的task组成切割规则遇到宽依赖就切割stage遇到一个shuffle就转为一个新的阶段
阶段的划分等于shuffle依赖的数量1 根据行动算子划分job、根据shuffle划分stage、根据RDD的分区数划分TaskJobstagetask一个job中可以有多个stage一个stage中可以多个task
118、Sparkjoin的分类
根据join操作方式进行分类分为shuffle join和broadcastjoin Shuffle joinspark将参与join操作的数据集按照join的条件进行分区并将具有相同键的数据分发到同一个节点上进行join操作 Broadcast joinspark将一个较小的数据集复制到每一个节点的内存中然后将参与join操作的大数据集分发到各个节点上进行join操作通常进行大表join小表
119、spark mapjoin的实现原理
Map join在内存中将两个数据集进行连接从而避免磁盘io的开销 1、数据划分spark将两个数据集划分为多个分区每个分区的数据流尽可能均匀 2、数据广播spark将其中一个较小的数据集广播到每一个节点的内存中 3、分区处理每个节点接收到广播的数据后将其本地的另一个数据集进行联接操作 4、结果汇总每个节点将自己的结果发送到驱动节点由驱动节点进行最终的节点汇总map join适用于两个数据集至少有一个可以完全放入内存中
120、spark shuffle以及优点
可以用于在数据分区过程中重新分配和重组数据在spark执行对数据进行重分区或者聚合操作时候将数据重新发送到不同的节点上进行下一步的计算 优点 数据本地性shuffle可以在节点之间移动数据以便在计算过程中最大限度地利用数据本地性减少数据传输过程中的开销 分布式计算shuffle运行spark在多个节点之间执行计算的时候从而实现了分布式计算的能力 补充Spark的shuffle怎么了解能讲讲Spark的shuffle的过程吗
121、什么时候会产生shuffle
数据重分区需要将数据重新分区进行后续的数据处理操作时候 聚合操作当需要对数据进行聚合操作时候会使用到shuffle操作 排序操作需要对数据进行排序的时候使用shuffle
122、spark为什么适合迭代处理
spark是基于内存计算的存储的数据在内存中而不是在磁盘上从而提高了数据处理的速度 可以保留中间结果RDD可以内存中保留中间结果对于迭代处理来说每次迭代都是可以重用中间结果而不是重新计算基于DAG执行引擎
123、Spark为什么快?
1、spark是基于内存计算MR是基于磁盘计算 2、spark中具有DAG有向无环图在此过程中减少了shuffle以及落地磁盘的次数 3、spark是粗粒度的资源申请也就是当提交了spark application时候application会将所有资源申请完毕task在执行的时候就不需要申请资源task执行快当最后一个task执行完之后才会被释放MR是细粒度的资源申请task需要自己申请资源并释放故application执行比较缓慢
124、Spark数据倾斜问题如何定位解决方案
spark中数据倾斜主要是指shuffle过程中出现的数据倾斜问题不同的key对应的数据量不同导致不同的task处理的数据量不同的问题 数据倾斜是指少数的task被分配了极大量的数据少数task运行缓慢 解决方案 增加分区如果数据分布不均匀可以增加分区数使得数据能够更加均匀地分配到不同的分区中 重新分桶/哈希对于键值对冲突的情况尝试重新分桶或者通过哈希函数重新计算键值使得数据分布均匀 增加缓存对于某些数据可以将其缓存到内存中减少重复计算 随机前缀/后缀对于简直冲突的情况增加键的前缀或者后缀降低冲突 倾斜数据单独处理
补充美团梳理spark解决数据倾斜的问题
数据倾斜原理简单在进行shuffle的时候必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理如果某一个key对应的数据量特别大的话就会发生数据倾斜大部分key对应10条数据个别key对应的100W条数据导致运行task结束事件不同因此整个saprk作业的运行进度是由运行时间最长的那个task决定的。 了解了spark的stage划分原理有助于快速定位数据倾斜发生的位置查看导致数据倾斜的key的数据分布情况 1、如果是spark SQL中的group by、join语句导致的数据倾斜那么就查询一下SQL中使用的表的key的分布情况。 2、如果是对SparkRDD执行的shuffle算子导致的数据倾斜那么可以在Spark作业中加入查看key分布的代码比如RDD.countByKey()。然后对统计出来的各个key出现的次数collect/take到客户端打印一下就可以看到key的分布情况。 解决方案 1、使用hive ETL预处理数据评估是否可以通过hive进行数据预处理从根源上解决了数据倾斜问题彻底避免了spark中执行的shuffle类算子我们只是把数据倾斜提升到了hive中避免了spark中发生数据倾斜。 2、过滤少数导致倾斜的key如果发现倾斜的key就少数几个并且对计算本身的影响不大就可以直接过滤少数几个key如果每次执行作业时候动态判定哪些key的数据量最多然后再进行过滤可以使用sample算子对RDD进行采样计算每个key的数量取数据量最多的key过滤即可。 3、提高shuffle操作的并行度这是最简单的一种方案对RDD执行shuffle算子时候给shuffle传入一个参数该参数就设置了shuffle算子执行时候的reduce task数量在实际时候治标不治本无法彻底解决数据倾斜的问题。 4、局部聚合和全局聚合第一次是局部聚合先给每个key都打上一个随机数然后对打上随机数后的数据执行reducebykey的操作进行局部聚合然后将各个key的前缀去掉再进行全局聚合操作对于聚合类的shuffle操作导致的数据倾斜效果很好可以大幅度甚至解决数据倾斜问题。 5、将reduce join转换为map join不使用join算子进行连接操作而使用Broadcast变量与map类算子实现join操作进而完全规避掉shuffle类的操作彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来然后对其创建一个Broadcast变量接着对另外一个RDD执行map类算子在算子函数内从Broadcast变量中获取较小RDD的全量数据与当前RDD的每一条数据按照连接key进行比对如果连接key相同的话那么就将两个RDD的数据用你需要的方式连接起来普通的join是会走shuffle过程的而一旦shuffle就相当于会将相同key的数据拉取到一个shuffle read task中再进行join此时就是reduce join。但是如果一个RDD是比较小的则可以采用广播小RDD全量数据map算子来实现与join同样的效果也就是map join此时就不会发生shuffle操作也就不会发生数据倾斜。 6、采样倾斜key并分拆join操作 7、随机前缀和扩容RDD进行join 125、spark中的宽窄依赖
两个相邻的RDD之间的依赖关系宽、窄依赖是根据上下游RDD的分区而言的 宽依赖上游一个RDD的partition可以被下游RDD的多个partition依赖 窄依赖上游一个RDD的partition可以被下游RDD的多个partition依赖 RDD不会保存数据只会保存血缘关系。提高容错性将RDD之间的关系恢复重新进行读取
126、spark join在什么情况下会变成窄依赖
当两个RDD进行join时候分区方式以及分区数目相同并且每个分区中的数据量也相当这样就将每个分区数据进行一对一匹配形成窄依赖 当进行shuffle操作的key值较少时候通过增大分区来减少每个分区中数据量使得每个分区的数据量相对较少
127、spark的内存模型
spark是基于分布式内存计算的由dirver和executor spark内存分为堆内内存和堆外内存堆内内存基于JVM内存模型堆外内存则通过调用底层JDK unsafeAPI 1、堆内内存 其大小由spark应用程序启动时候的-executor-momery参数配置executor运行的并发任务共享JVM堆内内存该任务在缓存RDD数据和广播数据时占用的内存被规划为存储内存Storage这些任务在执行shuffle时占用的内存被规划为执行内存Execution 2、堆外内存 可以直接在工作节点的系统内存中开辟空间spark可以直接操作堆外内存减少了不必要的内存开销和频繁的垃圾扫描默认情况下不开启
128、为什么要划分宽窄依赖
目的在于执行计算中进行优化。spark通过识别窄依赖来执行一些优化在同一个节点上对多个窄依赖的转化操作进行合并从而减少网络传输的开销。对于宽依赖spakr会根据分区的数量和大小来据欸的那个是否进行数据重分区。
129、spark中的转换算子和行动算子的区分
转换算子得到的是一个新的RDD但不会立即执行计算只是记录下当前的操作 行动算子是指触发RDD进行计算的操作所以spark中作业的划分是根据行动算子来确定的
130、Spark的哪些算子会有shuffle过程?
groupByKey将具有相同键的键值对分组到一起必须进行shuffle以重新分配数据到不同的分区。 reduceByKey对具有相同键的键值对进行聚合操作需要将具有相同键的数据重新分配到不同的分区。 sortByKey按照键对数据进行排序需要将数据重新分区以进行排序。 join将两个具有相同键的数据集进行连接操作需要将具有相同键的数据重新分配到不同的分区。 distinct去除数据集中的重复元素需要对元素进行重新分区以进行重复元素的合并。 cogroup将具有相同键的数据集进行分组需要将具有相同键的数据重新分配到不同的分区。
131、Spark有了RDD为什么还要有Dataform和DataSet
引入DF和DS是为了实现更高级的数据处理和优化 RDD是强类型的它在编译时候无法检查数据类型的准确性如果在运行过程中类型不匹配只能在运行时抛出。DF和DS是基于RDD的抽象提供了更加攻击的类型安全性允许编译器在编译时候检查数据类型的准确性 RDD是基于函数式编程的需要手动编写复杂的转换和操作逻辑。DF和DS提供了基于SQL的高级抽象可以使用sql语句进行数据操作
132、Spark的RDD、DataFrame、DataSet、DataStream区别
RDD式弹性分布式数据集是基于分区进行操作通过转换算子和行动算子来进行数据处理 DF是一种以结构化数据为中心的数据抽象概念DF是一个分布式数据具有类似关系型数据库表的结构 DS式DF的扩展提供类型安全和更高级的API强类型的数据集合
补充spark中dataframe表格的类型
createGlobalTempView全局临时视图spark中sql的临时视图是session级别的会随着session的消失而消失如果希望一个临时视图跨session而存在可以建立一个全局临时视图全局临时视图存在于系统数据库global_temp中必须加上库名引用它 createOrReplaceGlobalTempView创建一个可替换的全局视图 createTempView临时视图 createOrReplaceTempView创建一个临时视图如果该视图已经存在则替换它session级别的
补充RDD的弹性体现在哪些方面 自动进行内存和磁盘切换 基于lineage血缘关系的高效容错出错时候可以进行恢复task如果失败会特定次数的重试 stage如果失败会自动进行特定次数的重试而且只会计算失败的分片 checkpoint每次对RDD操作都会产生新的RDD如果链条比较长就算笨重就把数据放在磁盘中 persist内存或磁盘中对数据进行复用
133、Spark的Spark Application、Job、Stage、Task分别介绍下如何划分
application应用一个独立的spark作业是由一系列的tasks组成的一个application通常包含多个任务每个作业由一个或者多个RDD转换和操作组成。提交一个任务就是一个application Job作业job是一组相互依赖的RDD转化和动作操作的有向无环图一个job代表了一个完整的作业执行流程一个action算子就会生成一个job Stage阶段stage是job的划分一个job可以由多个stage组成stage是根据RDD之间的宽窄依赖划分的一个stage中的所有任务都可以并行执行不同的stage之间的任务需要等待前一个stage的任务完成 Task任务task是最小的作业单元每个stage包含多个任务每个任务负责处理一个RDD分区的数据一个stage中最后一个RDD的分区个数就是task的个数 Job代表一个完整的作业执行过程Stage是Job的划分根据RDD之间的宽依赖关系划分Task是Stage的执行单元负责对RDD进行实际的操作和计算 注意Application-Job-Stage-Task每一层都是1对n的关系。
134、Stage的内部逻辑
stage是由一个具有相同宽依赖关系的RDD组成的一个stage可以看作一个逻辑的划分 内部逻辑 1、DAG生成在stage内部spark会根据RDD之间的依赖关系生成一个有向无环图 2、任务划分会将每个stage划分为多个task每个task对应的RDD的分区 3、任务调度spark会将task调度到集群中的执行器上执行 4、任务执行 5、数据传输
135、spark为什么要划分stage
划分satge的目的是为了优化任务的执行过程提高计算性能和效率
136、stage的数量等于什么
stage的数量等于宽依赖的个数1
137、Spark容错机制?
138、RDD的容错机制
RDD的容错性是指其发射发生故障能够自动恢复并且不会丢失任何数据 容错实现方式 1、数据复制RDD将数据划分为多个分区并将每个分区的数据复制到集群的多个节点上
139、Spark广播变量的实现和原理?
广播变量是一种分布式共享变量允许开发者在每个节点上缓存一个只读变量而不是将其复制到每个任务中用于在每个节点上缓存一个较大的数据集方便在执行任务期间共享数据 在多个并行操作中使用同一个变量但是 Spark 会为每个任务分别发送。
140、转换算子
1、map 将处理的数据逐条进行映射转换可以是类型的转换也可以是值的转换。不会减少或者增多数据 2、mapPartitions 将待处理的数据以分区为单位发送到计算节点上进行处理可以减少或者增多数据 Map 算子是分区内一个数据一个数据的执行类似于串行操作。而 mapPartitions 算子 是以分区为单位进行批处理操作。 比如将RDD中的所有数据通过JDBC连接写入数据库如果使用map函数可能要为每一个元素都创建一个connection这样开销很大如果使用mapPartitions那么只需要针对每一个分区建立一个connection 3、mapPartitionsWithIndex 将待处理的数据以分区为单位发送到计算节点进行处理并且可以获取当前分区索引 4、flatmap 将待处理的数据进行扁平化后再映射 5、glom 将RDD中的分区数据直接转换为相同类型的RDD分区不变 将每一个分区形成一个数组形成新的RDD类型时RDD[Array[T]] 6、Groupby 根据指定的规则进行分组分区默认不变数据会被打乱重新组合一个组的数据在一个分区中涉及shuffle 7、filter 根据指定规则进行筛选过滤符合规则的数据保留不符合的数据丢弃 8、sample
从数据集中抽取数据采样从大规模数据中抽取数据第一个参数抽取数据后是否将数据放回第二个参数数据源中每条数据被抽取的概率如果抽取不放回表示数据源中每条数据被抽取的概率如果抽取放回的场合表示数据源中每条数据被抽取的可能次数第三个参数表示随机算法的种子 9、distinct 数据去重。 10、coalesce 缩减分区用于大数据集过滤以后提高小数据集的执行效率 11、repartition 扩大分区内部还是执行的coalesce算子只是默认执行shuffle操作没有shuffle操作的话就没有意义 12、sortBy算子 按照一定的规则进行排序 13、交集并集差集拉链 intersection、union、subtract、zip都是针对两个value类型的 在交集、并集、补集、差集中数据类型必须一致 在zip中数据类型可以不一致但是数据的个数一定要一样 14、partitionBy 15、reduceByKey a,1a,1b,1b,1b,1b,1 按照指定的键对value做聚合a,2,(b,3) 支持分区内预聚合可以有效减少shuffle时落盘的数据量 分区内和分区间计算规则是相同的 16、groupByKey 按照指定的键将value聚合成一个迭代器 (a,(1,2,3)) 类比groupBy(a,((a,1), (a,2), (a,3))) reduceByKey对比groupByKey 都存在shuffle的操作但是reduceByKey可以在shuffle前对分区内的数据进行预聚合这样会减少落盘的数据量。 groupByKey只是进行分组不存在数据量减少的问题 reduceByKey性能高 reduceByKey包含分组和聚合的功能GroupByKey只能分组不能聚合 17、aggregateByKey 分区内计算和分区间的计算规则可以不同自己定义 第一个参数表示计算的初始值 第二个参数列表 分区内的计算规则 分区间的计算规则 18、foldByKey 如果分区内和分区间的计算规则相同了那么就是用foldByKey算子 19、combinByKey 是一个通用的聚合操作 reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别 行动算子 count、countbykey、countbyvalue
141、reduceByKey和groupByKey的区别和作用?
reducebykey将具有相同的键的值进行聚合并返回一个新的键值RDD groupbykey将具有相同的键的所有值分组并返回一个新的键值对RDD 从shuffle的角度两者都存在shuffle操作但是reducebykey可以在shuffle前对分区内相同的key的数据进行预聚合减少落盘的数量groupbykey只是进行分组不存在数据量减少的问题前者性能较高 功能角度reducebykey只包含了分组和聚合的功能后者只能分组
142、reducebykey和reduce的区别
两者都是进行聚合操作的方法 reducebykey是转换算子将RDD中具有相同键的元素进行聚合返回一个新的RDD并将结果作为新的键 reduce是一个行动算子它将RDD中所有元素进行聚合并返回一个单个的结果 reduceByKey适用于对键值对RDD进行聚合操作返回一个新的键值对RDD而reduce操作适用于对整个RDD进行聚合返回一个单一结果。 reduceByKey可以在分区上并行地进行聚合操作而reduce操作是在整个RDD上进行的。 reduceByKey需要指定一个聚合函数来合并具有相同键的元素而reduce操作只需要指定一个聚合函数即可。
143、使用reduceByKey出现数据倾斜怎么办?
144、Spark SQL的执行原理?
和RDD不同sparksql的DS和sql并不是直接生成计划交给集群执行而是经过了一个叫Catalyst的优化器帮助开发者优化代码 回答 1、首先SparkSQL底层解析成RDD通过两个阶段RBO和CBO 2、RBO就是通过逻辑执行计划通过常见的优化达到逻辑执行计划 3、CBO就是从优化后的逻辑计划到物理执行计划
145、Spark SQL的优化?
1、catalyst优化器自动推断查询计划的最优执行方式 2、列式存储采用列式存储的方式来存储和处理数据 3、数据划分和分区可以将大规模的数据集划分成多个小块进行处理 4、数据裁剪和推测执行数据裁剪可以根据查询条件不想不关的数据过滤掉减少数据的传输和处理量 5、并行执行和动态分配资源
146、Spark RDD持久化
checkpoint 用于将spark应用程序的中间数据保存到持久存储中以便在发生故障或者重启时候恢复应用程序的状态当用户启动checkpoint后spark会将DAG的中间数据保存到可靠的存储系统中即使发生故障也可以从checkpoint中恢复数据执行方法: sparkcontext.setCheckpointDir( ) Cache缓存 RDD通过cache或者persist方法将前面的计算结果缓存默认情况下会把数据以缓存在JVM的堆内存中 缓存和检查点区别 cache只是将数据保存起来不切断血缘依赖checkpoint检查点切断血缘依赖 cache缓存的数据通常在磁盘内存地方checkpoint数据通常存储在hdfs
145、DF和DS的创建
补充RDD、DF、DS三者之间的转换 1、DF和DS——RDD只需要调用.rdd()就能实现 2、RDD——DS将RDD的每一行封装成样例类再调用toDS() 3、RDD——DF调用.toDF() 3、DF——DSDF就是DS的特例是可以相互转换使用.as() 4、DS——DF使用.toDF()
146、HashPartitioner和RangePartitioner的实现
两者都是spark的分区函数继承于partitioner HashPartitioner哈希分区对于给定的key计算其hashCode并除于分区的个数取余会后返回的余数就是这个key所属的分区ID RangePartitioner将一定范围内的数据映射到一个分区中尽量保证每个分区的数据均匀而且分区间有序也就是说一个分区中的元素肯定都是比另一个分区内的元素小或者大但是分区内的元素是不能保证顺序的
147、为什么spark比MR快
1、内存计算 spark将数据存储在内存中进行计算和处理而hadoop则将数据存储在磁盘上速度更慢 2、DAG执行引擎 spark使用DAG执行引擎通过将任务划分为多个阶段进行优化可以有效地减少任务之间的数据传输和磁盘读写 3、运行模式 spark支持多种运行模式本地、yarn。独立模式更根据需求进行选择 4、缓存机制 spark具有强大的缓存机制可以将结果存储在内存中避免了重复计算和磁盘读写操作 5、数据流水线 spark可以将多个数据处理操作连接成一个数据流水线减少了中间数据的存储的传输 6、资源调度 spark是粗粒度的资源申请也就是当提交了spark application时候application会将所有资源申请完毕task在执行的时候就不需要申请资源task执行快当会后一个task执行完之后才会被释放MR是细粒度的资源申请task需要自己申请资源并释放故application执行比较缓慢。
补充Hive On Spark 和 Spark SQL的区别
Hive on spark 将spark作为hive的计算引擎通过将hive的查询作为spark任务提交到spark集群上进行计算 继承了hive的数据仓库功能包括元数据管理数据存储查询优化等还支持UDF函数和存储过程 sparkSQL是spark项目的一部分用于处理结构化的数据基于Dataframe sparkSQL可以更直接利用spark引擎的优化器和执行引擎可以更紧密地集成到spark生态系统中更好利用集群资源优化器和执行引擎、数据格式和存储、内存计算、执行计划