当前位置: 首页 > news >正文

站长工具排名查询wordpress 屏蔽ftp

站长工具排名查询,wordpress 屏蔽ftp,深圳网站建设哪个好,北京金山办公软件第五章 RDD Checkpoint RDD 数据可以持久化#xff0c;但是持久化/缓存可以把数据放在内存中#xff0c;虽然是快速的#xff0c;但是也是最不可靠的#xff1b;也可以把数据放在磁盘上#xff0c;也不是完全可靠的#xff01;例如磁盘会损坏等。 Checkpoint的产生就是…第五章 RDD Checkpoint RDD 数据可以持久化但是持久化/缓存可以把数据放在内存中虽然是快速的但是也是最不可靠的也可以把数据放在磁盘上也不是完全可靠的例如磁盘会损坏等。 Checkpoint的产生就是为了更加可靠的数据持久化在Checkpoint的时候一般把数据放在HDFS上这就天然的借助了HDFS天生的高容错、高可靠来实现数据最大程度上的安全实现了RDD的容错和高可用。 在Spark Core中对RDD做checkpoint可以切断做checkpoint RDD的依赖关系将RDD数据保存到可靠存储如HDFS以便数据恢复 演示范例代码如下 import org.apache.spark.{SparkConf, SparkContext} /** * RDD数据Checkpoint设置案例演示 */ object SparkCkptTest { def main(args: Array[String]): Unit { // 创建应用程序入口SparkContext实例对象 val sc: SparkContext { // 1.a 创建SparkConf对象设置应用的配置信息 val sparkConf: SparkConf new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix($)) .setMaster(local[2]) // 1.b 传递SparkConf对象构建Context实例 new SparkContext(sparkConf) } sc.setLogLevel(WARN) // TODO: 设置检查点目录将RDD数据保存到那个目录 sc.setCheckpointDir(datas/spark/ckpt/) // 读取文件数据 val datasRDD sc.textFile(datas/wordcount/wordcount.data) // TODO: 调用checkpoint函数将RDD进行备份需要RDD中Action函数触发 datasRDD.checkpoint() datasRDD.count() // TODO: 再次执行count函数, 此时从checkpoint读取数据 datasRDD.count() // 应用程序运行结束关闭资源 Thread.sleep(100000) sc.stop() } }持久化和Checkpoint的区别 1、存储位置 Persist 和 Cache 只能保存在本地的磁盘和内存中(或者堆外内存)Checkpoint 可以保存数据到 HDFS 这类可靠的存储上 2、生命周期Cache和Persist的RDD会在程序结束后会被清除或者手动调用unpersist方法Checkpoint的RDD在程序结束后依然存在不会被删除 3、Lineage(血统、依赖链、依赖关系)Persist和Cache不会丢掉RDD间的依赖链/依赖关系因为这种缓存是不可靠的如果出现了一些错误(例如 Executor 宕机)需要通过回溯依赖链重新计算出来Checkpoint会斩断依赖链因为Checkpoint会把结果保存在HDFS这类存储中更加的安全可靠一般不需要回溯依赖链 第六章 外部数据源 Spark可以从外部存储系统读取数据比如RDBMs表中或者HBase表中读写数据这也是企业中常常使用如下两个场景 1、要分析的数据存储在HBase表中需要从其中读取数据数据分析 日志数据电商网站的商家操作日志订单数据保险行业订单数据 2、使用Spark进行离线分析以后往往将报表结果保存到MySQL表中网站基本分析pv、uv。。。。。 6.1 HBase 数据源 Spark可以从HBase表中读写Read/Write数据底层采用TableInputFormat和TableOutputFormat方式与MapReduce与HBase集成完全一样使用输入格式InputFormat和输出格式OutputFoamt。 HBase Sink 回 顾 MapReduce 向 HBase 表 中 写 入 数 据 使 用 TableReducer 其 中 OutputFormat 为TableOutputFormat读取数据KeyImmutableBytesWritableValuePut。 写 入 数 据 时 需 要 将 RDD 转换为 RDD[(ImmutableBytesWritable, Put)] 类 型 调 用saveAsNewAPIHadoopFile方法数据保存至HBase表中。 HBase Client连接时需要设置依赖Zookeeper地址相关信息及表的名称通过Configuration设置属性值进行传递。 范例演示将词频统计结果保存HBase表表的设计 代码如下 import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 将RDD数据保存至HBase表中 */ object SparkWriteHBase { def main(args: Array[String]): Unit { // 创建应用程序入口SparkContext实例对象 val sc: SparkContext { // 1.a 创建SparkConf对象设置应用的配置信息 val sparkConf: SparkConf new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix($)) .setMaster(local[2]) // 1.b 传递SparkConf对象构建Context实例 new SparkContext(sparkConf) } sc.setLogLevel(WARN) // TODO: 1、构建RDD val list List((hadoop, 234), (spark, 3454), (hive, 343434), (ml, 8765)) val outputRDD: RDD[(String, Int)] sc.parallelize(list, numSlices 2) // TODO: 2、将数据写入到HBase表中, 使用saveAsNewAPIHadoopFile函数要求RDD是(key, Value) // TODO: 组装RDD[(ImmutableBytesWritable, Put)] /** * HBase表的设计 * 表的名称htb_wordcount * Rowkey: word * 列簇: info * 字段名称 count */ val putsRDD: RDD[(ImmutableBytesWritable, Put)] outputRDD.mapPartitions{ iter iter.map { case (word, count) // 创建Put实例对象 val put new Put(Bytes.toBytes(word)) // 添加列 put.addColumn( // 实际项目中使用HBase时插入数据先将所有字段的值转为String再使用Bytes转换为字节数组 Bytes.toBytes(info), Bytes.toBytes(cout), Bytes.toBytes(count.toString) ) // 返回二元组 (new ImmutableBytesWritable(put.getRow), put) } } // 构建HBase Client配置信息 val conf: Configuration HBaseConfiguration.create() // 设置连接Zookeeper属性 conf.set(hbase.zookeeper.quorum, node1.itcast.cn) conf.set(hbase.zookeeper.property.clientPort, 2181) conf.set(zookeeper.znode.parent, /hbase) // 设置将数据保存的HBase表的名称 conf.set(TableOutputFormat.OUTPUT_TABLE, htb_wordcount) /* def saveAsNewAPIHadoopFile( path: String,// 保存的路径 keyClass: Class[_], // Key类型 valueClass: Class[_], // Value类型 outputFormatClass: Class[_ : NewOutputFormat[_, _]], // 输出格式OutputFormat实现 conf: Configuration self.context.hadoopConfiguration // 配置信息 ): Unit */ putsRDD.saveAsNewAPIHadoopFile( datas/spark/htb-output- System.nanoTime(), // classOf[ImmutableBytesWritable], // classOf[Put], // classOf[TableOutputFormat[ImmutableBytesWritable]], // conf ) // 应用程序运行结束关闭资源 sc.stop() } }运行完成以后使用hbase shell查看数据 HBase Source 回 顾 MapReduce 从 读 HBase 表 中 的 数 据 使 用 TableMapper 其 中 InputFormat 为TableInputFormat读取数据KeyImmutableBytesWritableValueResult。 从HBase表读取数据时同样需要设置依赖Zookeeper地址信息和表的名称使用Configuration设置属性形式如下 此外读取的数据封装到RDD中Key和Value类型分别为ImmutableBytesWritable和Result,不支持Java Serializable导致处理数据时报序列化异常。设置Spark Application使用Kryo序列化性能要比Java 序列化要好创建SparkConf对象设置相关属性如下所示 范例演示从HBase表读取词频统计结果代码如下 import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration} import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 从HBase 表中读取数据封装到RDD数据集 */ object SparkReadHBase { def main(args: Array[String]): Unit { // 创建应用程序入口SparkContext实例对象 val sc: SparkContext { // 1.a 创建SparkConf对象设置应用的配置信息 val sparkConf: SparkConf new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix($)) .setMaster(local[2]) // TODO: 设置使用Kryo 序列化方式 .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) // TODO: 注册序列化的数据类型 .registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result])) // 1.b 传递SparkConf对象构建Context实例 new SparkContext(sparkConf) } sc.setLogLevel(WARN) // TODO: a. 读取HBase Client 配置信息 val conf: Configuration HBaseConfiguration.create() conf.set(hbase.zookeeper.quorum, node1.itcast.cn) conf.set(hbase.zookeeper.property.clientPort, 2181) conf.set(zookeeper.znode.parent, /hbase) // TODO: b. 设置读取的表的名称 conf.set(TableInputFormat.INPUT_TABLE, htb_wordcount) /* def newAPIHadoopRDD[K, V, F : NewInputFormat[K, V]]( conf: Configuration hadoopConfiguration, fClass: Class[F], kClass: Class[K], vClass: Class[V] ): RDD[(K, V)] */ val resultRDD: RDD[(ImmutableBytesWritable, Result)] sc.newAPIHadoopRDD( conf, // classOf[TableInputFormat], // classOf[ImmutableBytesWritable], // classOf[Result] // ) println(sCount ${resultRDD.count()}) resultRDD .take(5) .foreach { case (rowKey, result) println(sRowKey ${Bytes.toString(rowKey.get())}) // HBase表中的每条数据封装在result对象中解析获取每列的值 result.rawCells().foreach { cell val cf Bytes.toString(CellUtil.cloneFamily(cell)) val column Bytes.toString(CellUtil.cloneQualifier(cell)) val value Bytes.toString(CellUtil.cloneValue(cell)) val version cell.getTimestamp println(s\t $cf:$column $value, version $version) } } // 应用程序运行结束关闭资源 sc.stop() } }运行结果 6.2 MySQL 数据源 实际开发中常常将分析结果RDD保存至MySQL表中使用foreachPartition函数此外Spark中提供JdbcRDD用于从MySQL表中读取数据。 调用RDD#foreachPartition函数将每个分区数据保存至MySQL表中保存时考虑降低RDD分区数目和批量插入提升程序性能。 范例演示将词频统计WordCount结果保存MySQL表tb_wordcount。 建表语句 USE db_test ; CREATE TABLE tb_wordcount ( count varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL, word varchar(100) NOT NULL, PRIMARY KEY (word) ) ENGINEInnoDB DEFAULT CHARSETutf8mb4 COLLATEutf8mb4_0900_ai_ci ;演示代码 import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 将词频统计结果保存到MySQL表中 */ object SparkWriteMySQL { def main(args: Array[String]): Unit { // 创建应用程序入口SparkContext实例对象 val sc: SparkContext { // 1.a 创建SparkConf对象设置应用的配置信息 val sparkConf: SparkConf new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix($)) .setMaster(local[2]) // 1.b 传递SparkConf对象构建Context实例 new SparkContext(sparkConf) } sc.setLogLevel(WARN) // 1. 从HDFS读取文本数据封装集合RDD val inputRDD: RDD[String] sc.textFile(datas/wordcount/wordcount.data) // 2. 处理数据调用RDD中函数 val resultRDD: RDD[(String, Int)] inputRDD // 3.a 每行数据分割为单词 .flatMap(line line.split(\\s)) // 3.b 转换为二元组表示每个单词出现一次 .map(word (word, 1)) // 3.c 按照Key分组聚合 .reduceByKey((tmp, item) tmp item) // 3. 输出结果RDD保存到MySQL数据库 resultRDD // 对结果RDD保存到外部存储系统时考虑降低RDD分区数目 .coalesce(1) // 对分区数据操作 .foreachPartition{iter saveToMySQL(iter)} // 应用程序运行结束关闭资源 sc.stop() } /** * 将每个分区中的数据保存到MySQL表中 * param datas 迭代器封装RDD中每个分区的数据 */ def saveToMySQL(datas: Iterator[(String, Int)]): Unit { // a. 加载驱动类 Class.forName(com.mysql.cj.jdbc.Driver) // 声明变量 var conn: Connection null var pstmt: PreparedStatement null try{ // b. 获取连接 conn DriverManager.getConnection( jdbc:mysql://node1.itcast.cn:3306/?serverTimezoneUTCcharacterEncodingutf8useUnic odetrue, root, 123456 ) // c. 获取PreparedStatement对象 val insertSql INSERT INTO db_test.tb_wordcount (word, count) VALUES(?, ?) pstmt conn.prepareStatement(insertSql) conn.setAutoCommit(false) // d. 将分区中数据插入到表中批量插入 datas.foreach{case (word, count) pstmt.setString(1, word) pstmt.setLong(2, count.toLong) // 加入批次 pstmt.addBatch() } // TODO: 批量插入 pstmt.executeBatch() conn.commit() }catch { case e: Exception e.printStackTrace() }finally { if(null ! pstmt) pstmt.close() if(null ! conn) conn.close() } } }运行程序查看数据库表的数据
http://www.w-s-a.com/news/225653/

