当前位置: 首页 > news >正文

在线做头像网站wordpress 前台投稿插件

在线做头像网站,wordpress 前台投稿插件,asp.net建立网站,代发软文背景#xff1a; kafka的文件系统数据源可以支持精准一次的一致性,本文就从源码看下如何TextInputFormat如何支持状态的精准一致性 TextInputFormat源码解析 首先flink会把输入的文件进行切分#xff0c;分成多个数据块的形式#xff0c;每个数据源算子任务会被分配以读取…背景 kafka的文件系统数据源可以支持精准一次的一致性,本文就从源码看下如何TextInputFormat如何支持状态的精准一致性 TextInputFormat源码解析 首先flink会把输入的文件进行切分分成多个数据块的形式每个数据源算子任务会被分配以读取其中的数据块,但是不是所有的文件都能进行分块判断文件是否可以进行分块的代码如下 protected boolean testForUnsplittable(FileStatus pathFile) {if (getInflaterInputStreamFactory(pathFile.getPath()) ! null) {unsplittable true;return true;}return false; }private InflaterInputStreamFactory? getInflaterInputStreamFactory(Path path) {String fileExtension extractFileExtension(path.getName());if (fileExtension ! null) {return getInflaterInputStreamFactory(fileExtension);} else {return null;} }后缀名称是.gz,.bzip2等的文件都没法切分,如果可以切分切分的具体代码如下所示 while (samplesTaken numSamples fileNum allFiles.size()) {// make a split for the sample and use it to read a recordFileStatus file allFiles.get(fileNum); // 根据偏移量进行切分FileInputSplit split new FileInputSplit(0, file.getPath(), offset, file.getLen() - offset, null);// we open the split, read one line, and take its lengthtry {open(split);if (readLine()) {totalNumBytes this.currLen this.delimiter.length;samplesTaken;}} finally {// close the file stream, do not release the bufferssuper.close();} // 偏移量迁移offset stepSize;// skip to the next file, if necessarywhile (fileNum allFiles.size() offset (file allFiles.get(fileNum)).getLen()) {offset - file.getLen();fileNum;} }再来看一下TextInputFormat如何支持checkpoint操作保存文件的偏移量的代码 Override public void snapshotState(StateSnapshotContext context) throws Exception {super.snapshotState(context);checkState(checkpointedState ! null, The operator state has not been properly initialized.);int subtaskIdx getRuntimeContext().getIndexOfThisSubtask();// 算子列表状态checkpointedState.clear();// 获取文件的当前读取的偏移ListT readerState getReaderState();try {for (T split : readerState) {//保存到检查点路径中checkpointedState.add(split);}} catch (Exception e) {checkpointedState.clear();throw new Exception(Could not add timestamped file input splits to to operator state backend of operator getOperatorName() .,e);}if (LOG.isDebugEnabled()) {LOG.debug({} (taskIdx{}) checkpointed {} splits: {}.,getClass().getSimpleName(),subtaskIdx,readerState.size(),readerState);} } 从检查点中恢复状态的代码如下 public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);checkState(checkpointedState null, The reader state has already been initialized.);// 初始化算子操作状态checkpointedState context.getOperatorStateStore().getListState(new ListStateDescriptor(splits, new JavaSerializer()));int subtaskIdx getRuntimeContext().getIndexOfThisSubtask();LOG.info(Restoring state for the {} (taskIdx{})., getClass().getSimpleName(), subtaskIdx);splits splits null ? new PriorityQueue() : splits;for (T split : checkpointedState.get()) {//从检查点状态中恢复各个切分的分块splits.add(split);} }
http://www.w-s-a.com/news/1619/

相关文章:

  • WordPress多站點支付插件内江市网站建设培训
  • 做做网站已更新动漫制作专业需要学什么
  • dfv印花图案设计网站网站建设应该应聘什么岗位
  • 网站后台管理系统模板下载专业网站推广的公司哪家好
  • 克拉玛依市建设局网站网页设计板式重构
  • 网站新闻专题怎么做湖南营销型网站建设 要上磐石网络
  • 阿里云发布网站成都轨迹公布
  • php网站源码架构谷歌站群系统
  • 潮州网站seowordpress 调用置顶文章
  • 做带会员后台的网站用什么软件旅游网站建设资金请示
  • 商品网站怎么做wordpress 表情拉长
  • 商城网站设计费用网络公司怎样推广网站
  • 视频公司的网站设计工图网
  • 免费快速网站十八个免费的舆情网站