当前位置: 首页 > news >正文

推广网站的方法有搜索引擎近境制作官网

推广网站的方法有搜索引擎,近境制作官网,广州 350建网站,网站建设运营必备人员2.3 Streaming 工作原理 SparkStreaming处理流式数据时#xff0c;按照时间间隔划分数据为微批次#xff08;Micro-Batch#xff09;#xff0c;每批次数据当做RDD#xff0c;再进行处理分析。 以上述词频统计WordCount程序为例#xff0c;讲解Streaming工作原理。 创…2.3 Streaming 工作原理 SparkStreaming处理流式数据时按照时间间隔划分数据为微批次Micro-Batch每批次数据当做RDD再进行处理分析。 以上述词频统计WordCount程序为例讲解Streaming工作原理。 创建 StreamingContext 当SparkStreaming流式应用启动streamingContext.start时首先创建StreamingContext流式上下文实例对象整个流式应用环境构建底层还是SparkContext。 当StreamingContext对象构建以后启动接收器Receiver专门从数据源端接收数据此接收器作为Task任务运行在Executor中一直运行Long Runing一直接收数据。 从WEB UI界面【Jobs Tab】可以看到【Job-0】是一个Receiver接收器一直在运行以Task方式运行需要1Core CPU。 可以从多个数据源端实时消费数据进行处理例如从多个TCP Socket接收数据对每批次数据进行词频统计使用DStream#union函数合并接收数据流演示代码如下 import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 从TCP Socket 中读取数据对每批次时间为5秒数据进行词频统计将统计结果输出到控制台。 * TODO: 从多个Socket读取流式数据进行union合并 */ object StreamingDStreamUnion { def main(args: Array[String]): Unit { // TODO: 1. 构建StreamingContext流式上下文实例对象 val ssc: StreamingContext { // a. 创建SparkConf对象设置应用配置信息 val sparkConf new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix($)) .setMaster(local[4]) // b.创建流式上下文对象, 传递SparkConf对象TODO: 时间间隔 - 用于划分流式数据为很多批次Batch val context new StreamingContext(sparkConf, Seconds(5)) // c. 返回 context } // TODO: 2. 从数据源端读取数据此处是TCP Socket读取数据 /* def socketTextStream( hostname: String, port: Int, storageLevel: StorageLevel StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[String] */ val inputDStream01: DStream[String] ssc.socketTextStream(node1.itcast.cn, 9999) val inputDStream02: DStream[String] ssc.socketTextStream(node1.itcast.cn, 9988) // 合并两个DStream流 val inputDStream: DStream[String] inputDStream01.union(inputDStream02) // TODO: 3. 对每批次的数据进行词频统计 val resultDStream: DStream[(String, Int)] inputDStream // 过滤不合格的数据 .filter(line null ! line line.trim.length 0) // 按照分隔符划分单词 .flatMap(line line.trim.split(\\s)) // 转换数据为二元组表示每个单词出现一次 .map(word (word, 1)) // 按照单词分组聚合统计 .reduceByKey((tmp, item) tmp item) // TODO: 4. 将结果数据输出 - 将每批次的数据处理以后输出 resultDStream.print(10) // TODO: 5. 对于流式应用来说需要启动应用 ssc.start() // 流式应用启动以后正常情况一直运行接收数据、处理数据和输出数据除非人为终止程序或者程序异常停止 ssc.awaitTermination() // 关闭流式应用(参数一是否关闭SparkContext参数二是否优雅的关闭 ssc.stop(stopSparkContext true, stopGracefully true) } }接收器接收数据 启动每个接收器Receiver以后实时从数据源端接收数据比如TCP Socket也是按照时间间隔将接收的流式数据划分为很多Block块。 接收器 Receiver划分流式数据的时间间隔BlockInterval 默认值为 200ms通过属性【spark.streaming.blockInterval】设置。接收器将接收的数据划分为Block以后按照设置的存储级别对Block进行存储从TCP Socket中接收数据默认的存储级别为MEMORY_AND_DISK_SER_2先存储内存不足再存储磁盘存储2副本。 从TCP Socket消费数据时可以设置Block存储级别演示代码如下 // TODO: 2. 从数据源端读取数据此处是TCP Socket读取数据 /* def socketTextStream( hostname: String, port: Int, storageLevel: StorageLevel StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[String] */ val inputDStream: ReceiverInputDStream[String] ssc.socketTextStream( node1.itcast.cn, // 9999, // // TODO: 设置Block存储级别为先内存不足磁盘副本为1 storageLevel StorageLevel.MEMORY_AND_DISK )汇报接收Block报告 接收器Receiver将实时汇报接收的数据对应的Block信息当BatchInterval时间达到以后StreamingContext将对应时间范围内数据block当做RDD加载SparkContextt处理数据。 以此循环处理流式的数据如下图所示 Streaming 工作原理总述 整个Streaming运行过程中涉及到两个时间间隔 批次时间间隔BatchInterval 每批次数据的时间间隔每隔多久加载一个Job Block时间间隔BlockInterval 接收器划分流式数据的时间间隔可以调整大小哦官方建议最小值不能小于50ms默认值为200ms属性spark.streaming.blockInterval调整设置 官方案例 BatchInterval 1s 1000ms 5 * BlockInterval 每批次RDD数据中有5个Block每个Block就是RDD一个分区数据 从代码层面结合实际数据处理层面来看Streaming处理原理如下左边为代码逻辑右边为实际每批次数据处理过程。 具体运行数据时每批次数据依据代码逻辑执行。 // TODO: 3. 对每批次的数据进行词频统计 val resultDStream: DStream[(String, Int)] inputDStream // 过滤不合格的数据 .filter(line null ! line line.trim.length 0) // 按照分隔符划分单词 .flatMap(line line.trim.split(\\s)) // 转换数据为二元组表示每个单词出现一次 .map(word (word, 1)) // 按照单词分组聚合统计 .reduceByKey((tmp, item) tmp item) // TODO: 4. 将结果数据输出 - 将每批次的数据处理以后输出 resultDStream.print(10)流式数据流图如下
http://www.w-s-a.com/news/326123/

