网站怎样设计网页,网站建设实训室介绍,东莞网站设计公司排名,怎么做能打不开漫画网站一、概览
Spark SQL and DataFrames - Spark 3.5.2 Documentation
我们先看下官网的描述#xff1a;
SparkSQL是用于结构化数据处理的Spark模块#xff0c;与基本的Spark RDD API不同。Spark SQL提供的接口为Spark提供了更多关于正在执行的数据和计算结构的信息。在内部
SparkSQL是用于结构化数据处理的Spark模块与基本的Spark RDD API不同。Spark SQL提供的接口为Spark提供了更多关于正在执行的数据和计算结构的信息。在内部Spark SQL使用这些额外信息来执行额外的优化。
Spark SQL的一个用途是执行SQL查询。Spark SQL还可以用于从现有的Hive库表中读取数据。在另一种编程语言中运行SQL时结果将作为Dataset/DataFrame返回。也可以使用命令行或通过JDBC/ODBC与执行SQL。
二、什么是Dataset
Dataset是Spark 1.6中添加的一个新接口是数据的分布式集合。它兼容RDD和SparkSQL的优点 1、RDD强类型、使用强大lambda函数的能力可以使用map()、flatMap()、filter()等转换算子 2、Spark SQL优化执行引擎可以使用select()、where()、groupBy()等DSL语法
Dataset是惰性的只有在调用action算子时才会触发计算。在内部Dataset表示一个逻辑计划描述了生成数据所需的计算。当调用一个操作时Spark的查询优化器会优化逻辑计划并生成一个物理计划以并行和分布式的方式高效执行。
从源码中我们可以看到需要给定一个特定于域的类型“T”映射到Spark的内部类型系统。
class Dataset[T] private[sql](DeveloperApi Unstable transient val queryExecution: QueryExecution,DeveloperApi Unstable transient val encoder: Encoder[T])extends Serializable {//...........}
如果时基本类型可以通过导入spark.implicits来支持如果是对象类型需要自己定义比如
case class Person(name: String, age: Long)
encoder会用于告诉Spark在运行时生成代码将“Person”对象序列化为二进制结构。这种二进制结构通常具有更低的内存占用并且针对数据处理的效率进行了优化例如以列格式。
三、什么是DataFrame
通常调用spark.sql(select * from xxxxx) 或者 spark.read.json(xxx/xxx.json)时会返回DataFrame
下面我们看下DataFrame的类型是什么
package org.apache.spark
package object sql {type DataFrame Dataset[Row]}
从源码中我们可以看到 DataFrame只是Dataset[Row]的一个类型别名
DataFrame是一个组织成命名列的数据集。它在概念上相当于关系数据库中的表或R/Python中的DataFrame但底层有更丰富的优化。DataFrames可以从各种来源构建例如结构化数据文件、Hive中的表、外部数据库或现有的RDD。DataFrame API在Scala、Java、Python和R中可用。
四、SparkSession
它是使用Dataset和DataFrame API对Spark编程的入口点
创建一个新SparkSession
SparkSession.builder.master(local).appName(Word Count).config(spark.some.config.option, some-value).getOrCreate()
我们来看下它的部分源码
class SparkSession private(transient val sparkContext: SparkContext,transient private val existingSharedState: Option[SharedState],transient private val parentSessionState: Option[SessionState],transient private[sql] val extensions: SparkSessionExtensions,transient private[sql] val initialSessionOptions: Map[String, String])extends Serializable with Closeable with Logging { self //此会话的封装版本为[[SQLContext]]形式以实现向后兼容性val sqlContext: SQLContext new SQLContext(this)//向 QueryExecutionListener 注册来监听查询度量def listenerManager: ExecutionListenerManager sessionState.listenerManager//用于注册用户定义函数UDF//以下示例将Scala闭包注册为UDF//sparkSession.udf.register(myUDF, (arg1: Int, arg2: String) arg2 arg1)def udf: UDFRegistration sessionState.udfRegistrationdef streams: StreamingQueryManager sessionState.streamingQueryManager//以各种方式创建DataFramedef createDataFrame[A : Product : TypeTag](rdd: RDD[A]): DataFrame withActive {//........}def createDataFrame[A : Product : TypeTag](data: Seq[A]): DataFrame withActive {//........}def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame withActive {//........}def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame withActive {//........}def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame withActive {//........}//以各种方式创建Datasetdef createDataset[T : Encoder](data: Seq[T]): Dataset[T] {//........}//用户可以通过该界面创建、删除、更改或查询底层数据库、表、函数等transient lazy val catalog: Catalog new CatalogImpl(self)def table(tableName: String): DataFrame {read.table(tableName)}//使用Spark执行SQL查询将结果作为“DataFrame”返回。这个API急切地运行DDL/DML命令但不用于SELECT查询。def sql(sqlText: String): DataFrame withActive {//........}//在外部执行引擎而不是Spark中执行任意字符串命令。//当用户想在Spark外执行某些命令时这可能很有用。//例如为JDBC执行自定义DDL/DML命令为ElasticSearch创建索引为Solr创建内核等等。//调用此方法后命令将被热切执行返回的DataFrame将包含命令的输出如果有的话。def executeCommand(runner: String, command: String, options: Map[String, String]): DataFrame {//........}//返回一个DataFrameReader可用于将非流数据作为“DataFrame”读取。//示例// sparkSession.read.parquet(/path/to/file.parquet)// sparkSession.read.schema(schema).json(/path/to/file.json)def read: DataFrameReader new DataFrameReader(self)//禁用样式检查器以便“隐含”对象可以以小写i开头//特定于ScalaScala中提供的隐式方法用于将常见的Scala对象转换为DataFrame。object implicits extends SQLImplicits with Serializable {protected override def _sqlContext: SQLContext SparkSession.this.sqlContext}}object SparkSession extends Logging {class Builder extends Logging {//为应用程序设置一个名称该名称将显示在Spark web UI中。//如果没有设置应用程序名称将使用随机生成的名称。def appName(name: String): Builder config(spark.app.name, name)//设置配置选项。使用此方法设置的选项会自动传播到“SparkConf”和SparkSession自己的配置中。def config(key: String, value: String): Builder synchronized {options key - valuethis}def config(conf: SparkConf): Builder synchronized {conf.getAll.foreach { case (k, v) options k - v }this}//设置要连接的Spark主URL//例如“local”在本地运行“local[4]”在4核本地运行//或“spark://master:7077“在Spark独立集群上运行。def master(master: String): Builder config(spark.master, master)//启用Hive支持包括连接到持久Hive元存储、支持Hive服务器和Hive用户定义函数。def enableHiveSupport(): Builder synchronized {if (hiveClassesArePresent) {config(CATALOG_IMPLEMENTATION.key, hive)} else {抛异常 无法使用Hive支持实例化SparkSession因为找不到Hive相关类}}//将扩展注入[[SparkSession]]。这允许用户添加分析器规则、优化器规则、//计划策略或自定义解析器。def withExtensions(f: SparkSessionExtensions Unit): Builder synchronized {f(extensions)this}//获取一个现有的 SparkSession如果没有现有的则创建一个新的。def getOrCreate(): SparkSession synchronized {//......省略.......}}//创建一个SparkSession.Builder来构造一个SparkSessiondef builder(): Builder new Builder}
五、使用示例
我们以Spark源码中的examples为例来看下SparkSQL是如何使用的 examples/src/main/resources/people.json {name:Michael} {name:Andy, age:30} {name:Justin, age:19} 1、创建DataFrame
val df spark.read.json(examples/src/main/resources/people.json)//将DataFrame的内容显示到stdout
df.show()
// -----------
// | age| name|
// -----------
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// -----------//使用$符号需要此导入
import spark.implicits._
//以树格式打印schema
df.printSchema()
// root
// |-- age: long (nullable true)
// |-- name: string (nullable true)
//只选择name列进行打印
df.select(name).show()
// -------
// | name|
// -------
// |Michael|
// | Andy|
// | Justin|
// -------// 选择所有的列,但是对age列分别进行加1
df.select($name, $age 1).show()
// ----------------
// | name|(age 1)|
// ----------------
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// ----------------// 选择年龄大于21的人
df.filter($age 21).show()
// -------
// |age|name|
// -------
// | 30|Andy|
// -------// 按年龄统计人数
df.groupBy(age).count().show()
// ---------
// | age|count|
// ---------
// | 19| 1|
// |null| 1|
// | 30| 1|
// ---------//将DataFrame注册为SQL临时视图
df.createOrReplaceTempView(people)val sqlDF spark.sql(SELECT * FROM people)
sqlDF.show()
// -----------
// | age| name|
// -----------
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// -----------2、创建Dataset
import spark.implicits._
// 创建编码器
val caseClassDS Seq(Person(Andy, 32)).toDS()
caseClassDS.show()
// -------
// |name|age|
// -------
// |Andy| 32|
// -------// 大多数常见类型的编码器是通过导入spark.implicits自动提供的_
val primitiveDS Seq(1, 2, 3).toDS()
primitiveDS.map(_ 1).collect() // Returns: Array(2, 3, 4)// 通过提供类DataFrames可以转换为Dataset。映射将按名称完成
val path examples/src/main/resources/people.json
val peopleDS spark.read.json(path).as[Person]
peopleDS.show()
// -----------
// | age| name|
// -----------
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// -----------
3、用户自定义函数
// 定义并注册零参数非确定性UDF
// 默认情况下UDF是确定性的即对相同的输入产生相同的结果。
val random udf(() Math.random())
spark.udf.register(random, random.asNondeterministic())
spark.sql(SELECT random()).show()
// -------
// |UDF() |
// -------
// |xxxxxxx|
// -------// 定义并注册一个单参数UDF
val plusOne udf((x: Int) x 1)
spark.udf.register(plusOne, plusOne)
spark.sql(SELECT plusOne(5)).show()
// ------
// |UDF(5)|
// ------
// | 6|
// ------// 定义一个双参数UDF并在一个步骤中向Spark注册
spark.udf.register(strLenScala, (_: String).length (_: Int))
spark.sql(SELECT strLenScala(test, 1)).show()
// --------------------
// |strLenScala(test, 1)|
// --------------------
// | 5|
// --------------------//WHERE子句中的UDF
spark.udf.register(oneArgFilter, (n: Int) { n 5 })
spark.range(1, 10).createOrReplaceTempView(test)
spark.sql(SELECT * FROM test WHERE oneArgFilter(id)).show()
// ---
// | id|
// ---
// | 6|
// | 7|
// | 8|
// | 9|
// ---
4、基于hive使用
使用Hive时必须使用Hive支持实例化“SparkSession”即enableHiveSupport()。包括连接到持久Hive元存储、支持Hive服务器和Hive用户定义函数。没有现有Hive部署的用户仍然可以启用Hive支持。当未由hive-site.xml配置时上下文会自动在当前目录中创建“metastore_db”并创建一个由“spark.sql.house.dir”配置的目录该目录默认为启动spark应用程序的当前目录中的“spark warehouse”目录。
import spark.implicits._
import spark.sqlsql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive)
sql(LOAD DATA LOCAL INPATH examples/src/main/resources/kv1.txt INTO TABLE src)// 普通查询
sql(SELECT * FROM src).show()
// ----------
// |key| value|
// ----------
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...// 聚合查询
sql(SELECT COUNT(*) FROM src).show()
// --------
// |count(1)|
// --------
// | 500 |
// --------