当前位置: 首页 > news >正文

四川省级建设主管部门网站绥化建设局网站

四川省级建设主管部门网站,绥化建设局网站,网站的头尾和导航的公用文件,学做网站要学什么东西大纲 UDAF入参并非表中一行#xff08;Row#xff09;的集合计算每个人考了几门课计算每门课有几个人考试计算每个人的平均分计算每课的平均分计算每个人的最高分和最低分 入参是表中一行#xff08;Row#xff09;的集合计算每个人的最高分、最低分以及所属的课程计算每课… 大纲 UDAF入参并非表中一行Row的集合计算每个人考了几门课计算每门课有几个人考试计算每个人的平均分计算每课的平均分计算每个人的最高分和最低分 入参是表中一行Row的集合计算每个人的最高分、最低分以及所属的课程计算每课的最高分数、最低分数以及所属人 完整代码入参并非表中一行Row的集合入参是表中一行Row的集合 在前面几篇文章中我们学习了非聚合类的用户自定义函数。这节我们将介绍最简单的聚合函数UDAF。 UDAF 我们对比下UDAF和UDF的定义 def udaf(f: Union[Callable, AggregateFunction, Type] None,input_types: Union[List[DataType], DataType, str, List[str]] None,result_type: Union[DataType, str] None, accumulator_type: Union[DataType, str] None,deterministic: bool None, name: str None,func_type: str general) - Union[UserDefinedAggregateFunctionWrapper, Callable]:def udf(f: Union[Callable, ScalarFunction, Type] None,input_types: Union[List[DataType], DataType, str, List[str]] None,result_type: Union[DataType, str] None,deterministic: bool None, name: str None, func_type: str general,udf_type: str None) - Union[UserDefinedScalarFunctionWrapper, Callable]:可以发现 udaf比udf多了一个参数accumulator_typeudaf比udf少了一个参数udf_type accumulator中文是“累加器”。我们可以将其看成聚合过后比如GroupBy的成批数据每批都要走一次函数。 举一个例子我们对图中左侧的成绩单使用人名name进行聚类然后计算出最高分数。即算出每个人考出的最高分数是多少。 如图所示聚合后的数据每个都会经过accumulator计算。计算出来的值的类型就是accumulator_type。这个类型的数据是中间态它并不是最终UDAF返回的数据类型——result_type。具体这块的知识我们会在后面讲解。 为了方便讲解我们就以上面例子来讲解其使用。先贴出准备的代码 from pyflink.common import Configuration from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema) from pyflink.table.types import DataTypes from pyflink.table.table_descriptor import TableDescriptor from pyflink.table.expressions import lit, col from pyflink.common import Row from pyflink.table.udf import udf,udtf,udaf,udtaf import pandas as pd from pyflink.table.udf import UserDefinedFunctiondef word_count():config Configuration()# write all the data to one fileconfig.set_string(parallelism.default, 1)env_settings EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env TableEnvironment.create(env_settings)row_type_tab_source DataTypes.ROW([DataTypes.FIELD(name, DataTypes.STRING()), DataTypes.FIELD(score, DataTypes.FLOAT()), DataTypes.FIELD(class, DataTypes.STRING())])students_score [(张三, 80.0, English),(李四, 75.0, English),(王五, 90.0, English),(赵六, 85.0, English),(张三, 60.0, Math),(李四, 95.0, Math),(王五, 90.0, Math),(赵六, 70.0, Math),(孙七, 60.0, Math),]tab_source t_env.from_elements(students_score, row_type_tab_source )我们在tab_source表中录入了学生的成绩信息其中包括姓名name、成绩score和科目class。 入参并非表中一行Row的集合 计算每个人考了几门课 按姓名name聚类UDTF统计聚类后集合的个数并返回别名UDTF返回的列名select出数据 udaf(result_typeDataTypes.ROW([DataTypes.FIELD(count, DataTypes.BIGINT())]), func_typepandas)def exam_count(pandas_df: pd.DataFrame):return Row(pandas_df.count())tab_student_exam_count tab_source.group_by(col(name)) \.aggregate(exam_count(col(name)).alias(count)) \.select(col(name), col(count)) tab_student_exam_count.execute().print()------------------------------------------------------ | name | count | ------------------------------------------------------ | 孙七 | 1 | | 张三 | 2 | | 李四 | 2 | | 王五 | 2 | | 赵六 | 2 | ------------------------------------------------------ 5 rows in set计算每门课有几个人考试 按姓名class聚类UDTF统计聚类后集合的个数并返回别名UDTF返回的列名select出数据 udaf(result_typeDataTypes.ROW([DataTypes.FIELD(count, DataTypes.BIGINT())]), func_typepandas)def exam_count(pandas_df: pd.DataFrame):return Row(pandas_df.count())tab_class_exam_count tab_source.group_by(col(class)) \.aggregate(exam_count(col(class)).alias(count)) \.select(col(class), col(count)) tab_class_exam_count.execute().print()------------------------------------------------------ | class | count | ------------------------------------------------------ | English | 4 | | Math | 5 | ------------------------------------------------------ 2 rows in set计算每个人的平均分 按姓名name聚类UDTF统计聚类后集合的均值并返回别名UDTF返回的列名select出数据 udaf(result_typeDataTypes.ROW([DataTypes.FIELD(avg, DataTypes.FLOAT())]), func_typepandas)def avg_score(pandas_df: pd.DataFrame):return Row(pandas_df.mean())tab_student_avg_score tab_source.group_by(col(name)) \.aggregate(avg_score(col(score)).alias(avg)) \.select(col(name), col(avg)) tab_student_avg_score.execute().print()---------------------------------------------------------------- | name | avg | ---------------------------------------------------------------- | 孙七 | 60.0 | | 张三 | 70.0 | | 李四 | 85.0 | | 王五 | 90.0 | | 赵六 | 77.5 | ---------------------------------------------------------------- 5 rows in set计算每课的平均分 按姓名class聚类UDTF统计聚类后集合的均值并返回别名UDTF返回的列名select出数据 udaf(result_typeDataTypes.ROW([DataTypes.FIELD(avg, DataTypes.FLOAT())]), func_typepandas)def avg_score(pandas_df: pd.DataFrame):return Row(pandas_df.mean())tab_class_avg_score tab_source.group_by(col(class)) \.aggregate(avg_score(col(score)).alias(avg)) \.select(col(class), col(avg)) tab_class_avg_score.execute().print()---------------------------------------------------------------- | class | avg | ---------------------------------------------------------------- | English | 82.5 | | Math | 75.0 | ---------------------------------------------------------------- 2 rows in set计算每个人的最高分和最低分 按姓名name聚类UDTF统计聚类后集合的最大值和最小值并返回别名UDTF返回的列名select出数据 udaf(result_typeDataTypes.ROW([DataTypes.FIELD(max, DataTypes.FLOAT()), DataTypes.FIELD(min, DataTypes.FLOAT())]), func_typepandas)def max_min_score(pandas_df: pd.DataFrame):return Row(pandas_df.max(), pandas_df.min())tab_student_max_min_score tab_source.group_by(col(name)) \.aggregate(max_min_score(col(score)).alias(max, min)) \.select(col(name), col(max), col(min)) tab_student_max_min_score.execute().print()------------------------------------------------------------------------------------------------ | name | max | min | ------------------------------------------------------------------------------------------------ | 孙七 | 60.0 | 60.0 | | 张三 | 80.0 | 60.0 | | 李四 | 95.0 | 75.0 | | 王五 | 90.0 | 90.0 | | 赵六 | 85.0 | 70.0 | ------------------------------------------------------------------------------------------------ 5 rows in set入参是表中一行Row的集合 计算每个人的最高分、最低分以及所属的课程 按姓名name聚类UDTF统计聚类后集合中分数最大值、最小值分数最大值所在行的课程名和分数最小值所在行的课程名并返回别名UDTF返回的列名select出数据 udaf(result_typeDataTypes.ROW([DataTypes.FIELD(max, DataTypes.FLOAT()), DataTypes.FIELD(max tag, DataTypes.STRING()), DataTypes.FIELD(min, DataTypes.FLOAT()), DataTypes.FIELD(min tag, DataTypes.STRING())]), func_typepandas)def max_min_score_with_class(pandas_df: pd.DataFrame):return Row(pandas_df[score].max(), pandas_df.loc[pandas_df[score].idxmax(), class], pandas_df[score].min(), pandas_df.loc[pandas_df[score].idxmin(), class])tab_student_max_min_score tab_source.group_by(col(name)) \.aggregate(max_min_score_with_class.alias(max, class(max), min, class(min))) \.select(col(name), col(max), col(class(max)), col(min), col(class(min))) tab_student_max_min_score.execute().print()---------------------------------------------------------------------------------------------------------------------------------------------------------------- | name | max | class(max) | min | class(min) | ---------------------------------------------------------------------------------------------------------------------------------------------------------------- | 孙七 | 60.0 | Math | 60.0 | Math | | 张三 | 80.0 | English | 60.0 | Math | | 李四 | 95.0 | Math | 75.0 | English | | 王五 | 90.0 | English | 90.0 | English | | 赵六 | 85.0 | English | 70.0 | Math | ---------------------------------------------------------------------------------------------------------------------------------------------------------------- 5 rows in set计算每课的最高分数、最低分数以及所属人 按姓名class聚类UDTF统计聚类后集合中分数最大值、最小值分数最大值所在行的人名和分数最小值所在行的人名并返回别名UDTF返回的列名select出数据 udaf(result_typeDataTypes.ROW([DataTypes.FIELD(max, DataTypes.FLOAT()), DataTypes.FIELD(max tag, DataTypes.STRING()), DataTypes.FIELD(min, DataTypes.FLOAT()), DataTypes.FIELD(min tag, DataTypes.STRING())]), func_typepandas)def max_min_score_with_name(pandas_df: pd.DataFrame):return Row(pandas_df[score].max(), pandas_df.loc[pandas_df[score].idxmax(), name], pandas_df[score].min(), pandas_df.loc[pandas_df[score].idxmin(), name])tab_class_max_min_score tab_source.group_by(col(class)) \.aggregate(max_min_score_with_name.alias(max, name(max), min, name(min))) \.select(col(class), col(max), col(name(max)), col(min), col(name(min))) tab_class_max_min_score.execute().print()---------------------------------------------------------------------------------------------------------------------------------------------------------------- | class | max | name(max) | min | name(min) | ---------------------------------------------------------------------------------------------------------------------------------------------------------------- | English | 90.0 | 王五 | 75.0 | 李四 | | Math | 95.0 | 李四 | 60.0 | 张三 | ---------------------------------------------------------------------------------------------------------------------------------------------------------------- 2 rows in set完整代码 入参并非表中一行Row的集合 from pyflink.common import Configuration from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema) from pyflink.table.types import DataTypes from pyflink.table.table_descriptor import TableDescriptor from pyflink.table.expressions import lit, col from pyflink.common import Row from pyflink.table.udf import udf,udtf,udaf,udtaf import pandas as pd from pyflink.table.udf import UserDefinedFunctiondef word_count():config Configuration()# write all the data to one fileconfig.set_string(parallelism.default, 1)env_settings EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env TableEnvironment.create(env_settings)row_type_tab_source DataTypes.ROW([DataTypes.FIELD(name, DataTypes.STRING()), DataTypes.FIELD(score, DataTypes.FLOAT()), DataTypes.FIELD(class, DataTypes.STRING())])students_score [(张三, 80.0, English),(李四, 75.0, English),(王五, 90.0, English),(赵六, 85.0, English),(张三, 60.0, Math),(李四, 95.0, Math),(王五, 90.0, Math),(赵六, 70.0, Math),(孙七, 60.0, Math),]tab_source t_env.from_elements(students_score, row_type_tab_source )udaf(result_typeDataTypes.ROW([DataTypes.FIELD(count, DataTypes.BIGINT())]), func_typepandas)def exam_count(pandas_df: pd.DataFrame):return Row(pandas_df.count())tab_student_exam_count tab_source.group_by(col(name)) \.aggregate(exam_count(col(name)).alias(count)) \.select(col(name), col(count)) tab_student_exam_count.execute().print()tab_class_exam_count tab_source.group_by(col(class)) \.aggregate(exam_count(col(class)).alias(count)) \.select(col(class), col(count)) tab_class_exam_count.execute().print()udaf(result_typeDataTypes.ROW([DataTypes.FIELD(avg, DataTypes.FLOAT())]), func_typepandas)def avg_score(pandas_df: pd.DataFrame):return Row(pandas_df.mean())tab_student_avg_score tab_source.group_by(col(name)) \.aggregate(avg_score(col(score)).alias(avg)) \.select(col(name), col(avg)) tab_student_avg_score.execute().print()tab_class_avg_score tab_source.group_by(col(class)) \.aggregate(avg_score(col(score)).alias(avg)) \.select(col(class), col(avg)) tab_class_avg_score.execute().print()udaf(result_typeDataTypes.ROW([DataTypes.FIELD(max, DataTypes.FLOAT()), DataTypes.FIELD(min, DataTypes.FLOAT())]), func_typepandas)def max_min_score(pandas_df: pd.DataFrame):return Row(pandas_df.max(), pandas_df.min())tab_student_max_min_score tab_source.group_by(col(name)) \.aggregate(max_min_score(col(score)).alias(max, min)) \.select(col(name), col(max), col(min)) tab_student_max_min_score.execute().print()if __name__ __main__:word_count()入参是表中一行Row的集合 from pyflink.common import Configuration from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema) from pyflink.table.types import DataTypes from pyflink.table.table_descriptor import TableDescriptor from pyflink.table.expressions import lit, col from pyflink.common import Row from pyflink.table.udf import udf,udtf,udaf,udtaf import pandas as pd from pyflink.table.udf import UserDefinedFunctiondef word_count():config Configuration()# write all the data to one fileconfig.set_string(parallelism.default, 1)env_settings EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env TableEnvironment.create(env_settings)row_type_tab_source DataTypes.ROW([DataTypes.FIELD(name, DataTypes.STRING()), DataTypes.FIELD(score, DataTypes.FLOAT()), DataTypes.FIELD(class, DataTypes.STRING())])students_score [(张三, 80.0, English),(李四, 75.0, English),(王五, 90.0, English),(赵六, 85.0, English),(张三, 60.0, Math),(李四, 95.0, Math),(王五, 90.0, Math),(赵六, 70.0, Math),(孙七, 60.0, Math),]tab_source t_env.from_elements(students_score, row_type_tab_source )udaf(result_typeDataTypes.ROW([DataTypes.FIELD(max, DataTypes.FLOAT()), DataTypes.FIELD(max tag, DataTypes.STRING()), DataTypes.FIELD(min, DataTypes.FLOAT()), DataTypes.FIELD(min tag, DataTypes.STRING())]), func_typepandas)def max_min_score_with_class(pandas_df: pd.DataFrame):return Row(pandas_df[score].max(), pandas_df.loc[pandas_df[score].idxmax(), class], pandas_df[score].min(), pandas_df.loc[pandas_df[score].idxmin(), class])tab_student_max_min_score tab_source.group_by(col(name)) \.aggregate(max_min_score_with_class.alias(max, class(max), min, class(min))) \.select(col(name), col(max), col(class(max)), col(min), col(class(min))) tab_student_max_min_score.execute().print()udaf(result_typeDataTypes.ROW([DataTypes.FIELD(max, DataTypes.FLOAT()), DataTypes.FIELD(max tag, DataTypes.STRING()), DataTypes.FIELD(min, DataTypes.FLOAT()), DataTypes.FIELD(min tag, DataTypes.STRING())]), func_typepandas)def max_min_score_with_name(pandas_df: pd.DataFrame):return Row(pandas_df[score].max(), pandas_df.loc[pandas_df[score].idxmax(), name], pandas_df[score].min(), pandas_df.loc[pandas_df[score].idxmin(), name])tab_class_max_min_score tab_source.group_by(col(class)) \.aggregate(max_min_score_with_name.alias(max, name(max), min, name(min))) \.select(col(class), col(max), col(name(max)), col(min), col(name(min))) tab_class_max_min_score.execute().print()if __name__ __main__:word_count()
http://www.w-s-a.com/news/816930/

