营销网站系统,辽阳网站网站建设,黄冈如何创建免费网站,上海网站备案中心一、问题引出
/**
案例#xff1a;对同一份数据文件分别做 WordCount 聚合操作和 Word 分组操作
期望#xff1a;针对数据文件只进行一次分词、转换操作得到 RDD 对象#xff0c;然后再对该对象分别进行聚合和分组#xff0c;实现数据重用
*/
object TestRDDPersist {def …一、问题引出
/**
案例对同一份数据文件分别做 WordCount 聚合操作和 Word 分组操作
期望针对数据文件只进行一次分词、转换操作得到 RDD 对象然后再对该对象分别进行聚合和分组实现数据重用
*/
object TestRDDPersist {def main(args: Array[String]): Unit {val conf new SparkConf().setMaster(local[*]).setAppName(persist)val sc new SparkContext(conf)val rdd sc.makeRDD(List(hello world, hello spark))val flatRdd rdd.flatMap(_.split( ))val mapRdd flatRdd.map(word {println()(word, 1)})// 聚合操作val reduceRdd mapRdd.reduceByKey(_ _)reduceRdd.collect().foreach(println)println(**********)// 分组操作val groupRdd mapRdd.groupByKey()groupRdd.collect().foreach(println)}
}/**
结果flatRdd.map过程在聚合时和分组时分别都执行了说明针对数据文件的分词、转换操作被重复执行了只有对象被重用而数据没有被重用
解析1.RDD是不会存储数据的当某个 RDD 转换成新的 RDD 后该 RDD 中的数据就没有了2.如果需要再次用到该 RDD 的数据则需要从数据源开始重新执行到该 RDD 来获取数据
解决针对某个需要被重复使用的 RDD 对象在其进行下一步操作时先将数据进行缓存持久化或checkpoint后续的其它操作从缓存持久化或checkpoint中获取数据
*/二、RDD Cache
/**
缓存或持久化方法1.rdd.cache()底层调用 persist() 方法默认是将数据保存到 JVM 堆内存中2.rdd.persist(StorageLevel)可以指定数据的保存级别
说明1.持久化方法被调用时不会立即进行缓存而是在触发action算子时数据才会被缓存在计算节点的内存中2.缓存除了用于数据重用还可以提高容错性
*/
object TestRDDPersist {def main(args: Array[String]): Unit {val conf new SparkConf().setMaster(local[*]).setAppName(persist)val sc new SparkContext(conf)val rdd sc.makeRDD(List(hello world, hello spark))val flatRdd rdd.flatMap(_.split( ))val mapRdd flatRdd.map(word {println()(word, 1)})//mapRdd.cache()mapRdd.persist()// 聚合操作val reduceRdd mapRdd.reduceByKey(_ _)reduceRdd.collect().foreach(println)println(**********)// 分组操作val groupRdd mapRdd.groupByKey()groupRdd.collect().foreach(println)/*结果聚合和分组前的操作过程只执行了一遍实现了数据重用*/}
}// 存储级别
object StorageLevel {val NONE new StorageLevel(false, false, false, false)val DISK_ONLY new StorageLevel(true, false, false, false)val DISK_ONLY_2 new StorageLevel(true, false, false, false, 2) // 副本val MEMORY_ONLY new StorageLevel(false, true, false, true) // 内存不足丢弃数据val MEMORY_ONLY_2 new StorageLevel(false, true, false, true, 2)val MEMORY_ONLY_SER new StorageLevel(false, true, false, false)val MEMORY_ONLY_SER_2 new StorageLevel(false, true, false, false, 2)val MEMORY_AND_DISK new StorageLevel(true, true, false, true) // 内存不足溢写磁盘val MEMORY_AND_DISK_2 new StorageLevel(true, true, false, true, 2)val MEMORY_AND_DISK_SER new StorageLevel(true, true, false, false)val MEMORY_AND_DISK_SER_2 new StorageLevel(true, true, false, false, 2)val OFF_HEAP new StorageLevel(true, true, true, false, 1)
}三、RDD CheckPoint
/**
方法rdd.checkpoint()将 RDD 中间结果写入磁盘
说明1.对 RDD 进行 checkpoint 操作并不会马上被执行必须执行 Action 操作才能触发2.checkpoint保存由于在job执行完不会被删除所以必须指定保存路径一般保存在分布式文件系统
*/
object TestRDDPersist {def main(args: Array[String]): Unit {val conf new SparkConf().setMaster(local[*]).setAppName(persist)val sc new SparkContext(conf)// 指定checkpoint保存路径sc.setCheckpointDir(checkpoint)val rdd sc.makeRDD(List(hello world, hello spark))val flatRdd rdd.flatMap(_.split( ))val mapRdd flatRdd.map(word {println()(word, 1)})mapRdd.checkpoint()// 聚合操作val reduceRdd mapRdd.reduceByKey(_ _)reduceRdd.collect().foreach(println)println(**********)// 分组操作val groupRdd mapRdd.groupByKey()groupRdd.collect().foreach(println)/*结果聚合和分组前的操作过程只执行了一遍实现了数据重用*/}
}四、缓存和检查点区别
cache 和 persist 会在原有的血缘关系中添加新的依赖一旦数据出错可以重头读取数据checkpoint 检查点会切断原有的血缘关系重新建立新的血缘关系相当于改变数据源cache 是将数据临时存储在 JVM 堆内存中性能较高但安全性低persist 可以指定存储级别将数据临时存储在磁盘文件中涉及到 IO性能较低作业执行完毕后临时文件会被删除checkpoint 是将数据长久地存储分布式文件系统中安全性较高但涉及 IO 且会独立开启一个作业从数据源开始获取数据所以性能较低一般在 checkpoint 前先进行 cache当 checkpoint 时 job 只需从缓存中读取数据即可可以提高性能