php网站开发怎么样,长沙seo推广营销,dz地方门户网站制作,网站建设的工作职责是什么Flink系列知识之#xff1a;Checkpoint原理
在介绍checkpoint的执行流程之前#xff0c;需要先明白Flink中状态的存储机制#xff0c;因为状态对于检查点的持续备份至关重要。
State Backends分类
下图显示了Flink中三个内置的状态存储种类。MemoryStateBackend和FsState…Flink系列知识之Checkpoint原理
在介绍checkpoint的执行流程之前需要先明白Flink中状态的存储机制因为状态对于检查点的持续备份至关重要。
State Backends分类
下图显示了Flink中三个内置的状态存储种类。MemoryStateBackend和FsStateBackend在运行时存储在Java堆中。FsStateBackend仅在执行检查点时才以文件的形式持久地将数据保存到远程存储。RocksDBStateBackend使用RocksDB(一种LSM数据库结合了内存和磁盘)来存储状态。
当使用基于堆的 state backend 保存状态时访问和更新涉及在堆上读写对象。但是对于保存在 RocksDBStateBackend 中的对象访问和更新涉及序列化和反序列化所以会有更大的开销。但 RocksDB 的状态量仅受本地磁盘大小的限制。还要注意只有 RocksDBStateBackend 能够进行增量快照这对于具有大量变化缓慢状态的应用程序来说是大有裨益的。
下面是执行HeapKeyedStateBackend的方法:
支持异步检查点(默认):存储格式为CopyOnWriteStateMap。仅支持同步检查点:存储格式为NestedStateMap。 当在MemoryStateBackend中使用HeapKeyedStateBackend时默认情况下基于检查点的数据序列化的最大数据量为5mb。 对于RocksDBKeyedStateBackend每个状态都存储在一个单独的列族中。keyGroup、Key和Namespace被序列化并以键的形式存储在数据库中。
对于RocksDBKeyedStateBackend每个状态都存储在一个单独的列族中。keyGroup、Key和Namespace被序列化并以键的形式存储在数据库中。
checkpoint执行流程
Flink容错机制的核心部分是绘制分布式数据流和算子状态的一致快照。这些快照充当一致的检查点系统可以在发生故障时退回到这些检查点。它受到用于分布式快照的标准Chandy-Lamport算法的启发并专门针对Flink的执行模型进行了定制。
自Flink 1.11以来检查点可以在对齐或不对齐的情况下进行。在本节中我们首先描述对齐的检查点。
Checkpoint barrier
Flink分布式快照的一个核心元素是stream barrier。这些barrier会被注入到数据流中并作为数据流的一部分与记录一起流动。当 Flink 作业设置了检查点时Flink 会在数据流中插入这些特殊记录以确保在特定点上所有算子的状态都被一致地保存。barrier永远不会超过记录它们严格地按顺序流动。barrier将数据流中的记录分隔为进入当前快照的记录集和进入下一个快照的记录集相当于将连续的数据流切分为多个有限序列对应多个 Checkpoint 周期。每个barrier都携带着包含了在它前面的记录的快照的ID。barrier不会中断数据流因此非常轻巧。来自不同快照的多个barrier可以同时在数据流中这意味着多种快照可能并发发生。整个过程是由 Flink 的执行引擎在运行时负责处理的通过协调不同操作符之间的信号和状态来实现数据流中的 checkpoint barrier 插入。
Stream barrier首先会被注入到source流的并行数据流中。快照n的barrier被注入的点(我们称之为Sn)是source源流中快照所能覆盖的数据的位置。例如在Apache Kafka中这个位置将是分区中拉取数据的偏移量。这个插入点Sn会被报告给检查点协调器(Flink的JobManager)。
当中间操作符从其所有输入流接收到快照n的barrier时它会开始执行快照并将状态写入到State backend中然后会将快照n的barrier继续向下游流动发送到其所有传出流中。一旦sink操作符(流DAG的末端)从其所有输入流接收到barrier n它就向检查点协调器确认快照n。在所有sink算子都确认了快照之后就认为快照已经完成。
一旦快照n完成作业就不会再向source算子请求Sn之前的记录因为此时这些记录已经完整地流过了整个DAG数据拓扑。
checkpoint alignment
Checkpoint alignment 机制是 Apache Flink 中用于确保分布式检查点一致性的一种机制。对于接收多个输入流的算子需要在快照barrier上对齐输入流。如下图所示:
一旦算子从某个输入流通道中接收到快照barrier n它就不能处理来自该流的任何一条记录阻塞直到它从其他所有输入流通道中都接收到barrier n。因为如果不阻塞的话算子状态将会混合属于快照n的记录和属于快照n1的记录。在对齐的过程中算子只会继续处理来自未出现 Barrier Channel 的数据而其余 Channel 的数据会被写入输入队列直至在队列满后被阻塞。当从最后一个输入流通道中接收到barrier n时算子开始执行快照异步地将状态写入到State Backend中然后将barrier n继续向下游所有输出通道流动。
比起其他分布式快照该算法的优势在于辅以 Copy-On-Write 技术的情况下不需要 “Stop The World” 影响应用吞吐量同时基本不用持久化处理中的数据只用保存进程的状态信息大大减小了快照的大小。
需要注意的是对于具有多个输入流的操作符算子以及在shuffle后接收多个上游子任务输出流的操作符算子都需要对齐。
checkpoint执行流程
上面介绍完checkpoint的相关原理后本节尝试逐步解释执行检查点的过程。如下图所示左侧为checkpoint coordinator中间为Flink job(由两个源节点和一个汇聚节点组成)右侧为persistent storage(大多数场景下由HDFS提供)。
Step 1) Checkpoint coordinator触发checkpoint执行信号到所有输入流操作符算子中。
Step 2) 源节点向下游广播一个checkpoint barrier。该checkpoint barrier是Chandy-Lamport分布式快照算法的核心。下游任务只有在接收到所有输入流通道的barrier后才执行checkpoint操作。
Step 3) 源操作符算子完成state状态备份后向checkpoint coordinator检查点协调器发送备份数据地址即状态句柄。同时barrier继续流向下游。 这里分为同步和异步如果开启的话两个阶段
同步阶段task执行状态快照并写入外部存储系统根据状态后端的选择不同有所区别执行快照的过程
对 state 做深拷贝将写操作封装在异步的 FutureTask 中FutureTask 的作用包括1打开输入流2写入状态的元数据信息3写入状态4关闭输入流
⠀异步阶段
执行同步阶段创建的 FutureTask向 Checkpoint Coordinator 发送 ACK 响应
Step 4) 当下游sink节点接收到上游两个输入通道的barrier后开始执行本地快照。下图演示了执行RocksDB增量检查点的过程。RocksDB将全部数据刷新到磁盘如红色三角形所示。然后Flink为未上传的文件实现持久备份如紫色三角形所示。
Step 5) 在执行完sink操作符算子的检查点之后sink操作符算子将状态句柄state handle返回给checkpoint coordinator检查点协调器。
Step 6) 当接收到所有任务算子的状态句柄state handle后checkpoint coordinator确认全局的checkpoint已经完成然后将checkpoint元文件备份到持久化存储中。
Unaligned Checkpoint
上述对齐的chekcpoint基于Chandy-Lamport算法实现了分布式系统下的数据一致性快照。通过上面的原理可以看出该方案在操作符算子具有多个输入流通道时需要阻塞地等待所有输入通道的barrier都到达后才会开始执行快照。这在大多数情况下是没有问题的但当某个输入流通道比其他输入流通道的数据流动更慢时比如出现了反压、数据倾斜问题。会导致快照的完成时间变长甚至超时。其次这种方案来说Barrier对齐的过程本身就可能成为一个反压的源头影响上游算法的效率而这在某些情况下是不必要的。
为了解决这个问题Flink在1.11版本中引入了Unaligned Checkpoint的特性。其基本思想是只要输入通道中的的数据能成为操作符算子状态的一部分那么checkpoint barrier就可以超越所有输入/输出通道中的数据。 Checkpointing can also be performed unaligned. The basic idea is that checkpoints can overtake all in-flight data as long as the in-flight data becomes part of the operator state. 如何来理解呢
在上面对齐的checkpoint的原理介绍中可以发现快照只包含了操作符算子的状态而不关心输入/输出通道的数据记录。这是因为barrier对齐的checkpoint将本地快照延迟至所有barrier到达也就是说当执行快照时属于当前checkpoint周期内的数据记录都已经对该算子状态产生了影响因而不必关心输入队列的剩余数据同时输出队列又携带着barrier继续流向下一个算子的输入队列因而输出队列的数据也不必关心从而巧妙地避免了对算子输入/输出队列的状态进行快照。
但实际上这和Chandy-Lamport算法是有一定出入的。举个例子假设我们对两个数据流进行 equal-join输出匹配上的元素。按照 Flink Aligned Checkpoint 的方式系统的状态变化如下图中不同颜色的元素代表属于不同的 Checkpoint 周期:
图 a: 输入 Channel 1 存在 3 个元素其中 2 在 Barrier 前面Channel 2 存在 4 个元素其中 2、9、7 在 Barrier 前面。图 b: 算子分别读取 Channel 一个元素输出 2。随后接收到 Channel 1 的 Barrier停止处理 Channel 1 后续的数据只处理 Channel 2 的数据。图 c: 算子再消费 2 个自 Channel 2 的元素接收到 Barrier开始本地快照并输出 Barrier。
对于相同的情况Chandy-Lamport 算法的状态变化如下:
图 a: 同上。图 b: 算子分别处理两个 Channel 一个元素输出结果 2。此后接收到 Channel 1 的 Barrier算子开始本地快照记录自己的状态并输出 Barrier。图 c: 算子继续正常处理两个 Channel 的输入输出 9。特别的地方是 Channel 2 后续元素会被保存下来直到 Channel 2 的 Barrier 出现即 Channel 2 的 9 和 7。保存的数据会作为 Channel 的状态成为快照的一部分。
两者的差异主要可以总结为两点:
快照的触发是在接收到第一个 Barrier 时还是在接收到最后一个 Barrier 时。是否需要阻塞已经接收到 Barrier 的 Channel 的计算。
从这两点来看新的 Unaligned Checkpoint 将快照的触发改为第一个 Barrier 且取消阻塞 Channel 的计算算法上与 Chandy-Lamport 基本一致同时在实现细节方面结合 Flink 的定位做了几个改进。
首先不同于 Chandy-Lamport 模型的只需要考虑算子输入 Channel 的状态Flink 的算子有输入和输出两种 Channel在快照时两者的状态都需要被考虑。
其次无论在 Chandy-Lamport 还是 Flink Aligned Checkpoint 算法中Barrier 都必须遵循其在数据流中的位置算子需要等待 Barrier 被实际处理才开始快照。而 Unaligned Checkpoint 改变了这个设定允许算子优先摄入并优先输出 Barrier。 如此一来第一个到达 Barrier 会在算子的缓存数据队列包括输入 Channel 和输出 Channel中往前跳跃一段距离而被”插队”的数据和其他输入 Channel 在其 Barrier 之前的数据会被写入快照中图中绿色部分。 上图描述了算子是如何处理非对齐的checkpoint barriers的
当输入队列中接收到第一个chekcpoint barrier时算子即开始执行相应处理。它会立即将该barrier跳过前面的输入队列并将其插入到输出队列的尾部。算子在执行快照时会把所有标记了跳过的数据记录图中绿色部分并将其一并写入到算子状态中。
此时算子只需短暂停止处理输入队列以标记缓冲区、转发barrier并创建其状态的快照。
这样的主要好处是如果本身算子的处理就是瓶颈Chandy-Lamport 的 Barrier 仍会被阻塞因为Chandy-Lamport仍然要等到第一个barrier到达算子时才开始触发快照执行如果算子的处理本身比较慢数据的流动仍然会很慢但 Unaligned Checkpoint 则可以在 Barrier 进入输入 Channel 就马上开始快照。 这可以从很大程度上加快 Barrier 流经整个 DAG 的速度从而降低 Checkpoint 整体时长。
回到之前的例子用 Unaligned Checkpoint 来实现状态变化如下:
图 a: 输入 Channel 1 存在 3 个元素其中 2 在 Barrier 前面Channel 2 存在 4 个元素其中 2、9、7 在 Barrier 前面。输出 Channel 已存在结果数据 1。图 b: 算子优先处理输入 Channel 1 的 Barrier开始本地快照记录自己的状态并将 Barrier 插到输出 Channel 末端。图 c: 算子继续正常处理两个 Channel 的输入输出 2、9。同时算子会将 Barrier 越过的数据即输入 Channel 1 的 2 和输出 Channel 的 1写入 Checkpoint并将输入 Channel 2 后续早于 Barrier 的数据即 2、9、7持续写入 Checkpoint。
比起Aligned Checkpoint中不同Checkpoint周期的数据以算子快照为界限分隔得很清晰Unaligned Checkpoint进行快照和输出Barrier时部分本属于当前Checkpoint的输入数据还未计算因此未反映到当前算子状态中而部分属于当前Checkpoint的输出数据却落到Barrier之后因此未反映到下游算子的状态中。这也正是Unaligned的含义不同Checkpoint周期的数据没有对齐包括不同输入Channel之间的不对齐以及输入和输出间的不对齐。而这部分不对齐的数据会被快照记录下来以在恢复状态时重放。换句话说从Checkpoint恢复时不对齐的数据并不能由Source端重放的数据计算得出同时也没有反应到算子状态中但因为它们会被Checkpoint恢复到对应Channel中所以依然能够提供只计算一次的准确结果。
Unaligned Checkpoint方案确保barrier可以尽可能快地在数据流中移动。它特别适合至少有一个缓慢移动的数据输入队列的应用其对齐时间可能达到几个小时。但是由于它增加了额外的I/O压力所以当应用写入State Backend的I/O本身就是瓶颈时非对齐Checkpoint方案并不会有明显帮助。
Exactly Once vs. At Least Once
为了实现EXACTLY ONCE的语义Flink使用了输入缓存队列来缓存在对齐过程中队列中传入的数据。同时我们经过上面Checkpoint原理介绍也能清晰地知道使用对齐的方式来执行快照是能够实现EXACTLY ONCE的语义的。 需要注意的是这里的EXACTLY ONCE语义并不意味着每个事件将被精确地处理一次而是意味着每个事件只会影响Flink算子状态一次。同时EXACTLY ONCE语义并不能实现端到端的数据EXACTLY ONCE如果需要实现端到端的EXACTLY ONCE语义需要sink算子能够实现写入的幂等和事务性。
通常在checkpoint过程中额外的对齐时间延迟大约是几毫秒但也可能会有一些异常值的延迟明显增加的情况。对于所有记录都需要超低延迟(几毫秒)的应用程序Flink有一个开关可以在检查点期间跳过对齐步骤。此时当算子接收到每个输入队列的checkpoint barrier时不会阻塞会继续处理barrier之后的数据记录。这就可能会导致本属于下一个checkpoint周期的数据记录影响了当前checkpoint周期的算子状态从而导致恢复时数据重复消费的情况因此这种模式下只能保证At Least Once语义。
Checkpoint Exactly Once和At Least Once语义配置
// 启用 Checkpoint 每 5 秒 一次模式为 EXACTLY_ONCE
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);// 启用 Checkpoint 每 5 秒 一次模式为 At Least Once
env.enableCheckpointing(5000, CheckpointingMode.AT_LEAST_ONCE);另外Aligned过程只发生在具有多个输入队列(连接)的算子以及具有多个输出队列的算子比如在重新分区/shuffle之后)。正因为如此只有单并行度的操作算子(map() flatMap() filter()…)的数据流实际上及时被设置为At Least Once语义也能实现Exactly once语义实际上就是单输入流的算子不需要barrier对齐。
参考
Flink 1.11 Unaligned Checkpoint 解析 Stateful Stream Processing Flink Checkpoints Principles and Practices: Flink Advanced Tutorials