当前位置: 首页 > news >正文

jsp网站开发小程序企业网站建设运营的灵魂是

jsp网站开发小程序,企业网站建设运营的灵魂是,网站建设实践鉴定,响应式网站建设对企业营销背景#xff1a; flink中常见的需求如下#xff1a;统计某个页面一天内的点击率,每10秒输出一次#xff0c;我们如果采用ProcessWindowFunction 结合自定义触发器如何实现呢#xff1f;如果这样实现问题是什么呢#xff1f; ProcessWindowFunction 结合自定义触发器实现…背景 flink中常见的需求如下统计某个页面一天内的点击率,每10秒输出一次我们如果采用ProcessWindowFunction 结合自定义触发器如何实现呢如果这样实现问题是什么呢 ProcessWindowFunction 结合自定义触发器实现统计点击率 关键代码 完整代码参见 package wikiedits.func;import java.text.SimpleDateFormat; import java.util.Date;import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;import wikiedits.func.model.KeyCount;public class ProcessWindowFunctionAndTiggerDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 使用处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);env.setStateBackend(new FsStateBackend(file:///D:/tmp/flink/checkpoint/windowtrigger));// 并行度为1env.setParallelism(1);// 设置数据源一共三个元素DataStreamTuple2String, Integer dataStream env.addSource(new SourceFunctionTuple2String, Integer() {Overridepublic void run(SourceContextTuple2String, Integer ctx) throws Exception {int xxxNum 0;int yyyNum 0;for (int i 1; i Integer.MAX_VALUE; i) {// 只有XXX和YYY两种nameString name (0 i % 2) ? XXX : YYY;// 更新aaa和bbb元素的总数if (0 i % 2) {xxxNum;} else {yyyNum;}// 使用当前时间作为时间戳long timeStamp System.currentTimeMillis();// 将数据和时间戳打印出来用来验证数据if(xxxNum % 20000){System.out.println(String.format(source%s, %s, XXX total : %d, YYY total : %d\n, name,time(timeStamp), xxxNum, yyyNum));}// 发射一个元素并且戴上了时间戳ctx.collectWithTimestamp(new Tuple2String, Integer(name, 1), timeStamp);// 每发射一次就延时1秒Thread.sleep(1);}}Overridepublic void cancel() {}});// 将数据用5秒的滚动窗口做划分再用ProcessWindowFunctionSingleOutputStreamOperatorString mainDataStream dataStream// 以Tuple2的f0字段作为key本例中实际上key只有aaa和bbb两种.keyBy(value - value.f0)// 5秒一次的滚动窗口.timeWindow(Time.minutes(5))// 10s触发一次计算更新统计结果.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))// 统计每个key当前窗口内的元素数量然后把key、数量、窗口起止时间整理成字符串发送给下游算子.process(new ProcessWindowFunctionTuple2String, Integer, String, String, TimeWindow() {// 自定义状态private ValueStateKeyCount state;Overridepublic void open(Configuration parameters) throws Exception {// 初始化状态name是myStatestate getRuntimeContext().getState(new ValueStateDescriptor(myState, KeyCount.class));}public void clear(Context context) {ValueStateKeyCount contextWindowValueState context.windowState().getState(new ValueStateDescriptor(myWindowState, KeyCount.class));contextWindowValueState.clear();}Overridepublic void process(String s, Context context, IterableTuple2String, Integer iterable,CollectorString collector) throws Exception {// 从backend取得当前单词的myState状态KeyCount current state.value();// 如果myState还从未没有赋值过就在此初始化if (current null) {current new KeyCount();current.key s;current.count 0;}int count 0;// iterable可以访问该key当前窗口内的所有数据// 这里简单处理只统计了元素数量for (Tuple2String, Integer tuple2 : iterable) {count;}// 更新当前key的元素总数current.count count;// 更新状态到backendstate.update(current);ValueStateKeyCount contextWindowValueState context.windowState().getState(new ValueStateDescriptor(myWindowState, KeyCount.class));KeyCount windowValue contextWindowValueState.value();if (windowValue null) {windowValue new KeyCount();windowValue.key s;windowValue.count 0;}windowValue.count count;contextWindowValueState.update(windowValue);// 将当前key及其窗口的元素数量还有窗口的起止时间整理成字符串String value String.format(window, %s, %s - %s, %d, windowStateCount :%d, total : %d,// 当前keys,// 当前窗口的起始时间time(context.window().getStart()),// 当前窗口的结束时间time(context.window().getEnd()),// 当前key在当前窗口内元素总数count,// 当前key所在窗口的总数contextWindowValueState.value().count,// 当前key出现的总数current.count);// 发射到下游算子collector.collect(value);}});// 打印结果通过分析打印信息检查ProcessWindowFunction中可以处理所有key的整个窗口的数据mainDataStream.print();env.execute(processfunction demo : processwindowfunction);}public static String time(long timeStamp) {return new SimpleDateFormat(yyyy-MM-dd hh:mm:ss).format(new Date(timeStamp));}} 这里采用ProcessWindowFunction 结合ContinuousProcessingTimeTrigger的方式确实可以实现统计至今为止某个页面点击率的目的不过这其中需要注意点的点是 每隔10s触发public void process(String s, Context context, IterableTuple2String, Integer iterable, CollectorString collector)方法时iterable对象是包含了一天的窗口内收到的所有消息也就是当前触发时iterable集合是前10s触发时iterable集合的超集,包含前10s触发时的所有的消息集合。 到这里所引起的问题也自然而然的出来了对于ProcessWindowFunction 实现而言flink内部是通过ListState的形式保存窗口内收到的所有消息的注意这里flink内部会使用ListState保存每一条分配到以天为单位的窗口内的消息这会导致状态膨胀想一下一天内所有的消息都会当成状态保存起来这对于状态后端的压力是有多大这些保存在ListState中的消息只有在窗口结束后才会清理具体参见WindowOperator.clearAllState,那有解决方案吗使用Agg/Reduce处理函数替ProcessWindowFunction作为处理函数可以实现吗请看下一篇文章 参考文章 https://www.cnblogs.com/Springmoon-venn/p/13667023.html
http://www.w-s-a.com/news/244279/

