jsp网站开发小程序,企业网站建设运营的灵魂是,网站建设实践鉴定,响应式网站建设对企业营销背景#xff1a;
flink中常见的需求如下#xff1a;统计某个页面一天内的点击率,每10秒输出一次#xff0c;我们如果采用ProcessWindowFunction 结合自定义触发器如何实现呢#xff1f;如果这样实现问题是什么呢#xff1f;
ProcessWindowFunction 结合自定义触发器实现…背景
flink中常见的需求如下统计某个页面一天内的点击率,每10秒输出一次我们如果采用ProcessWindowFunction 结合自定义触发器如何实现呢如果这样实现问题是什么呢
ProcessWindowFunction 结合自定义触发器实现统计点击率
关键代码 完整代码参见
package wikiedits.func;import java.text.SimpleDateFormat;
import java.util.Date;import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import wikiedits.func.model.KeyCount;public class ProcessWindowFunctionAndTiggerDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 使用处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);env.setStateBackend(new FsStateBackend(file:///D:/tmp/flink/checkpoint/windowtrigger));// 并行度为1env.setParallelism(1);// 设置数据源一共三个元素DataStreamTuple2String, Integer dataStream env.addSource(new SourceFunctionTuple2String, Integer() {Overridepublic void run(SourceContextTuple2String, Integer ctx) throws Exception {int xxxNum 0;int yyyNum 0;for (int i 1; i Integer.MAX_VALUE; i) {// 只有XXX和YYY两种nameString name (0 i % 2) ? XXX : YYY;// 更新aaa和bbb元素的总数if (0 i % 2) {xxxNum;} else {yyyNum;}// 使用当前时间作为时间戳long timeStamp System.currentTimeMillis();// 将数据和时间戳打印出来用来验证数据if(xxxNum % 20000){System.out.println(String.format(source%s, %s, XXX total : %d, YYY total : %d\n, name,time(timeStamp), xxxNum, yyyNum));}// 发射一个元素并且戴上了时间戳ctx.collectWithTimestamp(new Tuple2String, Integer(name, 1), timeStamp);// 每发射一次就延时1秒Thread.sleep(1);}}Overridepublic void cancel() {}});// 将数据用5秒的滚动窗口做划分再用ProcessWindowFunctionSingleOutputStreamOperatorString mainDataStream dataStream// 以Tuple2的f0字段作为key本例中实际上key只有aaa和bbb两种.keyBy(value - value.f0)// 5秒一次的滚动窗口.timeWindow(Time.minutes(5))// 10s触发一次计算更新统计结果.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))// 统计每个key当前窗口内的元素数量然后把key、数量、窗口起止时间整理成字符串发送给下游算子.process(new ProcessWindowFunctionTuple2String, Integer, String, String, TimeWindow() {// 自定义状态private ValueStateKeyCount state;Overridepublic void open(Configuration parameters) throws Exception {// 初始化状态name是myStatestate getRuntimeContext().getState(new ValueStateDescriptor(myState, KeyCount.class));}public void clear(Context context) {ValueStateKeyCount contextWindowValueState context.windowState().getState(new ValueStateDescriptor(myWindowState, KeyCount.class));contextWindowValueState.clear();}Overridepublic void process(String s, Context context, IterableTuple2String, Integer iterable,CollectorString collector) throws Exception {// 从backend取得当前单词的myState状态KeyCount current state.value();// 如果myState还从未没有赋值过就在此初始化if (current null) {current new KeyCount();current.key s;current.count 0;}int count 0;// iterable可以访问该key当前窗口内的所有数据// 这里简单处理只统计了元素数量for (Tuple2String, Integer tuple2 : iterable) {count;}// 更新当前key的元素总数current.count count;// 更新状态到backendstate.update(current);ValueStateKeyCount contextWindowValueState context.windowState().getState(new ValueStateDescriptor(myWindowState, KeyCount.class));KeyCount windowValue contextWindowValueState.value();if (windowValue null) {windowValue new KeyCount();windowValue.key s;windowValue.count 0;}windowValue.count count;contextWindowValueState.update(windowValue);// 将当前key及其窗口的元素数量还有窗口的起止时间整理成字符串String value String.format(window, %s, %s - %s, %d, windowStateCount :%d, total : %d,// 当前keys,// 当前窗口的起始时间time(context.window().getStart()),// 当前窗口的结束时间time(context.window().getEnd()),// 当前key在当前窗口内元素总数count,// 当前key所在窗口的总数contextWindowValueState.value().count,// 当前key出现的总数current.count);// 发射到下游算子collector.collect(value);}});// 打印结果通过分析打印信息检查ProcessWindowFunction中可以处理所有key的整个窗口的数据mainDataStream.print();env.execute(processfunction demo : processwindowfunction);}public static String time(long timeStamp) {return new SimpleDateFormat(yyyy-MM-dd hh:mm:ss).format(new Date(timeStamp));}}
这里采用ProcessWindowFunction 结合ContinuousProcessingTimeTrigger的方式确实可以实现统计至今为止某个页面点击率的目的不过这其中需要注意点的点是 每隔10s触发public void process(String s, Context context, IterableTuple2String, Integer iterable, CollectorString collector)方法时iterable对象是包含了一天的窗口内收到的所有消息也就是当前触发时iterable集合是前10s触发时iterable集合的超集,包含前10s触发时的所有的消息集合。 到这里所引起的问题也自然而然的出来了对于ProcessWindowFunction 实现而言flink内部是通过ListState的形式保存窗口内收到的所有消息的注意这里flink内部会使用ListState保存每一条分配到以天为单位的窗口内的消息这会导致状态膨胀想一下一天内所有的消息都会当成状态保存起来这对于状态后端的压力是有多大这些保存在ListState中的消息只有在窗口结束后才会清理具体参见WindowOperator.clearAllState,那有解决方案吗使用Agg/Reduce处理函数替ProcessWindowFunction作为处理函数可以实现吗请看下一篇文章
参考文章 https://www.cnblogs.com/Springmoon-venn/p/13667023.html