电子商务网站建设 以为例,新手做亚马逊要逛哪些网站,北京网站设计提供商,手机网站链接微信文章目录 一. 时间属性介绍二. Table api指定时间属性三. 处理时间的指定1. 在创建表的 DDL 中定义2. 在 DataStream 到 Table 转换时定义3. 使用 TableSource 定义 四. 事件时间的指定1. 在 DDL 中定义2. 在 DataStream 到 Table 转换时定义3. 使用 TableSource 定义 五. 小结… 文章目录 一. 时间属性介绍二. Table api指定时间属性三. 处理时间的指定1. 在创建表的 DDL 中定义2. 在 DataStream 到 Table 转换时定义3. 使用 TableSource 定义 四. 事件时间的指定1. 在 DDL 中定义2. 在 DataStream 到 Table 转换时定义3. 使用 TableSource 定义 五. 小结 Flink 可以基于几种不同的 时间 概念来处理数据。 处理时间 指的是执行具体操作时的机器时间大家熟知的绝对时间, 例如 Java的 System.currentTimeMillis()) 事件时间 指的是数据本身携带的时间。这个时间是在事件产生时的时间。摄入时间 指的是数据进入 Flink 的时间在系统内部会把它当做事件时间来处理。 本页面说明了如何在 Flink Table API SQL 里面定义时间以及相关的操作。
一. 时间属性介绍
像窗口在 Table API 和 SQL 这种基于时间的操作需要有时间信息。
时间属性声明 在CREATE TABLE DDL创建表的时候指定在 DataStream 中指定在定义 TableSource 时指定 一旦时间属性定义好就可以像普通列一样使用也可以在时间相关的操作中使用。 时间属性的传递和物化 只要时间属性没有被修改而是简单地从一个表传递到另一个表它就仍然是一个有效的时间属性。时间属性可以像普通的时间戳的列一样被使用和计算。一旦时间属性被用在了计算中它就会被物化进而变成一个普通的时间戳。 注意 普通的时间戳是无法跟 Flink 的时间以及watermark等一起使用的所以普通的时间戳就无法用在时间相关的操作中。 二. Table api指定时间属性
Table API 程序需要在 streaming environment 中指定时间属性
final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default// 或者:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);三. 处理时间的指定
处理时间是基于机器的本地时间来处理数据它是最简单的一种时间概念但是它不能提供确定性。它既不需要从数据里获取时间也不需要生成 watermark。
共有三种方法可以定义处理时间。
1. 在创建表的 DDL 中定义
处理时间属性可以在创建表的 DDL 中用计算列的方式定义用 PROCTIME() 就可以定义处理时间函数 PROCTIME() 的返回类型是 TIMESTAMP_LTZ 。
CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL 10 MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL 10 MINUTE);2. 在 DataStream 到 Table 转换时定义
ing
3. 使用 TableSource 定义
ing
四. 事件时间的指定 事件时间允许程序按照数据中包含的时间来处理这样可以在数据乱序或者晚到情况下产生一致的处理结果。 它可以保证从外部存储读取数据后产生可以复现replayable的结果。 为了能够处理乱序的事件并且区分正常到达和晚到的事件Flink 需要从事件中获取事件时间并且产生 watermarkwatermarks。 同样事件时间的指定也有三种方式
1. 在 DDL 中定义
事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。 WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式同时标记这个已有字段为时间属性字段。 Flink 支持和在 TIMESTAMP不带时区 列和 TIMESTAMP_LTZ带有本地时区 列上定义事件时间。 如果源数据中的时间戳数据表示为年-月-日-时-分-秒则通常为不带时区信息的字符串值例如 2020-04-15 20:13:40.564建议将事件时间属性定义在 TIMESTAMP(不带时区) 列上: CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time TIMESTAMP(3),-- 声明 user_action_time 是事件时间属性并且用 延迟 5 秒的策略来生成 watermarkWATERMARK FOR user_action_time AS user_action_time - INTERVAL 5 SECOND
) WITH (...
);SELECT TUMBLE_START(user_action_time, INTERVAL 10 MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL 10 MINUTE);当源数据中的时间戳数据表示为一个纪元 (epoch) 时间通常是一个 long 值例如 1618989564564此时建议将事件时间属性定义在 TIMESTAMP_LTZ 列上
CREATE TABLE user_actions (user_name STRING,data STRING,ts BIGINT,time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),-- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategyWATERMARK FOR time_ltz AS time_ltz - INTERVAL 5 SECOND
) WITH (...
);SELECT TUMBLE_START(time_ltz, INTERVAL 10 MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL 10 MINUTE);Epoch Time 是一种计算机系统中常用的时间表示方法它以秒为单位从一个特定时间点通常是1970年1月1日午夜UTC开始计算时间用于在计算机系统中跟踪和比较时间戳。 2. 在 DataStream 到 Table 转换时定义
ing
3. 使用 TableSource 定义
ing
五. 小结
本文讨论了flink sql中时间属性的指定方法其中有几点细节 普通的时间戳无法用在时间相关的操作中需要进行时间属性的定义通过PROCTIME()或WATERMARK关键字可以在create语句中分别定义处理时间和事件时间类型的时间属性时间属性定义好后就可以像普通列一样使用也可以在时间相关的操作中使用一旦时间属性被用在了计算中它就会被物化进而变成一个普通的时间戳。也就无法进行时间相关操作。 参考 https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/concepts/time_attributes/#%E5%9C%A8-ddl-%E4%B8%AD%E5%AE%9A%E4%B9%89