相关文章:

  • 个人网站的设计与开发网站建设流程中哪些部分比较重要
  • 招聘网站如何建设中国计算机网络公司排名
  • 工信部网站备案规定厦门在线制作网站
  • 商丘网站公司智联招聘手机app下载
  • 江西专业南昌网站建设中国专业的网站建设
  • 物流企业网站建设方案招标网站有哪些
  • 网站建设服务中企动力建筑工程网络进度计划备注填写范例
  • 电子商务网站开发与建设试卷php网站开发专业
  • 运城网站制作路90江苏省网站备案系统
  • 唐山做企业网站实体门店管理系统
  • 网站优化推广教程深圳网站建设世纪前线
  • 网站建设专家哪家好兰州网络推广执行
  • 广东住房和城乡建设厅网站王芃增加网站收录
  • 北京网站建设手机app电子商务网红营销的劣势
  • 网站 营销型wordpress获取4条文章标题
  • 浦东区建设工程监督网站建立全国统一的突发事件信息系统
  • 做网站需要基础吗重庆市造价信息网
  • 我要建设公司网站大连培训网站建设
  • 网站建设校长信箱设计方案小程序报价开发
  • 电子网站建设ppt模板营销策划方案怎么写?
  • 什么网站收录排名最高济南能源建设网站
  • 深圳移动网站建设公司价格桂林做网站哪家公司好
  • 互联网网站名字网站合作建设合同
  • 舟山高端网站设计广州优化排名推广
  • 哪个网站做免费广告好上海人才网站
  • cn域名做网站竞价推广代理
  • 省建设干部培训中心网站网站地图1 500 怎么做
  • 制作一个网站需要哪些人网站建设经营服务合同
  • 山东省住房和城乡建设厅官方网站网易发布广州
  • 长沙设计网站效果设计师灵感网站