在线做头像网站,wordpress 前台投稿插件,asp.net建立网站,代发软文背景#xff1a;
kafka的文件系统数据源可以支持精准一次的一致性,本文就从源码看下如何TextInputFormat如何支持状态的精准一致性
TextInputFormat源码解析
首先flink会把输入的文件进行切分#xff0c;分成多个数据块的形式#xff0c;每个数据源算子任务会被分配以读取…背景
kafka的文件系统数据源可以支持精准一次的一致性,本文就从源码看下如何TextInputFormat如何支持状态的精准一致性
TextInputFormat源码解析
首先flink会把输入的文件进行切分分成多个数据块的形式每个数据源算子任务会被分配以读取其中的数据块,但是不是所有的文件都能进行分块判断文件是否可以进行分块的代码如下
protected boolean testForUnsplittable(FileStatus pathFile) {if (getInflaterInputStreamFactory(pathFile.getPath()) ! null) {unsplittable true;return true;}return false;
}private InflaterInputStreamFactory? getInflaterInputStreamFactory(Path path) {String fileExtension extractFileExtension(path.getName());if (fileExtension ! null) {return getInflaterInputStreamFactory(fileExtension);} else {return null;}
}后缀名称是.gz,.bzip2等的文件都没法切分,如果可以切分切分的具体代码如下所示
while (samplesTaken numSamples fileNum allFiles.size()) {// make a split for the sample and use it to read a recordFileStatus file allFiles.get(fileNum);
// 根据偏移量进行切分FileInputSplit split new FileInputSplit(0, file.getPath(), offset, file.getLen() - offset, null);// we open the split, read one line, and take its lengthtry {open(split);if (readLine()) {totalNumBytes this.currLen this.delimiter.length;samplesTaken;}} finally {// close the file stream, do not release the bufferssuper.close();}
// 偏移量迁移offset stepSize;// skip to the next file, if necessarywhile (fileNum allFiles.size() offset (file allFiles.get(fileNum)).getLen()) {offset - file.getLen();fileNum;}
}再来看一下TextInputFormat如何支持checkpoint操作保存文件的偏移量的代码
Override
public void snapshotState(StateSnapshotContext context) throws Exception {super.snapshotState(context);checkState(checkpointedState ! null, The operator state has not been properly initialized.);int subtaskIdx getRuntimeContext().getIndexOfThisSubtask();// 算子列表状态checkpointedState.clear();// 获取文件的当前读取的偏移ListT readerState getReaderState();try {for (T split : readerState) {//保存到检查点路径中checkpointedState.add(split);}} catch (Exception e) {checkpointedState.clear();throw new Exception(Could not add timestamped file input splits to to operator state backend of operator getOperatorName() .,e);}if (LOG.isDebugEnabled()) {LOG.debug({} (taskIdx{}) checkpointed {} splits: {}.,getClass().getSimpleName(),subtaskIdx,readerState.size(),readerState);}
}
从检查点中恢复状态的代码如下
public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);checkState(checkpointedState null, The reader state has already been initialized.);// 初始化算子操作状态checkpointedState context.getOperatorStateStore().getListState(new ListStateDescriptor(splits, new JavaSerializer()));int subtaskIdx getRuntimeContext().getIndexOfThisSubtask();LOG.info(Restoring state for the {} (taskIdx{})., getClass().getSimpleName(), subtaskIdx);splits splits null ? new PriorityQueue() : splits;for (T split : checkpointedState.get()) {//从检查点状态中恢复各个切分的分块splits.add(split);}
}