当前位置: 首页 > news >正文

廊坊网站建设电话做结构图用什么网站

廊坊网站建设电话,做结构图用什么网站,大连网站建设策划,树莓派 wordpress1. 共享变量 Spark两种共享变量#xff1a;广播变量#xff08;broadcast variable#xff09;与累加器#xff08;accumulator#xff09;。 累加器用来对信息进行聚合#xff0c;相当于mapreduce中的counter#xff1b;而广播变量用来高效分发较大的对象#xff0c…1. 共享变量 Spark两种共享变量广播变量broadcast variable与累加器accumulator。 累加器用来对信息进行聚合相当于mapreduce中的counter而广播变量用来高效分发较大的对象相当于semijoin中的DistributedCache 。 共享变量出现的原因 我们传递给Spark的函数如map()或者filter()的判断条件函数能够利用定义在函数之外的变量但是集群中的每一个task都会得到变量的一个副本并且task在对变量进行的更新不会被返回给driver。 package com.hainiu.sparkimport org.apache.spark.{SparkConf, SparkContext}object TestAcc {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setAppName(test acc)conf.setMaster(local[*])val sc new SparkContext(conf)val rdd sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9),3)val count rdd.map(t 1).reduce(__)println(count)// val acc sc.longAccumulator(count) // // rdd.foreach(t{ // acc.add(1) // }) // // println(acc.value)// println(rdd.count())} } 原因总结 对于executor端driver端的变量是外部变量。 excutor端修改了变量count根本不会让driver端跟着修改。如果想在driver端得到executor端修改的变量需要用累加器实现。 当在Executor端用到了Driver变量不使用广播变量在每个Executor中有多少个task就有多少个Driver端变量副本。如果这个变量中的数据很大的话会产生很高的传输负载导致执行效率降低也可能会造成内存溢出。使用广播变量以后在每个Executor中只有一个Driver端变量副本在一个executor中的并行执行的task任务会引用该一个变量副本即可需要广播变量提高运行效率。 2. 累加器 累加器的执行流程 通过SparkContext创建一个累加器并初始化。当driver端将任务分发给executor时每个executor会接收一个任务和一个引用到该累加器的副本。每个executor上的任务可以调用累加器的add方法来增加累加器的值这些操作是线程安全的因为每个任务都会在自己的executor线程中执行。当每个任务完成executor将累加器的更新值发送到driver端进行聚合过程得到最终的聚合结果。 累加器可以很简便地对各个worker返回给driver的值进行聚合。累加器最常见的用途之一就是对一个job执行期间发生的事件进行计数。 用法 var acc: LongAccumulator sc.longAccumulator // 创建累加器acc.add(1) // 累加器累加acc.value // 获取累加器的值 累加器的简单使用 package com.hainiu.sparkimport org.apache.spark.{SparkConf, SparkContext}object WordCountWithAcc {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setAppName(test acc)conf.setMaster(local[*])val sc new SparkContext(conf)val acc sc.longAccumulator(bad word)sc.textFile(data/a.txt).flatMap(_.split( )).filter(t{if(t.equals(shit)){acc.add(1)false}elsetrue}).map((_,1)).reduceByKey(__).foreach(println)println(invalid words:acc.value)} } 3. 广播变量 ip转换工具 public class IpUtils {public static Long ip2Long(String ip) {String fragments[] ip.split([.]);Long ipNum 0L;for(int i0;ifragments.length;i) {ipNum Long.parseLong(fragments[i]) | ipNum 8L;}return ipNum;} } ip案例代码 package com.hainiu.sparkimport org.apache.spark.{SparkConf, SparkContext}object IpTest {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setAppName(ip)conf.setMaster(local[*])val sc new SparkContext(conf)val accessRDD sc.textFile(data/access.log).map(t{val strs t.split(\\|)IpUtils.ip2Long(strs(1))})val ipArr:Array[(Long,Long,String)] sc.textFile(data/ip.txt).map(t{val strs t.split(\\|)(strs(2).toLong,strs(3).toLong,strs(6)strs(7))}).collect()// accessRDD.map(ip{ // ipRDD.filter(t{ // ip t._1 ip t._2 // }) // }).foreach(println)accessRDD.map(ip{ipArr.find(t{t._1 ip t._2ip}) match {case Some(v) (v._3,1)case None (unknow,1)}//option}).reduceByKey(__).foreach(println)} } 使用广播变量可以使程序高效地将一个很大的只读数据发送到executor节点会将广播变量放到executor的BlockManager中而且对每个executor节点只需要传输一次该executor节点的多个task可以共用这一个。 用法 val broad: Broadcast[List[Int]] sc.broadcast(list) // 把driver端的变量用广播变量包装broad.value // 从广播变量获取包装的数据用于计算 我们可能遇到这样的问题如果我们需要广播的数据为100M如果需要driver端亲自向每个executor端发送100M的数据在工作中executor节点的个数可能是很多的比如是200个这意味着driver端要发送20G的数据这对于driver端的压力太大了。所以要用到比特洪流技术。 就是说driver端不必向每个executor发送一份完整的广播变量的数据而是将一份广播变量切分成200份发送给两百个executor然后200个executor间通过BlockManager中的组件transferService与其他executor通信进行完整的数据。 这样driver端只需要发送一份广播变量的数据压力就会小很多而且其他executor也都拿到了这一份广播变量的数据 。 package com.hainiu.sparkimport org.apache.spark.{SparkConf, SparkContext}object IpTest {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setAppName(ip)conf.setMaster(local[*])val sc new SparkContext(conf)val accessRDD sc.textFile(data/access.log).map(t{val strs t.split(\\|)IpUtils.ip2Long(strs(1))})val ipArr:Array[(Long,Long,String)] sc.textFile(data/ip.txt).map(t{val strs t.split(\\|)(strs(2).toLong,strs(3).toLong,strs(6)strs(7))}).collect()val bs sc.broadcast(ipArr)// accessRDD.map(ip{// ipRDD.filter(t{// ip t._1 ip t._2// })// }).foreach(println)accessRDD.map(ip{bs.value.find(t{t._1 ip t._2ip}) match {case Some(v) (v._3,1)case None (unknow,1)}//option}).reduceByKey(__).foreach(println)} } 为了提高查找的效率可以使用二分法查找代码。将时间复杂度由O(n)优化到了O(logn)。 val start System.currentTimeMillis()val res (binarySearch(ip,bs.value),1) // val res bs.value.find(t{ // t._1 ip t._2ip // }) match { // case Some(v) (v._3,1) // case None (unknow,1) // }val end System.currentTimeMillis()acc.add(end-start) 累加器实现运行时间的统计
http://www.w-s-a.com/news/892140/

