天津建站模板,郑州比较厉害的男科中医,常州公司网站建设,怎么做手机网站文章目录 Pyspark dataframe创建DataFrame使用Row对象使用元组与scheam使用字典与scheam注意 agg 聚合操作alias 设置别名字段设置别名设置dataframe别名 cache 缓存checkpoint RDD持久化到外部存储coalesce 设置dataframe分区数量collect 拉取数据columns 获取dataframe列 Pys… 文章目录 Pyspark dataframe创建DataFrame使用Row对象使用元组与scheam使用字典与scheam注意 agg 聚合操作alias 设置别名字段设置别名设置dataframe别名 cache 缓存checkpoint RDD持久化到外部存储coalesce 设置dataframe分区数量collect 拉取数据columns 获取dataframe列 Pyspark dataframe
创建DataFrame
from pyspark.sql import SparkSession,Row
from pyspark.sql.types import *def init_spark():spark SparkSession.builder.appName(LDSX_TEST_DATAFrame) \.config(hive.metastore.uris, thrift://hadoop01:9083) \.config(spark.master, local[2]) \.enableHiveSupport().getOrCreate()return spark
spark init_spark()# 设置字段类型
schema StructType([StructField(name, StringType(), True),StructField(age, StringType(), True),StructField(id, StringType(), True),StructField(gender, StringType(), True),
])使用Row对象
cs Row(name,age,id,gender)
row_list [ cs(ldsx,12,1,男),cs(test1,20,1,女),cs(test2,26,1,男),cs(test3,19,1,女),cs(test4,51,1,女),cs(test5,13,1,男)]
data spark.createDataFrame(row_list)
data.show()--------------
| name|age| id|gender|
--------------
| ldsx| 12| 1| 男|
|test1| 20| 1| 女|
|test2| 26| 1| 男|
|test3| 19| 1| 女|
|test4| 51| 1| 女|
|test5| 13| 1| 男|
--------------
data.printSchema()
root|-- name: string (nullable true)|-- age: string (nullable true)|-- id: string (nullable true)|-- gender: string (nullable true)使用元组与scheam
park.createDataFrame([(ldsx1,12,1,男),(ldsx2,12,1,男)],schema).show()
-----------------
| name|age| id|gender|
-----------------
|ldsx1| 12| 1| 男|
|ldsx2| 12| 1| 男|
-----------------使用字典与scheam
spark.createDataFrame([{name:ldsx,age:12,id:1,gender:女}]).show()
----------------
|age|gender| id|name|
----------------
| 12| 女| 1|ldsx|
----------------注意
scheam设置优先级高于row设置dict设置的key
schema StructType([StructField(name, StringType(), True),StructField(age, StringType(), True),StructField(id, StringType(), True),StructField(测试, StringType(), True),
])
spark.createDataFrame([{name:ldsx,age:12,id:1,gender:女}],schema).show()
--------------
|name|age| id|测试|
--------------
|ldsx| 12| 1|null|
--------------agg 聚合操作
在 PySpark 中aggaggregate函数用于对 DataFrame 进行聚合操作。它允许你在一个或多个列上应用一个或多个聚合函数并返回计算后的结果。可以结合groupby使用。
from pyspark.sql import functions as sf
data.show()
-----------------
| name|age| id|gender|
-----------------
| ldsx| 12| 1| 男|
|test1| 20| 1| 女|
|test2| 26| 1| 男|
|test3| 19| 1| 女|
|test4| 51| 1| 女|
|test5| 13| 1| 男|
-----------------
data.agg({age:max}).show()
--------
|max(age)|
--------
| 51|
--------
data.agg({age:max,gender:max}).show()
-------------------
|max(gender)|max(age)|
-------------------
| 男| 51|
-------------------data.agg(sf.min(data.age)).show()
--------
|min(age)|
--------
| 12|
--------
data.agg(sf.min(data.age),sf.min(data.name)).show()
-----------------
|min(age)|min(name)|
-----------------
| 12| ldsx|
-----------------
结合groupby使用
data.groupBy(gender).agg(sf.min(age)).show()--------------
|gender|min(age)|
--------------
| 女| 19|
| 男| 12|
--------------
data.groupBy(gender).agg(sf.min(age),sf.max(name)).show()
-----------------------
|gender|min(age)|max(name)|
-----------------------
| 女| 19| test4|
| 男| 12| test5|
-----------------------
alias 设置别名
字段设置别名
#字段设置别名
data.select(data[name].alias(rename_name)).show()
-----------
|rename_name|
-----------
| ldsx|
| test1|
| test2|
| test3|
| test4|
| test5|
-----------设置dataframe别名
d1 data.alias(ldsx1)
d2 data2.alias(ldsx2)
d1.show()
-----------------
| name|age| id|gender|
-----------------
| ldsx| 12| 1| 男|
|test1| 20| 1| 女|
|test2| 26| 1| 男|
|test3| 19| 1| 女|
|test4| 51| 1| 女|
|test5| 13| 1| 男|
-----------------
d2.show()
-----------------
| name|age| id|gender|
-----------------
|测试1| 12| 1| 男|
|测试2| 20| 1| 男|
-----------------d3 d1.join(d2,col(ldsx1.gender)col(ldsx2.gender),inner)
d3.show()
----------------------------------
| name|age| id|gender| name|age| id|gender|
----------------------------------
| ldsx| 12| 1| 男|测试1| 12| 1| 男|
| ldsx| 12| 1| 男|测试2| 20| 1| 男|
|test2| 26| 1| 男|测试1| 12| 1| 男|
|test2| 26| 1| 男|测试2| 20| 1| 男|
|test5| 13| 1| 男|测试1| 12| 1| 男|
|test5| 13| 1| 男|测试2| 20| 1| 男|
----------------------------------d3[[name]].show()
#报错提示
pyspark.errors.exceptions.captured.AnalysisException: [AMBIGUOUS_REFERENCE] Reference name is ambiguous, could be: [ldsx1.name, ldsx2.name].
# 使用别名前缀获取
d3[[ldsx1.name]].show()
-----
| name|
-----
| ldsx|
| ldsx|
|test2|
|test2|
|test5|
|test5|
-----d3[[ldsx2.name]].show()
-----
| name|
-----
|测试1|
|测试2|
|测试1|
|测试2|
|测试1|
|测试2|
-----
d3.select(ldsx1.name,ldsx2.name).show()
----------
| name| name|
----------
| ldsx|测试1|
| ldsx|测试2|
|test2|测试1|
|test2|测试2|
|test5|测试1|
|test5|测试2|
----------
cache 缓存
dataframe缓存默认缓存级别MEMORY_AND_DISK_DESER
df.cache()
# 查看逻辑计划和物理计划
df.explain()checkpoint RDD持久化到外部存储
Checkpoint是一种重量级的使用也就是RDD的重新计算成本很高的时候我们采用Checkpoint比较合适或者数据量很大的时候采用Checkpoint比较合适。如果数据量小或者RDD重新计算也是非常快的直接使用缓存即可。 CheckPoint支持写入HDFS。CheckPoint被认为是安全的
sc spark.sparkContext
# 设置检查存储目录
sc.setCheckpointDir(hdfs:///ldsx_checkpoint)
d3.count()
# 保存会在hdfs上进行存储
d3.checkpoint()
# 从hdfs读取
d3.count()coalesce 设置dataframe分区数量
# 设置dataframe分区数量
d3 d3.coalesce(3)
# 获取分区数量
d3.rdd.getNumPartitions()collect 拉取数据
当任务提交到集群的时候collect()操作是用来将所有结点中的数据收集到dirver节点数据量很大慎用防止dirver炸掉。
d3.collect()
[Row(nameldsx, age12, id1, gender男, name测试1, age12, id1, gender男), Row(nameldsx, age12, id1, gender男, name测试2, age20, id1, gender男), Row(nametest2, age26, id1, gender男, name测试1, age12, id1, gender男), Row(nametest2, age26, id1, gender男, name测试2, age20, id1, gender男), Row(nametest5, age13, id1, gender男, name测试1, age12, id1, gender男), Row(nametest5, age13, id1, gender男, name测试2, age20, id1, gender男)]columns 获取dataframe列 d3.columns
[name, age, id, gender, name, age, id, gender]d3.withColumn(ldsx1.name_1,col(ldsx1.name)).show()
----------------------------------------------
| name|age| id|gender| name|age| id|gender|ldsx1.name_1|
----------------------------------------------
| ldsx| 12| 1| 男|测试1| 12| 1| 男| ldsx|
| ldsx| 12| 1| 男|测试2| 20| 1| 男| ldsx|
|test2| 26| 1| 男|测试1| 12| 1| 男| test2|
|test2| 26| 1| 男|测试2| 20| 1| 男| test2|
|test5| 13| 1| 男|测试1| 12| 1| 男| test5|
|test5| 13| 1| 男|测试2| 20| 1| 男| test5|
----------------------------------------------# 重命名列名
d3.withColumnRenamed(ldsx1.name_1,col(ldsx1.name)).show()