奉贤区网站建设,云主机做网站永久保留网站,专业的网络整合营销推广,把国外的网站翻译过来做自媒体背景
算子的列表状态是平时比较常见的一种状态#xff0c;本文通过官方的例子来看一下怎么使用算子列表状态
算子列表状态
算子列表状态支持应用的并行度扩缩容#xff0c;如下所示: 使用方法参见官方示例#xff0c;我加了几个注解#xff1a;
public class Bufferin…背景
算子的列表状态是平时比较常见的一种状态本文通过官方的例子来看一下怎么使用算子列表状态
算子列表状态
算子列表状态支持应用的并行度扩缩容如下所示: 使用方法参见官方示例我加了几个注解
public class BufferingSinkimplements SinkFunctionTuple2String, Integer,CheckpointedFunction {//要实现CheckpointedFunction接口private final int threshold;//算子操作状态对象--算子级别的private transient ListStateTuple2String, Integer checkpointedState;//本地变量保存这个算子任务的本地变量--任务级别的 private ListTuple2String, Integer bufferedElements;public BufferingSink(int threshold) {this.threshold threshold;this.bufferedElements new ArrayList();}//invoke方法中一般都是操作本地变量bufferedElements不会直接操作算子列表状态Overridepublic void invoke(Tuple2String, Integer value, Context contex) throws Exception {bufferedElements.add(value);if (bufferedElements.size() threshold) {for (Tuple2String, Integer element: bufferedElements) {// send it to the sink}bufferedElements.clear();}}Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {checkpointedState.clear();for (Tuple2String, Integer element : bufferedElements) {// 把本地变量的值设置到算子列表状态中,算子列表状态会自动会被持久化checkpointedState.add(element);}}Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ListStateDescriptorTuple2String, Integer descriptor new ListStateDescriptor(buffered-elements,TypeInformation.of(new TypeHintTuple2String, Integer() {}));// 定义算子列表状态checkpointedState context.getOperatorStateStore().getListState(descriptor);if (context.isRestored()) {// 算子列表状态的值设置到本地变量中for (Tuple2String, Integer element : checkpointedState.get()) {bufferedElements.add(element);}}}
}