永久免费建站系统,浏览器主页网址,软件开发网站开发副业,哎呀哎呀视频在线观看offset配置:
flinkKafkaConsumer.setStartFromEarliest():从topic的最早offset位置开始处理数据#xff0c;如果kafka中保存有消费者组的消费位置将被忽略。 flinkKafkaConsumer.setStartFromLatest():从topic的最新offset位置开始处理数据#xff0c;如果kafka中保存有消费…offset配置:
flinkKafkaConsumer.setStartFromEarliest():从topic的最早offset位置开始处理数据如果kafka中保存有消费者组的消费位置将被忽略。 flinkKafkaConsumer.setStartFromLatest():从topic的最新offset位置开始处理数据如果kafka中保存有消费者组的消费位置将被忽略。 flinkKafkaConsumer.setStartFromTimestamp(…):从指定的时间戳毫秒开始消费数据Kafka中每个分区中数据大于等于设置的时间戳的数据位置将被当做开始消费的位置。如果kafka中保存有消费者组的消费位置将被忽略。 flinkKafkaConsumer.setStartFromGroupOffsets():默认的设置。根据代码中设置的group.id设置的消费者组去kafka中或者zookeeper中找到对应的消费者offset位置消费数据。如果没有找到对应的消费者组的位置那么将按照auto.offset.reset设置的策略读取offset。
消费者offset提交配置
配置offset的提交方式取决于是否为job设置开启checkpoint。可以使用env.enableCheckpointing(5000)来设置开启checkpoint。5000单位毫秒,代表每5秒进行依次checkpoint
关闭checkpoint如何禁用了checkpoint那么offset位置的提交取决于Flink读取kafka客户端的配置enable.auto.commit ( auto.commit.enable【Kafka 0.8】)配置是否开启自动提交offset, auto.commit.interval.ms决定自动提交offset的周期。 开启checkpoint如果开启了checkpoint那么当checkpoint保存状态完成后将checkpoint中保存的offset位置提交到kafka。这样保证了Kafka中保存的offset和checkpoint中保存的offset一致可以通过配置setCommitOffsetsOnCheckpoints(boolean)来配置是否将checkpoint中的offset提交到kafka中默认是true。如果使用这种方式那么properties中配置的kafka offset自动提交参数enable.auto.commit和周期提交参数auto.commit.interval.ms参数将被忽略。 总结:Flink提供了消费kafka数据的offset如何提交给Kafka或者zookeeper(kafka0.8之前,因为0.8之前offset是维护在zookeeper中的)的配置 ;关闭checkpoint的话,flink消费kafka数据 offset取决于kafka客户端的配置;开启checkpoint的话,flink消费kafka offset由jobmanager中的checkpoint维护,并同步到kafka中保持一置,注意Flink并不依赖提交给Kafka或者zookeeper中的offset来保证容错。提交的offset只是为了外部来查询监视kafka数据消费的情况。
使用checkpoint 两阶段提交来保证仅消费一次kafka中的数据 Flink checkpoint机制: 这种机制是在Flink应用内部实现仅一次处理数据的基础。
当谈及“exactly-once semantics”仅一次处理数据时指的是每条数据只会影响最终结果一次。Flink可以保证当机器出现故障或者程序出现错误时也没有重复的数据或者未被处理的数据出现实现仅一次处理的语义。
checkpoint中包含
当前应用的状态; 当前消费流数据的位置; 注意:checkpoint机制仅限于Flink架构内部保证仅一次处理数据;
使用两阶段提交协议保证flink连接外部系统数据仅一次处理
Flink checkpoint机制: 这种机制是在Flink应用内部实现仅一次处理数据的基础。
当谈及“exactly-once semantics”仅一次处理数据时指的是每条数据只会影响最终结果一次。Flink可以保证当机器出现故障或者程序出现错误时也没有重复的数据或者未被处理的数据出现实现仅一次处理的语义。
checkpoint中包含
当前应用的状态; 当前消费流数据的位置; 注意:checkpoint机制仅限于Flink架构内部保证仅一次处理数据;
使用两阶段提交协议保证flink连接外部系统数据仅一次处理; 当Flink处理完的数据需要写入外部系统时不保证仅一次处理数据。为了提供端到端的仅一次处理数据在将数据写入外部系统时也要保证仅一次处理数据这些外部系统必须提供一种手段来允许程序提交或者回滚写入操作同时还要保证与Flink的checkpoint机制协调使用,在分布式系统中协调提交和回滚的常见方法就是两阶段提交协议。下面给出一个实例了解Flink如何使用两阶段提交协议来实现数据仅一次处理语义。
该实例是从kafka中读取数据经过处理数据之后将结果再写回kafka。kafka0.11版本之后支持事务这也是Flink与kafka交互时仅一次处理的必要条件。【注意当Flink处理完的数据写入kafka时即当sink为kafka时自动封装了两阶段提交协议】。Flink支持仅一次处理数据不仅仅限于和Kafka的结合只要sink提供了必要的两阶段协调实现可以对任何sink都能实现仅一次处理数据语义。
其原理如下
上图Flink程序包含以下组件
一个从kafka中读取数据的source 一个窗口聚合操作 一个将结果写往kafka的sink。 要使sink支持仅一次处理数据语义必须以事务的方式将数据写往kafka,将两次checkpoint之间的操作当做一个事务提交确保出现故障时操作能够被回滚。假设出现故障在分布式多并发执行sink的应用程序中仅仅执行单次提交或回滚事务是不够的因为分布式中的各个sink程序都必须对这些提交或者回滚达成共识这样才能保证两次checkpoint之间的数据得到一个一致性的结果。Flink使用两阶段提交协议(pre-commitcommit)来实现这个问题。
Filnk checkpointing开始时就进入到pre-commit阶段具体来说一旦checkpoint开始Flink的JobManager向输入流中写入一个checkpoint barrier将流中所有消息分隔成属于本次checkpoint的消息以及属于下次checkpoint的消息barrier也会在操作算子间流转对于每个operator来说该barrier会触发operator的State Backend来为当前的operator来打快照。如下图示 Flink DataSource中存储着Kafka消费的offset当完成快照保存后将chechkpoint barrier传递给下一个operator。这种方式只有在Flink内部状态的场景是可行的内部状态指的是由Flink的State Backend管理状态例如上面的window的状态就是内部状态管理。只有当内部状态时pre-commit阶段无需执行额外的操作仅仅是写入一些定义好的状态变量即可checkpoint成功时Flink负责提交这些状态写入否则就不写入当前状态。
但是一旦operator操作包含外部状态事情就不一样了。我们不能像处理内部状态一样处理外部状态因为外部状态涉及到与外部系统的交互。这种情况下外部系统必须要支持可以与两阶段提交协议绑定的事务才能保证仅一次处理数据。
本例中的data sink是将数据写往kafka因为写往kafka是有外部状态的这种情况下pre-commit阶段下data sink 在保存状态到State Backend的同时还必须pre-commit外部的事务。如下图
当checkpoint barrier在所有的operator都传递一遍切对应的快照都成功完成之后pre-commit阶段才算完成。这个过程中所有创建的快照都被视为checkpoint的一部分checkpoint中保存着整个应用的全局状态当然也包含pre-commit阶段提交的外部状态。当程序出现崩溃时我们可以回滚状态到最新已经完成快照的时间点。
下一步就是通知所有的operator告诉它们checkpoint已经完成这便是两阶段提交的第二个阶段commit阶段。这个阶段中JobManager会为应用中的每个operator发起checkpoint已经完成的回调逻辑。本例中DataSource和Winow操作都没有外部状态因此在该阶段这两个operator无需执行任何逻辑但是Data Sink是有外部状态的因此此时我们需要提交外部事务。如下图示
汇总以上信息总结得出
一旦所有的operator完成各自的pre-commit,他们会发起一个commit操作。 如果一个operator的pre-commit失败所有其他的operator 的pre-commit必须被终止并且Flink会回滚到最近成功完成的checkpoint位置。 一旦pre-commit完成必须要确保commit也要成功内部的operator和外部的系统都要对此进行保证。假设commit失败【网络故障原因】Flink程序就会崩溃然后根据用户重启策略执行重启逻辑重启之后会再次commit。 因此所有的operator必须对checkpoint最终结果达成共识即所有的operator都必须认定数据提交要么成功执行要么被终止然后回滚。
参考https://blog.51cto.com/u_16213620/7923389