建设网站公司哪好,wordpress时间邀请码,做网站有哪些注意事项,客户关系crm管理系统背景
GenericWriteAheadSink原理是把接收记录按照检查点进行分段#xff0c;每个到来的记录都放到对应的分段中#xff0c;这些分段内的记录是作为算子状态的形式存储和故障恢复的#xff0c;对于每个分段内的记录列表#xff0c;flink会在收到检查点完成的通知时把他们都…背景
GenericWriteAheadSink原理是把接收记录按照检查点进行分段每个到来的记录都放到对应的分段中这些分段内的记录是作为算子状态的形式存储和故障恢复的对于每个分段内的记录列表flink会在收到检查点完成的通知时把他们都写到外部存储中本文对其中的检查点完成后是否对应的事务必须成功这个点进行讲解
源码解析GenericWriteAheadSink
首先开始进行checkpoint时代码如下
public void snapshotState(StateSnapshotContext context) throws Exception {super.snapshotState(context);// 把检查点id先放入本地变量中saveHandleInState(context.getCheckpointId(), context.getCheckpointTimestamp());this.checkpointedState.clear();for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) {// 把本地变量中的检查点存放到算子列表状态中this.checkpointedState.add(pendingCheckpoint);}}private void saveHandleInState(final long checkpointId, final long timestamp) throws Exception {PendingCheckpoint pendingCheckpoint new PendingCheckpoint(checkpointId, subtaskIdx, timestamp, handle);// 把检查点id先放到 pendingCheckpoints本地变量中 pendingCheckpoints.add(pendingCheckpoint);}其实接收检查点完成的通知
public void notifyCheckpointComplete(long checkpointId) throws Exception {super.notifyCheckpointComplete(checkpointId);synchronized (pendingCheckpoints) {IteratorPendingCheckpoint pendingCheckpointIt pendingCheckpoints.iterator();while (pendingCheckpointIt.hasNext()) {PendingCheckpoint pendingCheckpoint pendingCheckpointIt.next();long pastCheckpointId pendingCheckpoint.checkpointId;int subtaskId pendingCheckpoint.subtaskId;long timestamp pendingCheckpoint.timestamp;StreamStateHandle streamHandle pendingCheckpoint.stateHandle;//把历史的当前的还没有成功提交的检查点id对应的事务重新调用sendValue方法并提交对应检查点的事务if (pastCheckpointId checkpointId) {try {// 历史的或者当前的事务未提交if (!committer.isCheckpointCommitted(subtaskId, pastCheckpointId)) {try (FSDataInputStream in streamHandle.openInputStream()) {// 调用sendValue方法写数据boolean success sendValues(new ReusingMutableToRegularIteratorWrapper(new InputViewIterator(new DataInputViewStreamWrapper(in),serializer),serializer),pastCheckpointId,timestamp);if (success) {//提交对应检查点对应的事务committer.commitCheckpoint(subtaskId, pastCheckpointId);streamHandle.discardState();pendingCheckpointIt.remove();}}} else {streamHandle.discardState();pendingCheckpointIt.remove();}} catch (Exception e) {// we have to break here to prevent a new (later) checkpoint// from being committed before this oneLOG.error(Could not commit checkpoint., e);break;}}}}}注意这里需要注意的是flink的检查点成功创建后才会使用notify方法进行通知flink没有保证一定通知此外通知之后不论这个notify方法中发生了什么异常都不影响flink已经创建了检查点的事实。 对应到我们这个例子你就会发现在notify方法中有需要把历史检查点已经创建成功但是对应的事务没有提交的事务重新调用一次sendValue方法和提交对应检查点的事务也就是说不是每一次检查点都能成功的提交事务如果事务没有提交成功等待下一次检查点的通知即可下一个检查点的通知会把历史的检查点重新检测一次.