当前位置: 首页 > news >正文

建设部执业资格注册中心网站查询网页制作教程和流程

建设部执业资格注册中心网站查询,网页制作教程和流程,网站开发设计的难点,建设一个网站11、水位线 11.1、水位线概念 一般实时流处理场景中#xff0c;事件时间基本与处理时间保持同步#xff0c;可能会略微延迟。 flink中用来衡量事件时间进展的标记就是水位线#xff08;WaterMark#xff09;。水位线可以看作一条特殊的数据记录#xff0c;它是插入到数…11、水位线 11.1、水位线概念 一般实时流处理场景中事件时间基本与处理时间保持同步可能会略微延迟。 flink中用来衡量事件时间进展的标记就是水位线WaterMark。水位线可以看作一条特殊的数据记录它是插入到数据流中的一个标记点主要内容是一个时间戳用来指示当前的事件时间。一般使用某个数据的时间戳作为水位线的时间戳。 水位线特性 水位线是插入到数据流中的一个标记水位线主要内容是一个时间戳用来表示当前事件时间的进展水位线是基于数据的时间戳生成的水位线时间戳单调递增水位线可通过设置延迟正确处理乱序数据一个水位线WaterMark(t)表示在当前流中事件时间已经达到了时间戳t代表t之前的所有数据都到齐了之后流中不会出现时间戳小于或等于t的数据 以WaterMark等2s为例 **注意**flink窗口并不是静态准备好的而是动态创建的当有罗在这个窗口区间范围的数据达到时才创建对应的窗口。当到达窗口结束时间后窗口就触发计算并关闭触发计算和窗口关闭两个行为也是分开的。 11.2、生成水位线 11.2.1、原则 要性能就设置低水位线或不设置水位线直接使用处理时间语义可得到最低的延迟但有可能遗漏数据。 如要保证数据全部到齐可以设置高水位线但会影响性能计算会有延迟。 11.2.2、内置水位线 1、有序流中内置水位线设置 直接调用 package waterMark;import bean.WaterSensor; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;public class WatermarkMonoDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();;env.setParallelism(1);SingleOutputStreamOperatorWaterSensor dataStreamSource env.socketTextStream(192.168.132.101, 7777).map(value -{String[] datas value.split(,);return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} ).assignTimestampsAndWatermarks(//指定WaterMark策略WatermarkStrategy//升序的WaterMark没有等待时间.WaterSensorforMonotonousTimestamps()//指定时间戳分配器.withTimestampAssigner(new SerializableTimestampAssignerWaterSensor(){Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {//返回的时间戳为毫秒System.out.println(数据elementrecordTSrecordTimestamp);return element.getTs()*1000L;}}));SingleOutputStreamOperatorString process dataStreamSource.keyBy(value - value.getId())//要使用事件语义的窗口TumblingEventTimeWindowsWaterMark才能起作用不能用TumblingProcessingTimeWindows.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String key, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long start context.window().getStart();long end context.window().getEnd();String winStart DateFormatUtils.format(start, yyyy-MM-dd HH:mm:ss.SSS);String winEnd DateFormatUtils.format(end, yyyy-MM-dd HH:mm:ss.SSS);long l elements.spliterator().estimateSize();out.collect(key key 的窗口[ winStart , winEnd ]包含 l 条数据 elements.toString());}});process.print();env.execute();} } 2、乱序流中内置水位线设置 设置等待时间为2秒即12秒时触发窗口关闭 package waterMark;import bean.WaterSensor; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;import java.time.Duration;/*** Title:* Author lizhe* Package sink* Date 2024/6/5 21:57* description:*/ public class WatermarkOutOfOrdernessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();;env.setParallelism(1);SingleOutputStreamOperatorWaterSensor dataStreamSource env.socketTextStream(192.168.132.101, 7777).map(value -{String[] datas value.split(,);return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} ).assignTimestampsAndWatermarks(//指定WaterMark策略WatermarkStrategy//乱序的WaterMark有等待时间.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(2))//指定时间戳分配器.withTimestampAssigner(new SerializableTimestampAssignerWaterSensor(){Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {//返回的时间戳为毫秒System.out.println(数据elementrecordTSrecordTimestamp);return element.getTs()*1000L;}}));SingleOutputStreamOperatorString process dataStreamSource.keyBy(value - value.getId())//要使用事件语义的窗口TumblingEventTimeWindowsWaterMark才能起作用不能用TumblingProcessingTimeWindows.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String key, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long start context.window().getStart();long end context.window().getEnd();String winStart DateFormatUtils.format(start, yyyy-MM-dd HH:mm:ss.SSS);String winEnd DateFormatUtils.format(end, yyyy-MM-dd HH:mm:ss.SSS);long l elements.spliterator().estimateSize();out.collect(key key 的窗口[ winStart , winEnd ]包含 l 条数据 elements.toString());}});process.print();env.execute();} } 结果 可见当发送数据WaterSensor{id‘s1’, ts12, vc12}recordTS-9223372036854775808时使得[0,10)窗口关闭但是WaterSensor{id‘s1’, ts12, vc12}不会在[0,10)窗口中而是在[10,20)窗口中。 11.2.3、内置WaterMark生成原理 都是周期性生成的默认是200ms有序流WaterMark当前最大的事件时间-1ms乱序流WaterMark当前最大的事件时间-延迟时间-1ms 11.3、水位线的传递 11.3.1、多并行度下水位线传递 水位线传递以最小的WaterMark为准否则提前触发关窗造成数据丢失。 演示WaterMark多并行度下的传递 package waterMark;import bean.WaterSensor; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;import java.time.Duration;/*** Title:* Author lizhe* Package sink* Date 2024/6/5 21:57* description:*/ public class WatermarkOutOfOrdernessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//演示WaterMark多并行度下的传递env.setParallelism(2);SingleOutputStreamOperatorWaterSensor dataStreamSource env.socketTextStream(192.168.132.101, 7777).map(value -{String[] datas value.split(,);return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} ).assignTimestampsAndWatermarks(//指定WaterMark策略WatermarkStrategy//乱序的WaterMark有等待时间.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(2))//指定时间戳分配器.withTimestampAssigner(new SerializableTimestampAssignerWaterSensor(){Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {//返回的时间戳为毫秒System.out.println(数据elementrecordTSrecordTimestamp);return element.getTs()*1000L;}}));SingleOutputStreamOperatorString process dataStreamSource.keyBy(value - value.getId())//要使用事件语义的窗口TumblingEventTimeWindowsWaterMark才能起作用不能用TumblingProcessingTimeWindows.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String key, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long start context.window().getStart();long end context.window().getEnd();String winStart DateFormatUtils.format(start, yyyy-MM-dd HH:mm:ss.SSS);String winEnd DateFormatUtils.format(end, yyyy-MM-dd HH:mm:ss.SSS);long l elements.spliterator().estimateSize();out.collect(key key 的窗口[ winStart , winEnd ]包含 l 条数据 elements.toString());}});process.print();env.execute();} } 结果 在多并行度下增加了一个WaterMark的更新操作。当数据WaterSensor{id‘s1’, ts12, vc12}到来时一个WaterMark,5-23一个WaterMark是12-210因WaterMark取小原则WaterMark是3未更新为10。当数据WaterSensor{id‘s1’, ts13, vc13}到来WaterMark更新为10进而触发窗口关闭。 结论在多并行度下当触发WaterMark的下一条数据到来时才能进行关窗操作。 11.3.2、水位线空闲等待设置 在多个上游并行任务中如果有其中一个没有数据由于当前Task是以最小的那个座位当前任务的事件时钟就会导致当前Task的水位线无法推进从而导致窗口无法触发。这时候可以设置空闲等待。 package waterMark;import bean.WaterSensor; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;import java.time.Duration;/*** Title:* Author lizhe* Package * Date 2024/6/5 21:57* description:*/ public class WatermarkIdlenessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);SingleOutputStreamOperatorInteger streamOperator env.socketTextStream(192.168.132.101, 7777)//自定义分区器数据%分区数只输入奇数都只会去往一个子任务.partitionCustom(new PartitionerString() {Overridepublic int partition(String key, int numPartitions) {return Integer.parseInt(key) % numPartitions;}}, value - value).map(value - Integer.parseInt(value)).assignTimestampsAndWatermarks(//指定WaterMark策略WatermarkStrategy.IntegerforMonotonousTimestamps().withTimestampAssigner((r, ts) - r * 1000)//空闲等待5s.withIdleness(Duration.ofSeconds(5)));//分成两组奇数一组偶数一组开10s的事件时间滚动窗口streamOperator.keyBy(value - value%2).window(TumblingEventTimeWindows.of(Time.seconds(2))).process(new ProcessWindowFunctionInteger, String, Integer, TimeWindow() {Overridepublic void process(Integer integer, Context context, IterableInteger elements, CollectorString out) throws Exception {long start context.window().getStart();long end context.window().getEnd();String winStart DateFormatUtils.format(start, yyyy-MM-dd HH:mm:ss.SSS);String winEnd DateFormatUtils.format(end, yyyy-MM-dd HH:mm:ss.SSS);long l elements.spliterator().estimateSize();out.collect(key integer 的窗口[ winStart , winEnd ]包含 l 条数据 elements.toString());}}).print();env.execute();} } 11.4、迟到数据处理 11.4.1、推迟WaterMark推进 在WaterMark产生时设置一个乱序容忍度推迟系统时间的推进保证窗口计算被延迟执行为乱序的数据争取更多时间进入窗口。 forBoundedOutOfOrderness(Duration.ofSeconds(2))11.4.2、设置窗口延迟关闭 flink的窗口允许迟到数据。当触发窗口计算后会先计算当前结果但此时并不会关闭窗口。以后每来一条数据就触发一次窗口计算增量计算。直到WaterMark超过了窗口结束时间推迟时间窗口才会关闭。 .allowedLateness(Time.seconds(2))package waterMark;import bean.WaterSensor; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;import java.time.Duration;/*** Title:* Author lizhe* Package * Date 2024/6/5 21:57* description:*/ public class WatermarkAllowLatenessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor dataStreamSource env.socketTextStream(192.168.132.101, 7777).map(value -{String[] datas value.split(,);return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} ).assignTimestampsAndWatermarks(//指定WaterMark策略WatermarkStrategy//乱序的WaterMark有等待时间.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(2))//指定时间戳分配器.withTimestampAssigner(new SerializableTimestampAssignerWaterSensor(){Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {//返回的时间戳为毫秒System.out.println(数据elementrecordTSrecordTimestamp);return element.getTs()*1000L;}}));SingleOutputStreamOperatorString process dataStreamSource.keyBy(value - value.getId())//要使用事件语义的窗口TumblingEventTimeWindowsWaterMark才能起作用不能用TumblingProcessingTimeWindows.window(TumblingEventTimeWindows.of(Time.seconds(10)))//推迟2秒关窗.allowedLateness(Time.seconds(2)).process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String key, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long start context.window().getStart();long end context.window().getEnd();String winStart DateFormatUtils.format(start, yyyy-MM-dd HH:mm:ss.SSS);String winEnd DateFormatUtils.format(end, yyyy-MM-dd HH:mm:ss.SSS);long l elements.spliterator().estimateSize();out.collect(key key 的窗口[ winStart , winEnd ]包含 l 条数据 elements.toString());}});process.print();env.execute();} } 11.4.3、使用侧流接收迟到数据 使用.sideOutputLateData()函数将迟到数据放到侧输出流 package waterMark;import bean.WaterSensor; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag;import java.lang.reflect.Type; import java.time.Duration;/*** Title:* Author lizhe* Package sink* Date 2024/6/5 21:57* description:*/ public class WatermarkAllowLatenessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor dataStreamSource env.socketTextStream(192.168.132.101, 7777).map(value -{String[] datas value.split(,);return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} ).assignTimestampsAndWatermarks(//指定WaterMark策略WatermarkStrategy//乱序的WaterMark有等待时间.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(2))//指定时间戳分配器.withTimestampAssigner(new SerializableTimestampAssignerWaterSensor(){Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {//返回的时间戳为毫秒System.out.println(数据elementrecordTSrecordTimestamp);return element.getTs()*1000L;}}));OutputTagWaterSensor outputTag new OutputTag(late-data, Types.POJO(WaterSensor.class));SingleOutputStreamOperatorString process dataStreamSource.keyBy(value - value.getId())//要使用事件语义的窗口TumblingEventTimeWindowsWaterMark才能起作用不能用TumblingProcessingTimeWindows.window(TumblingEventTimeWindows.of(Time.seconds(10)))//推迟2秒关窗.allowedLateness(Time.seconds(2))//关窗后的迟到数据放到侧输出流.sideOutputLateData(outputTag).process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String key, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long start context.window().getStart();long end context.window().getEnd();String winStart DateFormatUtils.format(start, yyyy-MM-dd HH:mm:ss.SSS);String winEnd DateFormatUtils.format(end, yyyy-MM-dd HH:mm:ss.SSS);long l elements.spliterator().estimateSize();out.collect(key key 的窗口[ winStart , winEnd ]包含 l 条数据 elements.toString());}});process.print();process.getSideOutput(outputTag).print(侧输出流);env.execute();} } 11.4.4、总结 乱序与迟到的区别 **乱序**数据的顺序乱了出现时间早的比时间晚的晚来 **迟到**数据的时间戳当前的WaterMark 乱序与迟到数据的处理 在WaterMark中指定乱序等待时间如果开窗设置窗口允许迟到关窗后的迟到数据放入侧输出流 WaterMark等待时间与窗口允许迟到时间并不能等同和替换 WaterMark涉及到窗口第一次计算时间WaterMark等待时间过长会导致计算延迟变大。 窗口允许迟到时间只是要保证计算结果更加准确但不应影响数据计算延迟。 所以二者不能等价代替。 WaterMark等待时间与窗口允许迟到时间设置经验 WaterMark等待时间不能设置过大一般秒级。窗口允许迟到时间只考虑大部分的迟到数据。极端情况小部分迟到数据使用侧输出流。 12、基于时间的合流 上面提到的connect合流可满足大部分需求。但统计固定时间内两条流数据的匹配情况对于connect要使用自定义但可以使用更简单的Window来表示flink 内置了API。 12.1、窗口联结Window Join 落在同一个时间窗口范围内才能匹配根据keyby的key来进行匹配关联只能拿到匹配上的数据类似有固定时间范围的inner join package waterMark;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time;/*** Title: WindowJoinDemo* Author lizhe* Package Window Join* Date 2024/6/8 21:11* description:*/ public class WindowJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorTuple2String, Integer ds1 env.fromElements(Tuple2.of(a, 1),Tuple2.of(a, 2),Tuple2.of(b, 3),Tuple2.of(c, 4)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String, IntegerforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerTuple2String, Integer() {Overridepublic long extractTimestamp(Tuple2String, Integer element, long recordTimestamp) {return element.f1 * 1000L;}}));SingleOutputStreamOperatorTuple3String, Integer, Integer ds2 env.fromElements(Tuple3.of(a, 1,1),Tuple3.of(a, 11,1),Tuple3.of(b, 2,1),Tuple3.of(b, 12,1),Tuple3.of(c, 14,1),Tuple3.of(d, 15,1)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, IntegerforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, Integer() {Overridepublic long extractTimestamp(Tuple3String, Integer, Integer element, long recordTimestamp) {return element.f1 * 1000L;}}));DataStreamString join ds1.join(ds2).where(r1 - r1.f0).equalTo(r2 - r2.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunctionTuple2String, Integer, Tuple3String, Integer, Integer, String() {/*** 关联上的数据调用join方法* param first ds1的数据* param second ds2的数据* return* throws Exception*/Overridepublic String join(Tuple2String, Integer first, Tuple3String, Integer, Integer second) throws Exception {return first ---- second;}});join.print();env.execute();} } 12.2、间隔联结Interval Join 有时要处理的时间间隔并不固定。要匹配的数据可能刚开卡在窗口边缘两侧造成匹配失败。所有窗口联结并不能满足要求。 间隔联结的思路是针对一条流的每个数据开辟出其时间戳前后的一段时间间隔指定上下界的偏移负号代表时间往前正号代表时间往后看这期间是否有来自另一条流的匹配。只支持事件时间语义 package waterMark;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector;/*** Title:* Author lizhe* Package* Date 2024/6/8 21:11* description:*/ public class IntervalJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorTuple2String, Integer ds1 env.fromElements(Tuple2.of(a, 1),Tuple2.of(a, 2),Tuple2.of(b, 3),Tuple2.of(c, 4)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String, IntegerforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerTuple2String, Integer() {Overridepublic long extractTimestamp(Tuple2String, Integer element, long recordTimestamp) {return element.f1 * 1000L;}}));SingleOutputStreamOperatorTuple3String, Integer, Integer ds2 env.fromElements(Tuple3.of(a, 1,1),Tuple3.of(a, 11,1),Tuple3.of(b, 2,1),Tuple3.of(b, 12,1),Tuple3.of(c, 14,1),Tuple3.of(d, 15,1)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, IntegerforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, Integer() {Overridepublic long extractTimestamp(Tuple3String, Integer, Integer element, long recordTimestamp) {return element.f1 * 1000L;}}));KeyedStreamTuple2String, Integer, String stream1 ds1.keyBy(value - value.f0);KeyedStreamTuple3String, Integer, Integer, String stream2 ds2.keyBy(value - value.f0);stream1.intervalJoin(stream2).between(Time.seconds(-2),Time.seconds(2)).process(new ProcessJoinFunctionTuple2String, Integer, Tuple3String, Integer, Integer, String() {/*** 两条流的数据匹配上才会调用方法* param left stream1的数据* param right stream2的数据* param ctx 上下文* param out 采集器* throws Exception*/Overridepublic void processElement(Tuple2String, Integer left, Tuple3String, Integer, Integer right, Context ctx, CollectorString out) throws Exception {//进入这个方法是关联上的数据out.collect(left----right);}}).print();env.execute();} } 1.17版本支持将该匹配上的迟到数据通过侧输出流输出 如果当前数据的事件时间当前的WaterMark就是迟到数据主流的process不处理。 但在between后使用SideOutputLeftLateData(),SideOutputRightLateData()函数将迟到数据放到侧输出流 13、处理函数 DataStream更下层的API统一称为process算子接口就是process function处理函数 13.1、基本处理函数 处理函数提供一个定时服务TimeService可以通过它访问流中的事件、时间戳、水位线甚至可以注册定时事件。处理函数集成了AbstractRichFunction拥有富函数类的所有特性可以访问状态和其他运行时信息。处理函数可以直接将数据输出的侧输出流。处理函数是最为灵活的处理方法可实现各种自定义逻辑。 分类 ProcessFunctionKeyedProcessFunctionProcessWindowFunctionProcessAllWindowFunctionCoProcessFunctionProcessJoinFunctionBroadcastProcessFunctionKeyedBroadcastProcessFunction 13.2、按键分区处理函数KeyedProcessFunction 只有在KeyedStream才支持使用TimeService设置定时器。 13.2.1、定时器和定时服务 keyedStream.process(new KeyedProcessFunctionString, WaterSensor, String() {/*** 来一条数据处理一次* param value* param ctx* param out* throws Exception*/Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {//数据中提取出来的事件时间如果没有则为nullLong timestamp ctx.timestamp();//定时器TimerService timerService ctx.timerService();//注册定时器处理时间timerService.registerEventTimeTimer(10L);//注册定时器事件时间timerService.currentProcessingTime();//删除定时器事件时间timerService.deleteEventTimeTimer(10L);//删除定时器处理时间timerService.deleteProcessingTimeTimer(10L);//获取当前处理时间即系统时间timerService.currentProcessingTime();//获取当前WaterMarktimerService.currentWatermark();}});事件时间定时器 package process;import bean.WaterSensor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.TimerService; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector;import java.time.Duration;/*** Title: KeyedProcessTimerDemo* Author lizhe* Package process* Date 2024/6/9 12:29* description:*/ public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor singleOutputStreamOperator env.socketTextStream(192.168.132.101, 7777).map(value - {String[] datas value.split(,);return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));});WatermarkStrategyWaterSensor waterSensorWatermarkStrategy WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) - {return element.getTs() * 1000L;});SingleOutputStreamOperatorWaterSensor operator singleOutputStreamOperator.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);KeyedStreamWaterSensor, String keyedStream operator.keyBy(value - value.getId());keyedStream.process(new KeyedProcessFunctionString, WaterSensor, String() {/*** 来一条数据处理一次* param value* param ctx* param out* throws Exception*/Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {//数据中提取出来的事件时间如果没有则为nullLong timestamp ctx.timestamp();//定时器TimerService timerService ctx.timerService();//注册定时器处理时间timerService.registerEventTimeTimer(5000L);System.out.println(当前时间timestamp,注册了一个5s的定时器);}/*** 时间进展到定时器注册的时间调用该方法* param timestamp 当前时间进展* param ctx 上下文* param out 采集器* throws Exception*/Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorString out) throws Exception {super.onTimer(timestamp, ctx, out);System.out.println(现在时间timestamp 定时器触发,key为ctx.getCurrentKey());}}).print();env.execute();} } 输出 TimeService会以key和时间戳作为标准对定时器去重即对每个key和时间戳最多只有一个定时器如果注册了多次onTimer()方法也将被调用一次。 处理时间定时器 package process;import bean.WaterSensor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.TimerService; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector;import java.time.Duration;/*** Title: KeyedProcessTimerDemo* Author lizhe* Package process* Date 2024/6/9 12:29* description:*/ public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor singleOutputStreamOperator env.socketTextStream(192.168.132.101, 7777).map(value - {String[] datas value.split(,);return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));});WatermarkStrategyWaterSensor waterSensorWatermarkStrategy WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) - {return element.getTs() * 1000L;});SingleOutputStreamOperatorWaterSensor operator singleOutputStreamOperator.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);KeyedStreamWaterSensor, String keyedStream operator.keyBy(value - value.getId());keyedStream.process(new KeyedProcessFunctionString, WaterSensor, String() {/*** 来一条数据处理一次* param value* param ctx* param out* throws Exception*/Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {//数据中提取出来的事件时间如果没有则为nullLong timestamp ctx.timestamp();//定时器TimerService timerService ctx.timerService();long currentProcessingTime timerService.currentProcessingTime();timerService.registerProcessingTimeTimer(currentProcessingTime5000L);System.out.println(当前时间currentProcessingTime,注册了一个5后的定时器key为ctx.getCurrentKey() );}/*** 时间进展到定时器注册的时间调用该方法* param timestamp 当前时间进展* param ctx 上下文* param out 采集器* throws Exception*/Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorString out) throws Exception {super.onTimer(timestamp, ctx, out);System.out.println(现在时间timestamp 定时器触发,key为ctx.getCurrentKey());}}).print();env.execute();} } 总结 事件时间定时器通过WaterMark来触发的WaterMark注册时间。 注意 WaterMark当前最大事件时间-等待时间-1ms因为-1ms会推迟一条数据。比如5s的定时器如果等待3sWaterMark8s-3s-1ms4999ms不会触发5s的定时器。需要WaterMark9s-3s-1ms5999ms才能触发5s的定时器 在Process中获取当前的WaterMark显示的是上一次的的WaterMark因为Process还没接收到这条数据对应生成的新WaterMark 13.3、应用案例 统计一段时间内出现次数最多的水位。统计10s内出现次数最多的两个水位这两个水位每5s更新一次。 可使用滑动窗口实现按照不同水位进行统计 后面仔细看吧,可能有问题 package process;import bean.WaterSensor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;import java.time.Duration; import java.util.*;/*** Title:* Author lizhe* Package process* Date 2024/6/9 12:29* description:*/ public class TopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor operator env.socketTextStream(192.168.132.101, 7777).map(value - {String[] datas value.split(,);return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) - {return element.getTs() * 1000L;}));//1、按照vc分组开窗聚合增量计算全量打标签//开窗聚合后就是普通的流丢失了窗口信息需要自己打窗口标签WindowEndSingleOutputStreamOperatorTuple3Integer, Integer, Long aggregate operator.keyBy(value - value.getVc()).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(new VcCountAgg(), new WindowResult());//2、按照窗口标签keyby保证同一个窗口时间范围的结果到一起去。排序去TopNaggregate.keyBy(value - value.f2).process(new TopN(2)).print();env.execute();}public static class VcCountAgg implements AggregateFunctionWaterSensor,Integer,Integer{Overridepublic Integer createAccumulator() {return 0;}Overridepublic Integer add(WaterSensor value, Integer accumulator) {return accumulator;}Overridepublic Integer getResult(Integer accumulator) {return accumulator;}Overridepublic Integer merge(Integer a, Integer b) {return null;}}/*** 泛型如下* 第一个输入类型增量函数的输出 count值* 第二个输出类型Tuplevc,countWindowEnd带上窗口结束时间的标签* 第三个key类型vcInteger* 第四个窗口类型*/public static class WindowResult extends ProcessWindowFunctionInteger, Tuple3Integer,Integer,Long,Integer, TimeWindow{Overridepublic void process(Integer key, Context context, IterableInteger elements, CollectorTuple3Integer, Integer, Long out) throws Exception {Integer count elements.iterator().next();long windowsEnd context.window().getEnd();out.collect(Tuple3.of(key,count,windowsEnd));}}public static class TopN extends KeyedProcessFunctionLong,Tuple3Integer, Integer, Long,String{//存不同窗口的统计结果 keywindowEnd valuelist数据private MapLong, List Tuple3Integer,Integer,Long dataListMap;//要取的Top的数量private int threshold;public TopN(int threshold) {dataListMap new HashMap();this.threshold threshold;}Overridepublic void processElement(Tuple3Integer, Integer, Long value, Context ctx, CollectorString out) throws Exception {//进入这个方法只是一条数据要排序要存起来不同的窗口要分开存//1、存到HashMap中Long windowEnd value.f2;if (dataListMap.containsKey(windowEnd)){//1.1 包含vc 不是该vc的第一条直接加到list中ListTuple3Integer, Integer, Long tuple3List dataListMap.get(windowEnd);tuple3List.add(value);}else {//1.1 不包含vc是该vc的第一条需要初始化listListTuple3Integer, Integer, Long dataList new ArrayList();dataList.add(value);dataListMap.put(windowEnd,dataList);}//2、注册一个定时器WindowsEnd1ms即可同一个窗口范围应该同时输出的只不过是一条条调用ProcessElement方法只需延迟1msctx.timerService().registerProcessingTimeTimer(windowEnd1);}Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorString out) throws Exception {super.onTimer(timestamp, ctx, out);//定时器触发同一个窗口范围的计算结果攒齐了开始、排序、取TopNLong windowEnd ctx.getCurrentKey();//1、排序ListTuple3Integer, Integer, Long dataList dataListMap.get(windowEnd);dataList.sort(new ComparatorTuple3Integer, Integer, Long() {Overridepublic int compare(Tuple3Integer, Integer, Long o1, Tuple3Integer, Integer, Long o2) {return o2.f1-o1.f1;}});//2、取TopNStringBuilder outStr new StringBuilder();outStr.append(\n);//遍历 排序后的list取出前threshold个dataList要是不够dataList个取dataList.size()for (int i 0; i Math.min(threshold,dataList.size()); i) {Tuple3Integer, Integer, Long vcCount dataList.get(i);outStr.append(Top(i1)\n);outStr.append(vcvcCount.f0\n);outStr.append(countvcCount.f1 \n);outStr.append(窗口结束时间 vcCount.f2 \n);}//用完的list及时清理dataList.clear();out.collect(outStr.toString());}} } 13.4、侧输出流 使用侧输出流实现水位告警 package process;import bean.WaterSensor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.TimerService; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag;import java.time.Duration;/*** Title:* Author lizhe* Package process* Date 2024/6/9 12:29* description:*/ public class SideOutputDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);WatermarkStrategyWaterSensor waterSensorWatermarkStrategy WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) - {return element.getTs() * 1000L;});SingleOutputStreamOperatorWaterSensor singleOutputStreamOperator env.socketTextStream(192.168.132.101, 7777).map(value - {String[] datas value.split(,);return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);final OutputTagString warnTag new OutputTag(warn, Types.STRING);SingleOutputStreamOperatorWaterSensor process singleOutputStreamOperator.keyBy(value - value.getId()).process(new KeyedProcessFunctionString, WaterSensor, WaterSensor() {Overridepublic void processElement(WaterSensor value, Context ctx, CollectorWaterSensor out) throws Exception {//使用侧输出流告警if (value.getVc() 10) {ctx.output(warnTag, 当前水位 value.getVc() 大于阈值10);}out.collect(value);}});process.print();process.getSideOutput(warnTag).printToErr(warn);env.execute();} } 14、状态管理 14.1、flink的状态 分为有状态和无状态两种。 无状态的算子任务只要观察每个独立事件根据当前输入的数据直接转换输出结果。如map、filter、flatMap。 有状态算子任务除当前数据外还要其他数据来得到计算结果。“其他数据”就是状态。如聚合算子、窗口算子。 状态的分类 托管状态和原始状态 托管状态由flink统一管理使用时只需要调用相应接口。 原始状态自定义的相当于开辟了一块内存自己管理自己实现状态的序列化和故障恢复。 通常采用flink托管状态重点 算子状态和按键分区状态 通过keyby()函数的称为按键分区状态其他为算子状态 14.2、算子状态 对于一个并行子任务处理的所有数据都会访问到相同的状态状态对于同一任务而言是共享的。 算子状态可以用在所有算子上类似本地变量。 14.3、按键分区状态 状态根据输入流中定义的键来维护和访问也就keyby后能用。 14.3.1、值状态 状态只保存一个值。 水位相差10则报警 package state;import bean.WaterSensor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector;import java.io.Serializable; import java.time.Duration;/*** Title: KeyedValueStateDemo* Author lizhe* Package state* Date 2024/6/11 20:54* description:水位相差10则报警*/ public class KeyedValueStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(192.168.132.101, 7777).map(value - {String[] datas value.split(,);return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) - {return element.getTs() * 1000L;}));sensorDS.keyBy(value - value.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {ValueStateInteger lastVcState;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);ValueStateDescriptor lastVc new ValueStateDescriptorInteger(lastVc, Integer.class);lastVcStategetRuntimeContext().getState(lastVc);}Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {int lastVc lastVcState.value() null ? 0 : lastVcState.value();if (Math.abs(value.getVc()-lastVc)10) {out.collect(传感器idvalue.getId(),当前水位值value.getVc()上一条水水位值lastVc相差超过10);}lastVcState.update(value.getVc());}}).print();env.execute();} } 14.3.2、列表状态 将要保存的数据以列表形式进行保存 针对每种传感器输出最高的三个水位值 package state;import bean.WaterSensor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector;import java.time.Duration; import java.util.ArrayList;/*** Title:* Author lizhe* Package state* Date 2024/6/11 20:54* description:针对每种传感器输出最高的三个水位值*/ public class KeyedListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(192.168.132.101, 7777).map(value - {String[] datas value.split(,);return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) - {return element.getTs() * 1000L;}));sensorDS.keyBy(value - value.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {ListStateInteger vcListState;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcListState getRuntimeContext().getListState(new ListStateDescriptorInteger(vcListState, Integer.class));}Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {vcListState.add(value.getVc());ArrayListInteger arrayList new ArrayList();for (Integer vc : vcListState.get()) {arrayList.add(vc);}arrayList.sort((o1,o2)-{return o2-o1;});if (arrayList.size() 3) {arrayList.remove(3);}out.collect(传感器idvalue.getId(),最大三个水位值arrayList.toString());vcListState.update(arrayList);}}).print();env.execute();} } 14.3.3、map状态 把键值对最为状态保存起来 统计每种传感器每种水位值出现的次数 package state;import bean.WaterSensor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector;import java.time.Duration; import java.util.ArrayList; import java.util.Map;/*** Title:* Author lizhe* Package state* Date 2024/6/11 20:54* description:统计每种传感器每种水位值出现的次数*/ public class KeyedMapStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(192.168.132.101, 7777).map(value - {String[] datas value.split(,);return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) - {return element.getTs() * 1000L;}));sensorDS.keyBy(value - value.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {MapStateInteger,Integer vcCountMapState;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcCountMapState getRuntimeContext().getMapState(new MapStateDescriptorInteger, Integer(vcCountMapState, Integer.class, Integer.class));}Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {Integer vc value.getVc();if (vcCountMapState.contains(vc)){int count vcCountMapState.get(vc) ;vcCountMapState.put(vc,count);}else {vcCountMapState.put(vc, 1);}StringBuilder outStr new StringBuilder();outStr.append(传感器id为value.getId()\n);for (Map.EntryInteger, Integer vcCount : vcCountMapState.entries()) {outStr.append(vcCount.toString()\n);}outStr.append();out.collect(outStr.toString());}}).print();env.execute();} } 14.3.4、规约状态 对添加的数据进行规约将规约聚合后的值作为状态保存 计算每种传感器的水位和 package state;import bean.WaterSensor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector;import java.time.Duration;/*** Title:* Author lizhe* Package state* Date 2024/6/11 20:54* description:计算每种传感器的水位和*/ public class KeyedReduceStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(192.168.132.101, 7777).map(value - {String[] datas value.split(,);return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) - {return element.getTs() * 1000L;}));sensorDS.keyBy(value - value.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {ReducingStateInteger vcSum;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcSum getRuntimeContext().getReducingState(new ReducingStateDescriptorInteger(vcSum, new ReduceFunctionInteger() {Overridepublic Integer reduce(Integer value1, Integer value2) throws Exception {return value1 value2;}}, Integer.class));}Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {vcSum.add(value.getVc());out.collect(传感器idvalue.getId()水位值和vcSum.get());}}).print();env.execute();} } 14.3.5、聚合状态 类似规约状态相比于规约状态聚合里有个累加器来表示状态聚合的状态类型可与输入数据类型不同 计算水位平均值 package state;import bean.WaterSensor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.AggregatingState; import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector;import java.time.Duration;/*** Title:* Author lizhe* Package state* Date 2024/6/11 20:54* description:计算水位平均值*/ public class KeyedAggregateStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(192.168.132.101, 7777).map(value - {String[] datas value.split(,);return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) - {return element.getTs() * 1000L;}));sensorDS.keyBy(value - value.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {AggregatingStateInteger,Double vcAggregatingState;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcAggregatingState getRuntimeContext().getAggregatingState(new AggregatingStateDescriptorInteger, Tuple2Integer, Integer, Double(vcAggregatingState, new AggregateFunctionInteger, Tuple2Integer, Integer, Double() {Overridepublic Tuple2Integer, Integer createAccumulator() {return Tuple2.of(0,0);}Overridepublic Tuple2Integer, Integer add(Integer value, Tuple2Integer, Integer accumulator) {return Tuple2.of(accumulator.f0value,accumulator.f1 1);}Overridepublic Double getResult(Tuple2Integer, Integer accumulator) {return accumulator.f0*1D/accumulator.f1;}Overridepublic Tuple2Integer, Integer merge(Tuple2Integer, Integer a, Tuple2Integer, Integer b) {return null;}}, Types.TUPLE(Types.INT, Types.INT)));}Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {vcAggregatingState.add(value.getVc());Double vcAvg vcAggregatingState.get();out.collect(传感器idvalue.getId()平均水位vcAvg);}}).print();env.execute();} } 14.3.6、状态生存时间TTL 状态创建时候失效时间当前时间TTL。可对时效时间进行更新创建配置对象调用状态描述器启动TTL package state;import bean.WaterSensor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector;import java.time.Duration;/*** Title: KeyedValueStateDemo* Author lizhe* Package state* Date 2024/6/11 20:54* description:*/ public class KeyedStateTTLDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(192.168.132.101, 7777).map(value - {String[] datas value.split(,);return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) - {return element.getTs() * 1000L;}));sensorDS.keyBy(value - value.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {ValueStateInteger lastVcState;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);StateTtlConfig stateTtlConfig StateTtlConfig.newBuilder(Time.seconds(5))//过期时间5s.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//状态的创建和写入会刷新过期时间.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)//不返回过期的状态值.build();ValueStateDescriptorInteger lastVc new ValueStateDescriptor(lastVc, Integer.class);lastVc.enableTimeToLive(stateTtlConfig);lastVcStategetRuntimeContext().getState(lastVc);}Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {Integer value1 lastVcState.value();out.collect(keyvalue.getId()状态值 value1);lastVcState.update(value.getVc());}}).print();env.execute();} } 14.4、算子状态 状态分为列表状态ListState、联合列表状态ListUnionState、广播状态BroadcastState 算子并行实例上定义的状态作用范围被限定为当前算子任务。 package state;import bean.WaterSensor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration; import java.util.List;/*** Title: OperatorListDemo* Author lizhe* Package state* Date 2024/6/13 21:50* description:在map算子中计算数据个数*/ public class OperatorListDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream(192.168.132.101, 7777).map(new MyCountMapFunction()).print();env.execute();}public static class MyCountMapFunction implements MapFunctionString,Long, CheckpointedFunction{private long count 0L;private ListStateLong state;Overridepublic Long map(String value) throws Exception {return count;}/*** 本地变量持久化将本地变量拷贝到算子状态* param context* throws Exception*/Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println(snapshotState);state.clear();state.add(count);}/*** 初始化本地变量程序恢复时从状态中将数据添加到本地变量每个子任务调用一次* param context* throws Exception*/Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println(initializeState);state context.getOperatorStateStore().getListState(new ListStateDescriptorLong(state, Long.class));if (context.isRestored()){for (Long aLong : state.get()) {countaLong;}}}} } 算子状态List与UnionList区别 list状态轮询均分给新的子任务UnionList状态将原先多个子任务状态的合并成一份完整的。给新的并行子任务每人一份完整的 广播状态算子并行子任务都保持同一份全局状态。 水位超过指定的阈值发送告警阈值可以动态修改 package state;import bean.WaterSensor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.*; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector;/*** Title: OperatorListDemo* Author lizhe* Package state* Date 2024/6/13 21:50* description:水位超过指定的阈值发送告警阈值可以动态修改*/ public class OperatorBroadcastStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(192.168.132.101, 7777).map(value - {String[] datas value.split(,);return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));});//配置流用来广播配置DataStreamSourceString configDS env.socketTextStream(192.168.132.101, 8888);final MapStateDescriptorString, Integer descriptor new MapStateDescriptor(configDS, String.class, Integer.class);BroadcastStreamString broadcastStream configDS.broadcast(descriptor);BroadcastConnectedStreamWaterSensor, String connect sensorDS.connect(broadcastStream);connect.process(new BroadcastProcessFunctionWaterSensor, String, String() {/*** 数据流的处理方法* param value* param ctx* param out* throws Exception*/Overridepublic void processElement(WaterSensor value, ReadOnlyContext ctx, CollectorString out) throws Exception {ReadOnlyBroadcastStateString, Integer broadcastState ctx.getBroadcastState(descriptor);Integer integer broadcastState.get(threshold);//如果数据流先来广播流为空要判空integerintegernull ?0:integer;if (value.getVc()integer){out.collect(超过阈值阈值integer);}}/*** 广播后的配置流处理方法* param value* param ctx* param out* throws Exception*/Overridepublic void processBroadcastElement(String value, Context ctx, CollectorString out) throws Exception {BroadcastStateString, Integer state ctx.getBroadcastState(descriptor);state.put(threshold,Integer.valueOf(value));}}).print();env.execute();}} 14.5、状态后端 状态的存储、访问以及维护都是由一个可插拔的组件决定的这个组件为状态后端主要负责管理本地状态的存储方式和位置。 14.5.1、状态后端分类 状态后端开箱即用可不改变程序逻辑独立配置。有两种一种为哈希表状态后端默认一种为内嵌RocksDB状态后端。 哈希表状态后端状态存在内存直接把状态当对象存在TaskManager的JVM堆上以键值对方式存储。RocksDB状态后端RocksDB是kv型数据库将数据存到硬盘。
http://www.w-s-a.com/news/610363/

