网站图片尺寸,怎么在百度上添加自己的店铺地址,wordpress 删除仪表盘,营业执照注销文章目录
Flink SQL自定义函数#xff08;UDF#xff09;
一、概述
二、自定义标量函数#xff08;UDSF#xff09;
三、自定义聚合函数(UDAF)
四、 自定义表值函数(UDTF) Flink SQL自定义函数…
文章目录
Flink SQL自定义函数UDF
一、概述
二、自定义标量函数UDSF
三、自定义聚合函数(UDAF)
四、 自定义表值函数(UDTF) Flink SQL自定义函数UDF
Flink全托管支持在SQL作业中使用Python自定义函数Flink支持以下3类自定义函数UDSFUser Defined Scalar Function、UDAFUser Defined Aggregation Function、UDTFUser Defined Table-valued Function。 一、概述
在资料udf函数中可以看到udx.zip压缩包将其解压后可以看到有以下文件
其中udfs.py udafs.py udtfs.py分别对应了UDSF、UDAF、UDTF三个函数的示例。
进入阿里云Flink开发平台点击左侧导航栏SQL开发点击左侧的函数页签单击注册UDF将udx.zip上传如下图所示。 点击确定后Flink开发控制台会解析UDF文件中是否使用了Flink UDF、UDAF和UDTF接口的类并自动提取类名填充到Function Name字段中。可以看到这里识别出了三个函数。
点击创建函数可以看到函数页签下出现了udx目录下面有三个自定义函数此时自定义函数创建完成。 二、自定义标量函数UDSF
自定义标量函数UDSF将0个、1个或多个标量值映射到一个新的标量值。输入与输出是一对一的关系即读入一行数据写出一条输出值。
udfs.py内容如下
from pyflink.table import DataTypes
from pyflink.table.udf import udfudf(result_typeDataTypes.STRING())
def sub_string(s: str, begin: int, end: int):return s[begin:end] 说明
sub_string定义了获取每条数据中从begin~end位的字符的代码;需要通过名字为 “ udf ” 的装饰器声明这是一个 scalar function需要通过装饰器中的 result_type 参数声明 scalar function 的结果类型
进入阿里云Flink开发平台,在test作业草稿下,进行建表,语句如下
CREATE TABLE function_udf(a VARCHAR,b INT,c INT
) WITH (connector socket,hostname 178.23.146.213,port 9999,format csv
);
查询语句如下
SELECT sub_string(a,2,5)
FROM function_udf;
在ecs监听9999端口:nc -lk 9999,然后选中查询语句,点击调试.在ecs向9999发送数据
123|456,4,2
12|3456,7,1
结果如下 查询结果是function_udf表中a字段每行字符串的第3-5个字符。 三、自定义聚合函数(UDAF)
自定义聚合函数UDAF将多条记录聚合成1条记录。其输入与输出是多对一的关系即将多条输入记录聚合成一条输出值。
udafs.py内容如下:
from pyflink.common import Row
from pyflink.table import AggregateFunction, DataTypes
from pyflink.table.udf import udafclass WeightedAvg(AggregateFunction):def create_accumulator(self):# Row(sum, count)return Row(0, 0)def get_value(self, accumulator: Row) - float:if accumulator[1] 0:return 0else:return accumulator[0] / accumulator[1]def accumulate(self, accumulator: Row, value, weight):accumulator[0] value * weightaccumulator[1] weightdef retract(self, accumulator: Row, value, weight):accumulator[0] - value * weightaccumulator[1] - weightweighted_avg udaf(fWeightedAvg(),result_typeDataTypes.DOUBLE(),accumulator_typeDataTypes.ROW([DataTypes.FIELD(f0, DataTypes.BIGINT()),DataTypes.FIELD(f1, DataTypes.BIGINT())]))
说明:
该示例中weighted_avg定义了当前数据和历史数据求含权重的均值的代码。需要通过名字为 “ udaf ” 的装饰器声明这是一个 aggregate function需要分别通过装饰器中的 result_type 及 accumulator_type 参数声明 aggregate function 的结果类型及 accumulator 类型create_accumulatorget_value 和 accumulate 这 3 个方法必须要定义retract 方法可以根据需要定义需要注意的是由于必须定义 create_accumulatorget_value 和 accumulate 这 3 个方法Python UDAF 只能通过继承AggregateFunction 的方式进行定义。
仍然使用function_udf表,查询语句如下
SELECT weighted_avg(b,c)
FROM function_udf;
选中查询语句运行之后向9999端口依次发送数据如下
123|456,4,2 12|3456,7,1 查询结果是以c字段为权重的b字段当前数据和历史数据的均值。 四、 自定义表值函数(UDTF)
自定义表值函数UDTF将0个、1个或多个标量值作为输入参数可以是变长参数。表值函数可以返回任意数量的行作为输出而不仅是1个值。返回的行可以由1个或多个列组成。调用一次函数输出多行或多列数据。与自定义的标量函数类似但与标量函数不同。
udtfs.py内容如下
from pyflink.table import DataTypes
from pyflink.table.udf import udtfudtf(result_types[DataTypes.STRING(), DataTypes.STRING()])
def split(s: str):splits s.split(|)yield splits[0], splits[1]
说明
该示例中split定义了将一行字符串按照竖线|分割成多列字符串的代码。需要通过名字为 “ udtf ” 的装饰器声明这是一个 table function。需要通过装饰器中的 result_types 参数声明 table function 的结果类型。由于 table function 每条输出可以包含多个列result_types 需要指定所有输出列的类型。 仍然使用function_udf表,查询语句如下
SELECT a,b,c,d,e
FROM function_udf,lateral table(split(a)) as T(d,e);
选中查询语句运行之后,向9999端口发送数据如下
123|456,4,2
12|3456,7,1
结果 查询结果中会将function_udf表中每行字符串的a字段按照竖线|分割成d,e两列。 博客主页https://lansonli.blog.csdn.net欢迎点赞 收藏 ⭐留言 如有错误敬请指正本文由 Lansonli 原创首发于 CSDN博客停下休息的时候不要忘了别人还在奔跑希望大家抓紧时间学习全力奔赴更美好的生活✨