当前位置: 首页 > news >正文

网站生成手机版深圳网架制作

网站生成手机版,深圳网架制作,百度知道提问首页,团工作网站建设意见基本处理函数#xff08;ProcessFunction#xff09; stream.process(new MyProcessFunction())方法需要传入一个 ProcessFunction 作为参数#xff0c;ProcessFunction 不是接口 #xff0c; 而是一个抽象类 #xff0c;继承了AbstractRichFunction#xff0c;所有的处…基本处理函数ProcessFunction stream.process(new MyProcessFunction())方法需要传入一个 ProcessFunction 作为参数ProcessFunction 不是接口 而是一个抽象类 继承了AbstractRichFunction所有的处理函数都是富函数RichFunction拥有富函数所有功能。 // 泛型 // Type parameters:I – Type of the input elements. 输入类型 // O – Type of the output elements. 输出类型 public abstract class ProcessFunctionI, O extends AbstractRichFunction {public abstract void processElement(I value, Context ctx, CollectorO out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, CollectorO out) throws Exception {} } 1抽象方法.processElement() “处理元素”定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次参数包括三个输入数据值 value上下文 ctx以及“收集器”Collectorout。 value当前流中的输入元素ctx类型是 ProcessFunction 中定义的内部抽象类 Context表示当前运行的上下文可以获取到当前的时间戳并提供了用于查询时间和注册定时器的“定时服务”TimerService以及可以将数据发送到“侧输出流”side output的方法.output()。out“收集器”类型为 Collector用于返回输出数据。调用 out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用也可以不调用。 ProcessFunction 可以轻松实现flatMap、map、filter 这样的基本转换功能而通过富函数提供的获取上下文方法.getRuntimeContext()也可以自定义状态state进行处理。 2非抽象方法.onTimer() 只有在注册好的定时器触发的时候才会调用而定时器是通过“定时服务”TimerService 来注册的。 三个参数时间戳timestamp上下文ctx以及收集器out。 timestamp指设定好的触发时间事件时间语义下是水位线 ctx同样可以调用定时服务TimerService 采集器任意输出处理之后的数据 .onTimer()方法定时触发因此ProcessFunction可以自定义数据按照时间分组 、 定时触发计算输出结果这 就实现了**窗口window **的功能。所以说ProcessFunction 可以实现一切功能。 注意在 Flink 中只有**“按键分区流”KeyedStream 才支持设置定时器的操作**。 处理函数的分类8大处理函数 1ProcessFunction 最基本的处理函数基于 DataStream 直接调用.process()时作为参数传入。 2KeyedProcessFunction 对流按键分区后的处理函数基于 KeyedStream 调用.process()时作为参数传入。要想使用定时器必须基于 KeyedStream。 3ProcessWindowFunction 开窗之后的处理函数也是全窗口函数的代表。基于 WindowedStream调用.process()时作为参数传入。 4ProcessAllWindowFunction 同样是开窗之后的处理函数基于 AllWindowedStream 调用.process()时作为参数传入 5CoProcessFunction 合并connect两条流之后的处理函数基于 ConnectedStreams 调用.process()时作为参数传入 6ProcessJoinFunction 间隔连接interval join两条流之后的处理函数基于 IntervalJoined 调用.process()时作为参数传入。 7BroadcastProcessFunction 广播连接流处理函数基于 BroadcastConnectedStream 调用.process()时作为参数传入。 “广播连接流”BroadcastConnectedStream是一个未 keyBy 的普通DataStream与一个广播流BroadcastStream做连接conncet之后的产物。 8KeyedBroadcastProcessFunction 按键分区的广播连接流处理函数同样是基于 BroadcastConnectedStream调用.process()时作为参数传 入 。 一个KeyedStream 与广播流BroadcastStream做连接之后的产物。 按键分区处理函数KeyedProcessFunction 定时器Timer和定时服务TimerService ProcessFunction 的上下文Context中提供了.timerService()方法可以直接返回一个 TimerService 对象。 TimerService包含以下六个方法 // 获取当前的处理时间 long currentProcessingTime(); // 获取当前的水位线事件时间 long currentWatermark(); // 注册处理时间定时器当处理时间超过 time 时触发 void registerProcessingTimeTimer(long time); // 注册事件时间定时器当水位线超过 time 时触发 void registerEventTimeTimer(long time); // 删除触发时间为 time 的处理时间定时器 void deleteProcessingTimeTimer(long time); // 删除触发时间为 time 的处理时间定时器 void deleteEventTimeTimer(long time);六个方法可以分成两大类基于处理时间和基于事件时间。 TimerService 会以键key和时间戳为标准对定时器进行去重每个key和时间戳最多只有一个定时器如果注册了多次onTimer()方法也将只被调用一次。 案例 public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(124.222.253.33, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));// 传感器Id keyByKeyedStreamWaterSensor, String sensorKS sensorDS.keyBy(WaterSensor::getId);sensorKS.process(new KeyedProcessFunctionString, WaterSensor, String() {/*** 来一条数据调用一次*/Overridepublic void processElement(WaterSensor value, KeyedProcessFunctionString, WaterSensor, String.Context ctx, CollectorString out) throws Exception {// 获取当前数据的 keyString currentKey ctx.getCurrentKey();// TODO 1.定时器注册TimerService timerService ctx.timerService();// 1、事件时间的案例Long currentEventTime ctx.timestamp();//数据中提取出来的事件时间timerService.registerEventTimeTimer(5000L);System.out.println( 当前key currentKey ,当前时间 currentEventTime ,注册了一个5s 的定时器);// 2、处理时间的案例// long currentTs timerService.currentProcessingTime();// timerService.registerProcessingTimeTimer(currentTs 5000L);// System.out.println( 当前keycurrentKey ,当前时间 currentTs ,注册了一个5s 后的定时器);// 3、获取 process 的 当前watermark// long currentWatermark timerService.currentWatermark();// System.out.println(当前数据 value,当前 watermark currentWatermark);// 注册定时器 处理时间、事件时间// timerService.registerProcessingTimeTimer();// timerService.registerEventTimeTimer();// 删除定时器 处理时间、事件时间// timerService.deleteEventTimeTimer();// timerService.deleteProcessingTimeTimer();// 获取当前时间进展 处理时间-当前系统时间事件时间-当前 watermark// long currentTs timerService.currentProcessingTime();}/*** .时间进展到定时器注册的时间调用该方法* param timestamp 当前时间进展就是定时器被触发时的时间*/Overridepublic void onTimer(long timestamp, KeyedProcessFunctionString, WaterSensor, String.OnTimerContext ctx, CollectorString out) throws Exception {super.onTimer(timestamp, ctx, out);String currentKey ctx.getCurrentKey();System.out.println(key currentKey 现在时间是 timestamp 定时器触发);}}).print();env.execute();} }测试结果 注册多个定时器但是时间到了只触发一次。 窗口处理函数 ProcessWindowFunction 和 ProcessAllWindowFunctionProcessAllWindowFunction没有 keyBy 的数据流直接开窗并调用.process()方法 stream.keyBy( t - t.f0 ) .window( TumblingEventTimeWindows.of(Time.seconds(10))).process(new MyProcessWindowFunction())/* 泛型* Type parameters:* IN – The type of the input value. 输入类型* OUT – The type of the output value. 输出类型* KEY – The type of the key. key类型* W – The type of Window that this window function can be applied on. 窗口类型*/ public abstract class ProcessWindowFunctionIN, OUT, KEY, W extends Windowextends AbstractRichFunction {public abstract void process(KEY key, Context context, IterableIN elements, CollectorOUT out) throws Exception;public void clear(Context context) throws Exception {} }抽象方法process key窗口做统计计算基于的键也就是之前 keyBy 用来分区的字段。context当前窗口进行计算的上下文它的类型就是ProcessWindowFunction内部定义的抽象类 Context。elements窗口收集到用来计算的所有数据这是一个可迭代的集合类型。out收集器 上下文调用函数 public abstract class Context implements java.io.Serializable {public abstract W window();public abstract long currentProcessingTime();public abstract long currentWatermark();// 窗口状态public abstract KeyedStateStore windowState();// 全局状态public abstract KeyedStateStore globalState();// 定义侧输出流public abstract X void output(OutputTagX outputTag, X value);}TopN 需求实时统计一段时间内的出现次数最多的水位。例如统计最近10 秒钟内出现次数最多的两个水位并且每 5 秒钟更新一次。 创建实体类 public class WaterSensor {/*** 传感器Id*/public String id;/*** 时间戳*/public Long ts;/*** 水位*/public Integer vc; }方法一使用 ProcessAllWindowFunction public class ProcessAllWindowTopNDemo {public static void main(String[] args) {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(124.222.253.33, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));// 滑动窗口sensorDS.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).process(new MyTopNPAWF()).print();} }// 抽取窗口函数 public class MyTopNPAWF extends ProcessAllWindowFunctionWaterSensor, String, TimeWindow {Overridepublic void process(ProcessAllWindowFunctionWaterSensor, String, TimeWindow.Context context, IterableWaterSensor elements, CollectorString out) throws Exception {MapInteger, Integer vcCountMap new HashMap();for (WaterSensor element : elements) {// 统计不同水位出现次数vcCountMap.put(element.getVc(), vcCountMap.getOrDefault(element.getVc(), 0) 1);}// 对 count 值进行排序: 利用 List 来实现排序ListTuple2Integer, Integer datas new ArrayList();for (Integer vc : vcCountMap.keySet()) {datas.add(Tuple2.of(vc, vcCountMap.get(vc)));}// 对 List 进行排序根据 count 值 降序datas.sort(new ComparatorTuple2Integer, Integer() {Overridepublic int compare(Tuple2Integer, Integer o1, Tuple2Integer, Integer o2) {// 降序 后 减 前return o2.f1 - o1.f1;}});StringBuilder outStr new StringBuilder();outStr.append(\n);// 遍历 排序后的 List取出前 2 个 考虑可能List 不够2个的情况》 List 中元素的个数 和 2 取最小值for (int i 0; i Math.min(2, datas.size()); i) {Tuple2Integer, Integer vcCount datas.get(i);outStr.append(Top).append(i 1).append(\n);outStr.append(vc).append(vcCount.f0).append(\n);outStr.append(count).append(vcCount.f1).append(\n);outStr.append( 窗 口 结束时间).append(DateFormatUtils.format(context.window().getEnd(), yyyy-MM-ddHH:mm:ss.SSS)).append(\n);outStr.append(\n);}out.collect(outStr.toString());} }无论并行度如何设置并行度只为1。效率不高 方法二使用 KeyedProcessFunction ☆ 从两个方面去做优化一是对数据进行按键分区分别统计vc 的出现次数二是进行增量聚合得到结果最后再做排序输出。 public class KeyedProcessFunctionTopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(124.222.253.33, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));// 【水位分组】KeyedStreamWaterSensor, Integer keyedStream sensorDS.keyBy(WaterSensor::getVc);/*思路二 使用 KeyedProcessFunction 实现1、按照 vc 做 keyby开窗分别 count》 增量聚合计算 count》 全窗口对计算结果 count 值封装带上窗口结束时间的标签》 为了让同一个窗口时间范围的计算结果到一起去2、对同一个窗口范围的 count 值进行处理排序、取前N 个》 按照 windowEnd 做 keyby》 使用 process 来一条调用一次需要先存分开存用HashMap,keywindowEnd,valueList》 使用定时器对 存起来的结果 进行排序、取前N个*/// 1. 按照 vc 分组、开窗、聚合增量计算全量打标签// 开窗聚合后就是普通的流没有了窗口信息需要自己打上窗口的标记windowEndSingleOutputStreamOperatorTuple3Integer, Integer, Long windowAgg keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(new VcCountAgg(),new WindowResult());// 2. 按照窗口标签窗口结束时间keyby保证同一个窗口时间范围的结果到一起去。排序、取 TopNwindowAgg.keyBy(r - r.f2).process(new TopN(2)).print();env.execute();}// 【同水位累加】public static class VcCountAgg implements AggregateFunctionWaterSensor, Integer, Integer{Overridepublic Integer createAccumulator() {return 0;}Overridepublic Integer add(WaterSensor value, Integer accumulator) {return accumulator 1;}Overridepublic Integer getResult(Integer accumulator) {return accumulator;}Overridepublic Integer merge(Integer a, Integer b) {return null;}}/*** 【打时间标签】* 泛型如下* 第一个输入类型 增量函数的输出 count 值Integer* 第二个输出类型 Tuple3(vccountwindowEnd) ,带上窗口结束时间的标签* 第三个key 类型 vcInteger* 第四个窗口类型*/public static class WindowResult extends ProcessWindowFunctionInteger, Tuple3Integer, Integer, Long, Integer, TimeWindow {Overridepublic void process(Integer key, Context context, IterableInteger elements, CollectorTuple3Integer, Integer, Long out) throws Exception {// 迭代器里面只有一条数据next 一次即可Integer count elements.iterator().next();long windowEnd context.window().getEnd();out.collect(Tuple3.of(key, count, windowEnd));}}public static class TopN extends KeyedProcessFunctionLong, Tuple3Integer, Integer, Long, String {// 存不同窗口的 统计结果keywindowEndvaluelist 数据private MapLong, ListTuple3Integer, Integer, Long dataListMap;// 要取的 Top 数量private int threshold;public TopN(int threshold) {this.threshold threshold;dataListMap new HashMap();}Overridepublic void processElement(Tuple3Integer, Integer, Long value, Context ctx, CollectorString out) throws Exception {// 进入这个方法只是一条数据要排序得到齐才行》存起来不同窗口分开存// 1. 存到 HashMap 中Long windowEnd value.f2;if (dataListMap.containsKey(windowEnd)) {// 1.1 包含 vc不是该 vc 的第一条直接添加到List中ListTuple3Integer, Integer, Long dataList dataListMap.get(windowEnd);dataList.add(value);} else {// 1.1 不包含 vc是该 vc 的第一条需要初始化listListTuple3Integer, Integer, Long dataList new ArrayList();dataList.add(value);dataListMap.put(windowEnd, dataList);}// 2. 注册一个定时器 windowEnd1ms 即可 延迟1ms 触发即可及时性ctx.timerService().registerEventTimeTimer(windowEnd 1);}Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorString out) throws Exception {super.onTimer(timestamp, ctx, out);// 定时器触发同一个窗口范围的计算结果攒齐了开始排序、取TopNLong windowEnd ctx.getCurrentKey();// 1. 排序ListTuple3Integer, Integer, Long dataList dataListMap.get(windowEnd);dataList.sort(new ComparatorTuple3Integer, Integer, Long() {Overridepublic int compare(Tuple3Integer, Integer, Long o1, Tuple3Integer, Integer, Long o2) {return o2.f1 - o1.f1;}});// 2. 取 TopNStringBuilder outStr new StringBuilder();outStr.append(\n);for (int i 0; i Math.min(threshold, dataList.size()); i) {Tuple3Integer, Integer, Long vcCount dataList.get(i);outStr.append(Top).append(i 1).append(\n);outStr.append(vc).append(vcCount.f0).append(\n);outStr.append(count).append(vcCount.f1).append(\n);outStr.append(窗口结束时间).append(vcCount.f2).append(\n);outStr.append(\n);}// 用完的 List及时清理节省资源dataList.clear();out.collect(outStr.toString());}} }增量聚合、开窗处理 水位线分组增量聚合相同水位线数量1窗口函数打时间标签按上述打的时间标签分组排序获取topNprocess 侧输出流 process函数带侧输出流 案例对每个传感器水位超过 10 的输出告警信息 public class SideOutputDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(124.222.253.33, 7777).map(new WaterSensorMapFunction());OutputTagString warnTag new OutputTag(warn, Types.STRING);// 传感器分组SingleOutputStreamOperatorWaterSensor process sensorDS.keyBy(WaterSensor::getId).process(new KeyedProcessFunctionString, WaterSensor, WaterSensor() {Overridepublic void processElement(WaterSensor value, Context ctx, CollectorWaterSensor out) throws Exception {// 使用侧输出流告警String currentKey ctx.getCurrentKey();if (value.getVc() 10) {ctx.output(warnTag, 当前传感器 currentKey ,当前水位 value.getVc() ,大于阈值 10);}// 主流正常 发送数据out.collect(value);}});process.print(主流);process.getSideOutput(warnTag).printToErr(warn);env.execute();} }测流输出的同时不影响主流
http://www.w-s-a.com/news/353624/