相关文章:

  • 系列图标设计网站推荐建商城网站
  • 中牟建设工程信息网站黑龙江 哈尔滨
  • 网站设计基本结构wap自助建论坛网站
  • 专业番禺网站建设爱做网站外国
  • 深圳罗湖网站设计公司价格制作网站的公司办什么营业执照
  • 长清网站建设价格群辉NAS搭建wordpress
  • 变更股东怎样在工商网站做公示网站建设和网站优化哪个更重要
  • 西安手机网站python网站开发效率
  • 深圳建站的公司羽毛球赛事2022直播
  • j2ee网站开发搜索推广的流程
  • 网站目录结构图虚拟主机如何安装WordPress
  • 信产部网站备案保定软件开发网站制作
  • 东莞网站设计定做东莞网站建设最牛
  • 网站开发的软件天猫的网站导航怎么做的
  • 做链接哪个网站好网站建设平台方案设计
  • 资质升级业绩备案在哪个网站做网站建设方案费用预算
  • 做网站找哪个平台好wordpress 3.9 性能
  • 大兴模版网站建设公司企业网站备案案例
  • h5建站是什么wordpress客户端 接口
  • 济南自适应网站建设制作软件下载
  • 望都网站建设抖音广告投放收费标准
  • 网站制作软件排行榜上海市网站建设公司58
  • 什么是网站风格中国工商网企业查询官网
  • 专业建设专题网站wordpress lnmp wamp
  • 环保网站 下载页网站
  • 开源小程序模板江门关键词优化排名
  • 网站开发 知乎房地产型网站建设
  • 买完域名网站怎么设计wordpress 纯代码
  • 公司网站怎么做百度竞价宁波网络公司哪家好
  • 河西网站建设制作微信分销系统多层