相关文章:

  • 网站验证码目录wordpress内嵌播放器
  • 文明网网站建设南昌市建设规费标准网站
  • 安康有建网站的公司吗做网站用什么网名好
  • 济南网站制作哪家专业西安市城乡建设网官方网站
  • 网站建设有趣小游戏怎样让网站优化的方式
  • 昭通做网站儿童编程教学入门教程
  • eclipse静态网站开发软文广告投放平台
  • 网站建设教学视频济南做网站需要多少钱
  • 网站免费做软件市工商联官方网站建设方案
  • 网站建设大体包含英铭长沙网站建设
  • 网站建设培训学校北京如何搜索网站
  • discuz论坛模板哪些网站容易做seo优化
  • 渭南公司做网站网站建设互联网推广
  • 公司网站app怎么做杭州建设局网站
  • 网站开发需要自己写代码吗12306网站多少钱做的
  • 策勒网站建设四川建设网有限责任公司招聘
  • 网站建设哪里有学网页界面设计论文
  • 怎么做外贸网站推广劳务公司网站怎么做
  • 滴答手表网站中铁建设集团有限公司招聘信息2021
  • 重庆富通科技有限公司网站新闻头条最新消息国家大事
  • 四字母net做网站怎么样企业代运营公司
  • 纪检网站建设方案wordpress首页静态页面
  • 网站右下角浮动效果如何做网站logo设计在线生成
  • 西宁哪里做网站婚纱摄影网站设计思路
  • 凡科用模板做网站网站导入页欣赏
  • 北京响应式网站建设公司十大小程序开发公司
  • dw网站开发删除wordpress主题底部
  • 织梦网站怎样做子域名高德导航怎么看街景地图
  • 宿州专业网站建设株洲网站建设优化
  • 自动生成海报的网站常州建网站公司