诸暨网站建设书生商友,app 门户网站,山东德州网站建设哪家最好,手机制作广告的app写在前面
内容如何选择
本翻译只翻译本人认为精华的部分#xff0c;本人认为的Spark的一些核心理念#xff0c;编程思想。一些特别基础的操作包括但不限于搭建环境就不在此赘述了。
配套版本
本系列基于Spark 3.3.1#xff0c;Scala 2.12.10#xff0c;进行翻译总结
原…写在前面
内容如何选择
本翻译只翻译本人认为精华的部分本人认为的Spark的一些核心理念编程思想。一些特别基础的操作包括但不限于搭建环境就不在此赘述了。
配套版本
本系列基于Spark 3.3.1Scala 2.12.10进行翻译总结
原文链接
https://spark.apache.org/docs/3.3.1/rdd-programming-guide.html
思维导图
整篇文章思维导图
翻译正文
RDD编程指南
概览
总的来说每个 Spark 应用程序由一个驱动程序driver program组成该程序运行用户的主函数并在集群上执行各种并行操作。 Spark 提供的主要抽象基本核心概念是弹性分布式数据集RDD这是一组元素分布在集群的节点上可以并行操作。RDD 可以通过从 Hadoop 文件系统或任何其他支持 Hadoop 的文件系统中的文件开始或在驱动程序中使用现有的 Scala 集合并对其进行转换来创建。用户还可以要求 Spark 在内存中持久化 RDD从而使其能够在并行操作中高效地重用。最后RDD 会自动从节点故障中恢复。 Spark 的第二个抽象基本核心概念是共享变量可以在并行操作中使用。默认情况下当 Spark 在不同节点上以一组任务并行运行一个函数时它会将函数中使用的每个变量的副本发送到每个任务。有时变量需要在任务之间或在任务与驱动程序之间共享。Spark 支持两种类型的共享变量广播变量broadcast variables可以在所有节点的内存中缓存一个
弹性分布式数据集RDD
Spark 的核心概念是弹性分布式数据集RDD它是一种容错的元素集合可以并行操作。创建 RDD 有两种方式一种是在驱动程序中将现有集合并行化另一种是引用外部存储系统中的数据集例如共享文件系统、HDFS、HBase 或任何提供 Hadoop InputFormat 的数据源。
并行化集合
并行化集合是通过在驱动程序中对现有集合一个 Scala Seq调用 SparkContext 的 parallelize 方法来创建的。集合的元素会被复制以形成一个可以并行操作的分布式数据集。例如以下是如何创建一个包含数字 1 到 5 的并行化集合的示例
val data Array(1, 2, 3, 4, 5)
val distData sc.parallelize(data)一旦创建分布式数据集distData就可以并行操作。例如我们可以调用 distData.reduce((a, b) a b) 来对数组的元素进行求和。我们稍后会描述对分布式数据集的操作。
并行集合的一个重要参数是将数据集切分为的分区数量。Spark 会为集群的每个分区运行一个任务。通常每个 CPU 需要 2 到 4 个分区。通常Spark 会根据你的集群自动设置分区数量。不过你也可以通过将其作为第二个参数传递给 parallelize 方法手动设置分区数量例如sc.parallelize(data, 10)。注意代码中的某些地方使用“切片”slices这个术语与分区同义以保持向后兼容性。
外部数据集
Spark 可以从任何支持 Hadoop 的存储源创建分布式数据集包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3 等。Spark 支持文本文件、SequenceFiles 以及任何其他 Hadoop InputFormat。
文本文件 RDD 可以使用 SparkContext 的 textFile 方法创建。此方法接受文件的 URI可以是机器上的本地路径也可以是 hdfs://、s3a:// 等 URI并将其作为行的集合读取。以下是一个示例调用
scala val distFile sc.textFile(data.txt)
distFile: org.apache.spark.rdd.RDD[String] data.txt MapPartitionsRDD[10] at textFile at console:26一旦创建distFile 可以通过数据集操作进行处理。例如我们可以使用 map 和 reduce 操作来计算所有行的长度总和如下所示
distFile.map(s s.length).reduce((a, b) a b)关于使用 Spark 读取文件的一些注意事项 如果使用本地文件系统上的路径则该文件在工作节点上也必须可以通过相同路径访问。要么将文件复制到所有工作节点要么使用网络挂载的共享文件系统。 Spark 的所有基于文件的输入方法包括 textFile都支持在目录、压缩文件和通配符上运行。例如您可以使用 textFile(“/my/directory”)、textFile(“/my/directory/.txt) 和 textFile(/my/directory/.gz”)。当读取多个文件时分区的顺序取决于文件系统返回文件的顺序。它可能会或可能不会遵循文件路径的字典顺序。在一个分区内元素的顺序根据它们在底层文件中的顺序进行排列。 textFile 方法还接受一个可选的第二个参数用于控制文件的分区数量。默认情况下Spark 为文件的每个块创建一个分区在 HDFS 中默认块大小为 128MB但您也可以通过传递更大的值请求更多的分区。请注意分区数量不能少于块的数量。 除了文本文件Spark 的 Scala API 还支持几种其他数据格式 SparkContext.wholeTextFiles 允许您读取包含多个小文本文件的目录并将每个文件作为 (filename, content) 对返回。这与 textFile 不同后者会在每个文件中返回每行一个记录。分区由数据局部性决定在某些情况下这可能导致分区过少。在这些情况下wholeTextFiles 提供了一个可选的第二个参数用于控制最小分区数。 对于 SequenceFiles使用 SparkContext 的 sequenceFile[K, V] 方法其中 K 和 V 是文件中键和值的类型。这些应该是 Hadoop 的 Writable 接口的子类如 IntWritable 和 Text。此外Spark 允许您为一些常见的 Writables 指定原生类型例如sequenceFile[Int, String] 将自动读取 IntWritables 和 Texts。 对于其他 Hadoop InputFormats您可以使用 SparkContext.hadoopRDD 方法该方法接受任意 JobConf 和输入格式类、键类和值类。以与 Hadoop 作业相同的方式设置这些内容以便处理您的输入源。您还可以使用 SparkContext.newAPIHadoopRDD 处理基于“新”MapReduce APIorg.apache.hadoop.mapreduce的 InputFormats。 RDD.saveAsObjectFile 和 SparkContext.objectFile 支持以包含序列化 Java 对象的简单格式保存 RDD。虽然这不如 Avro 等专门格式高效但它提供了一种轻松保存任何 RDD 的方法。
RDD 操作
RDD 支持两种类型的操作
转换transformations它们从现有数据集中创建一个新数据集动作actions它们在数据集上运行计算后将值返回给驱动程序。
例如map 是一种转换它将每个数据集元素传递给一个函数并返回一个新的 RDD表示结果。另一方面reduce 是一种动作它使用某个函数聚合 RDD 中的所有元素并将最终结果返回给驱动程序尽管还有一个并行的 reduceByKey它返回一个分布式数据集。
Spark 中的所有转换都是惰性执行的这意味着它们不会立即计算结果。相反它们只是记住对某个基础数据集例如文件应用的转换。当一个动作需要将结果返回给驱动程序时转换才会被计算。这种设计使 Spark 能够更高效地运行。例如我们可以意识到通过 map 创建的数据集将用于 reduce并仅将 reduce 的结果返回给驱动程序而不是返回更大的映射数据集。
默认情况下每次对 RDD 执行动作时转换后的 RDD 可能会被重新计算。然而您也可以使用 persist或 cache方法将 RDD 持久化到内存中这样 Spark 将在集群中保留元素以便在下次查询时更快地访问。Spark 还支持将 RDD 持久化到磁盘或在多个节点之间进行复制。
基本操作
为了说明 RDD 的基础知识考虑以下简单程序
val lines sc.textFile(data.txt)
val lineLengths lines.map(s s.length)
val totalLength lineLengths.reduce((a, b) a b)第一行从外部文件定义了一个基础 RDD。此数据集并未加载到内存中或以其他方式进行操作lines 仅仅是指向文件的指针。第二行将 lineLengths 定义为 map 转换的结果。同样由于惰性执行lineLengths 也不会立即计算。最后我们运行 reduce这是一种动作。在这一点上Spark 将计算拆分为任务(task)以便在不同的机器上运行每台机器运行其部分的 map 和本地的 reduce仅将结果返回给驱动程序。
如果我们还希望稍后再次使用 lineLengths可以在 reduce 之前添加
lineLengths.persist()这将导致 lineLengths 在第一次计算后保存在内存中。这样在后续的操作中如果再使用 lineLengths就不需要重新计算而是可以直接从内存中获取结果从而提高性能。
将函数传递给 Spark
Spark 的 API 在驱动程序中大量依赖于传递函数以便在集群上运行。推荐有两种方法来实现这一点
匿名函数语法适用于简短的代码段。全局单例对象中的静态方法例如您可以定义一个对象 MyFunctions然后像下面这样传递 MyFunctions.func1
object MyFunctions {def func1(s: String): String { ... }
}myRdd.map(MyFunctions.func1)需要注意的是尽管也可以传递类实例中方法的引用与单例对象相对但这需要将包含该类的方法的对象一起发送。例如考虑以下代码
class MyClass {def func1(s: String): String { ... }def doStuff(rdd: RDD[String]): RDD[String] { rdd.map(func1) }
}在这里如果我们创建一个新的 MyClass 实例并调用 doStuff那么内部的 map 将引用该 MyClass 实例的 func1 方法因此整个对象需要发送到集群。这类似于编写 rdd.map(x this.func1(x))。
以类似的方式访问外部对象的字段也会引用整个对象
class MyClass {val field Hellodef doStuff(rdd: RDD[String]): RDD[String] { rdd.map(x field x) }
}这相当于编写 rdd.map(x this.field x)这会引用整个对象 this。为了避免这个问题最简单的方法是将 field 复制到一个局部变量中而不是外部访问它
def doStuff(rdd: RDD[String]): RDD[String] {val field_ this.fieldrdd.map(x field_ x)
}这样field_ 是一个局部变量它只会被传递到集群而不会发送整个对象从而提高了效率并避免了不必要的开销。
理解闭包
在 Spark 中理解在集群上执行代码时变量和方法的作用域和生命周期是较为困难的事情之一。RDD 操作如果修改其作用域外的变量往往会引发混淆。以下示例展示了使用 foreach() 来递增计数器的代码但类似的问题也可能出现在其他操作中。
例子
考虑下面的朴素RDD元素求和它的行为可能会有所不同具体取决于执行是否在同一个JVM中。一个常见的例子是在本地模式下运行Spark–master local[n]与将Spark应用程序部署到集群例如通过spark-submit到YARN时
var counter 0
var rdd sc.parallelize(data)// Wrong: Dont do this!!
rdd.foreach(x counter x)println(Counter value: counter)本地模式 vs 集群模式
上述代码的行为是未定义的并且可能无法按预期工作。为了执行作业Spark将RDD操作的处理分解为任务每个任务由一个执行器执行。在执行之前Spark计算任务的闭包。闭包是那些必须对执行器可见的变量和方法以便执行器能够对RDD在这种情况下是foreach()执行计算。该闭包会被序列化并发送到每个执行器。
发送到每个执行器的闭包中的变量现在是副本因此当在foreach函数中引用counter时它不再是驱动节点上的counter。驱动节点的内存中仍然有一个counter但这对执行器来说不再可见执行器只能看到来自序列化闭包的副本。因此counter的最终值仍然是零因为对counter的所有操作都引用了序列化闭包中的值。
在本地模式下在某些情况下foreach函数实际上会在与驱动相同的JVM中执行并且会引用同一个原始counter并可能会更新它。
为了确保在这些场景中行为明确应使用累加器。Spark中的累加器专门用于提供一种机制以便在执行分散到集群中的工作节点时安全地更新变量。本指南的累加器部分对此进行了更详细的讨论。
一般来说闭包——像循环或局部定义的方法这样的构造不应被用于改变某种全局状态。Spark不定义或保证从闭包外部引用的对象的突变行为。一些这样做的代码可能在本地模式下有效但这只是偶然的而这样的代码在分布式模式下不会按预期工作。如果需要某种全局聚合请使用累加器。
打印RDD的元素
另一个常见的用法是尝试使用rdd.foreach(println)或rdd.map(println)打印RDD的元素。在单台机器上这将生成预期的输出并打印出所有RDD的元素。然而在集群模式下由执行器调用的stdout输出现在写入执行器的stdout而不是驱动程序上的stdout因此驱动程序上的stdout将不会显示这些内容要在驱动程序上打印所有元素可以使用collect()方法将RDD带到驱动节点如下所示rdd.collect().foreach(println)。不过这可能导致驱动程序内存不足因为collect()会将整个RDD提取到单台机器上如果只需要打印RDD中的几个元素更安全的方法是使用take()rdd.take(100).foreach(println)。
处理键值对
虽然大多数Spark操作适用于包含任何类型对象的RDD但有一些特殊操作仅适用于键值对的RDD。最常见的操作是分布式“洗牌”操作例如按键对元素进行分组或聚合。
在Scala中这些操作会自动应用于包含Tuple2对象的RDD语言中的内置元组只需写作(a, b)即可创建。键值对操作在PairRDDFunctions类中可用该类会自动包装元组的RDD。
例如以下代码使用reduceByKey操作对键值对进行处理以计算文件中每行文本出现的次数
val lines sc.textFile(data.txt)
val pairs lines.map(s (s, 1))
val counts pairs.reduceByKey((a, b) a b)我们还可以使用counts.sortByKey()例如按字母顺序对对进行排序最后使用counts.collect()将它们作为对象数组带回驱动程序。
注意当使用自定义对象作为键进行键值对操作时必须确保自定义的equals()方法与匹配的hashCode()方法一起使用。有关详细信息请参见Object.hashCode()文档中概述的契约。
转换算子
以下表格列出了一些Spark支持的常见转换操作。有关详细信息请参考RDD API文档Scala、Java、Python、R和配对RDD函数文档Scala、Java。
方法说明map(func)返回一个新的分布式数据集通过将源中的每个元素传递给函数func生成。filter(func)返回一个新的数据集通过选择源中func返回true的那些元素生成。flatMap(func)类似于map但每个输入项可以映射到0个或多个输出项因此func应返回一个Seq而不是单个项。mapPartitions(func)类似于map但在RDD的每个分区块上单独运行因此func在运行在类型为T的RDD时必须为Iterator Iterator。mapPartitionsWithIndex(func)类似于mapPartitions但还提供一个整数值表示分区的索引因此func在运行在类型为T的RDD时必须为(Int, Iterator) Iterator。sample(withReplacement, fraction, seed)以给定的随机数生成器seed对数据进行抽样抽取数据的比例为fraction可以选择是否有放回。union(otherDataset)返回一个新的数据集包含源数据集和参数中的元素的并集。intersection(otherDataset)返回一个新的RDD包含源数据集和参数中元素的交集。distinct([numPartitions])返回一个新的数据集包含源数据集的唯一元素。groupByKey([numPartitions])当在(K, V)对的数据集上调用时返回(K, Iterable)对的数据集。注意如果您是为了对每个键执行聚合如求和或平均使用reduceByKey或aggregateByKey将获得更好的性能。注意默认情况下输出的并行度取决于父RDD的分区数量。您可以传递可选的numPartitions参数以设置不同数量的任务。reduceByKey(func, [numPartitions])当在(K, V)对的数据集上调用时返回(K, V)对的数据集其中每个键的值使用给定的reduce函数func进行聚合func必须为(V,V) V。与groupByKey一样reduce任务的数量可以通过可选的第二个参数进行配置。aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])当在(K, V)对的数据集上调用时返回(K, U)对的数据集其中每个键的值使用给定的组合函数和中性“零”值进行聚合。允许聚合值的类型与输入值的类型不同同时避免不必要的分配。与groupByKey一样reduce任务的数量可以通过可选的第二个参数进行配置。sortByKey([ascending], [numPartitions])当在(K, V)对的数据集上调用时其中K实现了Ordered返回按键升序或降序排序的(K, V)对的数据集如布尔值ascending参数所指定。join(otherDataset, [numPartitions])当在类型为(K, V)和(K, W)的数据集上调用时返回(K, (V, W))对的数据集其中每个键的所有元素对。通过leftOuterJoin、rightOuterJoin和fullOuterJoin支持外连接。cogroup(otherDataset, [numPartitions])当在类型为(K, V)和(K, W)的数据集上调用时返回(K, (Iterable, Iterable))元组的数据集。此操作也称为groupWith。cartesian(otherDataset)当在类型为T和U的数据集上调用时返回(T, U)对的数据集所有元素对。pipe(command, [envVars])通过shell命令例如Perl或bash脚本对RDD的每个分区进行处理。RDD元素被写入进程的stdin输出到其stdout的行作为字符串的RDD返回。coalesce(numPartitions)将RDD中的分区数量减少到numPartitions。在对大数据集进行过滤后更有效地运行操作。repartition(numPartitions)随机重新洗牌RDD中的数据以创建更多或更少的分区并在其间进行平衡。这始终会通过网络洗牌所有数据。repartitionAndSortWithinPartitions(partitioner)根据给定的分区器对RDD进行重新分区并在每个结果分区内按键对记录进行排序。这比先重新分区再在每个分区内排序更有效因为它可以将排序操作下推到洗牌机制中。
行动算子
以下表格列出了一些Spark支持的常见操作。有关详细信息请参考RDD API文档Scala、Java、Python、R和配对RDD函数文档Scala、Java。
方法说明reduce(func)使用函数func接受两个参数并返回一个结果对数据集的元素进行聚合。该函数应具有交换性和结合性以便能够正确地并行计算。collect()将数据集的所有元素作为数组返回到驱动程序。这通常在过滤或其他返回足够小的数据子集的操作后很有用。count()返回数据集中的元素数量。first()返回数据集的第一个元素类似于take(1)。take(n)返回数据集的前n个元素的数组。takeSample(withReplacement, num, [seed])返回数据集的随机样本数量为num可以选择是否有放回选项上还可以预先指定随机数生成器的种子。takeOrdered(n, [ordering])使用自然顺序或自定义比较器返回RDD的前n个元素。saveAsTextFile(path)将数据集的元素写入本地文件系统、HDFS或任何其他Hadoop支持的文件系统中指定目录的文本文件或文本文件集。Spark会对每个元素调用toString将其转换为文件中的一行文本。saveAsSequenceFile(path)Java和Scala将数据集的元素写入本地文件系统、HDFS或任何其他Hadoop支持的文件系统中指定路径的Hadoop SequenceFile。仅适用于实现Hadoop的Writable接口的键值对RDD。在Scala中它也适用于隐式可转换为Writable的类型Spark包括对基本类型如Int、Double、String等的转换。saveAsObjectFile(path)Java和Scala使用Java序列化将数据集的元素写入简单格式随后可以使用SparkContext.objectFile()加载。countByKey()仅适用于类型为(K, V)的RDD。返回一个包含每个键计数的(K, Int)对的哈希映射。foreach(func)对数据集的每个元素运行函数func。通常用于副作用例如更新累加器或与外部存储系统交互。注意在foreach()之外修改累加器以外的变量可能会导致未定义行为。有关更多详细信息请参见理解闭包。
Spark RDD API还提供了一些操作的异步版本比如foreachAsync它会立即返回一个FutureAction给调用者而不是在操作完成时阻塞。这可以用于管理或等待操作的异步执行。
Shuffle 操作
在Spark中某些操作会触发一个称为shuffle的事件。shuffle是Spark重新分配数据的机制以便在分区中以不同的方式对数据进行分组。这通常涉及在executor和机器之间复制数据因此洗牌是一项复杂且代价高昂的操作。
背景
为了理解洗牌期间发生的事情我们可以考虑reduceByKey操作的例子。reduceByKey操作生成一个新的RDD其中单个键的所有值会组合成一个元组——键和对与该键相关的所有值执行reduce函数的结果。挑战在于单个键的所有值不一定都位于同一个分区甚至同一台机器上但它们必须在一起才能计算结果。
在Spark中数据通常并不是分布在分区中以满足特定操作的要求。在计算过程中单个任务将操作于单个分区——因此为了组织单个reduceByKey减小任务要执行的所有数据Spark需要执行一个全到全的操作。它必须从所有分区读取数据以查找所有键的所有值然后跨分区汇集这些值以计算每个键的最终结果——这就是shuffle。
尽管新shuffle数据的每个分区中的元素集合是确定性的分区本身的顺序也是如此但这些元素的顺序却不是。如果希望在洗牌后获得可预测的有序数据可以使用
mapPartitions来对每个分区进行排序例如使用.sortedrepartitionAndSortWithinPartitions来有效地对分区进行排序同时重新分区sortBy来生成一个全局有序的RDD 可能导致shuffle的操作包括重新分区操作如repartition和coalesce基于键的操作计数操作除外如groupByKey和reduceByKey以及连接操作如cogroup和join。
性能影响
shuffle是一项昂贵的操作因为它涉及磁盘I/O、数据序列化和网络I/O。为了组织数据进行洗牌Spark生成了一组任务——映射任务用于组织数据以及一组减少任务用于聚合数据。这种命名源自MapReduce并不直接与Spark的map和reduce操作相关。
在内部来自单个映射任务的结果会保留在内存中直到无法容纳为止。然后这些结果根据目标分区进行排序并写入单个文件。在减少阶段任务读取相关的排序块。
某些shuffle操作可能会消耗大量的堆内存因为它们使用内存中的数据结构来组织记录在传输之前或之后。具体而言reduceByKey和aggregateByKey在映射端创建这些结构而基于键的操作则在减少端生成这些结构。当数据无法适应内存时Spark会将这些表溢出到磁盘从而造成额外的磁盘I/O开销和增加的垃圾回收。
shuffle还会在磁盘上生成大量中间文件。从Spark 1.3开始这些文件会被保留直到相应的RDD不再使用并被垃圾回收。这是为了避免在重新计算血缘时需要重新创建洗牌文件。垃圾回收可能会在很长一段时间后才发生如果应用程序保留对这些RDD的引用或者如果GC不频繁触发。这意味着长期运行的Spark作业可能会消耗大量的磁盘空间。临时存储目录由配置Spark上下文时的spark.local.dir配置参数指定。
shuffle行为可以通过调整各种配置参数进行调优。有关详细信息请参阅Spark配置指南中的“Shuffle Behavior”部分。
RDD持久化
Spark中最重要的功能之一是在操作之间将数据集持久化或缓存到内存中。当您持久化一个RDD时每个节点会将计算出的它的分区存储在内存中并在对该数据集或从中派生的数据集的其他操作中重用它们。这使得未来的操作可以更快通常超过10倍。缓存是迭代算法和快速交互使用的关键工具。
您可以使用persist()或cache()方法标记一个RDD进行持久化。在操作中第一次计算时它将被保留在节点的内存中。Spark的缓存是容错的 - 如果RDD的任何分区丢失它将自动使用最初创建它的转换重新计算。
此外每个持久化的RDD可以使用不同的存储级别进行存储允许您例如将数据集持久化到磁盘以序列化的Java对象的形式持久化到内存中以节省空间跨节点复制。这些级别通过将StorageLevel对象Scala、Java、Python传递给persist()来设置。cache()方法是一个使用默认存储级别的简写即StorageLevel.MEMORY_ONLY在内存中存储反序列化的对象。完整的存储级别如下
存储级别含义MEMORY_ONLY将RDD作为反序列化的Java对象存储在JVM中。如果RDD无法完全放入内存一些分区将不会被缓存每次需要时都会即时重新计算。这是默认级别。MEMORY_AND_DISK将RDD作为反序列化的Java对象存储在JVM中。如果RDD无法完全放入内存将不适合的分区存储在磁盘上并在需要时从那里读取。MEMORY_ONLY_SERJava和Scala将RDD作为序列化的Java对象每个分区一个字节数组存储。这通常比反序列化对象更节省空间尤其是当使用快速序列化器时但读取时更占用CPU。MEMORY_AND_DISK_SERJava和Scala与MEMORY_ONLY_SER类似但是将不适合放入内存的分区溢写到磁盘上而不是每次需要时即时重新计算。DISK_ONLY仅在磁盘上存储RDD分区。MEMORY_ONLY_2, MEMORY_AND_DISK_2等与上述级别相同但是在两个集群节点上复制每个分区。OFF_HEAP 实验性类似于MEMORY_ONLY_SER但是将数据存储在非堆内存中。这需要启用非堆内存。
注意在Python中存储的对象总是使用Pickle库进行序列化因此选择序列化级别并不重要。Python中可用的存储级别包括MEMORY_ONLY、MEMORY_ONLY_2、MEMORY_AND_DISK、MEMORY_AND_DISK_2、DISK_ONLY、DISK_ONLY_2和DISK_ONLY_3。
即使用户没有调用persistSpark也会自动持久化shuffle操作例如reduceByKey中的一些中间数据。这是为了避免在洗牌期间某个节点失败时重新计算整个输入。如果用户计划重用结果RDD我们仍然建议调用persist。
选择哪个存储级别
Spark的存储级别旨在在内存使用和CPU效率之间提供不同的权衡。选择存储级别的建议过程如下 如果您的RDD在默认存储级别MEMORY_ONLY下能够舒适地适应就保持这样。这是最节省CPU的选项允许对RDD的操作尽可能快地运行。 如果不适合尝试使用MEMORY_ONLY_SER并选择一个快速的序列化库使对象更加节省空间但访问速度仍然合理。适用于Java和Scala 除非计算数据集的函数很昂贵或者它们过滤了大量的数据否则不要将数据溢写到磁盘上。否则重新计算一个分区可能和从磁盘读取一样快。 如果您想要快速的容错恢复例如使用Spark来服务Web应用程序中的请求请使用复制的存储级别。所有存储级别都通过重新计算丢失的数据提供完整的容错能力但复制的级别允许您在不等待重新计算丢失的分区的情况下继续运行RDD上的任务。
删除数据
Spark会自动监控每个节点上的缓存使用情况并以最近最少使用LRU的方式丢弃旧的数据分区。如果您想手动移除一个RDD而不是等待它从缓存中掉出可以使用RDD.unpersist()方法。请注意这个方法默认情况下不会阻塞。如果您希望在资源释放后阻塞等待可以在调用这个方法时指定blockingtrue。
共享变量
通常当传递给Spark操作如map或reduce的函数在远程集群节点上执行时它会在函数中使用的所有变量的单独副本上工作。这些变量被复制到每台机器上远程机器上对变量的任何更新都不会传播回驱动程序。支持跨任务的通用读写共享变量将是低效的。然而Spark确实提供了两种有限类型的共享变量用于两种常见的使用模式广播变量和累加器。
广播变量
广播变量允许程序员在每台机器上保持一个只读变量的缓存而不是随着任务传送它的副本。它们可以被用来例如以高效的方式给每个节点一份大型输入数据集的副本。Spark还尝试使用高效的广播算法分发广播变量以减少通信成本。
Spark操作是通过一系列阶段执行的这些阶段由分布式“洗牌”操作分隔。Spark自动广播每个阶段任务所需的公共数据。这样广播的数据以序列化形式缓存并在运行每个任务前进行反序列化。这意味着显式创建广播变量仅在多个阶段的任务需要相同的数据时才有用或者当以反序列化形式缓存数据很重要时。
广播变量是通过调用SparkContext.broadcast(v)从变量v创建的。广播变量是围绕v的包装器其值可以通过调用value方法来访问。下面的代码展示了这一点
scala val broadcastVar sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] Broadcast(0)scala broadcastVar.value
res0: Array[Int] Array(1, 2, 3)创建广播变量后应在集群上运行的任何函数中使用它来替代值v以避免多次将v传送到节点。此外一旦广播了对象v就不应再修改它以确保所有节点获得相同的广播变量值例如如果稍后将变量传送到新节点。
要释放广播变量复制到执行器上的资源请调用.unpersist()。如果之后再次使用广播它将被重新广播。要永久释放广播变量使用的所有资源请调用.destroy()。此后不能再使用广播变量。请注意这些方法默认情况下不会阻塞。如果在调用它们时指定blockingtrue则会阻塞直到资源被释放。
累加器
累加器是通过一种结合律和交换律操作“加”到变量上的因此可以高效地在并行中支持。它们可以用来实现计数器如MapReduce中的或求和。Spark原生支持数值类型的累加器程序员也可以为新类型添加支持。
作为用户您可以创建命名或未命名的累加器。如下图所见命名的累加器在此示例中为计数器将在修改该累加器的阶段的Web UI中显示。Spark在“任务”表中显示每个任务修改的累加器的值。 在用户界面中跟踪累加器对于理解运行阶段的进度很有用注意Python中尚未支持此功能。
可以通过调用SparkContext.longAccumulator()或SparkContext.doubleAccumulator()分别创建用于累加Long或Double类型的数值累加器。然后在集群上运行的任务可以使用add方法向其添加值。然而它们不能读取累加器的值。只有驱动程序可以使用其value方法读取累加器的值。
以下代码展示了使用累加器累加数组元素的示例
scala val accum sc.longAccumulator(My Accumulator)
accum: org.apache.spark.util.LongAccumulator LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)scala sc.parallelize(Array(1, 2, 3, 4)).foreach(x accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 sscala accum.value
res2: Long 10虽然这段代码使用了对Long类型的累加器的内置支持但程序员也可以通过继承AccumulatorV2来创建自己的类型。AccumulatorV2抽象类有几个必须重写的方法reset用于将累加器重置为零add用于向累加器中添加另一个值merge用于将另一个相同类型的累加器合并到这个累加器中。其他必须重写的方法包含在API文档中。例如假设我们有一个表示数学向量的MyVector类我们可以这样编写
class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {private val myVector: MyVector MyVector.createZeroVectordef reset(): Unit {myVector.reset()}def add(v: MyVector): Unit {myVector.add(v)}...
}// Then, create an Accumulator of this type:
val myVectorAcc new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, MyVectorAcc1)请注意当程序员定义自己的AccumulatorV2类型时最终的类型可能与添加的元素的类型不同。
对于仅在行动中执行的累加器更新Spark保证每个任务对累加器的更新只会应用一次即重新启动的任务不会更新值。在转换中用户应该注意如果任务或作业阶段重新执行每个任务的更新可能会被应用多次。
累加器不会改变Spark的延迟评估模型。如果它们在RDD上的操作中被更新它们的值只有在作为行动的一部分计算RDD时才会更新一次。因此当在像map()这样的延迟转换中进行更新时不保证会执行累加器更新。以下代码片段演示了这个属性
val accum sc.longAccumulator
data.map { x accum.add(x); x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.