当前位置: 首页 > news >正文

海口专业网站制作策划wordpress转发

海口专业网站制作策划,wordpress转发,招聘平台,企业网站模板用哪个背景 在处理窗口函数时#xff0c;ProcessWindowFunction处理函数可以定义三个状态#xff1a; 富函数getRuntimeContext.getState, 每个key每个窗口的状态context.windowState(),每个key的状态context.globalState#xff0c;那么这几个状态之间有什么关系呢#xff1f; …背景 在处理窗口函数时ProcessWindowFunction处理函数可以定义三个状态 富函数getRuntimeContext.getState, 每个key每个窗口的状态context.windowState(),每个key的状态context.globalState那么这几个状态之间有什么关系呢 ProcessWindowFunction处理函数三种状态之间的关系 1.getRuntimeContext.getState这个定义的状态是每个key维度的也就是可以跨时间窗口并维持状态的 2.context.windowState()这个定义的状态是和每个key以及窗口相关的也就是虽然key相同但是时间窗口不同他们的值也不一样. 3.context.globalState这个定义的状态是和每个key相关的也就是和getRuntimeContext.getState的定义一样可以跨窗口维护状态 验证代码如下所示 package wikiedits.func;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.util.Collector; import wikiedits.func.model.KeyCount;import java.text.SimpleDateFormat;import java.util.Date;public class ProcessWindowFunctionDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 使用处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 并行度为1env.setParallelism(1);// 设置数据源一共三个元素DataStreamTuple2String, Integer dataStream env.addSource(new SourceFunctionTuple2String, Integer() {Overridepublic void run(SourceContextTuple2String, Integer ctx) throws Exception {int xxxNum 0;int yyyNum 0;for (int i 1; i Integer.MAX_VALUE; i) {// 只有XXX和YYY两种nameString name (0 i % 2) ? XXX : YYY;//更新aaa和bbb元素的总数if (0 i % 2) {xxxNum;} else {yyyNum;}// 使用当前时间作为时间戳long timeStamp System.currentTimeMillis();// 将数据和时间戳打印出来用来验证数据System.out.println(String.format(source%s, %s, XXX total : %d, YYY total : %d\n,name,time(timeStamp),xxxNum,yyyNum));// 发射一个元素并且戴上了时间戳ctx.collectWithTimestamp(new Tuple2String, Integer(name, 1), timeStamp);// 每发射一次就延时1秒Thread.sleep(1000);}}Overridepublic void cancel() {}});// 将数据用5秒的滚动窗口做划分再用ProcessWindowFunctionSingleOutputStreamOperatorString mainDataStream dataStream// 以Tuple2的f0字段作为key本例中实际上key只有aaa和bbb两种.keyBy(value - value.f0)// 5秒一次的滚动窗口.timeWindow(Time.seconds(5))// 统计每个key当前窗口内的元素数量然后把key、数量、窗口起止时间整理成字符串发送给下游算子.process(new ProcessWindowFunctionTuple2String, Integer, String, String, TimeWindow() {// 自定义状态private ValueStateKeyCount state;Overridepublic void open(Configuration parameters) throws Exception {// 初始化状态name是myStatestate getRuntimeContext().getState(new ValueStateDescriptor(myState, KeyCount.class));}public void clear(Context context){ValueStateKeyCount contextWindowValueState context.windowState().getState(new ValueStateDescriptor(myWindowState, KeyCount.class));contextWindowValueState.clear();}Overridepublic void process(String s, Context context, IterableTuple2String, Integer iterable,CollectorString collector) throws Exception {// 从backend取得当前单词的myState状态KeyCount current state.value();// 如果myState还从未没有赋值过就在此初始化if (current null) {current new KeyCount();current.key s;current.count 0;}int count 0;// iterable可以访问该key当前窗口内的所有数据// 这里简单处理只统计了元素数量for (Tuple2String, Integer tuple2 : iterable) {count;}// 更新当前key的元素总数current.count count;// 更新状态到backendstate.update(current);System.out.println(getRuntimeContext() context : (getRuntimeContext() context));ValueStateKeyCount contextWindowValueState context.windowState().getState(new ValueStateDescriptor(myWindowState, KeyCount.class));ValueStateKeyCount contextGlobalValueState context.globalState().getState(new ValueStateDescriptor(myGlobalState, KeyCount.class));KeyCount windowValue contextWindowValueState.value();if (windowValue null) {windowValue new KeyCount();windowValue.key s;windowValue.count 0;}windowValue.count count;contextWindowValueState.update(windowValue);KeyCount globalValue contextGlobalValueState.value();if (globalValue null) {globalValue new KeyCount();globalValue.key s;globalValue.count 0;}globalValue.count count;contextGlobalValueState.update(globalValue);ValueStateKeyCount contextWindowSameNameState context.windowState().getState(new ValueStateDescriptor(myState, KeyCount.class));ValueStateKeyCount contextGlobalSameNameState context.globalState().getState(new ValueStateDescriptor(myState, KeyCount.class));System.out.println(contextWindowSameNameState contextGlobalSameNameState : (contextWindowSameNameState contextGlobalSameNameState));System.out.println(state contextGlobalSameNameState : (state contextGlobalSameNameState));// 将当前key及其窗口的元素数量还有窗口的起止时间整理成字符串String value String.format(window, %s, %s - %s, %d, total : %d, windowStateCount :%s, globalStateCount :%s\n,// 当前keys,// 当前窗口的起始时间time(context.window().getStart()),// 当前窗口的结束时间time(context.window().getEnd()),// 当前key在当前窗口内元素总数count,// 当前key出现的总数current.count,contextWindowValueState.value(),contextGlobalValueState.value());// 发射到下游算子collector.collect(value);}});// 打印结果通过分析打印信息检查ProcessWindowFunction中可以处理所有key的整个窗口的数据mainDataStream.print();env.execute(processfunction demo : processwindowfunction);}public static String time(long timeStamp) {return new SimpleDateFormat(hh:mm:ss).format(new Date(timeStamp));}} 输出结果 window, XXX, 08:34:45 - 08:34:50, 3, total : 22, windowStateCount :KeyCount{keyXXX, count3}, globalStateCount :KeyCount{keyXXX, count22} window, YYY, 08:34:45 - 08:34:50, 2, total : 22, windowStateCount :KeyCount{keyYYY, count2}, globalStateCount :KeyCount{keyYYY, count22}从结果可以验证以上的结论此外需要特别注意的一点是context.windowState()的状态需要在clear方法中清理掉因为一旦时间窗口结束就再也没有机会清理了 从这个例子中还发现一个比较有趣的现象 ValueStateKeyCount state getRuntimeContext().getState(new ValueStateDescriptor(myState, KeyCount.class)); ValueStateKeyCount contextWindowSameNameState context.windowState().getState(new ValueStateDescriptor(myState, KeyCount.class)); ValueStateKeyCount contextGlobalSameNameState context.globalState().getState(new ValueStateDescriptor(myState, KeyCount.class));在open中通过getRuntimeContext().getState定义的状态竟然可以通过 context.windowState()/ context.globalState()访问到并且他们指向的都是同一个变量可以参见代码的输出 System.out.println(contextWindowSameNameState contextGlobalSameNameState : (contextWindowSameNameState contextGlobalSameNameState)); System.out.println(state contextGlobalSameNameState : (state contextGlobalSameNameState));结果如下 contextWindowSameNameState contextGlobalSameNameState :true state contextGlobalSameNameState :true参考文献 https://cloud.tencent.com/developer/article/1815079
http://www.w-s-a.com/news/599555/