相关文章:

  • 推广一个网站需要什么官网首页设计
  • 淘宝建设网站的理由企业官网建设哪家好
  • 青岛网站推wordpress主题切换
  • 天元建设集团有限公司资质郑州网站seo推广
  • 免费网站后台管理系统模板下载百度网盘app下载安装
  • 开封网站建设培训郑州高端网站建设哪家好
  • 东莞哪家做网站很有名的公司即墨专业医院网站制作公司
  • 做面食网站china cd wordpress
  • 门户网站 营销优秀建筑模型案例作品
  • 训做网站的心得体会范文中山市 有限公司网站建设
  • 服装电子商务网站建设过程与实现两学一做学习教育网站
  • 住房和城建设网站怎么用源码建站
  • 监理工程师证查询网站百度关键词优化软件网站
  • 关于建筑建设的网站asp网站建设报告书
  • 服务二级公司网站建设平台销售模式有哪些
  • 南昌县建设局网站微信分销小程序开发
  • 网站设计师需要什么知识与技能wordpress个性
  • 做茶叶网站的目的和规划有什么做照片书的网站
  • 开福区城乡建设局门户网站关键词挖掘查询工具爱站网
  • 网站建设全国排名沈阳seo按天计费
  • 成都公司网站设计无锡seo网站推广费用
  • 建网站平台要多少钱购物网站界面设计策划
  • 学完js了可以做哪些网站长沙建站官网
  • 怎么样做问卷网站多少钱英语
  • 房产网站建设方案建筑公司是干什么的
  • wordpress建的大型网站柳州市网站建设
  • 石家庄做网站的公司有哪些微信自媒体网站建设
  • 池州哪里有做网站注册公司有哪些风险
  • 做古代风格头像的网站对网站政务建设的建议
  • 网站搜索栏怎么做设计个网站要多少钱