龙溪网站制作,wordpress回复邮箱,发稿计划怎么写,站酷设计官网前言
学习总结Flink SQL Checkpoint的使用#xff0c;主要目的是为了验证Flink SQL流式任务挂掉后#xff0c;重启时还可以继续从上次的运行状态恢复。
验证方式
Flink SQL流式增量读取Hudi表然后sink MySQL表#xff0c;任务启动后处于running状态#xff0c;先查看sin…前言
学习总结Flink SQL Checkpoint的使用主要目的是为了验证Flink SQL流式任务挂掉后重启时还可以继续从上次的运行状态恢复。
验证方式
Flink SQL流式增量读取Hudi表然后sink MySQL表任务启动后处于running状态先查看sink表有数据然后将对应的yarn kill掉再通过设置的checkpoint重启任务任务重启后验证sink表的数据量。Flink SQL流式增量读取Hudi表可以参考Flink SQL增量查询Hudi表
版本
Flink 1.14.3Hudi 0.13.0
Checkpoint 参数
一般需要设置的常用参数
-- checkpoint间隔时间单位毫秒没有默认值如果想开启checkpoint需要将该参数设置一个大于0的数值
-- 如果想提升sink性能比如写hudi需要将该值设置大一点因为间隔时间决定了批次大小
-- checkpoint间隔时间不能设置太短也不能设置太长太短影响写入性能太长影响数据及时性。
set execution.checkpointing.interval1000;
-- 保存checkpoint文件的目录
set state.checkpoints.dirhdfs:///flink/checkpoints/hudi2mysql;
-- 任务取消后保留checkpoint,默认值NO_EXTERNALIZED_CHECKPOINTS
-- 可选值NO_EXTERNALIZED_CHECKPOINTS、DELETE_ON_CANCELLATION、RETAIN_ON_CANCELLATION
set execution.checkpointing.externalized-checkpoint-retentionRETAIN_ON_CANCELLATION;从checkpoint恢复
set execution.savepoint.pathhdfs:///flink/checkpoints/hudi2mysql/255bdd01cee7486113feb1cbe8b45ee0/chk-1314;其他参数
-- checkpoint模式默认值EXACTLY_ONCE可选值EXACTLY_ONCE、AT_LEAST_ONCE
-- 要想支持EXACTLY_ONCE需要sink端支持事务
set execution.checkpointing.modeEXACTLY_ONCE;
-- checkpoint超时时间默认10分钟
set execution.checkpointing.timeout600000;
-- checkpoint文件保留数默认1
set state.checkpoints.num-retained3;Checkpoint 目录结构
/user-defined-checkpoint-dir/{job-id}| --shared/ --taskowned/ --chk-1/ --chk-2/ --chk-3/... 验证
创建Hudi和MySQL物理表
Hudi表
CREATE TABLE hudi_source (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts bigint,dt string
)
WITH (connector hudi,path /tmp/hudi_source
);MySQL表
CREATE TABLE sink_mysql (id int(11) NOT NULL,name text,price double DEFAULT NULL,ts int(11) DEFAULT NULL,dt text,insert_time timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 创建时间,update_time timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 更新时间,PRIMARY KEY (id)
) ENGINEInnoDB DEFAULT CHARSETutf8;造数
insert into hudi_source values(1,hudi1,11.1,1000,20230301);
insert into hudi_source values(2,hudi2,22.2,1000,20230301);
......流读Hudi写MySQL
hudi2mysql.sql
set yarn.application.namehudi2mysql;
set execution.checkpointing.interval1000;
set state.checkpoints.dirhdfs:///flink/checkpoints/hudi2mysql;
set execution.checkpointing.externalized-checkpoint-retentionRETAIN_ON_CANCELLATION;CREATE TABLE hudi_source_incr (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts bigint,dt string
)
WITH (connector hudi,path /tmp/hudi_source,read.streaming.enabled true, read.start-commit 202302, read.streaming.check-interval 4
);create table sink_mysql (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts bigint,dt string
) with (connector jdbc,url jdbc:mysql://192.168.44.128:3306/cdc?useSSLfalseuseUnicodetruecharacterEncodingUTF-8characterSetResultsUTF-8,username root,password password,table-name sink_mysql
);insert into sink_mysql select * from hudi_source_incr;执行上面的SQL
bin/sql-client.sh -f sql/hudi2mysql.sql这样我们启动了一个常任务在Flink界面上可以看到checkpoint的相关信息如下图显示了checkpoint具体文件地址
可以用hdfs命令看一下checkpoint路径下有哪些文件
drwxr-xr-x - hive hdfs 0 2023-03-01 14:47 hdfs:///flink/checkpoints/hudi2mysql/255bdd01cee7486113feb1cbe8b45ee0/chk-589
drwxr-xr-x - hive hdfs 0 2023-03-01 14:36 hdfs:///flink/checkpoints/hudi2mysql/255bdd01cee7486113feb1cbe8b45ee0/shared
drwxr-xr-x - hive hdfs 0 2023-03-01 14:36 hdfs:///flink/checkpoints/hudi2mysql/255bdd01cee7486113feb1cbe8b45ee0/taskowned其中255bdd01cee7486113feb1cbe8b45ee0为flink的jobid 将yarn任务kill
yarn app -kill application_1676855463066_0177再看一下发现checkpoint文件还在
hdfs:///flink/checkpoints/hudi2mysql/255bdd01cee7486113feb1cbe8b45ee0/chk-1314重启任务验证checkpoint效果
需要先在hudi2mysql.sql添加下面的配置
-- 从该checkpoint文件对应的状态恢复
set execution.savepoint.pathhdfs:///flink/checkpoints/hudi2mysql/255bdd01cee7486113feb1cbe8b45ee0/chk-1314;重启flink sql任务
bin/sql-client.sh -f sql/hudi2mysql.sql我们可以在新启动的yarn界面上看到最新的恢复点,和我们设置的一样这样代表我们设置恢复点生效
最后再造几条新的增量数据在MySQL里看验证以下数据量是否一致
insert into hudi_source values(3,hudi3,33.3,1000,20230301);MySQL数据量一致且更新时间和插入时间一致代表id1、2的数据重启时没有重复消费达到了预期效果。也可以对MySQL表不设置主键直接通过验证数据量验证效果
这样我们通过一个简单的示例了解了checkpoint的具体使用。大致过程 1、设置开启checkpoint和保存的路径 2、任务运行时会根据设置的时间间隔不断生成新的ckp文件 3、等任务挂掉后重启任务时先设置execution.savepoint.path为我们最后一次保存的ckp文件 这样就达到了任务重启时继续从上次的运行状态恢复。
Checkpoint和Hudi
流任务写hudi时必须设置checkpoint不然不会生成commit感觉像是卡住一样具体表现为只生成.commit.requested和.inflight,然后不写文件、不生成.commit也不报错对于新手来说很费劲很难找到解决方法。 大概原因是因为写文件、生成commit的动作是在coordinator里面只有当checkpoint完成后才会调用coordinator所以不设置checkpoint就不会生成commit这里的逻辑是在Hudi源码里具体没看也就是说checkpoint和生成hudi commit是绑定一起的这样才能保证流写Hudi的事务性从而保证checkpoint的EXACTLY_ONCE。
StateBackend 在启动 CheckPoint 机制时状态会随着 CheckPoint 而持久化以防止数据丢失、保障恢复时的一致性。 状态内部的存储格式、状态在 CheckPoint 时如何持久化以及持久化在哪里均取决于选择的 State Backend。 在学习Flink SQL Checkpoint时发现网上的资料有下面的这个配置本来以为这样设置后就会将checkpoint文件保存到文件系统中后来发现并不是这样。并且官网文档和源码描述的也不是很清楚所以专门研究了一下这一块
set state.backendfilesystem;从 Flink 1.13 版本开始社区改进了 state backend 的公开类进而帮助用户更好理解本地状态存储和 checkpoint 存储的区分。 这个变化并不会影响 state backend 和 checkpointing 过程的运行时实现和机制仅仅是为了更好地传达设计意图。 用户可以将现有作业迁移到新的 API同时不会损失原有 state。 旧版本的 MemoryStateBackend 等价于使用 HashMapStateBackend 和 JobManagerCheckpointStorage。 新版本的有两个参数state.backend和state.checkpoint-storage
state.backend可选参数hashmap、roksdb另外也支持filesystem弃用和jobmanager弃用官方文档并没有说明filesystem和jobmanager已经弃用
只设置state.backend
state.backendCheckpoint StorageState Backend默认JobManagerCheckpointStorageHashMapStateBackendhashmapJobManagerCheckpointStorageHashMapStateBackendfilesystem弃用JobManagerCheckpointStorageHashMapStateBackendroksdbJobManagerCheckpointStorageEmbeddedRocksDBStateBackendjobmanager 弃用MemoryStateBackend弃用MemoryStateBackend 弃用
总结对于State Backend只有HashMapStateBackend和EmbeddedRocksDBStateBackend另外还有一个弃用的MemoryStateBackend
state.checkpoint-storage可选参数jobmanager、filesystem当设置了state.checkpoints.dirflink会自动使用filesystem对应的FileSystemCheckpointStorage
只设置state.checkpoint-storage
state.checkpoint-storageCheckpoint StorageState Backend默认JobManagerCheckpointStorageHashMapStateBackendjobmanagerJobManagerCheckpointStorageHashMapStateBackendfilesystemFileSystemCheckpointStorageHashMapStateBackend设置state.checkpoints.dirFileSystemCheckpointStorageHashMapStateBackend总结对于Checkpoint Storage只有JobManagerCheckpointStorage和FileSystemCheckpointStorage另外当设置state.checkpoint-storagefilesystem时必须同时设置state.checkpoints.dir否则会有异常
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot create the file system state backend: The configuration does not specify the checkpoint directory state.checkpoints.dir其实可以不设置state.checkpoint-storage当设置了state.checkpoints.dir时Checkpoint Storage 自动使用FileSystemCheckpointStorage不设置的话就使用默认的JobManagerCheckpointStorage
一开始对于默认的JobManagerCheckpointStorage、HashMapStateBackend不是很理解不明白这样的checkpoint有啥用因为是保存到内存中不是保存到文件系统中所以任务挂掉后就没办法恢复。 后来发现这种默认保存在内存中的checkpoint可以用于flink作业失败时自动恢复而不是任务挂掉后手动恢复另外默认情况下程序取消时也不保存checkpoint 其他总结 对于flink sql读取mysql设置checkpoint恢复不生效不是flink cdccheckpoint 一个时间间隔内只有一个批次这样才能保证eos,时间间隔大小影响写入性能对于kafka2hudi的场景checkpoint时间间隔如果比较小1s会因为时间不够导致第一个批次卡住等超时默认10分钟后才会报错所以需要间隔时间设置大一点10s以上即可默认情况只有全部任务running才会生成checkpoint可以通过参数修改execution.checkpointing.checkpoints-after-tasks-finish.enabledtrue pipeline.operator-chaining
set pipeline.operator-chainingfalse;将该参数设置为false实现将多个算子拆分利于观察每个任务的运行情况。对于上面说的kafka2hudi的场景本来只是为了观察任务卡住的原因但是发现设置了该参数后任务不卡了 原因是虽然官方文档说的是将该参数设置为false后会影响性能但是我测试的kafka2hudi的场景反而提升了性能~所以不卡了不增加checkpoint时间间隔的情况 下面是我测试的结果总数据量1000万checkpoint间隔10s
pipeline.operator-chaining第一个批次的数据量第二个批次的数据量第三个批次的数据量…总用时false77427014090251552195…66strue8386108964591245142…79s