相关文章:

  • 工信部网站备案管理系统网站备案负责人 更换
  • 我要做个网站该怎么做怎么做电商平台网站
  • wordpress教程 网站标题莱芜大众网
  • 网站建设业务终止合作范本主机公园wordpress
  • 口碑好企业网站建设网站建设与什么专业有关
  • 助贷获客系统快速优化排名公司推荐
  • 重庆做网站优化推广的公司企业网站如何进行定位
  • 高密市赏旋网站设计有限公司山东广饶县建设局网站
  • 成都哪里有网站开发公司网业分离是什么
  • 购物导购网站开发女孩学建筑学好找工作吗
  • 做网站沈阳掌握夏邑进入公众号
  • 怎么做自动提卡网站谷歌推广怎么做
  • 大同网站建设熊掌号wordpress 首页单页
  • 青岛网站美工成都优秀网站建设
  • 聊城大型门户网站建设多版本wordpress
  • 建网站的公司 快云wordpress的搜索
  • 贷款网站模版东莞网站建设哪家专业
  • 做做网站已更新878网站正在建设中
  • dz旅游网站模板网站上做百度广告赚钱么
  • 青岛外贸假发网站建设seo优化名词解释
  • 四川建设厅网站施工员证查询网站建设行业政策
  • 网站全站出售dw怎么设计网页
  • 合肥网站建设方案服务网站建设推荐郑国华
  • 襄阳网站建设需要多少钱台州网站设计公司网站
  • 东莞专业拍摄做网站照片如何在百度上发布自己的广告
  • 网站建设费 科目做网站建设最好学什么
  • php商城网站建设多少钱深圳市建设
  • 有什么做糕点的视频网站黄岛做网站
  • 做视频课程网站建设一个普通网站需要多少钱
  • 专做化妆品的网站合肥做网站建设公司