dede网站搬家教程,旅游网站开发答辩ppt,广州企业建站模板,网站建设贵不贵Java Table API的使用 使用Java Table API开发添加依赖创建表环境创建表查询表输出表使用示例 表和流的转换流DataStream转换成表Table表Table转换成流DataStream示例数据类型 自定义函数UDF标量函数表函数聚合函数表聚合函数 API方法汇总基本方法列操作聚合操作Joins合并操作排… Java Table API的使用 使用Java Table API开发添加依赖创建表环境创建表查询表输出表使用示例 表和流的转换流DataStream转换成表Table表Table转换成流DataStream示例数据类型 自定义函数UDF标量函数表函数聚合函数表聚合函数 API方法汇总基本方法列操作聚合操作Joins合并操作排序Group WindowsOver Windows 使用Java Table API开发
添加依赖
在代码中使用Table API必须引入相关的依赖
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version
/dependency!--负责Table API和下层DataStream API的连接支持--
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge/artifactIdversion${flink.version}/version
/dependency!--在本地的集成开发环境里运行Table API和SQL的支持--
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner-loader/artifactIdversion${flink.version}/version
/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-runtime/artifactIdversion${flink.version}/version
/dependency作者CodeDevMaster
链接https://juejin.cn/spost/7299353390764163122
来源稀土掘金
著作权归作者所有。商业转载请联系作者获得授权非商业转载请注明出处。创建表环境 使用Table API和SQL需要一个特别的运行时环境就是表环境TableEnvironment。 主要作用
1.注册Catalog和表2.执行 SQL 查询3.注册用户自定义函数UDF4.DataStream 和表之间的转换每个表和SQL的执行都必须绑定在一个表环境中。 TableEnvironment是Table API中提供的基本接口类通过调用静态的create()方法来创建一个表环境实例。传入一个环境的配置参数EnvironmentSettings它可以指定当前表环境的执行模式和计划器。执行模式有批处理和流处理两种选择默认是流处理模式计划器默认使用blink planner。 EnvironmentSettings settings EnvironmentSettings.newInstance().inStreamingMode() // 使用流处理模式.build();TableEnvironment tableEnv TableEnvironment.create(setting);另一种更加简单的方式创建表环境
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);创建表 创建表的方式通过连接器connector、虚拟表virtual tables 1.连接器表 最直观的创建表的方式就是通过连接器连接到一个外部系统然后定义出对应的表结构。在代码中调用表环境的executeSql()方法传入一个DDL作为参数执行SQL操作。 例如传入一个CREATE语句进行表的创建并通过WITH关键字指定连接到外部系统的连接器
tableEnv.executeSql(CREATE [TEMPORARY] TABLE MyTable ... WITH ( connector ... ));tableEnv.executeSql(create table tb_test (f_sequence int,f_random int) WITH (connector print););2.虚拟表
在SQL中直接使用这张表进行查询转换调用表环境的sqlQuery()方法传入一条SQL语句作为参数执行查询得到的结果是一个Table对象。
Table newTable tableEnv.sqlQuery(SELECT ... FROM MyTable... );Table是Table API中提供的核心接口类代表一个Java中定义的表实例。由于该Table对象并没有在表环境中注册如果希望直接在SQL中使用还需要将这个中间结果表注册到环境中
tableEnv.createTemporaryView(new_table, newTable);tableEnv.sqlQuery(select * from new_table;);查询表 对一个表的查询操作就对应着流数据的转换处理。Flink提供了两种查询方式SQL和Table API 1.执行SQL进行查询
查询会得到一个新的Table对象
Table table tableEnv.sqlQuery(select * from tb;);可以再次将它注册为虚拟表继续在SQL中调用
tableEnv.createTemporaryView(new_table, table);
tableEnv.sqlQuery(select * from new_table;);也可以直接将查询的结果写入到已经注册的另一张表中需要调用表环境的executeSql()方法来执行DDL传入一个INSERT语句
tableEnv.executeSql(insert into tb select * from new_table);2.调用Table API进行查询 Table API是嵌入在Java和Scala语言内的查询API核心就是Table接口类通过一步步链式调用Table的方法就可以定义出所有的查询转换操作。 基于环境中已注册的表通过表环境的from()方法传入参数就是注册的表名得到一个Table对象调用API进行各种转换操作得到的是一个新的Table对象 Table source tableEnv.from(tb);Table result source// 查询条件.where($(f_sequence).isEqual(2))// 分组.groupBy($(f_sequence))// 聚合 起别名.aggregate($(f_random).sum().as(sumValue))// 查询返回字段.select($(f_sequence), $(sumValue)); 输出表 表的创建和查询对应流处理中的读取数据源Source和转换Transform。最后将结果数据输出到外部系统就对应着表的输出操作。 将结果表写入已注册的输出表中
// SQL方式
tableEnv.executeSql(insert into tb_test select f_sequence,f_random from tb_tmp);// Table API方式
result.executeInsert(tb_test);使用示例 public static void main(String[] args) {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();/*** 创建表环境* 方式一*/
// EnvironmentSettings settings EnvironmentSettings.newInstance().inStreamingMode().build();
// StreamTableEnvironment tableEnv TableEnvironment.create(settings);/*** 创建表环境* 方式二*/StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);// 创建表tableEnv.executeSql(CREATE TABLE datagen (\n f_sequence INT,\n f_random INT,\n f_random_str STRING,\n ts AS localtimestamp,\n WATERMARK FOR ts AS ts\n ) WITH (\n connector datagen,\n rows-per-second5,\n fields.f_sequence.kindsequence,\n fields.f_sequence.start1,\n fields.f_sequence.end1000,\n fields.f_random.min1,\n fields.f_random.max1000,\n fields.f_random_str.length10\n ););tableEnv.executeSql(create table tb_test (f_sequence int,f_random int) WITH (connector print););// 使用sql方式查询Table table tableEnv.sqlQuery(select f_sequence,f_random,f_random_str from datagen where f_sequence 5;);// 把table对象注册成表名tableEnv.createTemporaryView(tb_tmp, table);tableEnv.sqlQuery(select * from tb_tmp where f_sequence 3);// 输出到另一张表
// tableEnv.executeSql(insert into tb_test select f_sequence,f_random from tb_tmp);// table api方式Table source tableEnv.from(datagen);Table result source// 查询条件.where($(f_sequence).isEqual(2))// 分组.groupBy($(f_sequence))// 聚合 起别名.aggregate($(f_random).sum().as(sumValue))// 查询返回字段.select($(f_sequence), $(sumValue));// 输出到另一张表result.executeInsert(tb_test);}如下一个更加复杂的Table API程序扫描Orders表过滤空值使字符串类型的字段标准化并且每个小时进行一次计算并返回a的平均账单金额b。
// 指定表程序
Table orders tEnv.from(Orders);Table result orders.filter(and($(a).isNotNull(),$(b).isNotNull(),$(c).isNotNull())).select($(a).lowerCase().as(a), $(b), $(rowtime)).window(Tumble.over(lit(1).hours()).on($(rowtime)).as(hourlyWindow)).groupBy($(hourlyWindow), $(a)).select($(a), $(hourlyWindow).end().as(hour), $(b).avg().as(avgBillingAmount));表和流的转换
流DataStream转换成表Table
1.调用fromDataStream()方法 // 将数据流转换成表Table table tableEnv.fromDataStream(dataStreamSource);// 指定提取哪些属性作为表中的字段名Table table1 tableEnv.fromDataStream(dataStreamSource, $(id), $(name));// 通过表达式的as()方法对字段进行重命名Table table2 tableEnv.fromDataStream(dataStreamSource, $(id).as(uid), $(name).as(username));2.调用createTemporaryView() 直接在SQL中引用这张表调用表环境的createTemporaryView()方法创建虚拟视图 tableEnv.createTemporaryView(new_table, table);/*** String 注册的表名* DataStreamT DataStream* Expression 指定表中的字段*/tableEnv.createTemporaryView(new_table,dataStreamSource, $(id).as(uid),$(name));表Table转换成流DataStream
1.调用toDataStream()
// 将数据流转换成表
Table table tableEnv.fromDataStream(dataStreamSource);DataStreamRow dataStream tableEnv.toDataStream(table);
dataStream.print();2.调用toChangelogStream()
// 将表转换成更新日志流DataStreamRow dataStream tableEnv.toChangelogStream(table);
dataStream.print();示例 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceTuple2String, Integer dataStreamSource env.fromElements(Tuple2.of(a, 1), Tuple2.of(b, 2), Tuple2.of(c, 3), Tuple2.of(d, 4));StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);// 流转表Table table tableEnv.fromDataStream(dataStreamSource);tableEnv.createTemporaryView(newTable, table);Table filterTable tableEnv.sqlQuery(select f0,f1 from newTable where f12);Table sumTable tableEnv.sqlQuery(select f0,sum(f1) from newTable group by f0);// 表转流// 追加流tableEnv.toDataStream(filterTable, Row.class).print(stream-1);// 将表转换成更新日志流tableEnv.toChangelogStream(sumTable).print(stream-2);// 注意:只要调用DataStreamAPI就需要调用executeenv.execute();}stream-1:6 I[d, 4]
stream-1:5 I[c, 3]
stream-2:2 I[d, 4]
stream-2:1 I[a, 1]
stream-2:7 I[c, 3]
stream-2:1 I[b, 2]数据类型 DataStream与Table之间存在数据类型转换问题以下是一些常见的数据类型参考文档 数据类型 1.Tuple类型
Table支持Flink中定义的元组类型Tuple对应在表中字段名默认就是元组中元素的属性名f0、f1、f2...所有字段都可以重新排序 可以提取其中的一部分字段 可以通过调用表达式的as()方法来进行重命名2.POJO类型
如果不指定字段名称直接使用原始POJO类型中的字段名称POJO中的字段同样可以被重新排序、提却和重命名3.ROW类型
通用的数据类型行Row它是Table中数据的基本组织形式// 流转表
Table table tableEnv.fromDataStream(dataStreamSource, $(f1).as(num), $(f0).as(key));
tableEnv.createTemporaryView(newTable, table);Table filterTable tableEnv.sqlQuery(select num,key from newTable where num 2);// 表转流
tableEnv.toDataStream(filterTable, Row.class).print(stream-1); // 流转表
Table table tableEnv.fromDataStream(dataStreamSource, $(id).as(uid), $(name).as(username));
tableEnv.createTemporaryView(newTable, table);
Table filterTable tableEnv.sqlQuery(select uid,name from newTable where uid 2);// 表转流
tableEnv.toDataStream(filterTable, User.class).print(stream-1);自定义函数UDF 系统函数不可能涵盖所有的功能当系统函数不支持需求就需要用自定义函数来实现。 Flink的Table API和SQL提供了多种自定义函数的接口以抽象类的形式定义。
主要有以下几类
标量函数Scalar Functions将输入的标量值转换成一个新的标量值表函数Table Functions将标量值转换成一个或多个新的行数据也就是扩展成一个表聚合函数Aggregate Functions将多行数据里的标量值转换成一个新的标量值表聚合函数Table Aggregate Functions将多行数据里的标量值转换成一个或多个新的行数据1.注册函数
tableEnv.createTemporarySystemFunction(MyFunction, MyFunction.class);2.使用Table API调用函数
# 使用call()方法来调用自定义函数 1.注册的函数名 2.函数调用时本身的参数
tableEnv.from(MyTable).select(call(MyFunction, $(myField)));3.在SQL中调用函数
tableEnv.sqlQuery(SELECT MyFunction(myField) FROM MyTable);标量函数 标量函数Scalar Functions是一种在输入参数上执行计算并返回单个值的函数。在 Flink SQL 中可以使用内置的标量函数或自定义的标量函数来处理表达式和数据转换操作。 内置标量函数示例 使用内置的 UPPER 标量函数将 name 字段转换为大写并使用 LENGTH 标量函数计算 description 字段的长度。 SELECT id, UPPER(name) as uppercase_name, LENGTH(description) as description_length
FROM myTable自定义标量函数示例
自定义标量函数可以把0个、1个或多个标量值转换成一个标量值它对应的输入是一行数据中的字段输出则是唯一的值。是一个一对一的转换关系。自定义一个类来继承抽象类ScalarFunction并实现叫作eval() 的求值方法。标量函数的行为就取决于求值方法的定义它必须是公有的public而且名字必须是eval。求值方法eval可以重载多次任何数据类型都可作为求值方法的参数和返回值类型。注意ScalarFunction抽象类中并没有定义eval()方法所以不能直接在代码中重写override但Table API的框架底层又要求了求值方法必须名字为eval()。public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceTuple2String, Integer dataStreamSource env.fromElements(Tuple2.of(a, 1), Tuple2.of(b, 2), Tuple2.of(c, 3), Tuple2.of(d, 4));StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);Table table tableEnv.fromDataStream(dataStreamSource);tableEnv.createTemporaryView(newTable, table);// 注册函数tableEnv.createTemporaryFunction(MyScalarFunction, MyScalarFunction.class);// 调用自定义函数// sql用法tableEnv.sqlQuery(select MyScalarFunction(f0,1) from newTable).execute() // 调用execute就不需要调用env.execute().print();// table api用法table.select(call(MyScalarFunction, $(f1), 1)).execute().print();}/*** 自定义函数的实现类* DataTypeHint(inputGroup InputGroup.ANY)对输入参数的类型做标注表示参数可以是任意类型* 参数1接受任意类型的输入 参数2要求int类型* 返回INT型的求和输出*/public static class MyScalarFunction extends ScalarFunction {public int eval(DataTypeHint(inputGroup InputGroup.ANY) Object a, int b) {return a.hashCode() b;}}-----------------
| op | EXPR$0 |
-----------------
| I | 98 |
| I | 99 |
| I | 100 |
| I | 101 |
-----------------
4 rows in set
-----------------
| op | _c0 |
-----------------
| I | 2 |
| I | 3 |
| I | 4 |
| I | 5 |
-----------------
4 rows in set表函数 表函数Table Functions是一种可以接受一行或多行输入并返回一个新的表作为结果的函数。在 Flink SQL 中您可以使用内置的表函数或自定义的表函数来进行表操作和数据转换。 内置表函数示例 使用内置的 EXPLODE 函数将 sentence 字段按空格分割并生成一个新的表作为 T其中每个单词都是一行。然后可以将单词和原始表的其他字段进行组合或过滤。 SELECT id, word
FROM myTable, LATERAL TABLE(EXPLODE(split(sentence, ))) as T(word)自定义表函数示例
表函数的输入参数可以是 0个、1个或多个标量值它可以返回任意多行数据。表函数可以认为就是返回一个表的函数是一个一对多的转换关系。自定义表函数需要自定义类来继承抽象类TableFunction内部必须要实现一个名为eval 的求值方法。TableFunction类本身是有一个泛型参数T是表函数返回数据的类型eval()方法没有返回类型内部也没有return语句是通过调用collect()方法来发送想要输出的行数据在SQL中调用表函数需要使用LATERAL TABLE(TableFunction)来生成扩展的侧向表然后与原始表进行联结public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString dataStreamSource env.fromElements(java, python, flink);StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);Table table tableEnv.fromDataStream(dataStreamSource, $(world));tableEnv.createTemporaryView(wordTable, table);// 注册函数tableEnv.createTemporaryFunction(MyTableFunction, MyTableFunction.class);// 调用自定义函数// 交叉联结tableEnv.sqlQuery(select world,wordUpperCase,length from wordTable,lateral table(MyTableFunction(world))).execute().print();// 带 on true 条件的左联结tableEnv.sqlQuery(select world,wordUpperCase,length from wordTable left join lateral table(MyTableFunction(world)) on true).execute().print();// 对侧向表的中字段进行重命名tableEnv.sqlQuery(select world,newWord,newLength from wordTable left join lateral table(MyTableFunction(world)) as T(newWord,newLength) on true).execute().print();}/*** Row包含两个字段word和length* 将表函数的输出类型定义成ROW得到侧向表中的数据类型每行数据转换后也只有一行* 表字段起别名* 调用表函数lateral table(MyTableFunction(world)) as T(a, b)* 使用注解 FunctionHint(output DataTypeHint(ROWword STRING,length INT))*/FunctionHint(output DataTypeHint(ROWwordUpperCase STRING,length INT))public static class MyTableFunction extends TableFunctionRow {public void eval(String str) {Row row new Row(2);row.setField(0, str.toUpperCase());row.setField(1, str.length());collect(row);}}
---------------------------------------------------------------------------------
| op | world | wordUpperCase | length |
---------------------------------------------------------------------------------
| I | java | JAVA | 4 |
| I | python | PYTHON | 6 |
| I | flink | FLINK | 5 |
---------------------------------------------------------------------------------
3 rows in set
---------------------------------------------------------------------------------
| op | world | wordUpperCase | length |
---------------------------------------------------------------------------------
| I | java | JAVA | 4 |
| I | python | PYTHON | 6 |
| I | flink | FLINK | 5 |
---------------------------------------------------------------------------------
3 rows in set
---------------------------------------------------------------------------------
| op | world | newWord | newLength |
---------------------------------------------------------------------------------
| I | java | JAVA | 4 |
| I | python | PYTHON | 6 |
| I | flink | FLINK | 5 |
---------------------------------------------------------------------------------
3 rows in set聚合函数 聚合函数Aggregate Functions是一类用于对数据进行聚合计算的函数。在 Flink SQL 中可以使用内置的聚合函数来计算数据集的汇总结果。 内置聚合函数示例 使用 SUM 函数计算 salary 字段的总和使用 COUNT 函数计算员工总数使用 AVG 函数计算 age 字段的平均值。 SELECT SUM(salary) as total_salary, COUNT(*) as employee_count, AVG(age) as avg_age
FROM employees自定义聚合函数示例
自定义聚合函数会把一行或多行数据聚合成一个标量值。是一个标准的多对一的转换。自定义聚合函数需要继承抽象类AggregateFunction。有两个泛型参数T, ACCT表示聚合输出的结果类型ACC表示聚合的中间状态类型。聚合函数的工作原理
创建一个累加器accumulator用来存储聚合的中间结果。累加器可以看作是一个聚合状态。调用createAccumulator()方法可以创建一个空的累加器对于输入的每一行数据都会调用accumulate()方法来更新累加器这是聚合的核心过程当所有的数据都处理完之后调用getValue()方法来计算并返回最终的结果public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceTuple3String, Integer, Integer dataStreamSource env.fromElements(Tuple3.of(a, 1, 2), Tuple3.of(b, 2, 3), Tuple3.of(b, 3, 4), Tuple3.of(a, 4, 5));StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);Table table tableEnv.fromDataStream(dataStreamSource, $(f0).as(key), $(f1).as(a), $(f2).as(b));tableEnv.createTemporaryView(newTable, table);tableEnv.createTemporaryFunction(MyAggregateFunction, MyAggregateFunction.class);tableEnv.sqlQuery(select key,MyAggregateFunction(a,b) as keyValue from newTable group by key).execute().print();}/*** 自定义聚合函数继承AggregateFunction返回类型累加器类型*/public static class MyAggregateFunction extends AggregateFunctionDouble, Tuple2Integer, Integer {/*** 创建累加器的方法* 没有输入参数返回类型为累加器类型ACC** return*/Overridepublic Tuple2Integer, Integer createAccumulator() {return Tuple2.of(0, 0);}/*** 进行聚合计算的核心方法每来一行数据都会调用* p* 第一个参数: 当前累加器类型为ACC表示当前聚合的中间状态* 后面参数: 聚合函数调用时传入的参数可以有多个类型也可以不同* p* 主要功能更新聚合状态没有返回类型* p* 必须为public方法名必须为accumulate且不能直接override、只能手动实现** param acc 累加器类型* param a 第一个参数* param b 第二个参数*/public void accumulate(Tuple2Integer, Integer acc, Integer a, Integer b) {acc.f0 a;acc.f1 b;}/*** 返回结果的方法* 输入参数是ACC类型的累加器* 输出类型为T* 在遇到复杂类型时Flink类型推导可能会无法得到正确的结果:可以通过 getAccumulatorType()和getResultType()两个方法来指定** return*/Overridepublic Double getValue(Tuple2Integer, Integer acc) {return (acc.f0 acc.f1) * 1D / 2;}}--------------------------------------------------------------------
| op | key | keyValue |
--------------------------------------------------------------------
| I | a | 1.5 |
| I | b | 2.5 |
| -U | b | 2.5 |
| U | b | 6.0 |
| -U | a | 1.5 |
| U | a | 6.0 |
--------------------------------------------------------------------
6 rows in set表聚合函数 表聚合函数Table Aggregate Functions是一类用于对表格数据进行聚合计算的函数。它们可以在聚合操作中处理整个表格或者表格的子集并返回聚合结果。 可以使用表聚合函数来执行更复杂的聚合操作如按组进行计算、按时间窗口进行计算等。 1.GROUP BY 聚合 根据 department 字段对 employees 表格进行分组并计算每个部门的总销售额和平均工资 SELECT department, SUM(sales) as total_sales, AVG(salary) as avg_salary
FROM employees
GROUP BY department2.时间窗口聚合 使用 TUMBLE 函数将 order_time 字段按照 1 小时的窗口进行划分并计算每个窗口内的订单数量和销售总额。 SELECT TUMBLE_START(order_time, INTERVAL 1 HOUR) as window_start,COUNT(*) as order_count,SUM(order_amount) as total_amount
FROM orders
GROUP BY TUMBLE(order_time, INTERVAL 1 HOUR)自定义表聚合函数
表聚合函数UDTAGG可以把一行或多行数据聚合成另一张表结果表中可以有多行多列。是一个多对多的转换。自定义表聚合函数需要继承抽象类TableAggregateFunction。TableAggregateFunction的结构和原理与AggregateFunction非常类似同样有两个泛型参数T, ACC用一个ACC类型的累加器accumulator来存储聚合的中间结果。public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceTuple2String, Integer dataStreamSource env.fromElements(Tuple2.of(a, 1), Tuple2.of(b, 2), Tuple2.of(a, 3), Tuple2.of(b, 4));StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);Table table tableEnv.fromDataStream(dataStreamSource, $(f0).as(key), $(f1).as(value));tableEnv.createTemporaryFunction(MyTableAggregateFunction, MyTableAggregateFunction.class);// 能用Table APItable.flatAggregate(call(MyTableAggregateFunction, $(key), $(value))).select($(f0).as(newKey), $(f1).as(newValue)).execute().print();}/*** 继承 TableAggregateFunction返回类型累加器类型*/public static class MyTableAggregateFunction extends TableAggregateFunctionTuple2String, Integer, MapString, Integer {/*** 创建累加器** return*/Overridepublic MapString, Integer createAccumulator() {MapString, Integer map new HashMap();return map;}/*** 聚合计算的核心 每来一个数据调用一次** param acc 累加器* param key 第一个参数* param value 第二个参数*/public void accumulate(MapString, Integer acc, String key, Integer value) {if (acc.containsKey(key)) {acc.put(key, acc.get(key) value);} else {acc.put(key, value);}}/*** 输出结果* p* 所有输入行处理完成后输出最终计算结果的方法* p* emitValue没有输出类型而输入参数有两个1.ACC类型的累加器2.用于输出数据的收集器out它的类型为CollectT* p* emitValue()在抽象类中没有定义无法override必须手动实现** param acc 累加器* param out 采集器返回类型*/public void emitValue(MapString, Integer acc, CollectorTuple2String, Integer out) {acc.entrySet().stream().forEach(item - {out.collect(Tuple2.of(item.getKey(), item.getValue()));});}}-------------------------------------------------
| op | newKey | newValue |
-------------------------------------------------
| I | a | 1 |
| -D | a | 1 |
| I | a | 1 |
| I | b | 2 |
| -D | a | 1 |
| -D | b | 2 |
| I | a | 4 |
| I | b | 2 |
| -D | a | 4 |
| -D | b | 2 |
| I | a | 4 |
| I | b | 6 |
-------------------------------------------------
12 rows in setAPI方法汇总
基本方法
From 执行一个注册过的表的扫描 Table orders tableEnv.from(Orders);FromValues 基于提供的行生成一张内联表。可以使用row(…)表达式创建复合行 Table table tEnv.fromValues(row(1, ABC),row(2L, ABCDE)
);生成结构:
f0: BIGINT NOT NULL
f1: VARCHAR(5) NOT NULL注意 方法会根据输入的表达式自动获取类型。如果在某一个特定位置的类型不一致该方法会尝试寻找一个所有类型的公共超类型。如果公共超类型不存在则会抛出异常。 明确指定所需的类型:
Table table tEnv.fromValues(DataTypes.ROW(DataTypes.FIELD(id, DataTypes.DECIMAL(10, 2)),DataTypes.FIELD(name, DataTypes.STRING())),row(1, ABC),row(2L, ABCDE)
);生成表结构
id: DECIMAL(10, 2)
name: STRINGSelect 执行一个select操作 Table orders tableEnv.from(Orders);
Table result orders.select($(a), $(c).as(d));
Table result orders.select($(*));As 重命名字段 Table orders tableEnv.from(Orders);
Table result orders.as(x, y, z, t);Where / Filter 过滤掉未验证通过过滤谓词的行 Table orders tableEnv.from(Orders);
Table result orders.where($(b).isEqual(red));
Table result orders.filter($(b).isEqual(red));insertInto 该方法执行对已注册的输出表的插入操作方法会将 INSERT INTO 转换为一个 TablePipeline。 该数据流可以用 TablePipeline.explain() 来解释用 TablePipeline.execute() 来执行。 输出表必须已注册在TableEnvironment中。此外已注册表的 schema 必须与查询中的schema相匹配。 Table orders tableEnv.from(Orders);
orders.insertInto(OutOrders).execute();列操作
1.addColumns 执行字段添加操作。 如果所添加的字段已经存在将抛出异常。 Table result orders.addColumns(concat($(c), sunny));2.addOrReplaceColumns 执行字段添加操作。 如果添加的列名称和已存在的列名称相同则已存在的字段将被替换。 此外如果添加的字段里面有重复的字段名则会使用最后一个字段。 Table result orders.addOrReplaceColumns(concat($(c), sunny).as(desc));3.dropColumns
Table result orders.dropColumns($(b), $(c));4.renameColumns 执行字段重命名操作。 字段表达式应该是别名表达式并且仅当字段已存在时才能被重命名。 Table result orders.renameColumns($(b).as(b2), $(c).as(c2));聚合操作
1.groupBy 使用分组键对行进行分组使用伴随的聚合算子来按照组进行聚合行。 Table orders tableEnv.from(Orders);
Table result orders.groupBy($(a)).select($(a), $(b).sum().as(d));2.GroupBy Window 使用分组窗口结合单个或者多个分组键对表进行分组和聚合。 Table result orders.window(Tumble.over(lit(5).minutes()).on($(rowtime)).as(w)) // 定义窗口.groupBy($(a), $(w)) // 按窗口和键分组// 访问窗口属性并聚合.select($(a),$(w).start(),$(w).end(),$(w).rowtime(),$(b).sum().as(d));3.Over Window 所有的聚合必须定义在同一个窗口上比如同一个分区、排序和范围内。目前只支持PRECEDING 到当前行范围无界或有界的窗口。ORDER BY操作必须指定一个单一的时间属性。 Table result orders// 定义窗口.window(Over.partitionBy($(a)).orderBy($(rowtime)).preceding(UNBOUNDED_RANGE).following(CURRENT_RANGE).as(w))// 滑动聚合.select($(a),$(b).avg().over($(w)),$(b).max().over($(w)),$(b).min().over($(w)));4.Distinct 和SQL DISTINCT聚合子句类似例如COUNT(DISTINCT a)。Distinct聚合声明的聚合函数内置或用户定义的仅应用于互不相同的输入值。 Table orders tableEnv.from(Orders);// 按属性分组后的的互异互不相同、去重聚合
Table groupByDistinctResult orders.groupBy($(a)).select($(a), $(b).sum().distinct().as(d));// 按属性、时间窗口分组后的互异互不相同、去重聚合
Table groupByWindowDistinctResult orders.window(Tumble.over(lit(5).minutes()).on($(rowtime)).as(w)).groupBy($(a), $(w)).select($(a), $(b).sum().distinct().as(d));// over window 上的互异互不相同、去重聚合
Table result orders.window(Over.partitionBy($(a)).orderBy($(rowtime)).preceding(UNBOUNDED_RANGE).as(w)).select($(a), $(b).avg().distinct().over($(w)),$(b).max().over($(w)),$(b).min().over($(w)));// 对 user-defined aggregate functions 使用互异互不相同、去重聚合
tEnv.registerFunction(myUdagg, new MyUdagg());
orders.groupBy(users).select($(users),call(myUdagg, $(points)).distinct().as(myDistinctResult));Table result orders.distinct();Joins
1.Inner Join 关联两张表。两张表必须有不同的字段名并且必须通过 join 算子或者使用 where 或 filter 算子定义至少一个 join 等式连接谓词。 Table left tableEnv.from(MyTable).select($(a), $(b), $(c));
Table right tableEnv.from(MyTable).select($(d), $(e), $(f));
Table result left.join(right).where($(a).isEqual($(d))).select($(a), $(b), $(e));2.Outer Join 和SQL LEFT/RIGHT/FULL OUTER JOIN 子句类似。 关联两张表。 两张表必须有不同的字段名并且必须定义至少一个等式连接谓词。 Table left tableEnv.from(MyTable).select($(a), $(b), $(c));
Table right tableEnv.from(MyTable).select($(d), $(e), $(f));Table leftOuterResult left.leftOuterJoin(right, $(a).isEqual($(d))).select($(a), $(b), $(e));
Table rightOuterResult left.rightOuterJoin(right, $(a).isEqual($(d))).select($(a), $(b), $(e));
Table fullOuterResult left.fullOuterJoin(right, $(a).isEqual($(d))).select($(a), $(b), $(e));3.Interval Join 可以通过流模式处理的常规join的子集至少需要一个equi-join谓词和一个限制双方时间界限的join条件。这种条件可以由两个合适的范围谓词、、、或一个比较两个输入表相同时间属性即处理时间或事件时间的等值谓词来定义。 Table left tableEnv.from(MyTable).select($(a), $(b), $(c), $(ltime));
Table right tableEnv.from(MyTable).select($(d), $(e), $(f), $(rtime));Table result left.join(right).where(and($(a).isEqual($(d)),$(ltime).isGreaterOrEqual($(rtime).minus(lit(5).minutes())),$(ltime).isLess($(rtime).plus(lit(10).minutes())))).select($(a), $(b), $(e), $(ltime));4.Inner Join with Table Function join表和表函数的结果。左外部表的每一行都会join表函数相应调用产生的所有行。 如果表函数调用返回空结果则删除左侧外部表的一行。 // 注册 User-Defined Table Function
TableFunctionTuple3String,String,String split new MySplitUDTF();
tableEnv.registerFunction(split, split);// join
Table orders tableEnv.from(Orders);
Table result orders.joinLateral(call(split, $(c)).as(s, t, v)).select($(a), $(b), $(s), $(t), $(v));5.Left Outer Join with Table Function join表和表函数的结果。左外部表的每一行都会 join 表函数相应调用产生的所有行。如果表函数调用返回空结果则保留相应的 outer外部连接行并用空值填充右侧结果。目前表函数左外连接的谓词只能为空或字面常量真。 // join
Table orders tableEnv.from(Orders);
Table result orders.leftOuterJoinLateral(call(split, $(c)).as(s, t, v)).select($(a), $(b), $(s), $(t), $(v));6.Join with Temporal Table Temporal table是跟踪随时间变化的表提供对特定时间点temporal table状态的访问。表与temporal table函数进行 join 的语法和使用表函数进行 inner join 的语法相同。目前仅支持与 temporal table 的 inner join。 Table ratesHistory tableEnv.from(RatesHistory);// 注册带有时间属性和主键的 temporal table function
TemporalTableFunction rates ratesHistory.createTemporalTableFunction(r_proctime,r_currency);
tableEnv.registerFunction(rates, rates);// 基于时间属性和键与“Orders”表关联
Table orders tableEnv.from(Orders);
Table result orders.joinLateral(call(rates, $(o_proctime)), $(o_currency).isEqual($(r_currency)));合并操作
Union与UnionAll Union两张表会删除重复记录而UnionAll则不会。两张表必须具有相同的字段类型。 Table left tableEnv.from(orders1);
Table right tableEnv.from(orders2);left.union(right);
left.unionAll(right);Intersect与IntersectAll Intersect返回两个表中都存在的记录。如果一条记录在一张或两张表中存在多次则只返回一条记录。 IntersectAll 返回两个表中都存在的记录。如果一条记录在两张表中出现多次那么该记录返回的次数同该记录在两个表中都出现的次数一致。 两张表必须具有相同的字段类型。 left.intersect(right);left.intersectAll(right);Minus与MinusAll Minus返回左表中存在且右表中不存在的记录。左表中的重复记录只返回一次。 MinusAll返回右表中不存在的记录。在左表中出现n次且在右表中出现m次的记录在结果表中出现 (n - m) 次。 两张表必须具有相同的字段类型 left.minus(right);left.minusAll(right);IN 如果表达式的值存在于给定表的子查询中那么 In 子句返回 true。子查询表必须由一列组成。这个列必须与表达式具有相同的数据类型。 Table result left.select($(a), $(b), $(c)).where($(a).in(right));排序
Order By 返回跨所有并行分区的全局有序记录。对于无界表该操作需要对时间属性进行排序或进行后续的 fetch 操作。 Table result tab.orderBy($(a).asc());Offset Fetch Offset 操作根据偏移位置来限定可能是已排序的结果集。Fetch操作将可能已排序的结果集限制为前 n 行。通常这两个操作前面都有一个排序操作。对于无界表offset操作需要 fetch 操作。 // 从已排序的结果集中返回前5条记录
Table result1 in.orderBy($(a).asc()).fetch(5);// 从已排序的结果集中返回跳过3条记录之后的所有记录
Table result2 in.orderBy($(a).asc()).offset(3);// 从已排序的结果集中返回跳过10条记录之后的前5条记录
Table result3 in.orderBy($(a).asc()).offset(10).fetch(5);Group Windows Group window聚合根据时间或行计数间隔将行分为有限组并为每个分组进行一次聚合函数计算。对于批处理表窗口是按时间间隔对记录进行分组的便捷方式。 基本使用示例如下
Table table input.window([GroupWindow w].as(w)) // 定义窗口并指定别名为 w.groupBy($(w)) // 以窗口 w 对表进行分组.select($(b).sum()); // 聚合在流环境中如果窗口聚合除了窗口之外还根据一个或多个属性进行分组则它们只能并行计算
Table table input.window([GroupWindow w].as(w)) // 定义窗口并指定别名为 w.groupBy($(w), $(a)) // 以属性 a 和窗口 w 对表进行分组.select($(a), $(b).sum()); // 聚合时间窗口的开始、结束或行时间戳等窗口属性可以作为窗口别名的属性添加到 select 子句中如 w.start、w.end 和 w.rowtime。窗口开始和行时间戳是包含的上下窗口边界。相反窗口结束时间戳是唯一的上窗口边界。 例如从下午 2 点开始的 30 分钟滚动窗口将 “14:00:00.000” 作为开始时间戳“14:29:59.999” 作为行时间时间戳“14:30:00.000” 作为结束时间戳。
Table table input.window([GroupWindow w].as(w)) // 定义窗口并指定别名为 w.groupBy($(w), $(a)) // 以属性 a 和窗口 w 对表进行分组.select($(a), $(w).start(), $(w).end(), $(w).rowtime(), $(b).count()); // 聚合并添加窗口开始、结束和 rowtime 时间戳Table API提供了一组具有特定语义的预定义 Window 类。下面列出了支持的窗口定义。
1.Tumble (Tumbling Windows) 滚动窗口将行分配给固定长度的非重叠连续窗口。 例如一个 5 分钟的滚动窗口以 5 分钟的间隔对行进行分组。滚动窗口可以定义在事件时间、处理时间或行数上。 方法描述over将窗口的长度定义为时间或行计数间隔。on要对数据进行分组时间间隔或排序行计数的时间属性。批处理查询支持任意 Long 或 Timestamp 类型的属性。流处理查询仅支持声明的事件时间或处理时间属性。as指定窗口的别名。别名用于在 groupBy() 子句中引用窗口并可以在 select() 子句中选择如窗口开始、结束或行时间戳的窗口属性。
// Tumbling Event-time Window
.window(Tumble.over(lit(10).minutes()).on($(rowtime)).as(w));// Tumbling Processing-time Window (assuming a processing-time attribute proctime)
.window(Tumble.over(lit(10).minutes()).on($(proctime)).as(w));// Tumbling Row-count Window (assuming a processing-time attribute proctime)
.window(Tumble.over(rowInterval(10)).on($(proctime)).as(w));2.Slide (Sliding Windows) 滑动窗口具有固定大小并按指定的滑动间隔滑动。如果滑动间隔小于窗口大小则滑动窗口重叠。因此行可能分配给多个窗口。 例如15 分钟大小和 5 分钟滑动间隔的滑动窗口将每一行分配给 3 个不同的 15 分钟大小的窗口以 5 分钟的间隔进行一次计算。滑动窗口可以定义在事件时间、处理时间或行数上。 方法描述over将窗口的长度定义为时间或行计数间隔。every将窗口的长度定义为时间或行计数间隔。滑动间隔的类型必须与窗口长度的类型相同。on要对数据进行分组时间间隔或排序行计数的时间属性。批处理查询支持任意 Long 或 Timestamp 类型的属性。流处理查询仅支持声明的事件时间或处理时间属性。as指定窗口的别名。别名用于在 groupBy() 子句中引用窗口并可以在 select() 子句中选择如窗口开始、结束或行时间戳的窗口属性。
// Sliding Event-time Window
.window(Slide.over(lit(10).minutes()).every(lit(5).minutes()).on($(rowtime)).as(w));// Sliding Processing-time window (assuming a processing-time attribute proctime)
.window(Slide.over(lit(10).minutes()).every(lit(5).minutes()).on($(proctime)).as(w));// Sliding Row-count window (assuming a processing-time attribute proctime)
.window(Slide.over(rowInterval(10)).every(rowInterval(5)).on($(proctime)).as(w));3.Session (Session Windows) 会话窗口没有固定的大小其边界是由不活动的间隔定义的例如如果在定义的间隔期内没有事件出现则会话窗口将关闭。 例如定义30 分钟间隔的会话窗口当观察到一行在 30 分钟内不活动否则该行将被添加到现有窗口中且30 分钟内没有添加新行窗口会关闭。会话窗口支持事件时间和处理时间。 方法描述withGap将两个窗口之间的间隙定义为时间间隔。on要对数据进行分组时间间隔或排序行计数的时间属性。批处理查询支持任意 Long 或 Timestampas指定窗口的别名。别名用于在 groupBy() 子句中引用窗口并可以在 select() 子句中选择如窗口开始、结束或行时间戳的窗口属性。
// Session Event-time Window
.window(Session.withGap(lit(10).minutes()).on($(rowtime)).as(w));// Session Processing-time Window (assuming a processing-time attribute proctime)
.window(Session.withGap(lit(10).minutes()).on($(proctime)).as(w));Over Windows OverWindow 定义了计算聚合的行范围 Table table input.window([OverWindow w].as(w)) // 用别名w定义over窗口.select($(a), $(b).sum().over($(w)), $(c).min().over($(w))); // 对窗口w进行聚合Table API提供了Over 类来配置over window的属性。可以在事件时间或处理时间以及指定为时间间隔或行计数的范围内定义over window
Partition By
在一个或多个属性上定义输入的分区。每个分区单独排序聚合函数分别应用于每个分区在流环境中如果窗口包含 partition by 子句则只能并行计算 over window 聚合。如果没有 partitionBy(…)数据流将由单个非并行任务处理Order By
定义每个分区内行的顺序从而定义聚合函数应用于行的顺序对于流处理查询必须声明事件时间或处理时间属性。目前仅支持单个排序属性Preceding
定义了包含在窗口中并位于当前行之前的行的间隔。间隔可以是时间或行计数间隔有界over window用间隔的大小指定例如时间间隔为10分钟或行计数间隔为10行无界over window通过常量来指定例如用UNBOUNDED_RANGE指定时间间隔或用 UNBOUNDED_ROW 指定行计数间隔。无界 over windows 从分区的第一行开始如果省略前面的子句则使用 UNBOUNDED_RANGE 和 CURRENT_RANGE 作为窗口前后的默认值Following
定义包含在窗口中并在当前行之后的行的窗口间隔。间隔必须以与前一个间隔时间或行计数相同的单位指定目前不支持在当前行之后有行的 over window。相反可以指定两个常量之一CURRENT_ROW 将窗口的上限设置为当前行CURRENT_RANGE 将窗口的上限设置为当前行的排序键例如与当前行具有相同排序键的所有行都包含在窗口中如果省略后面的子句则时间间隔窗口的上限定义为 CURRENT_RANGE行计数间隔窗口的上限定义为CURRENT_ROWAs
为over window 指定别名。别名用于在之后的 select() 子句中引用该 over window目前同一个 select() 调用中的所有聚合函数必须在同一个 over window 上计算无界Over Windows
// 无界的事件时间 over window假定有一个叫“rowtime”的事件时间属性
.window(Over.partitionBy($(a)).orderBy($(rowtime)).preceding(UNBOUNDED_RANGE).as(w));// 无界的处理时间 over window假定有一个叫“proctime”的处理时间属性
.window(Over.partitionBy($(a)).orderBy(proctime).preceding(UNBOUNDED_RANGE).as(w));// 无界的事件时间行数 over window假定有一个叫“rowtime”的事件时间属性
.window(Over.partitionBy($(a)).orderBy($(rowtime)).preceding(UNBOUNDED_ROW).as(w));// 无界的处理时间行数 over window假定有一个叫“proctime”的处理时间属性
.window(Over.partitionBy($(a)).orderBy($(proctime)).preceding(UNBOUNDED_ROW).as(w));有界Over Windows
// 有界的事件时间 over window假定有一个叫“rowtime”的事件时间属性
.window(Over.partitionBy($(a)).orderBy($(rowtime)).preceding(lit(1).minutes()).as(w));// 有界的处理时间 over window假定有一个叫“proctime”的处理时间属性
.window(Over.partitionBy($(a)).orderBy($(proctime)).preceding(lit(1).minutes()).as(w));// 有界的事件时间行数 over window假定有一个叫“rowtime”的事件时间属性
.window(Over.partitionBy($(a)).orderBy($(rowtime)).preceding(rowInterval(10)).as(w));// 有界的处理时间行数 over window假定有一个叫“proctime”的处理时间属性
.window(Over.partitionBy($(a)).orderBy($(proctime)).preceding(rowInterval(10)).as(w));