巩义做网站的,互联网公司排名前1000个,自己建网站有什么用,wordpress分类显示文章上面的是SparkSQL的API操作。
1. 将RDD转化为DataFrame对象
DataFrame#xff1a;
DataFrame是一种以RDD为基础的分布式数据集#xff0c;类似于传统数据库中的二维表格。带有schema元信息#xff0c;即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这样的数…
上面的是SparkSQL的API操作。
1. 将RDD转化为DataFrame对象
DataFrame
DataFrame是一种以RDD为基础的分布式数据集类似于传统数据库中的二维表格。带有schema元信息即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这样的数据集可以用SQL查询。 创建方式
准备数据
1 zhangsan 20 male
2 lisi 30 female
3 wangwu 35 male
4 zhaosi 40 female
toDF方式。
package com.hainiu.sparkimport org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}object TestSparkSql{def main(args: Array[String]): Unit {val conf new SparkConf()conf.setAppName(test sql)conf.setMaster(local[*])val sc new SparkContext(conf)val sqlSc new SQLContext(sc)//环境对象包装import sqlSc.implicits._//引入环境信息val rdd sc.textFile(data/a.txt).map(t {val strs t.split( )(strs(0).toInt, strs(1), strs(2).toInt)})//增加字段信息val df rdd.toDF(id, name, age)df.show() //展示表数据df.printSchema() //展示表格字段信息}
}
使用样例类定义schema
object TestSparkSql{def main(args: Array[String]): Unit {val conf new SparkConf()conf.setAppName(test sql)conf.setMaster(local[*])val sc new SparkContext(conf)val sqlSc new SQLContext(sc)import sqlSc.implicits._val rdd sc.textFile(data/a.txt).map(t {val strs t.split( )Student(strs(0).toInt, strs(1), strs(2).toInt)})// val df rdd.toDF(id, name, age)val df rdd.toDF()df.show() //打印数据以表格的形式打印数据df.printSchema() //打印表的结构信息}
}
case class Student(id:Int,name:String,age:Int)
createDataFrame方式
这种方式需要将rdd和schema信息进行合并得出一个新的DataFrame对象
package com.hainiu.sparkimport org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}object TestSparkSqlWithCreate {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setAppName(test create)conf.setMaster(local[*])val sc new SparkContext(conf)val sqlSc new SQLContext(sc)val rdd sc.textFile(data/a.txt).map(t {val strs t.split( )Row(strs(0).toInt, strs(1), strs(2).toInt)})
// rdd schemaval schema StructType(Array(StructField(id,IntegerType),StructField(name,StringType),StructField(age,IntegerType)))val df sqlSc.createDataFrame(rdd, schema)df.show()df.printSchema()}
}
2. SparkSQL的查询方式推荐第二种写法 第二个部分关于df的查询
第一种sql api的方式查询
使用的方式方法的形式编程但是思想还是sql形式和rdd编程特别相似的一种写法
object TestSql {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setAppName(test sql)conf.setMaster(local[*])val sc new SparkContext(conf)val sqlSc new SQLContext(sc)import sqlSc.implicits._val rdd sc.textFile(data/a.txt).map(t {val strs t.split( )(strs(0).toInt, strs(1), strs(2).toInt,strs(3))})val df rdd.toDF(id, name, age,gender)//select * from student where age 20//df.where(age 20)//分组聚合//df.groupby(gender).sum(age)//几个问题//聚合函数不能增加别名 聚合函数不能多次聚合 orderby不识别desc // df.groupBy(gender).agg(count(id).as(id),sum(age).as(age)).orderBy($age.desc) //字段标识可以是字符串也可以是字段对象//df.orderBy($age.desc) //df.orderBy(col(age).desc) //df.orderBy(df(age).desc) //增加字段对象可以实现高端操作//df.select($age.(1)) //join问题//val df1 sc.makeRDD(Array(// (1,100,98),// (2,100,95),// (3,90,92),//(4,90,93)//)).toDF(id,chinese,math)//df.join(df1,id) //字段相同 //df.join(df1,df(id)df1(id)) //窗口函数//普通函数 聚合函数 窗口函数 sum|count|rowkey over (partition by gender order by age desc)//按照条件分割完毕进行数据截取//班级的前两名 每个性别年龄最高的前两个//select *,row_number() over (partition by gender order by age desc) rn from tableimport sqlSc.implicits._import org.apache.spark.sql.functions._df.withColumn(rn,row_number().over(Window.partitionBy(gender).orderBy($age.desc))).where(rn 1).show()}
}
第二种纯sql形式的查询
首先注册表然后使用sql查询最终得出的还是dataFrame的对象其中和rdd的编程没有任何的区别只不过现在使用sql形式进行处理了而已
package com.hainiu.sparkimport org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}object TestSparkSqlWithCreate {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setAppName(test create)conf.setMaster(local[*])val sc new SparkContext(conf)val sqlSc new SQLContext(sc)val rdd sc.textFile(data/a.txt).map(t {val strs t.split( )Row(strs(0).toInt, strs(1), strs(2).toInt,strs(3))})
// rdd schemaval schema StructType(Array(StructField(id,IntegerType),StructField(name,StringType),StructField(age,IntegerType),StructField(gender,StringType),))val df sqlSc.createDataFrame(rdd, schema)//sql形式查询//select col from tabledf.createTempView(student)val df1 sqlSc.sql(|select count(1) cnt,gender from student group by gender|.stripMargin)df1.createTempView(student1)val df2 sqlSc.sql(|select * from student1 where cnt1|.stripMargin)df2.show()df2.printSchema()}
}