网站页面确认书,新网站如何做网站优化,建网站的目的,湖南省建设厅建筑业信息网第一步#xff1a;创建RDD
Spark提供三种创建RDD方式#xff1a;** 集合、本地文件、HDFS文件**
使用程序中的集合创建RDD#xff0c;主要用于进行测试#xff0c;可以在实际部署到集群运行之前#xff0c;自己使用集合构造一些测试数据#xff0c;来测试后面的spark应…第一步创建RDD
Spark提供三种创建RDD方式** 集合、本地文件、HDFS文件**
使用程序中的集合创建RDD主要用于进行测试可以在实际部署到集群运行之前自己使用集合构造一些测试数据来测试后面的spark应用程序的流程。使用本地文件创建RDD主要用于临时性地处理一些存储了大量数据的文件使用HDFS文件创建RDD是最常用的生产环境的处理方式主要可以针对HDFS上存储的数据进行离线批处理操作。
使用集合创建RDD
如果要通过集合来创建RDD需要针对程序中的集合调用SparkContext的parallelize()方法。Spark会将集合中的数据拷贝到集群上形成一个分布式的数据集合也就是一个RDD。相当于集合中的部分数据会到一个节点上而另一部分数据会到其它节点上。然后就可以用并行的方式来操作这个分布式数据集合了 object CreateRddByArrayscala {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setAppName(CreateRddByArrayscala).setMaster(local)val sc new SparkContext(conf)//创建集合 driver中执行val arr Array(1,2,3,4,5)//基于集合创建RDDval rdd sc.parallelize(arr)//对集合数据求和val sum rdd.reduce(_ _)//这行代码再driver中执行println(sum)** 注意** val arr Array(1,2,3,4,5)还有println(sum)代码是在driver进程中执行的这些代码不会并行执行parallelize还有reduce之类的操作是在worker节点中执行的
使用本地文件和HDFS文件创建RDD
通过SparkContext的textFile()方法可以针对本地文件或HDFS文件创建RDDRDD中的每个元素就是文件中的一行文本内容。textFile()方法支持针对目录、压缩文件以及通配符创建RDD
/*** 通过文件创建RDD*/
object CreateRddByFilescala {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setAppName(CreateRddByArrayscala).setMaster(local)val sc new SparkContext(conf)var path D:\\hello.txt//path hdfs://bigdata01:9000/test/hello.txtvar rdd sc.textFile(path,minPartitions 2)//获取每一行数据的长度计算文件内数据的总长度val length rdd.map(_.length).reduce(__)println(length);sc.stop() }
}** Spark中对RDD的操作** Spark对RDD的操作可以整体分为两类Transformation和Action
Transformation可以翻译为转换表示是针对RDD中数据的转换操作主要会针对已有的RDD创建一个新的RDD常见的有map、flatMap、filter等等. Action可以翻译为执行表示是触发任务执行的操作主要对RDD进行最后的操作比如遍历、reduce、保存到文件等并且还可以把结果返回给Driver程序. 不管是Transformation里面的操作还是Action里面的操作我们一般会把它们称之为算子 其中Transformation算子有一个特性** lazy ** lazy特性在这里指的是如果一个spark任务中只定义了transformation算子那么即使你执行这个任务任务中的算子也不会执行. 只有当transformation之后接着执行了一个action操作那么所有的transformation才会执行。 Spark通过lazy这种特性来进行底层的spark任务执行的优化避免产生过多中间结果。 Action的特性执行Action操作才会触发一个Spark 任务的运行从而触发这个Action之前所有的Transformation的执行
算子 介绍
map 将RDD中的每个元素进行处理一进一出
filter 对RDD中每个元素进行判断返回true则保留
flatMap 与map类似但是每个元素都可以返回一个或多个新元素
groupByKey 根据key进行分组每个key对应一个Iterablevalue
reduceByKey 对每个相同key对应的value进行reduce操作
sortByKey 对每个相同key对应的value进行排序操作(全局排序)
join 对两个包含key,value对的RDD进行join操作
distinct 对RDD中的元素进行全局去重Transformation操作开发实战
map对集合中每个元素乘以2filter过滤出集合中的偶数flatMap将行拆分为单词groupByKey对每个大区的主播进行分组reduceByKey统计每个大区的主播数量sortByKey对主播的音浪收入排序join打印每个主播的大区信息和音浪收入distinct统计当天开播的大区信息
scala代码如下
object TransformationOpScala {def main(args: Array[String]): Unit {val sc getSparkContextgroupByKeyOp(sc)}//flatMap将行拆分为单词def flatMapOp(sc: SparkContext): Unit {val dataRdd sc.parallelize(Array( good good study,day day up))dataRdd.flatMap(_.split( )).foreach(println(_))}//groupbyKey 对每个大区主播进行分组def groupByKeyOp(sc: SparkContext): Unit {val dataRdd sc.parallelize(Array((150001,us),(1500002,CN),(150003,CN),(1500004,IN)))//需要使用map对tuple中的数据位置进行互换因为需要把大区作为key进行分组操作dataRdd.map(tup(tup._2,tup._1)).groupByKey().foreach(tup{//获取大区val areatup._1println(area:)//获取同一个大区对应的所有用户idval it tup._2for(uid - it){println(uid )}println()})}//filter过滤出集合中的偶数def filterOp(sc: SparkContext): Unit {val dataRdd sc.parallelize(Array(1,2,3,4,5))dataRdd.filter(_ %2 0).foreach(println(_))}
//map对集合中每个元素乘以2def mapOp(sc: SparkContext): Unit {val dataRdd sc.parallelize(Array(1,2,3,4,5))dataRdd.map(_ * 2).foreach(println(_))}private def getSparkContext {val conf new SparkConf()conf.setAppName(CreateRddByArrayscala).setMaster(local)new SparkContext(conf)}
}常用Action介绍
算子 介绍
reduce 将RDD中的所有元素进行聚合操作
collect 将RDD中所有元素获取到本地客户端(Driver)
count 获取RDD中元素总数
take(n) 获取RDD中前n个元素
saveAsTextFile 将RDD中元素保存到文件中对每个元素调用toString
countByKey 对每个key对应的值进行count计数
foreach 遍历RDD中的每个元素scala代码
object ActionOpScala {def main(args: Array[String]): Unit {val sc getSparkContext//reduce聚合计算//reduceOp(sc)//collect获取元素集合//colletOp(sc)// count获取元素总数//countOp(sc)//saveAsTextFile保存文件//saveAsTextFileOp(sc)//countByKey统计相同的key出现多少次//countByKeyOp(sc)//foreach迭代遍历元素foreachOp(sc)sc.stop()}//foreach迭代遍历元素def foreachOp(sc: SparkContext): Unit {val dataRdd sc.parallelize(Array(1,2,3,4,5))dataRdd.foreach(println(_))}//countByKey统计相同的key出现多少次def countByKeyOp(sc: SparkContext): Unit {val dataRdd sc.parallelize(Array((A,1001),(B,1002),(A,1003),(C,1004)))val res dataRdd.countByKey()for((k,v) - res){println(k,v)}}//saveAsTextFile保存文件def saveAsTextFileOp(sc: SparkContext): Unit {val dataRdd sc.parallelize(Array(1,2,3,4,5))dataRdd.saveAsTextFile(hdfs://bigdata01:9000/out001)}
// count获取元素总数def countOp(sc: SparkContext): Unit {val dataRdd sc.parallelize(Array(1,2,3,4,5))val res dataRdd.count()println(res)}//collect获取元素集合def colletOp(sc: SparkContext): Unit {val dataRdd sc.parallelize(Array(1,2,3,4,5))//collect 返回的是一个Array数组val res dataRdd.collect()for(item - res){println(item)}}
//reduce聚合计算def reduceOp(sc: SparkContext): Unit {val dataRdd sc.parallelize(Array(1,2,3,4,5))val num dataRdd.reduce(_ _)println(num)}private def getSparkContext {val conf new SparkConf()conf.setAppName(CreateRddByArrayscala).setMaster(local)new SparkContext(conf)}
}