当前位置: 首页 > news >正文

宝安做棋牌网站建设找哪家效益快创意设计师个人网站

宝安做棋牌网站建设找哪家效益快,创意设计师个人网站,都匀住房与城乡建设部网站,网站编程培训公司1、RDD操作详解 # 启动spark-shell spark-shell --master local[2] 1.1 基本转换 1) map map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。 举例#xff1a; scala val a sc.parallelize(1 …1、RDD操作详解 # 启动spark-shell spark-shell --master local[2] 1.1 基本转换 1) map map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。 举例 scala val a sc.parallelize(1 to 9, 3) scala val b a.map(x x2) scala a.collect res10: Array[Int] Array(1, 2, 3, 4, 5, 6, 7, 8, 9) scala b.collect res11: Array[Int] Array(2, 4, 6, 8, 10, 12, 14, 16, 18)上述例子中把原RDD中每个元素都乘以2来产生一个新的RDD。 2) filter filter 是对RDD中的每个元素都执行一个指定的函数来过滤产生一个新的RDD。 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。 val rdd sc.parallelize(List(1,2,3,4,5,6)) val filterRdd rdd.filter(_ 5) filterRdd.collect() //返回所有大于5的数据的一个Array Array(6,8,10,12)3) flatMap 与map类似区别是原RDD中的元素经map处理后只能生成一个元素而原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。 举例对原RDD中的每个元素x产生y个元素从1到yy为元素x的值 scala val a sc.parallelize(1 to 4, 2) scala val b a.flatMap(x 1 to x) scala b.collect res12: Array[Int] Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)⭐️4) mapPartitions mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素而mapPartitions的输入函数是应用于每个分区也就是把每个分区中的内容作为整体来处理的。 它的函数定义为 def mapPartitions[U: ClassTag](f: Iterator[T] Iterator[U],preservesPartitioning: Boolean false): RDD[U]f即为输入函数它处理每个分区里面的内容。每个分区中的内容将以Iterator[T]传递给输入函数ff的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的。 举例 scala val a sc.parallelize(1 to 9, 3) scala def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] {var res List[(T, T)]() var pre iter.next while (iter.hasNext) {val cur iter.nextres.::(pre, cur)pre cur } res.iterator} scala a.mapPartitions(myfunc).collect res0: Array[(Int, Int)] Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))上述例子中的函数myfunc是把分区中一个元素和它的下一个元素组成一个Tuple。因为分区中最后一个元素没有下一个元素了所以(3,4)和(6,7)不在结果中。 mapPartitions还有些变种比如mapPartitionsWithContext它能把处理过程中的一些状态信息传递给用户指定的输入函数。还有mapPartitionsWithIndex它能把分区的index传递给用户指定的输入函数。 5) mapPartitionsWithIndex def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) Iterator[U], preservesPartitioning: Boolean false)(implicit arg0: ClassTag[U]): RDD[U]函数作用同mapPartitions不过提供了两个参数第一个参数为分区的索引。 scala var rdd1 sc.makeRDD(1 to 5,2) //rdd1有两个分区 scala var rdd2 rdd1.mapPartitionsWithIndex{| (x,iter) {| var result List[String]()| var i 0| while(iter.hasNext){| i iter.next()| }| result.::(x | i).iterator| }| }//rdd2将rdd1中每个分区的数字累加并在每个分区的累加结果前面加了分区索引 scala rdd2.collect res13: Array[String] Array(0|3, 1|12) //p-0(1,2) p-1(3,4,5)️好像没用了️6) mapWith mapWith是map的另外一个变种map只需要一个输入函数而mapWith有两个输入函数。它的定义如下 def mapWith[A: ClassTag, U: ](constructA: Int A, preservesPartitioning: Boolean false)(f: (T, A) U): RDD[U]第一个函数constructA是把RDD的partition indexindex从0开始作为输入输出为新类型A 第二个函数f是把二元组(T, A)作为输入其中T为原RDD中的元素A为第一个函数的输出输出类型为U。 举例把partition index 乘以10加2,作为新的RDD的元素。 val x sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 3) x.mapWith(a a * 10)((b, a) (b,a 2)).collect 结果 (1,2) (2,2) (3,2) (4,12) (5,12) (6,12) (7,22) (8,22) (9,22) (10,22) ️好像没用了️7) flatMapWith flatMapWith与mapWith很类似都是接收两个函数一个函数把partitionIndex作为输入输出是一个新类型A另外一个函数是以二元组T,A作为输入输出为一个序列这些序列里面的元素组成了新的RDD。它的定义如下 def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int A, preservesPartitioning: Boolean false)(f: (T, A) Seq[U]): RDD[U]举例 scala val a sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3) scala a.flatMapWith(x x, true)((x, y) List(y, x)).collect res58: Array[Int] Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2, 8, 2, 9)⭐️8) coalesce def coalesce(numPartitions: Int, shuffle: Boolean false)(implicit ord: Ordering[T] null): RDD[T]该函数用于将RDD进行重分区使用HashPartitioner。 第一个参数为重分区的数目第二个为是否进行shuffle默认为false; 以下面的例子来看 scala var data sc.parallelize(1 to 12, 3) scala data.collect scala data.partitions.size scala var rdd1 data.coalesce(1) scala rdd1.partitions.size scala var rdd1 data.coalesce(4) scala rdd1.partitions.size res2: Int 1 //如果重分区的数目大于原来的分区数那么必须指定shuffle参数为true//否则分区数不便 scala var rdd1 data.coalesce(4,true) scala rdd1.partitions.size res3: Int 4⭐️9) repartition def repartition(numPartitions: Int)(implicit ord: Ordering[T] null): RDD[T]该函数其实就是coalesce函数第二个参数为true的实现 scala var data sc.parallelize(1 to 12, 3) scala data.collect scala data.partitions.size scala var rdd1 data. repartition(1) scala rdd1.partitions.size scala var rdd1 data. repartition(4) scala rdd1.partitions.size res3: Int 410) randomSplit def randomSplit(weights: Array[Double], seed: Long Utils.random.nextLong): Array[RDD[T]]该函数根据weights权重将一个RDD切分成多个RDD。 该权重参数为一个Double数组 第二个参数为random的种子基本可忽略。 scala var rdd sc.makeRDD(1 to 12,12) rdd: org.apache.spark.rdd.RDD[Int] ParallelCollectionRDD[16] at makeRDD at :21scala rdd.collect res6: Array[Int] Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala var splitRDD rdd.randomSplit(Array(0.5, 0.1, 0.2, 0.2)) splitRDD: Array[org.apache.spark.rdd.RDD[Int]] Array(MapPartitionsRDD[17] at randomSplit at :23, MapPartitionsRDD[18] at randomSplit at :23, MapPartitionsRDD[19] at randomSplit at :23, MapPartitionsRDD[20] at randomSplit at :23)//这里注意randomSplit的结果是一个RDD数组 scala splitRDD.size res8: Int 4 //由于randomSplit的第一个参数weights中传入的值有4个因此就会切分成4个RDD, //把原来的rdd按照权重0.5, 0.1, 0.2, 0.2随机划分到这4个RDD中权重高的RDD划分到//的几率就大一些。 //注意权重的总和加起来为1否则会不正常 scala splitRDD(0).collect res10: Array[Int] Array(1, 4)scala splitRDD(1).collect res11: Array[Int] Array(3) scala splitRDD(2).collect res12: Array[Int] Array(5, 9)scala splitRDD(3).collect res13: Array[Int] Array(2, 6, 7, 8, 10)11) glom def glom(): RDD[Array[T]]该函数是将RDD中每一个分区中类型为T的元素转换成Array[T]这样每一个分区就只有一个数组元素。 scala var rdd sc.makeRDD(1 to 10,3) rdd: org.apache.spark.rdd.RDD[Int] ParallelCollectionRDD[38] at makeRDD at :21 scala rdd.partitions.size res33: Int 3 //该RDD有3个分区 scala rdd.glom().collect res35: Array[Array[Int]] Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10)) //glom将每个分区中的元素放到一个数组中这样结果就变成了3个数组⭐️12) union并集 val rdd1 sc.parallelize(List(5, 6, 4, 3))val rdd2 sc.parallelize(List(1, 2, 3, 4))//求并集val rdd3 rdd1.union(rdd2)rdd3.collect⭐️13) distinct去重 val rdd1 sc.parallelize(List(5, 6, 4, 3))val rdd2 sc.parallelize(List(1, 2, 3, 4))//求并集val rdd3 rdd1.union(rdd2)//去重输出rdd3.distinct.collect⭐️14) intersection交集 val rdd1 sc.parallelize(List(5, 6, 4, 3))val rdd2 sc.parallelize(List(1, 2, 3, 4))//求交集val rdd4 rdd1.intersection(rdd2) rdd4.collect⭐️15) subtract def subtract(other: RDD[T]): RDD[T]def subtract(other: RDD[T], numPartitions: Int): RDD[T]def subtract(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] null): RDD[T]该函数返回在RDD中出现并且不在otherRDD中出现的元素不去重。 val rdd1 sc.parallelize(List(5, 6, 6, 4, 3))val rdd2 sc.parallelize(List(1, 2, 3, 4))//求差集val rdd4 rdd1.subtract(rdd2)rdd4.collect16) subtractByKey def subtractByKey[W](other: RDD[(K, W)])(implicit arg0: ClassTag[W]): RDD[(K, V)]def subtractByKey[W](other: RDD[(K, W)], numPartitions: Int)(implicit arg0: ClassTag[W]): RDD[(K, V)]def subtractByKey[W](other: RDD[(K, W)], p: Partitioner)(implicit arg0: ClassTag[W]): RDD[(K, V)]subtractByKey和基本转换操作中的subtract类似只不过这里是针对K的返回在主RDD中出现并且不在otherRDD中出现的元素。 参数numPartitions用于指定结果的分区数 参数partitioner用于指定分区函数 var rdd1 sc.makeRDD(Array((A,1),(B,2),(C,3)),2)var rdd2 sc.makeRDD(Array((A,a),(C,c),(D,d)),2) scala rdd1.subtractByKey(rdd2).collectres13: Array[(String, String)] Array((B,2))17) groupbyKey val rdd1 sc.parallelize(List((tom, 1), (jerry, 3), (kitty, 2))) val rdd2 sc.parallelize(List((jerry, 2), (tom, 1), (shuke, 2))) //求并集 val rdd4 rdd1 union rdd2 //按key进行分组 val rdd5 rdd4.groupByKe rdd5.collect18) reduceByKey 顾名思义reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行reduce因此Key相同的多个元素的值被reduce为一个值然后与原RDD中的Key组成一个新的KV对。 举例: val rdd1 sc.parallelize(List((tom, 1), (jerry, 3), (kitty, 2))) val rdd2 sc.parallelize(List((jerry, 2), (tom, 1), (shuke, 2))) //求并集 val rdd4 rdd1 union rdd2 //按key进行分组 val rdd6 rdd4.reduceByKey(_ _) rdd6.collect()19) sortByKey 将List((“tom”, 1), (“jerry”, 3), (“kitty”, 2), (“shuke”, 1))和List((“jerry”, 2), (“tom”, 3), (“shuke”, 2), (“kitty”, 5))做wordcount并按名称排序 val rdd1 sc.parallelize(List((tom, 1), (jerry, 3), (kitty, 2), (shuke, 1)))val rdd2 sc.parallelize(List((jerry, 2), (tom, 3), (shuke, 2), (kitty, 5)))val rdd3 rdd1.union(rdd2)//按key进行聚合val rdd4 rdd3.reduceByKey(_ _)//false降序val rdd5 rdd4.sortByKey(false)rdd5.collect20) sortBy 将List((“tom”, 1), (“jerry”, 3), (“kitty”, 2), (“shuke”, 1))和List((“jerry”, 2), (“tom”, 3), (“shuke”, 2), (“kitty”, 5))做wordcount并按数值排序 val rdd1 sc.parallelize(List((tom, 1), (jerry, 3), (kitty, 2), (shuke, 1)))val rdd2 sc.parallelize(List((jerry, 2), (tom, 3), (shuke, 2), (kitty, 5)))val rdd3 rdd1.union(rdd2)//按key进行聚合val rdd4 rdd3.reduceByKey(_ _)//false降序val rdd5 rdd4.sortBy(_._2, false)rdd5.collect21) zip def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同否则会抛出异常。 scala var rdd1 sc.makeRDD(1 to 5,2)rdd1: org.apache.spark.rdd.RDD[Int] ParallelCollectionRDD[1] at makeRDD at :21scala var rdd2 sc.makeRDD(Seq(A,B,C,D,E),2)rdd2: org.apache.spark.rdd.RDD[String] ParallelCollectionRDD[2] at makeRDD at :21scala rdd1.zip(rdd2).collectres0: Array[(Int, String)] Array((1,A), (2,B), (3,C), (4,D), (5,E)) scala rdd2.zip(rdd1).collectres1: Array[(String, Int)] Array((A,1), (B,2), (C,3), (D,4), (E,5))scala var rdd3 sc.makeRDD(Seq(A,B,C,D,E),3)rdd3: org.apache.spark.rdd.RDD[String] ParallelCollectionRDD[5] at makeRDD at :21scala rdd1.zip(rdd3).collectjava.lang.IllegalArgumentException: Cant zip RDDs with unequal numbers of partitions//如果两个RDD分区数不同则抛出异常 22) zipPartitions zipPartitions函数将多个RDD按照partition组合成为新的RDD该函数需要组合的RDD具有相同的分区数但对于每个分区内的元素数量没有要求。 该函数有好几种实现可分为三类 1. 参数是一个RDD def zipPartitions[B, V](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]def zipPartitions[B, V](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]这两个区别就是参数preservesPartitioning是否保留父RDD的partitioner分区信息 映射方法f参数为两个RDD的迭代器。 scala var rdd1 sc.makeRDD(1 to 5,2) rdd1: org.apache.spark.rdd.RDD[Int] ParallelCollectionRDD[22] at makeRDD at :21scala var rdd2 sc.makeRDD(Seq(A,B,C,D,E),2) rdd2: org.apache.spark.rdd.RDD[String] ParallelCollectionRDD[23] at makeRDD at :21//rdd1两个分区中元素分布 scala rdd1.mapPartitionsWithIndex{| (x,iter) {| var result List[String]()| while(iter.hasNext){| result :: (part_ x | iter.next())| }| result.iterator| | }| }.collect res17: Array[String] Array(part_0|2, part_0|1, part_1|5, part_1|4, part_1|3)//rdd2两个分区中元素分布 scala rdd2.mapPartitionsWithIndex{| (x,iter) {| var result List[String]()| while(iter.hasNext){| result :: (part_ x | iter.next())| }| result.iterator| | }| }.collect res18: Array[String] Array(part_0|B, part_0|A, part_1|E, part_1|D, part_1|C)//rdd1和rdd2做zipPartition scala rdd1.zipPartitions(rdd2){| (rdd1Iter,rdd2Iter) {| var result List[String]()| while(rdd1Iter.hasNext rdd2Iter.hasNext) {| result::(rdd1Iter.next() _ rdd2Iter.next())| }| result.iterator| }| }.collect res19: Array[String] Array(2_B, 1_A, 5_E, 4_D, 3_C)2. 参数是两个RDD def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C]) Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]用法同上面只不过该函数参数为两个RDD映射方法f输入参数为两个RDD的迭代器。 scala var rdd1 sc.makeRDD(1 to 5,2) rdd1: org.apache.spark.rdd.RDD[Int] ParallelCollectionRDD[27] at makeRDD at :21scala var rdd2 sc.makeRDD(Seq(A,B,C,D,E),2) rdd2: org.apache.spark.rdd.RDD[String] ParallelCollectionRDD[28] at makeRDD at :21scala var rdd3 sc.makeRDD(Seq(a,b,c,d,e),2) rdd3: org.apache.spark.rdd.RDD[String] ParallelCollectionRDD[29] at makeRDD at :21//rdd3中个分区元素分布 scala rdd3.mapPartitionsWithIndex{| (x,iter) {| var result List[String]()| while(iter.hasNext){| result :: (part_ x | iter.next())| }| result.iterator| | }| }.collect res21: Array[String] Array(part_0|b, part_0|a, part_1|e, part_1|d, part_1|c)//三个RDD做zipPartitions scala var rdd4 rdd1.zipPartitions(rdd2,rdd3){| (rdd1Iter,rdd2Iter,rdd3Iter) {| var result List[String]()| while(rdd1Iter.hasNext rdd2Iter.hasNext rdd3Iter.hasNext) {| result::(rdd1Iter.next() _ rdd2Iter.next() _ rdd3Iter.next())| }| result.iterator| }| } rdd4: org.apache.spark.rdd.RDD[String] ZippedPartitionsRDD3[33] at zipPartitions at :27scala rdd4.collect res23: Array[String] Array(2_B_b, 1_A_a, 5_E_e, 4_D_d, 3_C_c)3. 参数是三个RDD def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]用法同上面只不过这里又多了个一个RDD而已。 23) zipWithIndex def zipWithIndex(): RDD[(T, Long)]该函数将RDD中的元素和这个元素在RDD中的ID索引号组合成键/值对。 scala var rdd2 sc.makeRDD(Seq(A,B,R,D,F),2)rdd2: org.apache.spark.rdd.RDD[String] ParallelCollectionRDD[34] at makeRDD at :21scala rdd2.zipWithIndex().collectres27: Array[(String, Long)] Array((A,0), (B,1), (R,2), (D,3), (F,4))24) zipWithUniqueId def zipWithUniqueId(): RDD[(T, Long)]该函数将RDD中元素和一个唯一ID组合成键/值对该唯一ID生成算法如下 每个分区中第一个元素的唯一ID值为该分区索引号 每个分区中第N个元素的唯一ID值为(前一个元素的唯一ID值) (该RDD总的分区数) 看下面的例子 scala var rdd1 sc.makeRDD(Seq(A,B,C,D,E,F),2)rdd1: org.apache.spark.rdd.RDD[String] ParallelCollectionRDD[44] at makeRDD at :21//rdd1有两个分区scala rdd1.zipWithUniqueId().collectres32: Array[(String, Long)] Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5))//总分区数为2//第一个分区第一个元素ID为0第二个分区第一个元素ID为1//第一个分区第二个元素ID为022第一个分区第三个元素ID为224//第二个分区第二个元素ID为123第二个分区第三个元素ID为3251.2 键值转换 ⭐️25) partitionBy def partitionBy(partitioner: Partitioner): RDD[(K, V)]该函数根据partitioner函数生成新的ShuffleRDD将原RDD重新分区。 scala var rdd1 sc.makeRDD(Array((1,A),(2,B),(3,C),(4,D)),2) rdd1: org.apache.spark.rdd.RDD[(Int, String)] ParallelCollectionRDD[23] at makeRDD at :21scala rdd1.partitions.size res20: Int 2//查看rdd1中每个分区的元素 scala rdd1.mapPartitionsWithIndex{| (partIdx,iter) {| var part_map scala.collection.mutable.Map[String,List[(Int,String)]]()| while(iter.hasNext){| var part_name part_ partIdx;| var elem iter.next()| if(part_map.contains(part_name)) {| var elems part_map(part_name)| elems :: elem| part_map(part_name) elems| } else {| part_map(part_name) List[(Int,String)]{elem}| }| }| part_map.iterator| | }| }.collect res22: Array[(String, List[(Int, String)])] Array((part_0,List((2,B), (1,A))), (part_1,List((4,D), (3,C))))//(2,B),(1,A)在part_0中(4,D),(3,C)在part_1中//使用partitionBy重分区 scala var rdd2 rdd1.partitionBy(new org.apache.spark.HashPartitioner(2)) rdd2: org.apache.spark.rdd.RDD[(Int, String)] ShuffledRDD[25] at partitionBy at :23scala rdd2.partitions.size res23: Int 2//查看rdd2中每个分区的元素 scala rdd2.mapPartitionsWithIndex{| (partIdx,iter) {| var part_map scala.collection.mutable.Map[String,List[(Int,String)]]()| while(iter.hasNext){| var part_name part_ partIdx;| var elem iter.next()| if(part_map.contains(part_name)) {| var elems part_map(part_name)| elems :: elem| part_map(part_name) elems| } else {| part_map(part_name) List[(Int,String)]{elem}| }| }| part_map.iterator| }| }.collect res24: Array[(String, List[(Int, String)])] Array((part_0,List((4,D), (2,B))), (part_1,List((3,C), (1,A))))//(4,D),(2,B)在part_0中(3,C),(1,A)在part_1中26) mapValues mapValues顾名思义就是输入函数应用于RDD中Kev-Value的Value原RDD中的Key保持不变与新的Value一起组成新的RDD中的元素。因此该函数只适用于元素为KV对的RDD。 举例 scala val a sc.parallelize(List(dog, tiger, lion, cat, panther, eagle), 2)scala val b a.map(x (x.length, x))scala b.mapValues(x _ x).collectres5: Array[(Int, String)] Array((3,xdogx), (5,xtigerx), (4,xlionx),(3,xcatx), (7,xpantherx), (5,xeaglex))27) flatMapValues flatMapValues类似于mapValues不同的在于flatMapValues应用于元素为KV对的RDD中Value。每个一元素的Value被输入函数映射为一系列的值然后这些值再与原RDD中的Key组成一系列新的KV对。 举例 val a sc.parallelize(List((1, 2), (3, 4), (5, 6)))val b a.flatMapValues(x 1.to(x))b.collect.foreach(println)28) combineByKey def combineByKey[C](createCombiner: (V) C, mergeValue: (C, V) C, mergeCombiners: (C, C) C): RDD[(K, C)]def combineByKey[C](createCombiner: (V) C, mergeValue: (C, V) C, mergeCombiners: (C, C) C, numPartitions: Int): RDD[(K, C)]def combineByKey[C](createCombiner: (V) C, mergeValue: (C, V) C, mergeCombiners: (C, C) C, partitioner: Partitioner, mapSideCombine: Boolean true, serializer: Serializer null): RDD[(K, C)]该函数用于将RDD[K,V]转换成RDD[K,C],这里的V类型和C类型可以相同也可以不同。 其中的参数 createCombiner组合器函数用于将V类型转换成C类型输入参数为RDD[K,V]中的V,输出为C ,分区内相同的key做一次 mergeValue合并值函数将一个C类型和一个V类型值合并成一个C类型输入参数为(C,V)输出为C分区内相同的key循环做 mergeCombiners分区合并组合器函数用于将两个C类型值合并成一个C类型输入参数为(C,C)输出为C分区之间循环做 numPartitions结果RDD分区数默认保持原有的分区数 partitioner分区函数,默认为HashPartitioner mapSideCombine是否需要在Map端进行combine操作类似于MapReduce中的combine默认为true 看下面例子 scala var rdd1 sc.makeRDD(Array((A,1),(A,2),(B,1),(B,2),(C,1))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] ParallelCollectionRDD[64] at makeRDD at :21 scala rdd1.combineByKey(| (v : Int) v _, | (c : String, v : Int) c v, | (c1 : String, c2 : String) c1 $ c2| ).collect res60: Array[(String, String)] Array((A,2_$1_), (B,1_$2_), (C,1_))其中三个映射函数分别为 createCombiner: (V) C (v : Int) v “” //在每一个V值后面加上字符返回C类型(String) mergeValue: (C, V) C (c : String, v : Int) c “” v //合并C类型和V类型中间加字符,返回C(String) mergeCombiners: (C, C) C (c1 : String, c2 : String) c1 “ ” c 2 / / 合并 C 类型和 C 类型中间加 ” c2 //合并C类型和C类型中间加 ”c2//合并C类型和C类型中间加返回C(String) 其他参数为默认值。 最终将RDD[String,Int]转换为RDD[String,String]。 再看例子 rdd1.combineByKey((v : Int) List(v),(c : List[Int], v : Int) v :: c,(c1 : List[Int], c2 : List[Int]) c1 ::: c2 ).collect res65: Array[(String, List[Int])] Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1)))最终将RDD[String,Int]转换为RDD[String,List[Int]]。 29) foldByKey def foldByKey(zeroValue: V)(func: (V, V) V): RDD[(K, V)]def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) V): RDD[(K, V)]def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) V): RDD[(K, V)] 该函数用于RDD[K,V]根据K将V做折叠、合并处理其中的参数zeroValue表示先根据映射函数将zeroValue应用于V,进行初始化V,再将映射函数应用于初始化后的V. 例子 scala var rdd1 sc.makeRDD(Array((A,0),(A,2),(B,1),(B,2),(C,1)))scala rdd1.foldByKey(0)(__).collectres75: Array[(String, Int)] Array((A,2), (B,3), (C,1)) //将rdd1中每个key对应的V进行累加注意zeroValue0,需要先初始化V,映射函数为操//作比如(A,0), (A,2)先将zeroValue应用于每个V,得到(A,00), (A,20)即//(A,0), (A,2)再将映射函数应用于初始化后的V最后得到(A,02),即(A,2)再看 scala rdd1.foldByKey(2)(__).collectres76: Array[(String, Int)] Array((A,6), (B,7), (C,3))//先将zeroValue2应用于每个V,得到(A,02), (A,22)即(A,2), (A,4)再将映射函//数应用于初始化后的V最后得到(A,24)即(A,6)再看乘法操作 scala rdd1.foldByKey(0)(__).collectres77: Array[(String, Int)] Array((A,0), (B,0), (C,0))//先将zeroValue0应用于每个V,注意这次映射函数为乘法得到(A,00), (A,20)//即(A,0), (A,0)再将映射函//数应用于初始化后的V最后得到(A,00)即(A,0)//其他K也一样最终都得到了V0scala rdd1.foldByKey(1)(__).collectres78: Array[(String, Int)] Array((A,0), (B,2), (C,1))//映射函数为乘法时需要将zeroValue设为1才能得到我们想要的结果。在使用foldByKey算子时候要特别注意映射函数及zeroValue的取值。 30) reduceByKeyLocally def reduceByKeyLocally(func: (V, V) V): Map[K, V]该函数将RDD[K,V]中每个K对应的V值根据映射函数来运算运算结果映射到一个Map[K,V]中而不是RDD[K,V]。 scala var rdd1 sc.makeRDD(Array((A,0),(A,2),(B,1),(B,2),(C,1)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] ParallelCollectionRDD[91] at makeRDD at :21scala rdd1.reduceByKeyLocally((x,y) x y)res90: scala.collection.Map[String,Int] Map(B - 3, A - 2, C - 1)31) cogroup和groupByKey的区别 val rdd1 sc.parallelize(List((tom, 1), (tom, 2), (jerry, 3), (kitty, 2)))val rdd2 sc.parallelize(List((jerry, 2), (tom, 1), (shuke, 2)))//cogroupval rdd3 rdd1.cogroup(rdd2)//groupbykeyval rdd4 rdd1.union(rdd2).groupByKey//注意cogroup与groupByKey的区别rdd3.foreach(println)rdd4.foreach(println)⭐️32) join val rdd1 sc.parallelize(List((tom, 1), (jerry, 3), (kitty, 2)))val rdd2 sc.parallelize(List((jerry, 2), (tom, 1), (shuke, 2)))//求jionval rdd3 rdd1.join(rdd2)rdd3.collect33) leftOuterJoin def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]leftOuterJoin类似于SQL中的左外关联left outer join返回结果以前面的RDD为主关联不上的记录为空。只能用于两个RDD之间的关联如果要多个RDD关联多关联几次即可。 参数numPartitions用于指定结果的分区数 参数partitioner用于指定分区函数 var rdd1 sc.makeRDD(Array((A,1),(B,2),(C,3)),2)var rdd2 sc.makeRDD(Array((A,a),(C,c),(D,d)),2)scala rdd1.leftOuterJoin(rdd2).collectres11: Array[(String, (String, Option[String]))] Array((B,(2,None)), (A,(1,Some(a))), (C,(3,Some(c))))34) rightOuterJoin def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))] rightOuterJoin类似于SQL中的有外关联right outer join返回结果以参数中的RDD为主关联不上的记录为空。只能用于两个RDD之间的关联如果要多个RDD关联多关联几次即可。 参数numPartitions用于指定结果的分区数 参数partitioner用于指定分区函数 var rdd1 sc.makeRDD(Array((A,1),(B,2),(C,3)),2)var rdd2 sc.makeRDD(Array((A,a),(C,c),(D,d)),2)scala rdd1.rightOuterJoin(rdd2).collectres12: Array[(String, (Option[String], String))] Array((D,(None,d)), (A,(Some(1),a)), (C,(Some(3),c)))1.3 Action操作 35) first def first(): Tfirst返回RDD中的第一个元素不排序。 scala var rdd1 sc.makeRDD(Array((A,1),(B,2),(C,3)),2)rdd1: org.apache.spark.rdd.RDD[(String, String)] ParallelCollectionRDD[33] at makeRDD at :21scala rdd1.firstres14: (String, String) (A,1)scala var rdd1 sc.makeRDD(Seq(10, 4, 2, 12, 3))rdd1: org.apache.spark.rdd.RDD[Int] ParallelCollectionRDD[0] at makeRDD at :21scala rdd1.firstres8: Int 1036) count def count(): Longcount返回RDD中的元素数量。 scala var rdd1 sc.makeRDD(Array((A,1),(B,2),(C,3)),2)rdd1: org.apache.spark.rdd.RDD[(String, String)] ParallelCollectionRDD[34] at makeRDD at :21scala rdd1.countres15: Long 336) reduce def reduce(f: (T, T) ⇒ T): T根据映射函数f对RDD中的元素进行二元计算返回计算结果。 scala var rdd1 sc.makeRDD(1 to 10,2) rdd1: org.apache.spark.rdd.RDD[Int] ParallelCollectionRDD[36] at makeRDD at :21scala rdd1.reduce(_ _) res18: Int 55scala var rdd2 sc.makeRDD(Array((A,0),(A,2),(B,1),(B,2),(C,1))) rdd2: org.apache.spark.rdd.RDD[(String, Int)] ParallelCollectionRDD[38] at makeRDD at :21scala rdd2.reduce((x,y) {| (x._1 y._1,x._2 y._2)| }) res21: (String, Int) (CBBAA,6)37) collect def collect(): Array[T]collect用于将一个RDD转换成数组。 scala var rdd1 sc.makeRDD(1 to 10,2) rdd1: org.apache.spark.rdd.RDD[Int] ParallelCollectionRDD[36] at makeRDD at :21scala rdd1.collect res23: Array[Int] Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)38) take def take(num: Int): Array[T]take用于获取RDD中从0到num-1下标的元素不排序。 scala var rdd1 sc.makeRDD(Seq(10, 4, 2, 12, 3))rdd1: org.apache.spark.rdd.RDD[Int] ParallelCollectionRDD[40] at makeRDD at :21scala rdd1.take(1)res0: Array[Int] Array(10) scala rdd1.take(2)res1: Array[Int] Array(10, 4)39) top def top(num: Int)(implicit ord: Ordering[T]): Array[T]top函数用于从RDD中按照默认降序或者指定的排序规则返回前num个元素。 scala var rdd1 sc.makeRDD(Seq(10, 4, 2, 12, 3)) rdd1: org.apache.spark.rdd.RDD[Int] ParallelCollectionRDD[40] at makeRDD at :21scala rdd1.top(1) res2: Array[Int] Array(12)scala rdd1.top(2) res3: Array[Int] Array(12, 10)//指定排序规则 scala implicit val myOrd implicitly[Ordering[Int]].reverse myOrd: scala.math.Ordering[Int] scala.math.Ordering$$anon$4767499efscala rdd1.top(1) res4: Array[Int] Array(2)scala rdd1.top(2) res5: Array[Int] Array(2, 3)40) takeOrdered def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]takeOrdered和top类似只不过以和top相反的顺序返回元素。 scala var rdd1 sc.makeRDD(Seq(10, 4, 2, 12, 3)) rdd1: org.apache.spark.rdd.RDD[Int] ParallelCollectionRDD[40] at makeRDD at :21scala rdd1.top(1) res4: Array[Int] Array(12)scala rdd1.top(2) res5: Array[Int] Array(12, 10)scala rdd1.takeOrdered(1) res6: Array[Int] Array(2)scala rdd1.takeOrdered(2) res7: Array[Int] Array(2, 3)⭐️41) aggregate def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): Uaggregate用户聚合RDD中的元素先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型再使用combOp将之前每个分区聚合后的U类型聚合成U类型特别注意seqOp和combOp都会使用zeroValue的值zeroValue的类型为U。 var rdd1 sc.makeRDD(1 to 10,2) rdd1.mapPartitionsWithIndex{(partIdx,iter) {var part_map scala.collection.mutable.Map[String,List[Int]]()while(iter.hasNext){var part_name part_ partIdx;var elem iter.next()if(part_map.contains(part_name)) {var elems part_map(part_name)elems :: elempart_map(part_name) elems} else {part_map(part_name) List[Int]{elem}}}part_map.iterator}}.collect res16: Array[(String, List[Int])] Array((part_0,List(5, 4, 3, 2, 1)), (part_1,List(10, 9, 8, 7, 6)))##第一个分区中包含5,4,3,2,1 ##第二个分区中包含10,9,8,7,6scala rdd1.aggregate(1)(| {(x : Int,y : Int) x y}, | {(a : Int,b : Int) a b}| ) res17: Int 58结果为什么是58看下面的计算过程 ##先在每个分区中迭代执行 (x : Int,y : Int) x y 并且使用zeroValue的值1##即part_0中 zeroValue54321 154321 16##part_1中 zeroValue109876 1109876 41##再将两个分区的结果合并(a : Int,b : Int) a b 并且使用zeroValue的值1##即zeroValuepart_0part_1 1 16 41 58再比如 scala rdd1.aggregate(2)(| {(x : Int,y : Int) x y}, | {(a : Int,b : Int) a b}| ) res18: Int 1428 ##这次zeroValue2 ##part_0中 zeroValue54321 254321 17 ##part_1中 zeroValue109876 2109876 42 ##最后zeroValuepart_0part_1 2 17 42 1428因此zeroValue即确定了U的类型也会对结果产生至关重要的影响使用时候要特别注意。 42) fold def fold(zeroValue: T)(op: (T, T) ⇒ T): Tfold是aggregate的简化将aggregate中的seqOp和combOp使用同一个函数op。 var rdd1 sc.makeRDD(1 to 10, 2) scala rdd1.fold(1)(| (x,y) x y | ) res19: Int 58 ##结果同上面使用aggregate的第一个例子一样即 scala rdd1.aggregate(1)(| {(x,y) x y}, | {(a,b) a b}| ) res20: Int 5843) lookup def lookup(key: K): Seq[V]lookup用于(K,V)类型的RDD,指定K值返回RDD中该K对应的所有V值。 scala var rdd1 sc.makeRDD(Array((A,0),(A,2),(B,1),(B,2),(C,1))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] ParallelCollectionRDD[0] at makeRDD at :21scala rdd1.lookup(A) res0: Seq[Int] WrappedArray(0, 2)scala rdd1.lookup(B) res1: Seq[Int] WrappedArray(1, 2)44) countByKey def countByKey(): Map[K, Long]countByKey用于统计RDD[K,V]中每个K的数量。 scala var rdd1 sc.makeRDD(Array((A,0),(A,2),(B,1),(B,2),(B,3))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] ParallelCollectionRDD[7] at makeRDD at :21scala rdd1.countByKey res5: scala.collection.Map[String,Long] Map(A - 2, B - 3)45) foreach def foreach(f: (T) ⇒ Unit): Unitforeach用于遍历RDD,将函数f应用于每一个元素。 但要注意如果对RDD执行foreach只会在Executor端有效而并不是Driver端。 比如rdd.foreach(println)只会在Executor的stdout中打印出来Driver端是看不到的。 我在Spark1.4中是这样不知道是否真如此。 这时候使用accumulator共享变量与foreach结合倒是个不错的选择。 scala var cnt sc.accumulator(0) cnt: org.apache.spark.Accumulator[Int] 0scala var rdd1 sc.makeRDD(1 to 10,2) rdd1: org.apache.spark.rdd.RDD[Int] ParallelCollectionRDD[5] at makeRDD at :21scala rdd1.foreach(x cnt x)scala cnt.value res51: Int 55scala rdd1.collect.foreach(println) ⭐️46) foreachPartition def foreachPartition(f: (Iterator[T]) ⇒ Unit): UnitforeachPartition和foreach类似只不过是对每一个分区使用f。 scala var rdd1 sc.makeRDD(1 to 10,2) rdd1: org.apache.spark.rdd.RDD[Int] ParallelCollectionRDD[5] at makeRDD at :21scala var allsize sc.accumulator(0) size: org.apache.spark.Accumulator[Int] 0scala rdd1.foreachPartition { x {| allsize x.size| }} scala println(allsize.value) #10⭐️47) sortBy def sortBy[K](f: (T) ⇒ K, ascending: Boolean true, numPartitions: Int this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]sortBy根据给定的排序k函数将RDD中的元素进行排序。 scala var rdd1 sc.makeRDD(Seq(3,6,7,1,2,0),2)scala rdd1.sortBy(x x).collectres1: Array[Int] Array(0, 1, 2, 3, 6, 7) //默认升序scala rdd1.sortBy(x x,false).collectres2: Array[Int] Array(7, 6, 3, 2, 1, 0) //降序//RDD[K,V]类型scalavar rdd1 sc.makeRDD(Array((A,2),(A,1),(B,6),(B,3),(B,7)))scala rdd1.sortBy(x x).collectres3: Array[(String, Int)] Array((A,1), (A,2), (B,3), (B,6), (B,7))//按照V进行降序排序scala rdd1.sortBy(x x._2,false).collectres4: Array[(String, Int)] Array((B,7), (B,6), (B,3), (A,2), (A,1))48) saveAsTextFile def saveAsTextFile(path: String): Unitdef saveAsTextFile(path: String, codec: Class[_ : CompressionCodec]): UnitsaveAsTextFile用于将RDD以文本文件的格式存储到文件系统中。 codec参数可以指定压缩的类名。 var rdd1 sc.makeRDD(1 to 10,2) scala rdd1.saveAsTextFile(hdfs://cdh5/tmp/lxw1234.com/) //保存到HDFShadoop fs -ls /tmp/lxw1234.com Found 2 items -rw-r--r-- 2 lxw1234 supergroup 0 2015-07-10 09:15 /tmp/lxw1234.com/_SUCCESS -rw-r--r-- 2 lxw1234 supergroup 21 2015-07-10 09:15 /tmp/lxw1234.com/part-00000hadoop fs -cat /tmp/lxw1234.com/part-00000注意如果使用rdd1.saveAsTextFile(“file:///tmp/lxw1234.com”)将文件保存到本地文件系统那么只会保存在Executor所在机器的本地目录。 ## 指定压缩格式保存rdd1.saveAsTextFile(hdfs://cdh5/tmp/lxw1234.com/,classOf[com.hadoop.compression.lzo.LzopCodec])hadoop fs -ls /tmp/lxw1234.com -rw-r--r-- 2 lxw1234 supergroup 0 2015-07-10 09:20 /tmp/lxw1234.com/_SUCCESS -rw-r--r-- 2 lxw1234 supergroup 71 2015-07-10 09:20 /tmp/lxw1234.com/part-00000.lzohadoop fs -text /tmp/lxw1234.com/part-00000.lzo49) saveAsSequenceFile saveAsSequenceFile用于将RDD以SequenceFile的文件格式保存到HDFS上。 用法同saveAsTextFile。 50) saveAsObjectFile def saveAsObjectFile(path: String): UnitsaveAsObjectFile用于将RDD中的元素序列化成对象存储到文件中。 对于HDFS默认采用SequenceFile保存。 var rdd1 sc.makeRDD(1 to 10,2) rdd1.saveAsObjectFile(hdfs://cdh5/tmp/lxw1234.com/)hadoop fs -cat /tmp/lxw1234.com/part-00000 SEQ !org.apache.hadoop.io.NullWritableorg.apache.hadoop.io.BytesWritableTsaveAsHadoopFile def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ : OutputFormat[_, _]], codec: Class[_ : CompressionCodec]): Unitdef saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ : OutputFormat[_, _]], conf: JobConf …, codec: Option[Class[_ : CompressionCodec]] None): UnitsaveAsHadoopFile是将RDD存储在HDFS上的文件中支持老版本Hadoop API。 可以指定outputKeyClass、outputValueClass以及压缩格式。 每个分区输出一个文件。 var rdd1 sc.makeRDD(Array((A,2),(A,1),(B,6),(B,3),(B,7)))import org.apache.hadoop.mapred.TextOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritablerdd1.saveAsHadoopFile(/tmp/lxw1234.com/,classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])rdd1.saveAsHadoopFile(/tmp/lxw1234.com/,classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]],classOf[com.hadoop.compression.lzo.LzopCodec])52) saveAsHadoopDataset def saveAsHadoopDataset(conf: JobConf): UnitsaveAsHadoopDataset用于将RDD保存到除了HDFS的其他存储中比如HBase。 在JobConf中通常需要关注或者设置五个参数 文件的保存路径、key值的class类型、value值的class类型、RDD的输出格式(OutputFormat)、以及压缩相关的参数。 ##使用saveAsHadoopDataset将RDD保存到HDFS中import org.apache.spark.SparkConf import org.apache.spark.SparkContext import SparkContext._ import org.apache.hadoop.mapred.TextOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.mapred.JobConfvar rdd1 sc.makeRDD(Array((A,2),(A,1),(B,6),(B,3),(B,7))) var jobConf new JobConf() jobConf.setOutputFormat(classOf[TextOutputFormat[Text,IntWritable]]) jobConf.setOutputKeyClass(classOf[Text]) jobConf.setOutputValueClass(classOf[IntWritable]) jobConf.set(mapred.output.dir,/tmp/lxw1234/) rdd1.saveAsHadoopDataset(jobConf)结果 hadoop fs -cat /tmp/lxw1234/part-00000 A 2 A 1hadoop fs -cat /tmp/lxw1234/part-00001 B 6 B 3 B 7##保存数据到HBASE ##HBase建表 create ‘lxw1234′,{NAME ‘f1′,VERSIONS 1},{NAME ‘f2′,VERSIONS 1},{NAME ‘f3′,VERSIONS 1}import org.apache.spark.SparkConf import org.apache.spark.SparkContext import SparkContext._ import org.apache.hadoop.mapred.TextOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.io.ImmutableBytesWritablevar conf HBaseConfiguration.create()var jobConf new JobConf(conf)jobConf.set(hbase.zookeeper.quorum,zkNode1,zkNode2,zkNode3)jobConf.set(zookeeper.znode.parent,/hbase)jobConf.set(TableOutputFormat.OUTPUT_TABLE,lxw1234)jobConf.setOutputFormat(classOf[TableOutputFormat])var rdd1 sc.makeRDD(Array((A,2),(B,6),(C,7)))rdd1.map(x {var put new Put(Bytes.toBytes(x._1))put.add(Bytes.toBytes(f1), Bytes.toBytes(c1), Bytes.toBytes(x._2))(new ImmutableBytesWritable,put)}).saveAsHadoopDataset(jobConf)##结果 hbase(main):005:0 scan lxw1234 ROW COLUMNCELL A columnf1:c1, timestamp1436504941187, value\x00\x00\x00\x02 B columnf1:c1, timestamp1436504941187, value\x00\x00\x00\x06 C columnf1:c1, timestamp1436504941187, value\x00\x00\x00\x07 3 row(s) in 0.0550 seconds注意保存到HBase运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。 53) saveAsNewAPIHadoopFile def saveAsNewAPIHadoopFile[F : OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unitdef saveAsNewAPIHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ : OutputFormat[_, _]], conf: Configuration self.context.hadoopConfiguration): UnitsaveAsNewAPIHadoopFile用于将RDD数据保存到HDFS上使用新版本Hadoop API。 用法基本同saveAsHadoopFile。 import org.apache.spark.SparkConf import org.apache.spark.SparkContext import SparkContext._ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritablevar rdd1 sc.makeRDD(Array((A,2),(A,1),(B,6),(B,3),(B,7))) rdd1.saveAsNewAPIHadoopFile(/tmp/lxw1234/,classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])54) saveAsNewAPIHadoopDataset def saveAsNewAPIHadoopDataset(conf: Configuration): Unit作用同saveAsHadoopDataset,只不过采用新版本Hadoop API。 以写入HBase为例 HBase建表 create ‘lxw1234′,{NAME ‘f1′,VERSIONS 1},{NAME ‘f2′,VERSIONS 1},{NAME ‘f3′,VERSIONS 1}完整的Spark应用程序 package com.lxw1234.testimport org.apache.spark.SparkConf import org.apache.spark.SparkContext import SparkContext._ import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Putobject Test {def main(args : Array[String]) {val sparkConf new SparkConf().setMaster(spark://lxw1234.com:7077).setAppName(lxw1234.com)val sc new SparkContext(sparkConf);var rdd1 sc.makeRDD(Array((A,2),(B,6),(C,7)))sc.hadoopConfiguration.set(hbase.zookeeper.quorum ,zkNode1,zkNode2,zkNode3)sc.hadoopConfiguration.set(zookeeper.znode.parent,/hbase)sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,lxw1234)var job new Job(sc.hadoopConfiguration)job.setOutputKeyClass(classOf[ImmutableBytesWritable])job.setOutputValueClass(classOf[Result])job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])rdd1.map(x {var put new Put(Bytes.toBytes(x._1))put.add(Bytes.toBytes(f1), Bytes.toBytes(c1), Bytes.toBytes(x._2))(new ImmutableBytesWritable,put)} ).saveAsNewAPIHadoopDataset(job.getConfiguration)sc.stop() } }注意保存到HBase运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。
http://www.w-s-a.com/news/309302/

