湖南北山建设集团股份有限公司官方网站,国产cms,网站建设和网站维护,做交流网站join是两个结果集之间的链接#xff0c;需要进行数据的匹配。
演示一下join是否存在shuffle。
1. 如果两个rdd没有分区器#xff0c;分区个数一致
#xff0c;会发生shuffle。但分区数量不变。
scala val arr Array((zhangsan,300),(lisi,…join是两个结果集之间的链接需要进行数据的匹配。
演示一下join是否存在shuffle。
1. 如果两个rdd没有分区器分区个数一致
会发生shuffle。但分区数量不变。
scala val arr Array((zhangsan,300),(lisi,400),(wangwu,350),(zhaosi,450))
arr: Array[(String, Int)] Array((zhangsan,300), (lisi,400), (wangwu,350), (zhaosi,450))scala val arr1 Array((zhangsan,22),(lisi,24),(wangwu,30),(guangkun,5))
arr1: Array[(String, Int)] Array((zhangsan,22), (lisi,24), (wangwu,30), (guangkun,5))scala sc.makeRDD(arr,3)
res116: org.apache.spark.rdd.RDD[(String, Int)] ParallelCollectionRDD[108] at makeRDD at console:27scala sc.makeRDD(arr1,3)
res117: org.apache.spark.rdd.RDD[(String, Int)] ParallelCollectionRDD[109] at makeRDD at console:27scala res116 join res117
res118: org.apache.spark.rdd.RDD[(String, (Int, Int))] MapPartitionsRDD[112] at join at console:28scala res118.collect
res119: Array[(String, (Int, Int))] Array((zhangsan,(300,22)), (wangwu,(350,30)), (lisi,(400,24))) 2. 如果分区个数不一致有shuffle且产生的rdd的分区个数以多的为主。 3. 如果分区个数一样并且分区器一样那么是没有shuffle的
scala sc.makeRDD(arr,3)
res128: org.apache.spark.rdd.RDD[(String, Int)] ParallelCollectionRDD[118] at makeRDD at console:27scala sc.makeRDD(arr1,3)
res129: org.apache.spark.rdd.RDD[(String, Int)] ParallelCollectionRDD[119] at makeRDD at console:27scala res128.reduceByKey(__)
res130: org.apache.spark.rdd.RDD[(String, Int)] ShuffledRDD[120] at reduceByKey at console:26scala res129.reduceByKey(__)
res131: org.apache.spark.rdd.RDD[(String, Int)] ShuffledRDD[121] at reduceByKey at console:26scala res130 join res131
res132: org.apache.spark.rdd.RDD[(String, (Int, Int))] MapPartitionsRDD[124] at join at console:28scala res132.collect
res133: Array[(String, (Int, Int))] Array((zhangsan,(300,22)), (wangwu,(350,30)), (lisi,(400,24)))scala res132.partitions.size
res134: Int 3 4. 都存在分区器但是分区个数不同也会存在shuffle
scala val arr Array((zhangsan,300),(lisi,400),(wangwu,350),(zhaosi,450))
arr: Array[(String, Int)] Array((zhangsan,300), (lisi,400), (wangwu,350), (zhaosi,450))scala val arr1 Array((zhangsan,22),(lisi,24),(wangwu,30),(guangkun,5))
arr1: Array[(String, Int)] Array((zhangsan,22), (lisi,24), (wangwu,30), (guangkun,5))scala sc.makeRDD(arr,3)
res0: org.apache.spark.rdd.RDD[(String, Int)] ParallelCollectionRDD[0] at makeRDD at console:27scala sc.makeRDD(arr1,4)
res1: org.apache.spark.rdd.RDD[(String, Int)] ParallelCollectionRDD[1] at makeRDD at console:27scala res0.reduceByKey(__)
res2: org.apache.spark.rdd.RDD[(String, Int)] ShuffledRDD[2] at reduceByKey at console:26scala res1.reduceByKey(__)
res3: org.apache.spark.rdd.RDD[(String, Int)] ShuffledRDD[3] at reduceByKey at console:26scala res2 join res3
res4: org.apache.spark.rdd.RDD[(String, (Int, Int))] MapPartitionsRDD[6] at join at console:28scala res4.collect
res5: Array[(String, (Int, Int))] Array((zhangsan,(300,22)), (wangwu,(350,30)), (lisi,(400,24)))scala res4.partitions.size
res6: Int 4 这里为啥stage3里reduceByKey和join过程是连在一起的因为分区多的RDD是不需要进行shuffle的数据该在哪个分区就在哪个分区反而是分区少的RDD要进行join要进行数据的打散。
分区以多的为主。
5. 一个带有分区器一个没有分区器那么以带有分区器的rdd分区数量为主并且存在shuffle
scala arr
res7: Array[(String, Int)] Array((zhangsan,300), (lisi,400), (wangwu,350), (zhaosi,450))scala arr1
res8: Array[(String, Int)] Array((zhangsan,22), (lisi,24), (wangwu,30), (guangkun,5))scala sc.makeRDD(arr,3)
res9: org.apache.spark.rdd.RDD[(String, Int)] ParallelCollectionRDD[7] at makeRDD at console:27scala sc.makeRDD(arr,4)
res10: org.apache.spark.rdd.RDD[(String, Int)] ParallelCollectionRDD[8] at makeRDD at console:27scala res9.reduceByKey(__)
res11: org.apache.spark.rdd.RDD[(String, Int)] ShuffledRDD[9] at reduceByKey at console:26scala res10 join res11
res12: org.apache.spark.rdd.RDD[(String, (Int, Int))] MapPartitionsRDD[12] at join at console:28scala res12.partitions.size
res13: Int 3scala res12.collect
res14: Array[(String, (Int, Int))] Array((zhangsan,(300,300)), (wangwu,(350,350)), (lisi,(400,400)), (zhaosi,(450,450))) 同理stage6的reduceByKey过程和join过程是连在一起的是因为有分区器的RDD并不需要进行shuffle操作原来的数据该在哪在哪而没有分区器的RDD要进行join要进行数据的打散有shuffle过程所以有stage4到stage6的连线。