推荐网站网页,济南网站建设q479185700惠,重庆网站推广效果,旅游网站开发与设计论文DataFrame提供统一接口加载和保存数据源中的数据#xff0c;包括#xff1a;结构化数据、Parquet文件、JSON文件、Hive表#xff0c;以及通过JDBC连接外部数据源。一个DataFrame可以作为普通的RDD操作#xff0c;也可以通过#xff08;registerTempTable#xff09;注册成…DataFrame提供统一接口加载和保存数据源中的数据包括结构化数据、Parquet文件、JSON文件、Hive表以及通过JDBC连接外部数据源。一个DataFrame可以作为普通的RDD操作也可以通过registerTempTable注册成一个临时表支持在临时表的数据上运行SQL查询操作。
一、数据源加载保存操作
DataFrame数据源默认文件为Parquet格式可以通过spark.sql.sources.default参数进行重新修改。 不论何种格式的数据源均采取统一API、read和write进行操作代码如下
// 读取parquet格式数据
val df sqlContext.read.load(file:///$SPARK_HOME/examples/src/main/resources/users.parquet)
// 从DataFrame写数据并保存成Parquet格式
df.write.save(saveusers.parquet)1指定选项
Spark支持通过完全限定名称如org.apache.spark.sql.parquet指定数据源的附加选项内置数据源可以使用短名称json、parquet、jdbcSpark SQL支持通过format将任何类型的DataFrames转换成其他类型。
val df sqlContext.read.format(json).load(file:///$SPARK_HOME examples/src/main/resources/people.json)
df.select(name, age).write.format(parquet).save(namesAndAges.parquet)2保存模式
可以通过配置SaveMode指定如何处理现有数据实现保存模式不使用任何锁定而且不是原子操作因此多路数据写入相同位置是不安全的。当执行overwrite时写入新数据之前原来数据将被删除。
3保存持久表
当使用HiveContext时DataFrames通过saveAsTable命令保存为持久表使用与registerTempTable命令不同saveAsTable实现Dataframe的内容并创建一个指向Hive Metastore中数据的指针。即使Spark程序重新启动连接相同Metastore的数据不会发生变化。 默认情况下saveAsTable将创建一个“管理表”这意味着数据的位置将由Metastore控制当表被删除时管理表将表数据自动删除。
二、Parquet文件
Parquet是一种支持多种数据处理系统的存储格式Spark SQL提供了读写Parquet文件并且自动保存原始数据的模式。
1Parquet文件优点
1高效Parquet采取列式存储避免读入不需要的数据具有极好的性能和GC。 2方便的压缩和解压缩并具有极好的压缩比例。 3可以直接固化为Parquet文件也可以直接读取Parquet文件具有比磁盘更好的缓存效果。 Spark SQL对读写Parquet文件提供支持方便加载Parquet文件数据到DataFrame供Spark SQL操作也可以将DataFrame写入Parquet文件并自动保留原始Scheme架构。 在外部数据源方面Spark对Parquet的支持有了很大的加强更快的metadata discovery和schema merging同时能够读取其他工具或者库生成的非标准合法的Parquet文件以及更快、更鲁棒的动态分区插入。
2加载数据编程
通过sqlContext.implicits._隐式转换一个RDD为DataFrame并将DataFrame保存为Parquet文件加载保存的Parquet文件重新构建一个DataFrame注册成临时表供SQL查询使用。
// 创建sqlContextval
sqlContext new org.apache.spark.sql.SQLContext(sc)
// 隐式转换为一个DataFrame
import sqlContext.implicits._
// 使用case定义schema实现Person接口
case class Person(name: String, age: Int)
// 读取文件创建一个MappedRDD并将数据写入Person模式类隐式转换为DataFrame
val peopleDF sc.textFile(file:///$SPARK_HOME/examples/src/main/resources/people.txt).map(_.split(,)).map(p Person(p(0), p(1).trim.toInt)).toDF()
// 保存DataFrame保存为Parquet格式
peopleDF.write.parquet(people.parquet)
// 加载Parquet文件作为DataFrame
val parquetFile sqlContext.read.parquet(people.parquet)
// 将DataFrame注册为临时表供SQL查询使用
parquetFile.registerTempTable(parquetTable)
val result sqlContext.sql(SELECT name FROM parquetTable WHERE age 13 AND age 19)
result.map(t Name: t(0)).collect().foreach(println)3分区发现partition discovery
表分区table partitioning是一种常见的优化方法用于像Hive一样的系统。对于分区表数据通常存储在不同的目录中在每个分区目录路径中对分区列的值进行编码。 Parquet数据源能够自动发现和推断分区信息使用以下目录结构存储以前使用的人口数据到一个分区表以gender和country作为分区列
path└──table├── gendermale│ ├── ...│ ├── countryUS│ │ └── data.parquet│ ├── countryCN│ │ └── data.parquet│ └── ...└── genderfemale├── ...├── countryUS│ └── data.parquet├── countryCN│ └── data.parquet└── ...通过路径path/table使用SQLContext.read的parquet或load命令Spark SQL自动提取分区信息返回的DataFrame模式如下
root
|-- name: string (nullable true)
|-- age: long (nullable true)
|-- gender: string (nullable true)
|-- country: string (nullable true)分区列的数据类型是自动映射支持numeric数据类型和string类型自动推断。
4模式合并schema merging
如同ProtocolBuffer、Avro、ThriftParquet也支持模式演进用户可以从一个简单的模式开始逐步根据需要添加更多的列。通过这种方式用户最终得到多个不同但是能相互兼容模式的Parquet文件Parquet数据源能够自动检测这种情况进而合并这些文件。 由于模式合并是相对昂贵的操作在很多情况下并非必须为了提升性能在1.5.0版本中默认关闭。
// 隐式转换一个RDD为DataFrame
import sqlContext.implicits._
// 创建一个DataFrame存储数据到一个分区目录
val df1 sc.makeRDD(1 to 5).map(i (i, i * 2)).toDF(single, double)
df1.write.parquet(data/test_table/key1)
// 创建一个新DataFrame存储在一个新的分区目录
val df2 sc.makeRDD(6 to 10).map(i (i, i * 3)).toDF(single, triple)
df2.write.parquet(data/test_table/key2)
// 读取分区表
val df3 sqlContext.read.option(mergeSchema, true).parquet(data/test_table)
df3.printSchema()
// 通过基础DataFrame函数以树格式打印Schema包含分区目录下全部的分区列
df3.printSchema()
// root
// |-- single: int (nullable true)
// |-- double: int (nullable true)
// |-- triple: int (nullable true)
// |-- key: int (nullable true)Parquet数据源自动从文件路径中发现了key这个分区列并且正确合并了两个不相同但相容的Schema。值得注意的是如果最后的查询中查询条件跳过了key1这个分区Spark SQL的查询优化器会根据这个查询条件将该分区目录剪掉完全不扫描该目录中的数据从而提升查询性能。
5配置
在SQLContext中使用setConf方法或在运行时使用SQL命令SET keyvalue实现对Parquet文件的配置
三、JSON数据集
Spark SQL可以自动推断出一个JSON数据集的Schema并作为一个DataFrame加载通过SQLContext.read.json方法使用JSON文件创建DataFrame。
// 创建sqlContextval
sqlContext new org.apache.spark.sql.SQLContext(sc)
// 设置JSON数据集的路径可以是单个文件或者一个目录
val path file:///Spark_Home/examples/src/main/resources/people.json
val people sqlContext.read.json(path)
// 打印schema并显示推断的schema
people.printSchema()
// root
// |-- age: integer (nullable true)
// |-- name: string (nullable true)
// 注册DataFrame作为一个临时表
people.registerTempTable(jsonTable)
// 使用sql运行SQL表达式
val teenagers sqlContext.sql(SELECT name FROM jsonTable WHERE age 13 AND age 19)或者通过转换一个JSON对象的RDD[String]创建DataFrame。
val anotherRDD sc.parallelize({name:Yin,address:{city:Columbus,state:Ohio}} :: Nil)
val anotherPeople sqlContext.read.json(anotherRDD)四、Hive表
Spark SQL支持从Hive表中读写数据然而默认版本Spark组件并不包括Hive大量的依赖关系。Hive支持通过添加-Phive和-Phive-thriftserver标志对Spark重新构建一个包括Hive的新组件Hive的新组件必须分发到所有的Worker节点上因为Worker节点需要访问Hive的serialization和deserialization库SerDes以便于访问存储在Hive中的数据所以该Hive集合Jar包必须拷贝到所有的Worker节点。 除了基本的SQLContextSpark SQL还可以创建一个HiveContext该HiveContext通过基本的SQLContext提供了一系列的方法集可以使用更完整的HiveQL解析器查询访问Hive的UDF并从Hive表读取数据以及SerDe支持。
1示例数据
新建一个kv1.txt文件数据如下
238 val_238
86 val_86
311 val_311
27 val_27
165 val_165
409 val_409
255 val_255
278 val_278
98 val_982创建HiveContext
使用Hive必须先构建一个继承SQLContext的HiveContext对象并加入在MetaStore中查找表和使用HiveQL写查询功能的支持可以在conf目录hive-site.xml文件中添加Hive的配置文件当运行一个YARN集群时datanucleus jars和hive-site.xml必须在Driver和全部的Executors启动。 一个简单的方法如下在spark-submit命令行通过–jars参数和–file参数加载即使hive-site.xml文件没有配置仍然可以创建一个HiveContext并会在当前目录下自动地创建metastore_db和warehouse。
使用Scala语言说明HiveContext创建方式
// SparkContext实例
val sc: SparkContext ...
// 通过sc创建HiveContext的实例hiveContext
val hiveContext new org.apache.spark.sql.hive.HiveContext(sc)3使用Hive操作数据
使用HiveContext无需单独安装Hive可以使用spark.sql.dialect选项选择解析查询语句的SQL的特定转化这个参数可以使用SQLContext上的setConf方法也可以使用SQL上的SETkeyvalue命令进行修改。
// 通过HiveContext的sql命令创建表
hiveContext.sql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING))
// 加载数据, $SPARK_HOME指Spark文件安装目录使用“file:// ...”标识的本地文件使用“hdfs:// ...”标识的HDFS存储系统的文件
hiveContext.sql(LOAD DATA LOCAL INPATH file:///$Spark_Home/examples/src/main/resources/kv1.txt INTO TABLE src)
// HiveQL的查询表达
hiveContext.sql(FROM src SELECT key,value).collect().foreach(println)
// 使用HiveContext创建表命令
CREATE [EXTERNAL] TABLE[IF NOT EXISTS] table_name
(col_name data_type,…)
[PARTITIONED BY(col_name data_type,…)]
[[ROW FORMAT row_format]]
[STORED AS file_format]
[LOCATION hdfs_path]4Spark支持的Hive特性
1Hive查询语句包括SELECT、GROUP BY、ORDER BY、CLUSTER BY、SORT BY 2Hive运算符包括关系运算符、、、、、、、等、算术运算符、-、*、/、%等、逻辑运算符AND、、OR、||等、复杂类型构造函数、数据函数sign、ln、cos等、字符串函数instr、length、printf等 3用户自定义函数UDF 4用户自定义聚合函数UDAF 5用户定义的序列化格式SerDes 6连接操作包括JOIN、{LEFT|RIGHT|FULL}OUTER JOIN、LEFT SEMI JOIN、CROSS JOIN 7联合操作Unions 8子查询SELECT col FROMSELECT ab AS col from t1t2 9抽样Sampling 10解释Explain 11分区表Partitioned tables 12所有的HiveDDL操作函数包括CREATE TABLE、CREATE TABLE AS SELECT、ALTER TABLE 13大多数Hive数据类型TINYINT、SMALLINT、INT、BIGINT、BOOLEAN、FLOAT、DOUBLE、STRING、BINARY、TIMESTAMP、DATE、ARRAY、MAP、STRUCT。
五、通过JDBC连接数据库
Spark SQL还包括一个可以通过JDBC从其他数据库读取数据的数据源并返回一个DataFrame在Spark SQL很容易处理或者Join其他的数据源。除了Scala语言Java或Python语言也很容易操作而不需要提供一个Class Tag。不同于Spark SQL JDBC server允许其他应用程序使用Spark SQL运行查询。
在Spark类路径中包含特定数据库的JDBC驱动程序如通过Spark Shell连接postgresql命令
SPARK_CLASSPATHpostgresql-9.3-1102-jdbc41.jar bin/spark-shell
val jdbcDF sqlContext.load(jdbc, Map(url - jdbc:postgresql:dbserver,dbtable - schema.tablename))使用数据源API加载远程数据库的表作为一个DataFrame和Spark SQL临时表 文章来源《Spark核心技术与高级应用》 作者于俊向海代其锋马海平 文章内容仅供学习交流如有侵犯联系删除哦