网址搜索器,seo职位是什么意思,用软件做网站,网站原图怎么做按键分区处理函数#xff08;KeyedProcessFunction#xff09;#xff1a;先进行分区#xff0c;然后定义处理操作
1.定时器#xff08;Timer#xff09;和定时服务#xff08;TimerService#xff09;
定时器#xff08;timers#xff09;是处理函数中进行时间相关… 按键分区处理函数KeyedProcessFunction先进行分区然后定义处理操作
1.定时器Timer和定时服务TimerService
定时器timers是处理函数中进行时间相关操作的主要机制定时服务TimerService提供了注册定时器的功能
TimerService 是 Flink 关于时间和定时器的基础服务接口
// 获取当前的处理时间
long currentProcessingTime();
// 获取当前的水位线事件时间
long currentWatermark();
// 注册处理时间定时器当处理时间超过 time 时触发
void registerProcessingTimeTimer(long time);
// 注册事件时间定时器当水位线超过 time 时触发
void registerEventTimeTimer(long time);
// 删除触发时间为 time 的处理时间定时器
void deleteProcessingTimeTimer(long time);
// 删除触发时间为 time 的事件时间定时器
void deleteEventTimeTimer(long time);
六个方法可以分成两大类基于处理时间和基于事件时间。而对应的操作主要有三个获取当前时间注册定时器以及删除定时器 尽管处理函数中都可以直接访问TimerService不过只有基于 KeyedStream 的处理函数才能去调用注册和删除定时器的方法未作按键分区的 DataStream 不支持定时器操作只能获取当前时间 对于处理时间和事件时间这两种类型的定时器TimerService 内部会用一个优先队列将它们的时间戳保存起来排队等待执行可以认为定时器其实是 KeyedStream上处理算子的一个状态它以时间戳作为区分。所以 TimerService 会以键key和时间戳为标准对定时器进行去重也就是说对于每个 key 和时间戳最多只有一个定时器如果注册了多次onTimer()方法也将只被调用一次
基于 KeyedStream 注册定时器时会传入一个定时器触发的时间戳这个时间戳的定时器对于每个 key 都是有效的利用这个特性有时我们可以故意降低时间戳的精度来减少定时器的数量从而提高处理性能。比如我们可以在设置定时器时只保留整秒数那么定时器的触发频率就是最多 1 秒一次
long coalescedTime time / 1000 * 1000; //时间戳定时器默认的区分精度是毫秒
ctx.timerService().registerProcessingTimeTimer(coalescedTime); //注册定时器
2.KeyedProcessFunction 的使用
基础用法
stream.keyBy( t - t.f0 ).process(new MyKeyedProcessFunction())
这里的MyKeyedProcessFunction即是KeyedProcessFunction的一个实现类
源码解析
KeyedProcessFunction源码如下
public abstract class KeyedProcessFunctionK, I, O extends AbstractRichFunction {private static final long serialVersionUID 1L;/*** Process one element from the input stream.** pThis function can output zero or more elements using the {link Collector} parameter and* also update internal state or set timers using the {link Context} parameter.** param value The input value.* param ctx A {link Context} that allows querying the timestamp of the element and getting a* {link TimerService} for registering timers and querying the time. The context is only* valid during the invocation of this method, do not store it.* param out The collector for returning result values.* throws Exception This method may throw exceptions. Throwing an exception will cause the* operation to fail and may trigger recovery.*/public abstract void processElement(I value, Context ctx, CollectorO out) throws Exception;/*** Called when a timer set using {link TimerService} fires.** param timestamp The timestamp of the firing timer.* param ctx An {link OnTimerContext} that allows querying the timestamp, the {link* TimeDomain}, and the key of the firing timer and getting a {link TimerService} for* registering timers and querying the time. The context is only valid during the invocation* of this method, do not store it.* param out The collector for returning result values.* throws Exception This method may throw exceptions. Throwing an exception will cause the* operation to fail and may trigger recovery.*/public void onTimer(long timestamp, OnTimerContext ctx, CollectorO out) throws Exception {}/*** Information available in an invocation of {link #processElement(Object, Context, Collector)}* or {link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class Context {/*** Timestamp of the element currently being processed or timestamp of a firing timer.** pThis might be {code null}, for example if the time characteristic of your program is* set to {link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.*/public abstract Long timestamp();/** A {link TimerService} for querying time and registering timers. */public abstract TimerService timerService();/*** Emits a record to the side output identified by the {link OutputTag}.** param outputTag the {code OutputTag} that identifies the side output to emit to.* param value The record to emit.*/public abstract X void output(OutputTagX outputTag, X value);/** Get key of the element being processed. */public abstract K getCurrentKey();}/*** Information available in an invocation of {link #onTimer(long, OnTimerContext, Collector)}.*/public abstract class OnTimerContext extends Context {/** The {link TimeDomain} of the firing timer. */public abstract TimeDomain timeDomain();/** Get key of the firing timer. */Overridepublic abstract K getCurrentKey();}
}可以看到和ProcessFunction类似都有一个processElement()和onTimer()方法并且定义了一个Context抽象类不同点在于类型参数多了一个K也就是key的类型
代码示例
①处理时间语义
public class ProcessingTimeTimerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 处理时间语义不需要分配时间戳和watermarkSingleOutputStreamOperatorEvent stream env.addSource(new ClickSource());// 要用定时器必须基于KeyedStreamstream.keyBy(data - true).process(new KeyedProcessFunctionBoolean, Event, String() {Overridepublic void processElement(Event value, Context ctx, CollectorString out) throws Exception {Long currTs ctx.timerService().currentProcessingTime();out.collect(数据到达到达时间 new Timestamp(currTs));// 注册一个10秒后的定时器ctx.timerService().registerProcessingTimeTimer(currTs 10 * 1000L);}Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorString out) throws Exception {out.collect(定时器触发触发时间 new Timestamp(timestamp));}}).print();env.execute();}
}
通过ctx.timerService().currentProcessingTime()获取当前处理时间
通过ctx.timerService().registerProcessingTimeTimer来设置一个定时器
运行结果如下 由于定时器是处理时间的定时器不用考虑水位线延时问题因此10s后能够准时触发定时操作 ②事件时间语义
public class EventTimeTimerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorEvent stream env.addSource(new CustomSource()).assignTimestampsAndWatermarks(WatermarkStrategy.EventforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerEvent() {Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));// 基于KeyedStream定义事件时间定时器stream.keyBy(data - true).process(new KeyedProcessFunctionBoolean, Event, String() {Overridepublic void processElement(Event value, Context ctx, CollectorString out) throws Exception {out.collect(数据到达时间戳为 ctx.timestamp());out.collect(数据到达水位线为 ctx.timerService().currentWatermark() \n -------分割线-------);// 注册一个10秒后的定时器ctx.timerService().registerEventTimeTimer(ctx.timestamp() 10 * 1000L);}Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorString out) throws Exception {out.collect(定时器触发触发时间 timestamp);}}).print();env.execute();}// 自定义测试数据源public static class CustomSource implements SourceFunctionEvent {Overridepublic void run(SourceContextEvent ctx) throws Exception {// 直接发出测试数据ctx.collect(new Event(Mary, ./home, 1000L));// 为了更加明显中间停顿5秒钟Thread.sleep(5000L);// 发出10秒后的数据ctx.collect(new Event(Mary, ./home, 11000L));Thread.sleep(5000L);// 发出10秒1ms后的数据ctx.collect(new Event(Alice, ./cart, 11001L));Thread.sleep(5000L);}Overridepublic void cancel() { }}
}运行结果如下 运行结果解释
①第一条数据到来时时间戳为1000但由于水位线的生成是周期性的默认200ms因此水位线不会立即发送改变仍然是Long.MIN_VALUE之后只要到了水位线生成的时间周期就会依据当前最大的时间戳来生成水位线默认减1
②第二条数据到来时显然水位线已经推进到了999但仍然不会立即改变
③在事件时间语义下定时器触发的条件就是水位线推进到设定的时间第一条数据到来之后设定的定时器时间为11000而当时间戳为11000的数据到来时水位线还停留在999的位置因此不会立即触发定时器之后水位线会推进到1099911000-1同样无法触发定时器
④第三条数据到来时时间戳为11001此时水位线推进到了10999等到水位线周期性更新后推进到1100011001-1这样第一个定时器就会触发
⑤然后等待5s后没有新的数据到来整个程序结束将要退出此时会将水位线推进到Long.MAX_VALUE所以所有没有触发的定时器统一触发 学习课程链接【尚硅谷】Flink1.13实战教程涵盖所有flink-Java知识点_哔哩哔哩_bilibili