微信引流主动被加软件,网站tkd优化,淘宝网商城,网站建设 需求分析报告flink 最后一个窗口一直没有新数据#xff0c;窗口不关闭问题 自定义实现 WatermarkStrategy接口 自定义实现 WatermarkStrategy接口
窗口类型#xff1a;滚动窗口 代码#xff1a; public static class WatermarkDemoFunction implements WatermarkStrategyJSONObject… flink 最后一个窗口一直没有新数据窗口不关闭问题 自定义实现 WatermarkStrategy接口 自定义实现 WatermarkStrategy接口
窗口类型滚动窗口 代码 public static class WatermarkDemoFunction implements WatermarkStrategyJSONObject{private Tuple2Long,Boolean state Tuple2.of(0L,true);Overridepublic WatermarkGeneratorJSONObject createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGeneratorJSONObject() {private long maxWatermark;Overridepublic void onEvent(JSONObject waterSensor, long l, WatermarkOutput watermarkOutput) {maxWatermark Math.max(maxWatermark,waterSensor.getLong(ts));state.f0 System.currentTimeMillis();System.out.println(maxWatermark is maxWatermark);state.f1 false;}Overridepublic void onPeriodicEmit(WatermarkOutput watermarkOutput) {//乱序时间long outOfTime 3000L;if (maxWatermark - outOfTime 0){} else {// 10s内没有数据则关闭当前窗口System.out.println(System.currentTimeMillis() - state.f0: (System.currentTimeMillis() - state.f0));System.out.println(state.f1: state.f1);if (System.currentTimeMillis() - state.f0 9000L !state.f1){watermarkOutput.emitWatermark(new Watermark(maxWatermark 6000L));state.f1 true;System.out.println(触发窗口,maxWatermark 6000L: (maxWatermark 6000L));} else {System.out.println(正常发送水印);watermarkOutput.emitWatermark(new Watermark(maxWatermark - outOfTime));}}}};}}代码部分逻辑说明 若设置了自动生成watermark 参数根据打印日志设置对应的时间多久没新数据写入触发窗口计算 env.getConfig().setAutoWatermarkInterval(5000);
使用自定义的watermark watermark 周期生成的疑问 1、默认200ms会连续生成4次后不会继续生成了 2、设置了周期生成间隔env.getConfig().setAutoWatermarkInterval(1000L); 只会周期生成一次
参考https://blog.csdn.net/lr131425/article/details/127422833