昆明网站优化推广平台,如何用阿里云做网站,做海报找素材网站,谷歌搜索引擎入口2021Flink的三种时间
在谈watermark之前#xff0c;首先需要了解flink的三种时间概念。在flink中#xff0c;有三种时间戳概念#xff1a;Event Time 、Processing Time 和 Ingestion Time。其中watermark只对Event Time类型的时间戳有用。这三种时间概念分别表示#xff1a; …Flink的三种时间
在谈watermark之前首先需要了解flink的三种时间概念。在flink中有三种时间戳概念Event Time 、Processing Time 和 Ingestion Time。其中watermark只对Event Time类型的时间戳有用。这三种时间概念分别表示
Processing time
处理时间指执行算子操作的机器的当前时间。当基于处理时间运行时所有关于时间的操作如时间窗口都将使用执行算子操作的主机的本地时间。例如当时间窗口为一小时如果应用程序在9:15 am开始运行则第一个窗口将包括在9:15 am到10:00 am之间被处理的事件下一个窗口将包含在10:00 am到11:00 am之间被处理的事件依此类推。
处理时间是最简单的时间概念不需要流和机器之间的协调。它提供了最佳的性能和最低的延迟。但是在分布式和异步环境中处理时间不能提供确定性因为它容易受到上流系统例如从消息队列到达Flink的速度、flink内部operators之间交互的速度以及中断调度或其他情况等因素的影响。
Event Time
事件时间是每个event在其生产设备上产生的时间即元素在到达flink之前本身就自带的时间戳。
所以说Event Time的时间戳取决于数据而与其他时间无关。使用Event Time必须在从执行环境中先引入EventTime的时间属性。如 java
复制代码
val env StreamExecutionEnvironment.getExecutionEnvironment // 从调用时刻开始给env创建的每一个stream追加时间特征 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
然后通过Dstream的assignTimestampsAndWatermarks方法指定event time时间戳具体操作不做赘述。
在理想情况下事件时间是有序的。但实际上由于分布式操作以及网络延迟等原因event可能不是按照event time的顺序到达的。所以flink对处理乱序数据的方案是提供一个允许延迟时间在允许延迟时间内到达的元素将会重新触发一次计算。这个延迟时间时相对event time而不是其他时间的而event time不是由flink决定的那么如何判断当前的event time到底时多少呢flink通过一个watermark来确定与维护当前event time的最大值。这也是本文将会在后面重点解释的。
Ingestion time
Ingestion time是event进入Flink的时间即执行source操作时的时间。
Ingestion time从概念上讲介于Event Time和Processing time之间。
与Processing time相比 它花费的资源会稍微多一些但结果却更可预测。由于 Ingestion time使用稳定的时间戳仅在addSource处分配了一次因此对记录的不同窗口操作将引用相同的时间戳而在Processing time中每个窗口操作都会更新事件的Processing time所以可能一个上游窗口中的记录会分配给不同的下游窗口基于本地系统时钟和任何可能的延误。
与Event Time相比Ingestion time程序无法处理任何乱序事件或迟到的数据但是程序不必指定如何生成watermarks。
下图为三种时间语义的图解 watermark
用我自己的语言总结在flink的窗口计算中的watermark就是触发窗口计算的一种机制。 那么watermark到底是以怎样的一种形式存在的呢实际上watermark就是一种特殊的event它被参杂在Dstream中watermark由flink的某个操作生成后就在整个程序中随event一同流转如下图所示 以下是watermark的代码可以看出watermark的就是一个流元素仅包含一个时间戳属性 java
复制代码
public final class Watermark extends StreamElement { /** The watermark that signifies end-of-event-time. */ public static final Watermark MAX_WATERMARK new Watermark(Long.MAX_VALUE); // ------------------------------------------------------------------------ /** The timestamp of the watermark in milliseconds. */ private final long timestamp; /** * Creates a new watermark with the given timestamp in milliseconds. */ public Watermark(long timestamp) { this.timestamp timestamp; }
watermark的窗口触发机制
watermark会根据数据流中event的时间戳发生变化。通常情况下event都是乱序的不按时间排序的。watermark的计算逻辑为当前最大的 event time - 最大允许延迟时间MaxOutOfOrderness。在同一个分区内部当watermark大于或者等于窗口的结束时间时才能触发该窗口的计算即watermarkwindows endtime。如下图所示 根据上图分析 MaxOutOfOrderness 5s窗口的大小为10s。 watermark分别为12:08、12:15、12:30 计算逻辑为WM(12:08)12:13 - 5sWM(12:15)12:20 - 5sWM(12:30)12:35 - 5s
对于 [12:00,12:10 窗口需要在WM12:15时才能被触发计算参与计算的event为event(12:07)/event(12:01)/event(12:07)/event(12:09)event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:20)/event(12:14)/event(12:15)不参与计算因为还未到窗口时间也就是event time 为 [12:00,12:10] 窗口内的event才能参与计算。 注意如果过了这个窗口期再收到 [12:00,12:10] 窗口内的event就算超过了最大允许延迟时间MaxOutOfOrderness不会再参与计算也就是数据被强制丢掉了。对于 [12:10,12:20] 和 [12:20,12:30] 窗口会在WM12:30时被同时触发计算参与**[12:10,12:20]** 窗口计算的event为event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:14)/event(12:15)/event(12:15)/event(12:18)参与 [12:20,12:30] 窗口计算的event为event(12:20)/event(12:20)在这个过程中event(12:05)会被丢弃不会参与计算因为已经超了最大允许延迟时间MaxOutOfOrderness
迟到的事件
在介绍watermark时提到了现实中往往处理的是乱序event即当event处于某些原因而延后到达时往往会发生该event time watermark的情况所以flink对处理乱序event的watermark有一个允许延迟的机制这个机制就是最大允许延迟时间MaxOutOfOrderness允许在一定时间内迟到的event仍然视为有效event。
并行流的Watermarks
watermark可以在source处生成也可以在source后通过其他算子生成如map、filter等如果source有多个并行度那么每个并行度会单独生成一个watermark这些watermark定义了各分区的event time。 当并行度发生变化即上游的一个分区可能被下游多个分区使用时每个分区的watermark是会广播至下游的每个分区的如一些聚合多个流的操作如 keyBy(…) 或者partition(…)此类操作的watermark是在所有输入流中取最小的watermark。当带有watermark的流通过此类算子时会根据每个分区的watermark来更新watermark。
举个例子当上游并行度数为4下游的某个分区的窗口中的watermark如下 当已到达的watermark分别为2、4、3、6时窗口中的watermark为2触发watermark为2的对应窗口计算并将watermark2广播至下游。 当第一个窗口的watermark被更新为4时所有分区中已到达最小的watermark是3则将窗口的watermark更新为3触发对应窗口的计算并将watermark3广播至下游。 当第二个分区的watermark被更新为7所有分区中已到达最小的watermark还是3不做处理。 当第三个分区的watermark被更新为6所有分区中已到达最小的watermark是4则将窗口的watermark更新为4触发对应窗口的计算并将watermark4广播至下游。
下图显示了event和watermark在一个并行流的示例以及算子如何跟踪事件时间的 watermark分配器
当watermark完全基于event time时如果没有元素到达则watermark不会被更新这就说明当一段时间没有元素到达则在这个时间间隙内watermark不会增加那么也不会触发窗口计算。显然如果这段时间很长的话那么该窗口中已经到达的元素将会等待很久才会被输出计算结果。
为了避免这种情况可以使用周期性的watermark分配器AssignerWithPeriodicWatermarks 下面马上提到这些分配器不仅仅基于event time进行分配。比如可以使用一个分配器当一段时间没有接收到新的event时则将当前时间作为watermark。
watermark的两种分配器flink生成watermark有两种机制
AssignerWithPeriodicWatermarks 分配时间戳并定期生成watermark可以取决于event time或基于处理时间。AssignerWithPunctuatedWatermarks分配时间戳并根据每一个元素生成watermark每来一个元素都进行一次判断相更消耗性能
通常情况下会使用第一种机制原因除了更节省性能外在上面的分配器中也有提到。
下面分别对两种机制进行介绍。
AssignerWithPeriodicWatermarks
对每个元素都调用extractTimestamp方法获取时间戳并维护一个最大时间戳。通过ExecutionConfig.setAutoWatermarkInterval(...)定义生成watermark的间隔每n毫秒 。根据这个间隔周期性调用分配器的getCurrentWatermark()方法为watermark分配值。
在flink自带的BoundedOutOfOrdernessGenerator分配器中 getCurrentWatermark是定期将当前watermark更新为最大时间戳减去允许延迟时间的值。
以下是两个使用AssignerWithPeriodicWatermarks 生成的时间戳分配器的简单示例 java
复制代码
/** * This generator generates watermarks assuming that elements arrive out of order, * but only to a certain degree. The latest elements for a certain timestamp t will arrive * at most n milliseconds after the earliest elements for timestamp t. */ class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] { val maxOutOfOrderness 3500L // 3.5 seconds var currentMaxTimestamp: Long _ override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long { val timestamp element.getCreationTime() currentMaxTimestamp max(timestamp, currentMaxTimestamp) timestamp } override def getCurrentWatermark(): Watermark { // return the watermark as current highest timestamp minus the out-of-orderness bound new Watermark(currentMaxTimestamp - maxOutOfOrderness) } } /** * This generator generates watermarks that are lagging behind processing time by a fixed amount. * It assumes that elements arrive in Flink after a bounded delay. */ class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] { val maxTimeLag 5000L // 5 seconds override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long { element.getCreationTime } override def getCurrentWatermark(): Watermark { // return the watermark as current time minus the maximum time lag new Watermark(System.currentTimeMillis() - maxTimeLag) } }
AssignerWithPunctuatedWatermarks
根据每个元素的event time生成watermark通过extractTimestamp(...)方法为元素分配时间戳通过checkAndGetNextWatermark(...)检查元素的watermark并更新watermark。
checkAndGetNextWatermark(...)方法的第二个参数是extractTimestamp(...) 返回的时间戳根据这个时间戳决定是否要生成watermark。每当checkAndGetNextWatermark(...) 方法返回一个非空watermark并且该watermark大于上一个watermark时就会更新watermark。 java
复制代码
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] { override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long { element.getCreationTime } override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark { if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null } }