响应式网站能用dw做吗,网站运营推广方法总结,wordpress情侣主题,网站移动端和手机端分开做1. 状态的定义
在 Apache Flink 中#xff0c;状态#xff08;State#xff09; 是指在数据流处理过程中需要持久化和追踪的中间数据#xff0c;它允许 Flink 在处理事件时保持上下文信息#xff0c;从而支持复杂的流式计算任务#xff0c;如聚合、窗口计算、联接等。状…1. 状态的定义
在 Apache Flink 中状态State 是指在数据流处理过程中需要持久化和追踪的中间数据它允许 Flink 在处理事件时保持上下文信息从而支持复杂的流式计算任务如聚合、窗口计算、联接等。状态是 Flink 处理有状态操作如窗口、时间戳操作、聚合等的核心组成部分。
2. 状态的类型
Flink 提供了强大的状态管理机制允许应用程序在分布式环境中处理状态保证高可用性和容错性。Flink 的状态分为 Keyed State 和 Operator State并提供了不同的存储和恢复机制。
2.1 Keyed State按键状态
Keyed State 是基于流中每个元素的键进行管理的状态。每个键会有一个独立的状态这对于需要按照每个输入元素的唯一标识符如用户 ID、商品 ID 等维护状态的操作非常有用。Keyed State 主要用于需要对流中的每个“键”进行独立计算的场景如按用户进行会话计算、按时间窗口聚合等。
常见的 Keyed State 类型包括
ValueState存储与每个键相关联的单个值。ListState存储与每个键相关联的多个值通常用来表示一个元素列表。MapState存储与每个键相关联的键值对适用于需要维护多个关联数据的场景。ReducingState支持对每个键的值进行累加或其他聚合操作。AggregatingState可以根据给定的聚合函数对每个键的状态进行聚合。
2.2 Operator State操作符状态
Operator State 是由操作符如 Flink 中的算子管理的状态通常用于保持操作符内部的状态信息不与键相关联。它用于管理一些需要跨整个流处理作业的全局状态如窗口管理、算子内部缓冲区等。Operator State 主要用于在分布式环境中处理 算子 级别的状态尤其在对状态进行恢复时非常重要帮助 Flink 恢复作业。
常见的 Operator State 类型包括
ListState与键无关存储多个值。 UnionListState : 与键无关存储多个值, UnionListState 是 ListState 的扩展主要用于 跨多个并行实例 共享状态。在 Flink 的 流式应用程序 中如果多个并行实例需要访问和修改共享的状态通常使用 UnionListState。 BroadcastState存储和广播信息。
3. Keyed State
Keyed 状态可以看作是一个嵌入式的键值存储。该状态是与由有状态操作符读取的流一起严格地进行分区和分布的。因此只有在 Keyed 流 上才能访问键值状态也就是说只有在进行键控/分区数据交换后才能访问与当前事件的键相关联的值。将流的键与状态对齐确保了所有的状态更新都是本地操作从而在没有事务开销的情况下保证一致性。这个对齐还使得 Flink 能够透明地重新分配状态并调整流的分区。 Keyed 状态进一步组织为所谓的 Key Groups键组。Key Groups 是 Flink 重新分配 Keyed 状态的最小单位其数量与定义的最大并行度相同。在执行过程中每个并行实例的键控操作符都处理一个或多个 Key Groups 中的键。
3.1 使用 Keyed State
keyed state 接口提供不同类型状态的访问接口这些状态都作用于当前输入数据的 key 下。换句话说这些状态仅可在 KeyedStream 上使用在Java/Scala API上可以通过 stream.keyBy(...) 得到 KeyedStream在Python API上可以通过 stream.key_by(...) 得到 KeyedStream。
接下来我们会介绍不同类型的状态然后介绍如何使用他们。所有支持的状态类型如下所示 ValueStateT: 保存一个可以更新和检索的值如上所述每个值都对应到当前的输入数据的 key因此算子接收到的每个 key 都可能对应一个值。 这个值可以通过 update(T) 进行更新通过 T value() 进行检索。 ListStateT: 保存一个元素的列表。可以往这个列表中追加数据并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(ListT) 进行添加元素通过 IterableT get() 获得整个列表。还可以通过 update(ListT) 覆盖当前的列表。 ReducingStateT: 保存一个单值表示添加到状态的所有值的聚合。接口与 ListState 类似但使用 add(T) 增加元素会使用提供的 ReduceFunction 进行聚合。 AggregatingStateIN, OUT: 保留一个单值表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。 MapStateUK, UV: 维护了一个映射列表。 你可以添加键值对到状态中也可以获得反映当前所有映射的迭代器。使用 put(UKUV) 或者 putAll(MapUKUV) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries()keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。
所有类型的状态还有一个clear() 方法清除当前 key 下的状态数据也就是当前输入元素的 key。
请牢记这些状态对象仅用于与状态交互。状态本身不一定存储在内存中还可能在磁盘或其他位置。 另外需要牢记的是从状态中获取的值取决于输入元素所代表的 key。 因此在不同 key 上调用同一个接口可能得到不同的值。
你必须创建一个 StateDescriptor才能得到对应的状态句柄。 这保存了状态名称正如我们稍后将看到的你可以创建多个状态并且它们必须具有唯一的名称以便可以引用它们 状态所持有值的类型并且可能包含用户指定的函数例如ReduceFunction。 根据不同的状态类型可以创建ValueStateDescriptorListStateDescriptor AggregatingStateDescriptor, ReducingStateDescriptor 或 MapStateDescriptor。
状态通过 RuntimeContext 进行访问因此只能在 rich functions 中使用。RichFunction 中 RuntimeContext 提供如下方法
ValueStateT getState(ValueStateDescriptorT)ReducingStateT getReducingState(ReducingStateDescriptorT)ListStateT getListState(ListStateDescriptorT)AggregatingStateIN, OUT getAggregatingState(AggregatingStateDescriptorIN, ACC, OUT)MapStateUK, UV getMapState(MapStateDescriptorUK, UV)
下面是一个 FlatMapFunction 的例子展示了如何将这些部分组合起来
public class CountWindowAverage extends RichFlatMapFunctionTuple2Long, Long, Tuple2Long, Long {/*** The ValueState handle. The first field is the count, the second field a running sum.*/private transient ValueStateTuple2Long, Long sum;Overridepublic void flatMap(Tuple2Long, Long input, CollectorTuple2Long, Long out) throws Exception {// access the state valueTuple2Long, Long currentSum sum.value();// update the countcurrentSum.f0 1;// add the second field of the input valuecurrentSum.f1 input.f1;// update the statesum.update(currentSum);// if the count reaches 2, emit the average and clear the stateif (currentSum.f0 2) {out.collect(new Tuple2(input.f0, currentSum.f1 / currentSum.f0));sum.clear();}}Overridepublic void open(OpenContext ctx) {ValueStateDescriptorTuple2Long, Long descriptor new ValueStateDescriptor(average, // the state nameTypeInformation.of(new TypeHintTuple2Long, Long() {}), // type informationTuple2.of(0L, 0L)); // default value of the state, if nothing was setsum getRuntimeContext().getState(descriptor);}
}// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)).keyBy(value - value.f0).flatMap(new CountWindowAverage()).print();// the printed output will be (1,4) and (1,5)
3.2 状态有效期 (TTL)
任何类型的 keyed state 都可以有 有效期 (TTL)。如果配置了 TTL 且状态值已过期则会尽最大可能清除对应的值所有状态类型都支持单元素的 TTL。 这意味着列表元素和映射元素将独立到期。在使用状态 TTL 前需要先构建一个StateTtlConfig 配置对象。 然后把配置传递到 state descriptor 中启用 TTL 功能
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import java.time.Duration;StateTtlConfig ttlConfig StateTtlConfig.newBuilder(Duration.ofSeconds(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();ValueStateDescriptorString stateDescriptor new ValueStateDescriptor(text state, String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
TTL 配置有以下几个选项 newBuilder 的第一个参数表示数据的有效期是必选项。TTL 的更新策略默认是 OnCreateAndWrite:
StateTtlConfig.UpdateType.OnCreateAndWrite - 仅在创建和写入时更StateTtlConfig.UpdateType.OnReadAndWrite - 读取时也更新
数据在过期但还未被清理时的可见性配置如下默认为 NeverReturnExpired):
StateTtlConfig.StateVisibility.NeverReturnExpired - 不返回过期数据StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 会返回过期但未清理的数据
NeverReturnExpired 情况下过期数据就像不存在一样不管是否被物理删除。这对于不能访问过期数据的场景下非常有用比如敏感数据。 ReturnExpiredIfNotCleanedUp 在数据被物理删除前都会返回。
注意:
状态上次的修改时间会和数据一起保存在 state backend 中因此开启该特性会增加状态数据的存储。 Heap state backend 会额外存储一个包括用户状态以及时间戳的 Java 对象RocksDB state backend 会在每个状态值list 或者 map 的每个元素序列化后增加 8 个字节。暂时只支持基于 processing time 的 TTL。尝试从 checkpoint/savepoint 进行恢复时TTL 的状态是否开启必须和之前保持一致否则会遇到 “StateMigrationException”。TTL 的配置并不会保存在 checkpoint/savepoint 中仅对当前 Job 有效。不建议checkpoint恢复前后将state TTL从短调长这可能会产生潜在的数据错误。当前开启 TTL 的 map state 仅在用户值序列化器支持 null 的情况下才支持用户值为 null。如果用户值序列化器不支持 null 可以用 NullableSerializer 包装一层。启用 TTL 配置后StateDescriptor 中的 defaultValue已被标记 deprecated将会失效。这个设计的目的是为了确保语义更加清晰在此基础上用户需要手动管理那些实际值为 null 或已过期的状态默认值。
3.2.1 过期数据的清理
默认情况下过期数据会在读取的时候被删除例如 ValueState#value同时会有后台线程定期清理如果 StateBackend 支持的话。可以通过 StateTtlConfig 配置关闭后台清理
import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig StateTtlConfig.newBuilder(Duration.ofSeconds(1)).disableCleanupInBackground().build();
可以按照如下所示配置更细粒度的后台清理策略。当前的实现中 HeapStateBackend 依赖增量数据清理RocksDBStateBackend 利用压缩过滤器进行后台清理。
3.2.2 全量快照时进行清理
另外你可以启用全量快照时进行清理的策略这可以减少整个快照的大小。当前实现中不会清理本地的状态但从上次快照恢复时不会恢复那些已经删除的过期数据。 该策略可以通过 StateTtlConfig 配置进行配置
import org.apache.flink.api.common.state.StateTtlConfig;
import java.time.Duration;StateTtlConfig ttlConfig StateTtlConfig.newBuilder(Duration.ofSeconds(1)).cleanupFullSnapshot().build();这种策略在 RocksDBStateBackend 的增量 checkpoint 模式下无效。
注意:这种清理方式可以在任何时候通过 StateTtlConfig 启用或者关闭比如在从 savepoint 恢复时。
3.2.3 增量数据清理
另外可以选择增量式清理状态数据在状态访问或/和处理时进行。如果某个状态开启了该清理策略则会在存储后端保留一个所有状态的惰性全局迭代器。 每次触发增量清理时从迭代器中选择已经过期的数进行清理。该特性可以通过 StateTtlConfig 进行配置
import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig StateTtlConfig.newBuilder(Duration.ofSeconds(1)).cleanupIncrementally(10, true).build();该策略有两个参数。 第一个是每次清理时检查状态的条目数在每个状态访问时触发。第二个参数表示是否在处理每条记录时触发清理。 Heap backend 默认会检查 5 条状态并且关闭在每条记录时触发清理。
注意:
如果没有 state 访问也没有处理数据则不会清理过期数据。增量清理会增加数据处理的耗时。现在仅 Heap state backend 支持增量清除机制。在 RocksDB state backend 上启用该特性无效。如果 Heap state backend 使用同步快照方式则会保存一份所有 key 的拷贝从而防止并发修改问题因此会增加内存的使用。但异步快照则没有这个问题。对已有的作业这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性比如从 savepoint 重启后.
3.2.4 在 RocksDB 压缩时清理
如果使用 RocksDB state backend则会启用 Flink 为 RocksDB 定制的压缩过滤器。RocksDB 会周期性的对数据进行合并压缩从而减少存储空间。 Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的状态数据。该特性可以通过 StateTtlConfig 进行配置
import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig StateTtlConfig.newBuilder(Duration.ofSeconds(1)).cleanupInRocksdbCompactFilter(1000, Duration.ofHours(1)).build();Flink 处理一定条数的状态数据后会使用当前时间戳来检测 RocksDB 中的状态是否已经过期 你可以通过
StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) 方法指定处理状态的条数。 时间戳更新的越频繁状态的清理越及时但由于压缩会有调用 JNI 的开销因此会影响整体的压缩性能。 RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一次。定期压缩可以加速过期状态条目的清理特别是对于很少访问的状态条目。 比这个值早的文件将被选取进行压缩并重新写入与之前相同的 Level 中。 该功能可以确保文件定期通过压缩过滤器压缩。 您可以通过StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries, Duration periodicCompactionTime) 方法设定定期压缩的时间。 定期压缩的时间的默认值是 30 天。 您可以将其设置为 0 以关闭定期压缩或设置一个较小的值以加速过期状态条目的清理但它将会触发更多压缩。还可以通过配置开启 RocksDB 过滤器的 debug 日志 log4j.logger.org.rocksdb.FlinkCompactionFilterDEBUG
注意:
压缩时调用 TTL 过滤器会降低速度。TTL 过滤器需要解析上次访问的时间戳并对每个将参与压缩的状态进行是否过期检查。 对于集合型状态类型比如 list 和 map会对集合中每个元素进行检查。对于元素序列化后长度不固定的列表状态TTL 过滤器需要在每次 JNI 调用过程中额外调用 Flink 的 java 序列化器 从而确定下一个未过期数据的位置。对已有的作业这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性比如从 savepoint 重启后。定期压缩功能只在 TTL 启用时生效。
4. Operator State
算子状态或者非 keyed 状态是绑定到一个并行算子实例的状态。Kafka Connector 是 Flink 中使用算子状态一个很具有启发性的例子。Kafka consumer 每个并行实例维护了 topic partitions 和偏移量的 map 作为它的算子状态。当并行度改变的时候算子状态支持将状态重新分发给各并行算子实例。处理重分发过程有多种不同的方案。在典型的有状态 Flink 应用中你无需使用算子状态。它大都作为一种特殊类型的状态使用。用于实现 source/sink以及无法对 state 进行分区而没有主键的这类场景中。
4.1 广播状态 (Broadcast State)
广播状态是一种特殊的算子状态。引入它的目的在于支持一个流中的元素需要广播到所有下游任务的使用情形。在这些任务中广播状态用于保持所有子任务状态相同。 该状态接下来可在第二个处理记录的数据流中访问。可以设想包含了一系列用于处理其他流中元素规则的低吞吐量数据流这个例子自然而然地运用了广播状态。 考虑到上述这类使用情形广播状态和其他算子状态的不同之处在于
它具有 map 格式它仅在一些特殊的算子中可用。这些算子的输入为一个广播数据流和非广播数据流这类算子可以拥有不同命名的多个广播状态 。
4.2 使用 Operator State
用户可以通过实现 CheckpointedFunction 接口来使用 operator stateCheckpointedFunction 接口提供了访问 non-keyed state 的方法需要实现如下两个方法
void snapshotState(FunctionSnapshotContext context) throws Exception;void initializeState(FunctionInitializationContext context) throws Exception;进行 checkpoint 时会调用 snapshotState()。 用户自定义函数初始化时会调用 initializeState()初始化包括第一次自定义函数初始化和从之前的 checkpoint 恢复。 因此 initializeState() 不仅是定义不同状态类型初始化的地方也需要包括状态恢复的逻辑。当前 operator state 以 list 的形式存在。这些状态是一个 可序列化 对象的集合 List彼此独立方便在改变并发后进行状态的重新分派。 换句话说这些对象是重新分配 non-keyed state 的最细粒度。根据状态的不同访问方式有如下几种重新分配的模式 Even-split redistribution: 每个算子都保存一个列表形式的状态集合整个状态由所有的列表拼接而成。当作业恢复或重新分配的时候整个状态会按照算子的并发度进行均匀分配。 比如说算子 A 的并发读为 1包含两个元素 element1 和 element2当并发读增加为 2 时element1 会被分到并发 0 上element2 则会被分到并发 1 上。 Union redistribution: 每个算子保存一个列表形式的状态集合。整个状态由所有的列表拼接而成。当作业恢复或重新分配时每个算子都将获得所有的状态数据。如果你的列表可能具有高基数请不要使用此功能。检查点元数据将存储指向每个列表项的偏移量这可能导致 RPC 帧大小或内存溢出错误。
下面的例子中的 SinkFunction 在 CheckpointedFunction 中进行数据缓存然后统一发送到下游这个例子演示了列表状态数据的 event-split redistribution。
public class BufferingSinkimplements SinkFunctionTuple2String, Integer,CheckpointedFunction {private final int threshold;private transient ListStateTuple2String, Integer checkpointedState;private ListTuple2String, Integer bufferedElements;public BufferingSink(int threshold) {this.threshold threshold;this.bufferedElements new ArrayList();}Overridepublic void invoke(Tuple2String, Integer value, Context contex) throws Exception {bufferedElements.add(value);if (bufferedElements.size() threshold) {for (Tuple2String, Integer element: bufferedElements) {// send it to the sink}bufferedElements.clear();}}Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {checkpointedState.update(bufferedElements);}Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ListStateDescriptorTuple2String, Integer descriptor new ListStateDescriptor(buffered-elements,TypeInformation.of(new TypeHintTuple2String, Integer() {}));checkpointedState context.getOperatorStateStore().getListState(descriptor);if (context.isRestored()) {for (Tuple2String, Integer element : checkpointedState.get()) {bufferedElements.add(element);}}}
}initializeState 方法接收一个 FunctionInitializationContext 参数会用来初始化 non-keyed state 的 “容器”。这些容器是一个 ListState 用于在 checkpoint 时保存 non-keyed state 对象。注意这些状态是如何初始化的和 keyed state 类似StateDescriptor 会包括状态名字、以及状态类型相关信息。
ListStateDescriptorTuple2String, Integer descriptor new ListStateDescriptor(buffered-elements,TypeInformation.of(new TypeHintTuple2String, Integer() {}));checkpointedState context.getOperatorStateStore().getListState(descriptor);调用不同的获取状态对象的接口会使用不同的状态分配算法。比如
getUnionListState(descriptor) 会使用 union redistribution 算法 而 getListState(descriptor) 则简单的使用 even-split redistribution 算法。
当初始化好状态对象后我们通过 isRestored() 方法判断是否从之前的故障中恢复回来如果该方法返回 true 则表示从故障中进行恢复会执行接下来的恢复逻辑。
正如代码所示BufferingSink 中初始化时恢复回来的 ListState 的所有元素会添加到一个局部变量中供下次 snapshotState() 时使用。 然后清空 ListState再把当前局部变量中的所有元素写入到 checkpoint 中。另外我们同样可以在 initializeState() 方法中使用 FunctionInitializationContext 初始化 keyed state。
4.3 带状态的 Source Function
带状态的数据源比其他的算子需要注意更多东西。为了保证更新状态以及输出的原子性用于支持 exactly-once 语义用户需要在发送数据前获取数据源的全局锁。
public static class CounterSourceextends RichParallelSourceFunctionLongimplements CheckpointedFunction {/** current offset for exactly once semantics */private Long offset 0L;/** flag for job cancellation */private volatile boolean isRunning true;/** 存储 state 的变量. */private ListStateLong state;Overridepublic void run(SourceContextLong ctx) {final Object lock ctx.getCheckpointLock();while (isRunning) {// output and state update are atomicsynchronized (lock) {ctx.collect(offset);offset 1;}}}Overridepublic void cancel() {isRunning false;}Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {state context.getOperatorStateStore().getListState(new ListStateDescriptor(state,LongSerializer.INSTANCE));// 从我们已保存的状态中恢复 offset 到内存中在进行任务恢复的时候也会调用此初始化状态的方法for (Long l : state.get()) {offset l;}}Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {state.update(Collections.singletonList(offset));}
}希望订阅 checkpoint 成功消息的算子可以参考
org.apache.flink.api.common.state.CheckpointListener 接口。
5. State Backends
键/值索引存储的具体数据结构取决于所选择的状态后端。一种状态后端将数据存储在内存中的哈希映射中另一种状态后端则使用 RocksDB 作为键/值存储。除了定义存储状态的数据结构外状态后端还实现了在某个时间点对键/值状态进行快照并将该快照作为检查点的一部分存储的逻辑。状态后端可以在不更改应用程序逻辑的情况下进行配置。 Flink 提供了多种 state backends它用于指定状态的存储方式和位置。状态可以位于 Java 的堆或堆外内存。取决于你的 state backendFlink 也可以自己管理应用程序的状态。 为了让应用程序可以维护非常大的状态Flink 可以自己管理内存如果有必要可以溢写到磁盘。 默认情况下所有 Flink Job 会使用 Flink 配置文件 中指定的 state backend。但是配置文件中指定的默认 state backend 会被 Job 中指定的 state backend 覆盖如下所示。
Configuration config new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, rocksdb);
env.configure(config);