相关文章:

  • 备案网站忘记密码乐装网
  • 电商扶贫网站建设淄博网站建设小程序
  • 网站群建设代理丰城网站建设公司
  • 青岛网站建设服务器wordpress迁移跳转原网站
  • 泰安网站建设哪里有公司如何注册网站
  • 做网站开专票税钱是多少个点上海市有哪些公司
  • 寿县有做网站开发的吗宁波网站建设方式
  • 网站建设和网站推广服务器怎么发布网站
  • 比较好的摄影网站雅安市政建设公司网站
  • 网站与微信区别wordpress 站内信
  • 宁夏网站开发设计说明书源码下载脚本之家
  • 邱县做网站百度搜索排名机制
  • 运城个人网站建设智慧团建系统官方网站登录
  • 公司营业执照可以做几个网站一家专门做母婴的网站
  • 网站建设商标属于哪个类别搜狗seo快速排名公司
  • 织梦做商城网站企业网络建站
  • 网站后期维护都有什么wordpress首页加图片
  • 展会网站怎么做网页设计与制作教程版徐洪亮课后答案
  • 石景山网站建设设计公司建设网站怎么建立服务器
  • 本地生活服务平台app网站关键词优化原理
  • 建网站的公司叫什么重庆论坛建站模板
  • 湖北网站制作公司银川网站建设哪家不错
  • 网站后台演示地址服装网站建设公司推荐
  • 湖北钟祥建设局网站旅游哪个网站最好
  • 浙江建设工程信息网站辽宁建设工程信息网场内业绩什么意思
  • 郑州做网站公司 汉狮网络专业图片搜集网站怎么做
  • 网站托管是什么品牌推广营销平台
  • 制作网站的难度贵州省兴义市建设局网站
  • 永春建设局网站室内设计师培训班学费多少
  • 做仿站如何获取网站源码windows2012做网站