相关文章:

  • 网站开发认证考试石家庄高端网站开发
  • 网站建设第一步怎么弄站酷网页
  • 设备网站模板江西的赣州网站建设
  • 邯郸营销型网站国际招聘人才网
  • hexo wordpress 主题织梦网站优化教程
  • 网站建设方案及上海市建设协会网站
  • 轴承外贸网站怎么做南宁网站排名优化公司哪家好
  • 沈阳企业网站建站郴州优化公司
  • cctv5+手机在线直播观看seo关键词排名优化方法
  • 网站建设公司怎么谈单怎么开通微信小程序商店
  • 深圳做网站案例一个服务器可以备案几个网站
  • 网络营销策划名词解释泉州百度推广排名优化
  • 一键生成网站的软件互联网营销师是干什么
  • 网站后台管理水印怎么做手机优化设置
  • 哪个网站做图文素材多wordpress++优化
  • 建设网站就选用什么样的公司网站类型分类有哪些
  • 找平面设计师网站网站建设须知
  • 建设联结是不是正规网站wordpress 微博同步
  • 瑞安微网站建设广州推广
  • 做旅游宣传网站的流程图中国企业集成网电子商务
  • 开发商城网站开发成交功能网站
  • 网站建设公司专业公司排名搭建网站的企业
  • 网站建设难吗海南智能网站建设报价
  • 企业网站建设选题的依据及意义校园网站建设的论文
  • 网站版面设计方案水电维修在哪个网站上做推广好些
  • 邹平建设局官方网站企业宣传片广告公司
  • 南京建设集团网站建站极速通
  • 网站建设与推广员岗位职责网站开发应如何入账
  • 企业网站的作用和目的手机回收站
  • 大连零基础网站建设培训电话郎溪做网站