相关文章:

  • 快递网站策划怎么做ppt长春建设信息网站
  • 做服装搭配图的网站有哪些经营一个网站要怎么做
  • 呼市品牌网站建设那家好增城住房和建设局网站
  • 网站首页布局设计代码太仓网站开发建设服务
  • 学校网站建设与管理porto wordpress模板
  • 余姚做网站公司网站建设有哪些基本流程
  • 门户网站建设的报价百度医生在线问诊
  • 北京公司注册在哪个网站浏览器打开网址404
  • 廊坊做网站公司绣花图案设计网站
  • 网站空间租用哪个好购物网站建设模板图片
  • 建设银行包头分行网站泰安网签成交量最新
  • 手机微网站与微官网现在去成都需要隔离吗
  • 学校的二级网站怎么建设深圳企业网站制作设计
  • 自己做qq头像静态的网站网站建设是属于软件开发费吗
  • 举报网站建设做网站之前的工作
  • 用QQ群做网站排名个人网站制作协议
  • 做茶叶网站的素材天津网站营销
  • 网站设计建设流程图微信端的网站开发python
  • 湖州网站seo优化网站改域名备案
  • dedecms怎么制作网站合肥电商网站开发
  • 网站开发通用流程图做flash的网站
  • 营销型网站有哪些平台网站建设藤设计
  • 网站需求分析网站建设美食网站建设多少钱
  • 有专门做网站的吗建德网站
  • 做网站要买服务器吗单页设计思路
  • 一 电子商务网站建设规划网站开发前端框架和后端框架
  • 自助网站建设系统软件自己免费建设网站
  • 百度微建站access如何与网站连接数据库
  • ppt素材免费网站网站正能量晚上免费软件
  • 个人淘宝客网站如何备案搭建一个平台要多少钱