相关文章:

  • 做线上网站需要多少钱系统开发板价格
  • 建筑企业登录哪个网站wordpress feed地址
  • 网站建设流程百科提升seo搜索排名
  • 杭州网站建设 巴零做销售怎么和客户聊天
  • 北京自己怎样做网站wordpress oauth2插件
  • 上海800做网站wordpress建站的好处
  • 婚纱摄影网站设计模板如何做好网站内容
  • cdn网站加速招商计划书模板ppt
  • 我在某网站网站做代理开发小程序外包
  • 设计网站国外商城网站的建设费用
  • 网站开发工作需要什么专业学做网站游戏教程
  • 电子商务网站规划 分析 设计杭州网站优化平台
  • 汕头企业自助建站系统网站后台登陆验证码无法显示
  • 宁波网站制作服务做外贸推广自己网站
  • php 微信 网站开发青岛网站互联网公司
  • 网站软件免费下载大全网站建设开发价格高吗
  • asp网站制作软件上海做网站制作
  • 福田区住房和建设局网站好搜搜索引擎
  • 平面设计师看的网站济南机场建设
  • 俄文网站开发翻译平台页面设计模板
  • 建设在线购物网站淮南电商网站建设价格
  • 龙泉市旅游门户网站建设wordpress faq插件
  • 网站的流程图贵阳做网站方舟网络
  • c 做网站开发实例wordpress 加上index
  • 济南seo网站推广搜索广告推广
  • 有关于网站建设的参考文献宁波seo网络推广公司
  • 网站设配色个人主页介绍文案
  • 网站seo相关设置优化网站建设的好处
  • 上海市建设工程安全生产协会网站郴州网站设计公司
  • 网站大型网页游戏自己搭建服务器做视频网站