当前位置: 首页 > news >正文

昆明网站建设公司排名有创意的婚纱网站模板下载

昆明网站建设公司排名,有创意的婚纱网站模板下载,成都网站建设福州,免费企业网站系统源码下载背景 FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线#xff0c;比如这条特殊的记录代表一个完整记录的结束等#xff0c;本文就来解析下发送punctuated水位线的源码 punctuated 水位线发送源码解析 1.首先KafkaFetcher中的runFetchLoop方法 public…背景 FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线比如这条特殊的记录代表一个完整记录的结束等本文就来解析下发送punctuated水位线的源码 punctuated 水位线发送源码解析 1.首先KafkaFetcher中的runFetchLoop方法 public void runFetchLoop() throws Exception {try {// kick off the actual Kafka consumerconsumerThread.start();while (running) {// this blocks until we get the next records// it automatically re-throws exceptions encountered in the consumer threadfinal ConsumerRecordsbyte[], byte[] records handover.pollNext();// get the records for each topic partitionfor (KafkaTopicPartitionStateT, TopicPartition partition :subscribedPartitionStates()) {ListConsumerRecordbyte[], byte[] partitionRecords records.records(partition.getKafkaPartitionHandle()); // 算子任务消费的每个分区都调用这个方法partitionConsumerRecordsHandler(partitionRecords, partition);}}} finally {// this signals the consumer thread that no more work is to be doneconsumerThread.shutdown();}2.查看partitionConsumerRecordsHandler方法处理当前算子任务对应的每个分区的水位线 protected void emitRecordsWithTimestamps(QueueT records,KafkaTopicPartitionStateT, KPH partitionState,long offset,long kafkaEventTimestamp) {// emit the records, using the checkpoint lock to guarantee// atomicity of record emission and offset state updatesynchronized (checkpointLock) {T record;while ((record records.poll()) ! null) {long timestamp partitionState.extractTimestamp(record, kafkaEventTimestamp);// 发送kafka记录到下游算子sourceContext.collectWithTimestamp(record, timestamp);// this might emit a watermark, so do it after emitting the record// 处理分区的水位线记录这个分区的水位线并在满足条件时更新整个算子任务的水位线partitionState.onEvent(record, timestamp);}partitionState.setOffset(offset);}}3.处理每个分区的水位线javapublic void onEvent(T event, long timestamp) {watermarkGenerator.onEvent(event, timestamp, immediateOutput);}public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {final org.apache.flink.streaming.api.watermark.Watermark next wms.checkAndGetNextWatermark(event, eventTimestamp);if (next ! null) {output.emitWatermark(new Watermark(next.getTimestamp()));}}其中 output.emitWatermark(new Watermark(next.getTimestamp()));对应方法如下public void emitWatermark(Watermark watermark) {long timestamp watermark.getTimestamp();// 更新每个分区对应的水位线并且更新boolean wasUpdated state.setWatermark(timestamp);// if its higher than the max watermark so far we might have to update the// combined watermark 这个表明这个算子任务的最低水位线也就是算子任务级别的水位线而不是分区级别的了if (wasUpdated timestamp combinedWatermark) {updateCombinedWatermark();}}//每个分区水位线的更新如下public boolean setWatermark(long watermark) {this.idle false;final boolean updated watermark this.watermark;this.watermark Math.max(watermark, this.watermark);return updated;} 4.最后是发送算子任务级别的水位线的方法 private void updateCombinedWatermark() {long minimumOverAllOutputs Long.MAX_VALUE;boolean hasOutputs false;boolean allIdle true;for (OutputState outputState : watermarkOutputs) {if (!outputState.isIdle()) {minimumOverAllOutputs Math.min(minimumOverAllOutputs, outputState.getWatermark());allIdle false;}hasOutputs true;}// if we dont have any outputs minimumOverAllOutputs is not valid, its still// at its initial Long.MAX_VALUE state and we must not emit thatif (!hasOutputs) {return;}if (allIdle) {underlyingOutput.markIdle();} else if (minimumOverAllOutputs combinedWatermark) {combinedWatermark minimumOverAllOutputs;underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));}}你可以看这个流程是不是意味着如果使用Punctuated的方式是不支持Idle空闲时间的–答案是的
http://www.w-s-a.com/news/306629/

相关文章:

  • 网站seo相关设置优化网站建设的好处
  • 上海市建设工程安全生产协会网站郴州网站设计公司
  • 网站大型网页游戏自己搭建服务器做视频网站
  • 建立网站企业wordpress用户名密码破解
  • 网站管理助手建站教程国外网站做acm题目比较好
  • 网站开发框架排行专业网页制作服务商
  • 企业网站建设入账政务网站建设信息
  • 网络平台建设是什么江门排名优化怎么做
  • 响应式旅游网站模板下载网址做
  • 个人做网站名称可以随意更改吗惠州网站推广排名
  • 自己建设一个网站步骤网站认证怎么认证
  • 深圳建站公司开发费用沧州手机建站哪家好
  • 兰州网站设计公司排名百度怎么发布短视频
  • 大连模板开发建站泰州网站建设策划方案
  • 厦门好的网站设计局域网内建网站
  • 关键词那种网站正版网页游戏平台排行榜
  • 网站自助建设平台创建网址快捷方式
  • 坑梓网站建设包括哪些成都网站建设优创
  • 重庆网站seo公司哪家好超级优化大师
  • 成都网站建设推广详情邵阳市住房和城乡建设局网站
  • 淄博网站推广猎头公司有哪些
  • 局域网内建立网站90设计网怎么样
  • 域名备案和网站备案有什么不同工程项目建设网站
  • 做网站难吗?wordpress评论qq
  • 权威网站优化价格电子商务静态网站建设实验报告
  • 公司如何办网站北京网站建设公司内江
  • 六安建设网站企业营业执照查询系统入口
  • a5网站建设如果建设淘宝导购网站
  • html5响应式网站开发教程在国内做跨境电商怎么上外国网站
  • win7配置不能运行wordpress关键词快速优化排名软件