网站板块建设的重要性,网络营销都有哪些内容,衡水网络营销公司,网站备案 查询用户可以根据需求自己封装计算的逻辑#xff0c;对字段数据进行计算 内置函数#xff0c;是spark提供的对字段操作的方法 #xff0c;split(字段) 对字段中的数进行切割#xff0c;F.sum(字段) 会将该字段下的数据进行求和 实际业务中又能内置函数不满足计算需求#xff0… 用户可以根据需求自己封装计算的逻辑对字段数据进行计算 内置函数是spark提供的对字段操作的方法 split(字段) 对字段中的数进行切割F.sum(字段) 会将该字段下的数据进行求和 实际业务中又能内置函数不满足计算需求此时就需要自定义行数完成字段数据的业务处 函数分类
udf 自定义一进一出udaf 聚合自定义多进一出udtf 爆炸一进多出
UDF函数
对每一行数据以此进行计算返回每一行的结果
1不带装饰器 # UDF函数
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *ss SparkSession.builder.getOrCreate()# 读取文件数据转为df
df ss.read.csv(hdfs://node1:8020/data/students.csv,headerTrue,sep,)df.show()# 自定义字符串长度计算函数
# F.udf(returnTypeIntegerType()) # 使用装饰器注册函数只能在DSL方法中使用不能在SQL中使用
def len_func(field):自定义函数函数名可以自己指定:param field: 是用来结构处理的字段数据可以定义多个。根据实际处理的字段数量决定定义多少个接收参数:return: 返回处理后的数据if field is None:return 0else:data len(field)return data# 将自定义的函数注册到spark中使用
# 参数一 指定spark中使用函数的名
# 参数二 指定自定义函数的名
# 参数三 指定函数的返回值类型
# 接收参数 定义和函数名一样的值
len_func ss.udf.register(len_func,len_func,returnType IntegerType())# 在spark中使用
df2 df.select(id,name,gender,len_func(name))
df2.show()# 使用sql语句
df.createTempView(stu)df3 ss.sql(select * ,len_FUNC(name) from stu)
df3.show() 2带有装饰器
# UDF函数
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *ss SparkSession.builder.getOrCreate()# 读取文件数据转为df
df ss.read.csv(hdfs://node1:8020/data/students.csv,headerTrue,sep,)df.show()# 自定义字符串长度计算函数
F.udf(returnTypeIntegerType()) # 使用装饰器注册函数只能在DSL方法中使用不能在SQL中使用
def len_func(field):自定义函数函数名可以自己指定:param field: 是用来结构处理的字段数据可以定义多个。根据实际处理的字段数量决定定义多少个接收参数:return: 返回处理后的数据if field is None:return 0else:data len(field)return data# 在spark中使用
df2 df.select(id,name,gender,len_func(name))
df2.show()
装饰器注册 只能在DSL方法中使用在sql语句中无法使用
UDAF函数
多进一出 主要是聚合
使用pandas中的series实现可以读取一列数据存储在pandas的seriess中进行数据的聚合
# 读取文件数据转为df
df ss.read.csv(hdfs://node1:8020/data/students.csv,headerTrue,sep,,schemaid int,name string,gender string,age int,cls string)df.show()# 自定义udaf函数
# 装饰器注册
F.pandas_udf(returnTypeIntegerType())
# 自定义udaf函数
# fileds:pd.Series 给数据字段指定一个类型
# - float 指定返回值类型
# udaf函数注册需要两步第一步现指定装饰器
def sub(filed:pd.Series) - int:自定义udaf函数实现累减:param field: 接收的字段列数据 pd.Series声明字段数据的类型,接收一列数据可以使用pandas的series类型:return:# field是series就按照series方式操作n filed[0] # 取出第一个值作为初始值for i in filed[1::]:n-ireturn n# regidter方法注册
sub ss.udf.register(sub,sub)# 使用udaf函数 缺少 PyArrow pandas中series类型交个spark程序无法识别spark是有scala实现scala中没有对应的series类型
# 可以使用 PyArrow框架将series转为scale能识别的数据类型
df2 df.select(sub(age))
df2.show() arrow框架 pyarrow Apache Arrow 是一种内存中的列式数据格式用于Spark中以在JVM和Python进程之间有效地传输数据。目前这对使用 Pandas/NumPy 数据的 Python 用户最有益提升传输速度。 在线安装 三台机器安装 进入虚拟环境 conda activate base 在线安装 pip install pyspark[sql] -i Verifying - USTC Mirrors 离线安装 三台机器安装 pip install pyarrow-10.0.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl 安装pyarrow
conda activate base
pip install pyspark[sql] -i https://pypi.mirrors.ustc.edu.cn/simple/