美团网站建设,重庆自助建站模板,建设网站的编程过程,网站设计的国际专业流程是什么Spark 配置项硬件资源类CPU内存堆外内User Memory/Spark 可用内存Execution/Storage Memory磁盘ShuffleSpark SQLJoin 策略调整自动分区合并自动倾斜处理配置项分为 3 类:
硬件资源类 : 与 CPU、内存、磁盘有关的配置项Shuffle 类 : Shuffle 计算过程的配置项Spark SQL : Spar…
Spark 配置项硬件资源类CPU内存堆外内User Memory/Spark 可用内存Execution/Storage Memory磁盘ShuffleSpark SQLJoin 策略调整自动分区合并自动倾斜处理配置项分为 3 类:
硬件资源类 : 与 CPU、内存、磁盘有关的配置项Shuffle 类 : Shuffle 计算过程的配置项Spark SQL : Spark SQL 优化配置项
读取配置项顺序 SparkConf 对象 - 命令行参数 - 配置文件
硬件资源类
资源类别配置项含义CPUspark.cores.max集群满 CPU 核spark.executor.cores每个 Executors 可用的 CPU Coresspark.default.parallelism默认并行度spark.sql.shuffle.partitionsReduce 的默认并行度spark.task.cpus每个任务可用的 CPU 核spark.executor.instances集群内 Executors 的个数内存spark.executor.memory单个 Executor 的堆内内存总大小spark.memory.offHeap.enabled是否启动堆外内存spark.memory.offHeap.size单个 Executorp 的堆外内存总大小spark.memory.fraction除 User Memory 外的内存空间占比spark.memory.storageFraction缓存RDD的内存占比执行内存占比 1 - spark.memory.storageFractionspark.rdd.compressRDD缓存是否压缩默认不压缩磁盘spark.local.dir存储 Shuffle 中间文件/RDD Cache 的磁盘目录
CPU
配置项
spark.cores.max集群满 CPU 核spark.executor.cores每个 Executors 可用的 CPU 核spark.task.cpus每个任务可用的 CPU 核spark.executor.instances集群内 Executors 的个数
并行度 : 定义分布式数据集划分的份数/粒度决定了分布式任务的计算负载。并行度越高数据的粒度越细数据分片越多数据越分散
并行度的配置项 :
spark.default.parallelism默认并行度spark.sql.shuffle.partitionsReduce 的默认并行度
并行计算任务在任一时刻整个集群能够同时计算的任务数量
整个集群的并行计算任务数 spark.executor.instances * spark.executor.cores
达到 CPU、内存、数据之间的平衡的约定 :
spark.executor.cores 指定 CPU Cores 记为 cExecution Memory 内存大小 记为 m分布式数据集的大小记为 D 并行度记为 PD/P 每个数据分片大小一个数据分片对应着一个 Task分布式任务而一个 Task 又对应着一个 CPU Core
公式量化
# D/P 数据分片大小m/c 每个 Task 分到的可用内存
D/P ~ m/c内存
内存配置项 :
spark.executor.memory单个 Executor 的堆内内存总大小spark.memory.offHeap.size单个 Executorp 的堆外内存总大小(spark.memory.offHeap.enabledtrue)spark.memory.fraction堆内内存中用于缓存RDD和执行计算的内存比例spark.memory.storageFraction缓存RDD的内存占比执行内存占比 1 - spark.memory.storageFractionspark.rdd.compressRDD缓存是否压缩默认不压缩Reserved Memory 大小固定为 300MBM 指定了 Executor 进程的 JVM Heap 大小 ( Executor Memory )Execution Memory 的组成 Execution Memory、Storage Memory 、UserMemoryUser Memory : 存储用户自定义的数据结构如 : RDD 的各类实例化对象或集合类型如: 数组、列表等Spark 1.6 后推出了动态内存管理模式Execution Memory/Storage Memory 能互相抢占
堆外内
堆外存储
int 的用户 ID、String 的姓名、int的年龄、Char 的性别 处理数据集
数据模式比较扁平而且字段多是定长数据类型就更多使用堆外内存数据模式很复杂嵌套结构/变长字段很多就更多使用 JVM 堆内内存
User Memory/Spark 可用内存
User Memory 存储开发者自定义的数据结构这些数据结构需要协助分布式数据集的处理
spark.memory.fraction : 明确 Spark 可支配内存占比即 User Memory 堆内占比 1 - spark.memory.fraction
spark.memory.fraction 系数越大Spark 可支配的内存越多User Memory 占比越小spark.memory.fraction 默认值是 0.6JVM 堆内的 60% 给 Spark支配40% 给 User Memory
调整内存相对占比
自定义数据结构多spark.memory.fraction 调低用于分布式计算和缓存分布式数据集自定义数据结构少spark.memory.fraction 调高用于分布式计算和缓存分布式数据集
Execution/Storage Memory
sf 的设置情况
ETL RDD Cache 使用少。就能将 sf 设低点让 Execution Memory 大点缓存密集型 机器学习RDD Cache 使用较多就能把 sf 设高点让 Storage Memory 大点过多的缓存会引发 GCGarbage Collection垃圾回收
JVM 把 Heap 堆内内存分为
年轻代存储生命周期较短、引用次数较低的对象会引发 Young GC老年代存储生命周期较长、引用次数高的对象会引发 Full GCRDDcache 会存在老年代
Full GC时会引发 STW
抢占应用程序执行线程把所有 CPU 线程都做垃圾回收应用程序的暂时不执行(Stop the world)等 Full GC 完事后才把 CPU 线程释放应用程序才能继续执行Full GC 弊端远大于 Young GC
为了 RDD cache 访问效率用 RDD/DataFrame/Dataset.cache 以对象值形式缓存到内存 (避免序列化消耗)
用对象值形式缓存数据每条数据都要构成一个对象 (自定义Case class, Row 对象)当大量的 RDD cache 时会引发 Full GC当应用是缓存密集型需要大量缓存为了执行效率可以改用序列化
spark.rdd.compress RDD 缓存默认不压缩
启用压缩后能节省缓存内存的占用把更多的内存空间留给分布式任务执行启用压缩后会引入额外的计算开销、牺牲 CPU
磁盘
磁盘的配置项
spark.local.dir 任意的本地文件系统目录默认值是 /tmp 。 用于存储各种各样的临时数据如 Shuffle 中间文件、RDD Cache。
有条件可以设置个大而性能好的文件系统如空间足够大的 SSD 文件系统目录
Shuffle
spark.shuffle.file.bufferMap 输出端的写缓冲区的大小spark.reducer.maxSizeInFlightReduce 输入端的读缓冲区的大小spark.shuffle.sort.bypassMergeThresholdMap 阶段不进行排序的分区阈值
Shuffle 的计算的两个阶段
Map 阶段执行映射逻辑并按 Reducer 的分区规则将中间数据写入到本地磁盘Reduce 阶段从各个节点下载数据分片并根据需要实现聚合计算Map 阶段的计算结果(中间文件)会存储到写缓冲区Write Buffer)满后再写入到磁盘文件系统Reduce 阶段通过网络从不同节点的磁盘中拉取中间文件以数据块暂存到计算节点的读缓冲区Read Buffer满后再写入到磁盘文件系统
自 Spark 1.6 后全用 Sort shuffle manager 管理 Shuffle
Sort shuffle manager 会把 Map/Reduce 都引入排序
repartition、groupBy 就没有排序的需求引入的排序就是额外的计算开销
不需要聚合/排序时调整 spark.shuffle.sort.bypassMergeThreshold 改变 Reduce 端的并行度默认值 200。当 Reduce 的分区数 该值时Shuffle 就不会引入排序
Spark SQL
作用配置项含义AQEspark.sql.adaptive.enabled是否启用 AQEJoin 策略spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin非空分区比例 该值,调整Join策略spark.sql.autoBroadcastJoinThreshold基表 该值, 触发Broadcast Join自动分区合并spark.sql.adaptive.coalescePartitions.enabled是否启用合并分区spark.sql.adaptive.advisoryPartitionSizelnBytes合并后的目标分区大小spark.sql.adaptive.coalescePartitions.minPartitionNum分区合并后并行度 该值自动倾斜处理spark.sql.adaptive.skewJoin.enabled是否自动处理数据倾斜spark.sql.adaptive.skewJoin.skewedPartitionFactor倾斜分区的比例系数spark.sql.adaptive.skewJoin.skewedPartitionThresholdlnBytes倾斜分区的最低阀值spark.sql.adaptive.advisoryPartitionSizeInBytes拆分倾斜分区粒度 (字节)
Spark 3.0 推出 AQE (Adaptive Query Execution, 自适应查询执行) 的 3 个动态优化特性 Join 策略调整、自动分区合并、自动倾斜处理
# 启用 AQE
spark.sql.adaptive.enabled trueJoin 策略调整
Join 策略调整 : Spark SQL 在运行时动态调整为 Broadcast Join
每当 DAG 中的 Map 阶段执行完毕会结合 Shuffle 中间文件的统计信息重新计算 Reduce 数据表的存储大小。当基表 autoBroadcastJoinThreshold时下个阶段就可能变为 Broadcast Join
动态 Join 策略的条件二 大表过滤后非空分区比例 nonEmptyPartitionRatioForBroadcastJoin才能成功触发 Broadcast Join 降级
例子 大表有 100 个分区过滤后只有 15 个分区有数据非空分区比例 : 15 / 100 15% 20% , 就触发 Broadcast Join 降级
配置项
# AQE前基表 该值就会触发 Broadcast Join
spark.sql.autoBroadcastJoinThreshold 10m# AQE后非空分区比例 该值就调整动态 Join 策略
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin 0.5Spark SQL 的广播阈值对比的两种情况
基表来自文件系统用基表在磁盘的存储大小与广播阈值对比基表来自 DAG 的中间文件用 DataFrame 执行计划中的统计值与广播阈值对比
DataFrame 执行计划中的统计值
val df: DataFrame _
// 先对分布式数据集加Cache
df.cache.count// 获取执行计划
val plan df.queryExecution.logical// 获取执行计划对于数据集大小的精确预估
val estimated: BigInt spark.sessionState.executePlan(plan).optimizedPlan.stats.sizeInBytes自动分区合并
自动分区合并 解决 Reduce 过小的分区而导致的数据的不均衡问题
分区合并示意图 :
依序扫描数据分区当相邻分区的尺寸之和 实际大小时就把扫描过的分区做一次合并 # 是否启用自动分区合并默认启用
spark.sql.adaptive.coalescePartitions.enabled true# 合并后的目标分区大小
spark.sql.adaptive.advisoryPartitionSizelnBytes 256MB# 分区合并后并行度 该值
spark.sql.adaptive.coalescePartitions.minPartitionNum 1每个分区的平均大小 数据集大小/最低并行度实际大小 min(advisoryPartitionSizeInBytes , 分区的平均大小)
例子 Shuffle 中间文件 20GBminPartitionNum 200
每个分区的尺寸 20GB / 200 100MB设 advisoryPartitionSizeInBytes 200MB最终分区 min(100MB200MB) 100MB
自动倾斜处理
自动倾斜处理把倾斜的数据分区拆分成小分区
对所有数据分区按大小做排序取中位数。将 中位数 * skewedPartitionFactor 得到判定阈值。凡是 阈值的数据分区就可能认为倾斜分区当可能倾斜分区 skewedPartitionThresholdInBytes就会判定为倾斜分区
配置项
# 开启自动倾斜处理
spark.sql.adaptive.skewJoin.enabled true# 判断大分区倾斜分区的比例系数
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5# 判断大分区倾斜分区的最低阔值
spark.sql.adaptive.skewJoin.skewedPartitionThresholdinBytes 256MB# 拆分大分区 , 倾斜分区的拆分单位
spark.sql.adaptive.advisoryPartitionSizelnBytes 256MB例子数据表有 3 个分区90MB、100MB 、512MB。中位数是 100MB
判定阈值 中位数 * skewedPartitionFactor 100MB * 5 500MB512MB 为候选分区512MB skewedPartitionThresholdInBytes(256MB) 就认为该分区是倾斜分区512MB skewedPartitionThresholdInBytes(1GB) 就不是倾斜分区再根据 advisoryPartitionSizeInBytes(256MB) 对大分区进行拆分512MB 被拆成两个小分区512MB / 2 256MB