做网站用盗版PS,wordpress 总提示更新,html网页制作期末大作业成品,plone vs wordpress目录
1.说明
1.1 什么是累加器
1.2 累加器的功能
2. 使用累加器
3. 累加器和reduce、fold算子的区别 1.说明
1.1 什么是累加器
累加器是Spark提供的一个共享变量(Shared Variables) 默认情况下#xff0c;如果Executor节点上使用到了Driver端定义的变量(通过算子传…目录
1.说明
1.1 什么是累加器
1.2 累加器的功能
2. 使用累加器
3. 累加器和reduce、fold算子的区别 1.说明
1.1 什么是累加器
累加器是Spark提供的一个共享变量(Shared Variables) 默认情况下如果Executor节点上使用到了Driver端定义的变量(通过算子传递) 算子会将该变量的副本发送的每个Task任务但是并不会将Task任务对副本变量的修改返回给Driver端 但是Spark为我们提供了一个共享变量(累加器)允许Driver端和Task之间共享一个变量
1.2 累加器的功能 累加器用来将Executor端变量的信息聚合到Driver端 在Driver程序中定义的变量在Executor端的每个Task都会得到这个变量的一个新的副本每个Task更新这些副本的值以后会再返回给Driver端进行merge得到最终的值 2. 使用累加器
spark中为我们提供了三个常用的累加器并且支持我们根据自己业务需求来实现自定义累加器类
代码示例 test(使用spark自带的累加器) {// 初始化 spark配置实例val sparkconf: SparkConf new SparkConf().setMaster(local[4]).setAppName()// 初始化 spark环境对象val sc: SparkContext new SparkContext(sparkconf)/** TODO 使用 LongAccumulator* 功能:* 对 整数类型的元素做累加* */val intRdd: RDD[Int] sc.makeRDD(List(1, 2, 2, 3, 3, 4, 5, 6, 7, 8, 9))val accum: LongAccumulator sc.longAccumulator(My LongAccumulator)intRdd.foreach(x accum.add(x))println(sLongAccumulator${accum.value})/** TODO 使用 DoubleAccumulator* 功能:* 对 浮点类型的元素做累加** */val doubleRdd: RDD[Double] sc.makeRDD(List(1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1, 9.1))val doubleAccumulator: DoubleAccumulator sc.doubleAccumulator(My DoubleAccumulator)doubleRdd.foreach(x doubleAccumulator.add(x))println(sDoubleAccumulator${doubleAccumulator.value})/** TODO 使用 CollectionAccumulator* 将元素添加到list中去* */val collectAccumulator: CollectionAccumulator[Int] sc.collectionAccumulator[Int](My )intRdd.foreach(x collectAccumulator.add(x))println(sCollectionAccumulator${collectAccumulator.value})/** TODO 使用自定义累加器* 将元素添加到Set中去** 实现步骤* 1.根据业务逻辑实现自定义累加器实现类* 2.向spark环境中注册自定义累加器* 3.使用自定义累加器** */val setAccumulator new SetAccumulator[Int]()sc.register(setAccumulator, My SetAccumulator)intRdd.foreach(x setAccumulator.add(x))println(sSetAccumulator${setAccumulator.value})sc.stop()}自定义累加器
/*
* 自定义累加器
* TODO 并未考虑线程安全的问题实际使用时需添加这部分的判断
*
* */
class SetAccumulator[T] extends AccumulatorV2[T, collection.mutable.Set[T]] {/* 定义可变Set */var set collection.mutable.Set[T]()/* 判断 累加器是否为初始状态 */override def isZero: Boolean set.isEmpty/** 获取当前累加器的 新副本* 每个变量(累加器)的副本会发送到每个Task* */override def copy(): AccumulatorV2[T, mutable.Set[T]] new SetAccumulator/** 重置累加器(清空累加器)* */override def reset(): Unit Nil/** TODO 分区内累加规则(Task内)* 获取数据并进行累加* 根据指定的规则,向累加器中添加元素* */override def add(v: T): Unit {set v}/** TODO 分区间累加规则* 合并多个累加器副本* */override def merge(other: AccumulatorV2[T, mutable.Set[T]]): Unit {this.value other.value}override def value: mutable.Set[T] set
}执行结果 3. 累加器和reduce、fold算子的区别
重点关注 1.累加器并不是调优操作并不会带来效率上的提升 2.累加器在Executor端做add操作(累加器副本做更新)在Driver端做merge操作(合并多个Task中的累加器副本)
示例代码 test(对比累加器和reduce、fold算子效率问题) {/** TODO 思考: 累加器和reduce、fold算子的区别* */// 初始化 spark配置实例val sparkconf: SparkConf new SparkConf().setMaster(local[4]).setAppName()// 初始化 spark环境对象val sc: SparkContext new SparkContext(sparkconf)val intRdd sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9))// 查看每个分区的内容intRdd.mapPartitionsWithIndex((i, iter) {println(s分区编号$i :${iter.mkString( )});iter}).collect()val accum: LongAccumulator sc.longAccumulator(My Accumulator)intRdd.foreach(x accum.add(x))println(s累加器结果${accum.value})println(----reduce算子----------------------)val resultByReduce intRdd.reduce((v1, v2) {println(s$v1 $v2 ${v1 v2})v1 v2})println(sreduce算子结果${resultByReduce})println(----reduce算子----------------------)val resultByFlod intRdd.fold(0)((v1, v2) {println(s$v1 $v2 ${v1 v2})v1 v2})println(sresultByFlod${resultByFlod})while (true) {}// http://localhost:4040/stages/stage/?id1attempt0sc.stop()}执行结果 累加器并未对计算效率带来提升 参考链接
传送门1
传送门2
官网链接