北京十大网站建设公司,小程序代理模板,河池网站优化,陈田村拆车件网上商城目录 说明RDD学习RDD介绍RDD案例基于集合创建RDDRDD存入外部文件中 转换算子 操作map 操作说明案例 flatMap操作说明案例 filter 操作说明案例 groupBy 操作说明案例 distinct 操作说明案例 sortBy 操作说明案例 mapToPair 操作说明案例 mapValues操作说明案例 groupByKey操作说… 目录 说明RDD学习RDD介绍RDD案例基于集合创建RDDRDD存入外部文件中 转换算子 操作map 操作说明案例 flatMap操作说明案例 filter 操作说明案例 groupBy 操作说明案例 distinct 操作说明案例 sortBy 操作说明案例 mapToPair 操作说明案例 mapValues操作说明案例 groupByKey操作说明案例 reduceByKey操作说明案例 sortByKey操作说明案例 行动算子 操作collect 操作说明案例 count 操作说明案例 first操作说明案例 take操作说明案例 countByKey操作说明案例 saveAsTextFile 操作说明案例 foreach操作说明案例 说明
本文依赖于上一篇文章【spark学习】 spring boot 整合 spark 的相关内容请先阅读上一篇文章将spark需要的环境先配置好并测试通过之后再进行此文章的阅读和操作。
RDD学习
RDD介绍
想象一下你有一个大大的数据表里面包含了很多很多的信息。如果你想对这些数据进行操作比如筛选出符合条件的数据、或者对数据做一些计算RDD 就是 Spark 用来存储和操作这些数据的一种方式。它有两个基本操作转换操作就像是加工数据和 行动操作获取结果
RDD案例
基于集合创建RDD
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;/*** spark 案例*/
Slf4j
Component
public class DemoTimer {AutowiredJavaSparkContext javaSparkContext;AutowiredSparkSession sparkSession;/*** 创建 RDD 算子*/PostConstructpublic void createRDD() {// 从集合创建 RDDListString lists Arrays.asList(hello, spark, hi, spark, hadoop);JavaRDDString parallelize javaSparkContext.parallelize(lists); // 创建RDDparallelize.collect().forEach(System.out::println); // 打印}
}输出结果
RDD存入外部文件中
将RDD存入外部文件用于观察文件结果和分区结果。txt文件数等于默认分区数。从结果中看出默认分区为4个。
注意存放结果的文件夹路径必须没有被创建。
以下案例中将基于集合生成的RDD保存到saveRddDemo文件夹中。
package www.zhangxiaosan.top.timer;import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;/*** spark 案例*/
Slf4j
Component
public class DemoTimer {AutowiredJavaSparkContext javaSparkContext;AutowiredSparkSession sparkSession;/*** 创建 RDD 算子*/PostConstructpublic void createRDD() {// 从集合创建 RDDListString lists Arrays.asList(hello, spark, hi, spark, hadoop);// parallelize(): 创建RDDRDD中创建默认分区数// parallelize( 元素, 分区数) 创建RDDRDD中指定分区数JavaRDDString parallelize javaSparkContext.parallelize(lists); // 创建RDDparallelize.collect().forEach(System.out::println); // 打印// 存储的目录文件夹路径。此处为项目中的路径且目录必须为不存在的路径。String fileSavePathI:\\zhang\\SpringBootSparkDemo\\src\\main\\resources\\saveRddDemo;parallelize.saveAsTextFile(fileSavePath);// 开始将RDD保存到文件中 }
}
运行结果 在文件夹中存放的是运行程序生成的文件。如下图。 _SUCCESS文件成功标记 part-XXX 文件保存的数据文件 结果中有4个文件说明默认分区为4个。
转换算子 操作
转换算子Transformation是指那些返回一个新的 RDD 的操作。转换算子不会立即执行计算而是构建一个执行计划只有当行动算子Action触发计算时转换算子的操作才会实际执行。转换算子可以用来处理和转换数据比如映射、过滤、聚合等。
map 操作
说明
map() 方法传入一个函数作为参数。map() 方法会将RDD中的元素逐一进行调用函数的参数即是RDD的元素。 如map( 函数( RDD元素 ) )。 map() 方法会返回一个新的RDD。
案例
将RDD中的每个元素都拼接上字符串 “ say hi ”
package www.zhangxiaosan.top.timer;import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;/*** spark 案例*/
Slf4j
Component
public class DemoTimer {AutowiredJavaSparkContext javaSparkContext;AutowiredSparkSession sparkSession;/*** 创建 RDD 算子*/PostConstructpublic void createRDD() {// 从集合创建 RDDListString lists Arrays.asList(张三, 李四, 王五);JavaRDDString parallelize javaSparkContext.parallelize(lists); // 创建RDD// lambda表达式传入匿名函数进行元素拼接字符。// item 表示RDD中的元素parallelize parallelize.map(item - item say hi); parallelize.collect().forEach(System.out::println); // 打印}
}
输出结果
flatMap操作
说明
flatMap() 方法传入一个函数作为参数 。flatMap() 方法会将RDD中的元素逐一进行调用函数的参数即是RDD的元素。 如flatMap( 函数( RDD元素 ) )。 flatMap() 方法会返回一个新的RDD。 flatMap() 方法 会将 RDD的元素扁平化处理成一个集合。
例如 假设你有一个箱子箱子里面放着几个小盒子每个小盒子里又有一些玩具。flatMap 就是一个工具它能帮你把每个小盒子里的玩具拿出来直接放进一个大盒子里最终把所有玩具放在一个地方。 每个小盒子里的玩具可以不止一个甚至可能没有玩具比如有的盒子是空的。flatMap 会把每个盒子里的玩具都拿出来放到一个大盒子里最终得到一个扁平的大盒子里面是所有玩具。
案例
将集合[ [1, 2, 3, 4, 5],[hello, spark],[张三, 李四] ] 扁平化处理成[ 1, 2, 3, 4, 5, hello, spark, 张三, 李四]
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;/*** spark 案例*/
Slf4j
Component
public class DemoTimer {AutowiredJavaSparkContext javaSparkContext;AutowiredSparkSession sparkSession;PostConstructpublic void flatMapDemo() {// 声明集合[ [1, 2, 3, 4, 5],[hello, spark],[张三, 李四] ]ListListString arrs Arrays.asList(Arrays.asList(1, 2, 3, 4, 5),Arrays.asList(hello, spark),Arrays.asList(张三, 李四));//输出声明的集合呢容System.out.println(原始集合打印);arrs.forEach(item - System.out.print( item));// 分隔符System.out.println();System.out.println(-----------);System.out.println(flatMap操作后);// 创建集合的RDDJavaRDDListString parallelize javaSparkContext.parallelize(arrs);// flatMap操作ListString collect parallelize.flatMap(i - i.iterator()).collect(); // 打印 flatMap操作 后的集合collect.forEach(item - System.out.print( item));System.out.println();}
}
filter 操作
说明
filter() 方法传入一个函数作为参数函数返回值只能为Boolean值。filter() 方法会将RDD中的元素逐一进行调用函数的参数即是RDD的元素。 如filter( 函数( RDD元素 ) )。 filter() 方法会返回一个新的RDD。filter() 将RDD元素中满足条件的元素保留即函数返回为true的元素RDD中不满足条件的元素即函数返回为false的元素过滤。
案例
过滤单数保留双数 import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;/*** spark 案例*/
Slf4j
Component
public class DemoTimer {AutowiredJavaSparkContext javaSparkContext;AutowiredSparkSession sparkSession;PostConstructpublic void filterDemo() {// 声明集合ListInteger arrs Arrays.asList(-1, 0, 1, 2, 3, 4, 5);System.out.println(原始集合打印);arrs.forEach(item - System.out.print( item));// 输出过滤前的数据System.out.println();System.out.println(-----------);System.out.println(filter操作后);JavaRDDInteger parallelize javaSparkContext.parallelize(arrs);// 过滤掉 单数 取模不等于0的数字为单数否则为偶数。item % 2 0 的数字为双数返回有true保留ListInteger collect parallelize.filter(item - item % 2 0).collect();// 输出过滤后的数据collect.forEach(item - System.out.print( item));System.out.println();}
}运行结果
groupBy 操作
说明
将数据按某些条件进行分组。每个组由键值对组成。 groupBy() 常和以下聚合函数一起使用来对分组数据进行统计分析。 常用的聚合操作包括
count(): 统计每个组的元素数量sum(): 计算每个组的元素总和avg(): 计算每个组的平均值max()计算每个组的最大值min(): 计算每个组的最小值agg(): 自定义聚合操作可以结合多个聚合函数
案例
给写生成绩按照分数来分组。
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
Slf4j
Component
public class DemoTimer {AutowiredJavaSparkContext javaSparkContext;AutowiredSparkSession sparkSession;PostConstructpublic void groupByDemo() {// 定义学生成绩数据ListTuple2String, Integer students Arrays.asList(new Tuple2(张三, 85),new Tuple2(李四, 90),new Tuple2(王五, 85),new Tuple2(赵六, 95),new Tuple2(孙七, 90));// 创建RDDJavaRDDTuple2String, Integer parallelize javaSparkContext.parallelize(students);// 使用 groupBy 进行分组按成绩分组JavaPairRDDObject, IterableTuple2String, Integer integerIterableJavaPairRDD parallelize.groupBy(tuple - tuple._2());// 使用 groupBy 按成绩分组。_2表示元组的第二个元素即分数// 打印结果integerIterableJavaPairRDD.sortByKey().foreach(item - System.out.println(成绩 item._1() 学生 item._2()));}
}结果如下
distinct 操作
说明
对RDD的元素进行分布式去重。返回新的RDD新的RDD内的元素不重复出现。
案例
将集合中重复的元素去除。
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
Slf4j
Component
public class DemoTimer {AutowiredJavaSparkContext javaSparkContext;AutowiredSparkSession sparkSession;PostConstructpublic void distinctDemo() {// 声明集合ListInteger arrs Arrays.asList(1,2,3,4,5,6,1,4,6);// 创建RDDJavaRDDInteger parallelize javaSparkContext.parallelize(arrs);// distinct()方法用于去除重复元素。JavaRDDInteger distinct parallelize.distinct();// 打印结果distinct.collect().forEach(System.out::println);}
}运行结果
sortBy 操作
说明
对RDD的元素进行排序返回新的RDD。 sortBy() 可传入3个参数 参数1函数每个RDD元素都会传入此函数中此函数定义排序规则。 参数2boolean值。定义排序顺序。true为升序false降序。 参数3Integer 整数指定分区数量。
案例
将集合中的元素降序排序。
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
Slf4j
Component
public class DemoTimer {AutowiredJavaSparkContext javaSparkContext;AutowiredSparkSession sparkSession;PostConstructpublic void sortByDemo() {// 创建无序集合ListInteger arrs Arrays.asList(7,2,5,4,1,9,6,3,8);// 创建RDDJavaRDDInteger parallelize javaSparkContext.parallelize(arrs);// 使用 sortBy 进行排序按元素值排序并返回一个RDD。JavaRDDInteger sortBy parallelize.sortBy(item - item, false, 1);// 打印结果sortBy.collect().forEach(System.out::println);}
}运行结果
mapToPair 操作
说明
mapToPair()是将一个普通的 RDD 转换为 JavaPairRDD 的一个方法。 JavaPairRDD 中的每个元素都是一个键值对。 mapToPair对RDD的元素进行映射成一个由键值对组成的 RDD即映射成JavaPairRDD即将每个元素转换成一个 Tuple2 对象其中第一个元素是键key第二个元素是值value。
案例
将集合 [“张三:85”, “李四:90”, “王五:85”, “赵六:95”, “孙七:90”] 中的学生和成绩按照冒号分隔形成键值对姓名为键值为成绩。
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
Slf4j
Component
public class DemoTimer {AutowiredJavaSparkContext javaSparkContext;AutowiredSparkSession sparkSession;PostConstructpublic void mapToPairDemo() {// 创建包含学生成绩的列表ListString students Arrays.asList(张三:85, 李四:90, 王五:85, 赵六:95, 孙七:90);// 将数据并行化为 RDDJavaRDDString studentsRDD javaSparkContext.parallelize(students);// 使用 mapToPair 将每个元素转换为 (姓名, 成绩) 的键值对JavaPairRDDString, Integer studentsPairRDD studentsRDD.mapToPair((PairFunctionString, String, Integer) s - {// 根据冒号分隔学生姓名和成绩返回一个 (姓名, 成绩) 的元组String[] parts s.split(:);// 返回一个 (姓名, 成绩) 的元组return new Tuple2(parts[0], Integer.parseInt(parts[1]));});// 打印结果studentsPairRDD.collect().forEach(System.out::println);}
}运行结果
mapValues操作
说明
mapValues() 是一个用于处理 JavaPairRDD 的方法。它可以对 RDD 中的每个键值对的 值value 进行转换同时保留原来的 键key 不变。
案例
将集合 [“张三:85”, “李四:90”, “王五:85”, “赵六:95”, “孙七:90”] 中的学生和成绩按照冒号分隔形成键值对姓名为键值为成绩。并将成绩在原分数上5分。 import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
Slf4j
Component
public class DemoTimer {AutowiredJavaSparkContext javaSparkContext;AutowiredSparkSession sparkSession;PostConstructpublic void mapToPairDemo() {// 创建包含学生成绩的列表ListString students Arrays.asList(张三:85, 李四:90, 王五:85, 赵六:95, 孙七:90);// 将数据并行化为 RDDJavaRDDString studentsRDD javaSparkContext.parallelize(students);// 使用 mapToPair 将每个元素转换为 (姓名, 成绩) 的键值对JavaPairRDDString, Integer studentsPairRDD studentsRDD.mapToPair((PairFunctionString, String, Integer) s - {// 根据冒号分隔学生姓名和成绩返回一个 (姓名, 成绩) 的元组String[] parts s.split(:);// 返回一个 (姓名, 成绩) 的元组return new Tuple2(parts[0], Integer.parseInt(parts[1]));});// 打印结果System.out.println(原始分数);studentsPairRDD.collect().forEach(System.out::println);System.out.println(\n5分后);// item 表示每个键值对中的值即分数JavaPairRDDString, Integer newValeRdd studentsPairRDD.mapValues(item - item 5);// 打印结果newValeRdd.collect().forEach(System.out::println);}
}运行结果
groupByKey操作
说明
groupByKey() 是一个用于处理 JavaPairRDD 的方法。它根据键对数据进行分组将所有具有相同键的元素聚集到一起生成一个新的 RDD其中每个键对应一个包含所有相同键值的集合。 每个键对应的值收集到一个 Iterable 容器中然后返回一个新的 RDD其中每个键对应一个包含该键的所有值的 Iterable。
案例
将集合 [“张三:85”, “李四:90”, “陈八:95”, “王五:85”, “黄六:92”] 中的学生和成绩按照冒号分隔形成键值对成绩为键值为姓名。并将键值对中的键进行分类将相同分数的学生归成一组。
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
Slf4j
Component
public class DemoTimer {AutowiredJavaSparkContext javaSparkContext;AutowiredSparkSession sparkSession;PostConstructpublic void groupByKeyDemo(){// 创建包含学生成绩的列表ListString students Arrays.asList(张三:85, 李四:90, 陈八:95, 王五:85, 黄六:92);// 将数据并行化为 RDDJavaRDDString studentsRDD javaSparkContext.parallelize(students);// 使用 mapToPair 将每个元素转换为 (成绩, 姓名) 的键值对JavaPairRDDInteger, String studentsPairRDD studentsRDD.mapToPair(s - {String[] parts s.split(:);return new Tuple2Integer, String(Integer.parseInt(parts[1]), parts[0]);});// 根据键值对中的键进行分类JavaPairRDDInteger, IterableString groupedRDD studentsPairRDD.groupByKey();// 打印结果groupedRDD.collect().forEach(pair - {System.out.println(成绩: pair._1() , 姓名: pair._2());});}
}执行结果
reduceByKey操作
说明
reduceByKey()用于对 JavaPairRDD 数据进行聚合的一个方法。它根据键对值进行合并并在分区内进行局部聚合从而减少了跨节点的数据传输通常比 groupByKey() 更高效。
案例
计算学生的总分。
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
Slf4j
Component
public class DemoTimer {AutowiredJavaSparkContext javaSparkContext;AutowiredSparkSession sparkSession;PostConstructpublic void groupByKeyDemo(){// 创建包含学生成绩的列表ListString students Arrays.asList(张三:85, 李四:90, 张三:95, 王五:85, 李四:92);// 将数据并行化为 RDDJavaRDDString studentsRDD javaSparkContext.parallelize(students);// 使用 mapToPair 将每个元素转换为 (姓名, 成绩) 的键值对JavaPairRDDString, Integer studentsPairRDD studentsRDD.mapToPair(s - {String[] parts s.split(:);return new Tuple2(parts[0], Integer.parseInt(parts[1]));});// 使用 groupByKey 按学生姓名进行成绩求和JavaPairRDDString, Integer groupedRDD studentsPairRDD.reduceByKey((a, b) - a b);// 打印结果groupedRDD.collect().forEach(pair - {System.out.println(姓名: pair._1() , 成绩: pair._2());});}
}运行结果
sortByKey操作
说明
对 JavaPairRDD 数据按键进行排序的方法。默认为升序。 传入参数为Boolean值true 升序 false降序
案例
将学生成绩按照成绩从高到低降序排序
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
Slf4j
Component
public class DemoTimer {AutowiredJavaSparkContext javaSparkContext;AutowiredSparkSession sparkSession;PostConstructpublic void sortByKeyDemo(){// 定义原始数据ListTuple2String, Integer students Arrays.asList(new Tuple2(张三, 85),new Tuple2(李四, 90),new Tuple2(王五, 65),new Tuple2(赵六, 95),new Tuple2(孙七, 90));// 生成RDDJavaPairRDDString, Integer studentsPairRDD javaSparkContext.parallelizePairs(students);// 将学生原集合姓名成绩格式的数据转换为成绩姓名格式的键值对JavaPairRDDInteger, String integerStringJavaPairRDD studentsPairRDD.mapToPair(item - new Tuple2(item._2(), item._1()));// 将成绩姓名格式的键值对按照键降序排序即按照成绩降序排序JavaPairRDDInteger, String stringIntegerJavaPairRDD integerStringJavaPairRDD.sortByKey(false);// 打印结果stringIntegerJavaPairRDD .collect() .forEach(pair - {System.out.println(成绩: pair._1() , 姓名: pair._2());});}
}运行结果
行动算子 操作
行动算子Action是指会触发 Spark 作业的执行并且会产生一个结果或者副作用的操作。与 转换算子Transformation不同转换算子只会定义数据转换的计算逻辑而不会立即执行。只有在遇到行动算子时Spark 才会真正开始计算并将结果返回给用户或写入外部存储。
当你调用一个行动算子时Spark 会从头开始执行所有必要的转换操作并将结果返回给你或者存储到外部系统如 HDFS、数据库等。
行动算子通常会返回一个具体的结果例如一个列表、一个数值或者在某些情况下可能会执行一些副作用操作例如将数据写入磁盘。
collect 操作
说明
将分布式 RDD 中的所有数据项拉取到本地驱动程序Driver中通常作为一个数组、列表或其他集合类型。因为 collect() 会将整个 RDD 数据集拉到本地所以如果数据量非常大可能会导致内存溢出OutOfMemoryError。
注意事项 数据量大时的风险如果 RDD 中包含的数据量非常大调用 collect() 会导致所有数据被加载到本地驱动程序的内存中这可能会导致内存溢出错误OutOfMemoryError。因此建议在数据集非常大的情况下谨慎使用 collect()。
建议对于大规模数据集通常会使用其他行动算子如 take()获取一个数据的子集避免一次性加载所有数据。
并行计算的代价尽管 collect() 会将所有数据从分布式环境中拉回到单个节点但它不会对数据进行额外的计算只会执行之前定义的转换操作。因此它本质上是将整个数据集的计算结果从集群中汇总回来。
案例
收集RDD数据并打印。
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
Slf4j
Component
public class DemoTimer {AutowiredJavaSparkContext javaSparkContext;AutowiredSparkSession sparkSession;PostConstructpublic void collectDemo(){// 定义一个整数集合ListInteger arrs Arrays.asList(1, 2, 3, 4, 5);// 创建RDDJavaRDDInteger parallelize javaSparkContext.parallelize(arrs);// collect()方法用于将RDD中的数据收集到Driver端并返回一个List。ListInteger collect parallelize.collect();// 打印结果collect.forEach(item - System.out.println(item));}
}执行结果
count 操作
说明
统计RDD中的元素数量。
案例
统计RDD中的元素数量并输出 import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
Slf4j
Component
public class DemoTimer {AutowiredJavaSparkContext javaSparkContext;AutowiredSparkSession sparkSession;PostConstructpublic void countDemo(){// 定义一个整数集合ListInteger arrs Arrays.asList(1, 2, 3, 4, 5);// 创建RDDJavaRDDInteger parallelize javaSparkContext.parallelize(arrs);// count()方法统计数量。Long total parallelize.count();// 打印结果System.out.println(元素数量 total);}
}运行结果
first操作
说明
返回RDD中的第一个元素
案例
获取RDD中的第一个元素并打印
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
Slf4j
Component
public class DemoTimer {AutowiredJavaSparkContext javaSparkContext;AutowiredSparkSession sparkSession;PostConstructpublic void firstDemo(){// 创建一个集合ListInteger arrs Arrays.asList(1, 2, 3, 4, 5);// 创建RDDJavaRDDInteger parallelize javaSparkContext.parallelize(arrs);// 获取第一个元素Integer first parallelize.first();// 打印结果System.out.println(第一个元素 first);}
}运行结果
take操作
说明
在RDD中从头获取指定数量的元素返回获取的元素集合。
案例
获取前3个元素
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
Slf4j
Component
public class DemoTimer {AutowiredJavaSparkContext javaSparkContext;AutowiredSparkSession sparkSession;PostConstructpublic void takeDemo(){// 创建一个集合ListInteger arrs Arrays.asList(1, 2, 3, 4, 5);// 创建RDDJavaRDDInteger parallelize javaSparkContext.parallelize(arrs);// 获取前三个元素ListInteger takes parallelize.take(3);// 打印结果System.out.println(前三个元素 takes);}
}运行结果
countByKey操作
说明
统计RDD中每种键的数量。
案例
根据键值对统计键值对中不同键的数量。并打印。
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
Slf4j
Component
public class DemoTimer {AutowiredJavaSparkContext javaSparkContext;AutowiredSparkSession sparkSession;PostConstructpublic void countByKeyDemo(){// 创建一个集合ListTuple2String,Integer arrs Arrays.asList(new Tuple2String,Integer(张三,15),new Tuple2String,Integer(张三,20),new Tuple2String,Integer(李四,20),new Tuple2String,Integer(李四,30),new Tuple2String,Integer(李四,50),new Tuple2String,Integer(王五,10));// 创建JavaPairRDD 对象JavaPairRDD对象的元素为键值对。JavaPairRDDString, Integer parallelize javaSparkContext.parallelizePairs(arrs);// 使用 countByKey() 方法统计相同键的数量。MapString, Long countByKey parallelize.countByKey();// 打印结果countByKey.forEach((key, value) - System.out.println(键: key , 数量: value));}
}运行结果
saveAsTextFile 操作
说明
将 RDD 的数据以txt文件保存到外部存储系统。传入指定路径文件会生成到该路径下。
注意 输出路径不能存在如果输出路径已经存在saveAsTextFile 会抛出异常 文件分区Spark 会将每个分区的数据写入一个独立的文件。因此如果 RDD 有多个分区它会生成多个文件每个文件对应一个分区的数据 。 文件名会根据分区编号进行自动命名通常形式是part-00*** 文本格式保存时RDD 中的每个元素会被转换为文本行。默认情况下Spark 会把 RDD 中的每个元素的 toString() 输出到文件中。 路径支持分布式存储saveAsTextFile 支持将数据保存到本地文件系统、HDFS、S3 等分布式存储中。路径的格式取决于存储系统的类型。例如如果要保存到 HDFS路径应该以 hdfs:// 开头。
案例
将RDD存入外部文件。
以下案例中将基于集合生成的RDD保存到saveRddDemo文件夹中。具体参考本文中的 RDD存入外部文件中 内容
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;/*** spark 案例*/
Slf4j
Component
public class DemoTimer {AutowiredJavaSparkContext javaSparkContext;AutowiredSparkSession sparkSession;/*** 创建 RDD 算子*/PostConstructpublic void createRDD() {// 从集合创建 RDDListString lists Arrays.asList(hello, spark, hi, spark, hadoop);// parallelize(): 创建RDDRDD中创建默认分区数// parallelize( 元素, 分区数) 创建RDDRDD中指定分区数JavaRDDString parallelize javaSparkContext.parallelize(lists); // 创建RDDparallelize.collect().forEach(System.out::println); // 打印// 存储的目录文件夹路径。此处为项目中的路径且目录必须为不存在的路径。String fileSavePathI:\\zhang\\SpringBootSparkDemo\\src\\main\\resources\\saveRddDemo;parallelize.saveAsTextFile(fileSavePath);// 开始将RDD保存到文件中 }
}
foreach操作
说明
循环遍历RDD中的元素
案例
以上案例中大部分用到此方法是否用方式看以上案例。