当前位置: 首页 > news >正文

怎么做网站卖货免费网站申请注册步骤

怎么做网站卖货,免费网站申请注册步骤,wordpress register_setting,电商网站设计思路广播流是什么#xff1f; 将一条数据广播到所有的节点。使用 dataStream.broadCast() 广播流使用场景#xff1f; 一般用于动态加载配置项。比如lol#xff0c;每天不断有人再投诉举报#xff0c;客服根本忙不过来#xff0c;腾讯内部做了一个判断#xff0c;只有vip3…广播流是什么 将一条数据广播到所有的节点。使用 dataStream.broadCast() 广播流使用场景 一般用于动态加载配置项。比如lol每天不断有人再投诉举报客服根本忙不过来腾讯内部做了一个判断只有vip3以上的客户的投诉才会有人工一对一回复过了一段时间大家都发现vip3才有人工都开始充钱到vip3此时人还是很多于是只有vip4上的客户才能人工回复 vip3-vip4 这种判断标准在不断的变化。此时就需要广播流。因为此时数据只有1条需要多个节点都收到这个变化的数据。 广播流怎么用 一般通过connect合流去操作 a connect b.broadcast 。a是主流也就是数据流b是配置变化流 不多说直接上demo开箱即用 package com.chenchi.broadcast;import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoProcessFunction; import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector;import java.util.HashMap; import java.util.Random;public class BroadCastStreamDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamPattern patternDataStream env.addSource(new ChangeSource());DataStreamUser userDataStream env.addSource(new CustomerSource());userDataStream.print(user);patternDataStream.print(pattern);//test1 直接合流 不广播。只会在一个节点更新。 用于特殊需求 // userDataStream // .keyBy(user - user.userId) // .connect(patternDataStream) // .process(new CustomerSimpleProcess()) // .print();//test2// 定义广播状态的描述器创建广播流 如何保存需要的广播数据呢 这个案例是通过map保留变化数据 // userDataStream // .keyBy(user - user.userId) // .connect(patternDataStream.broadcast()) // .process(new CustomerSimpleProcess()) // .print();//test3MapStateDescriptorVoid, Pattern bcStateDescriptor new MapStateDescriptor(patterns, Types.VOID, Types.POJO(Pattern.class));//通过描述器 更新BroadcastStreamPattern broadcast patternDataStream.broadcast(bcStateDescriptor);userDataStream.keyBy(user - user.userId).connect(broadcast).process(new CustomerBroadCastProcess()).print();env.execute();}private static class CustomerBroadCastProcess extends KeyedBroadcastProcessFunctionInteger, User, Pattern, String {Overridepublic void processElement(User user, KeyedBroadcastProcessFunctionInteger, User, Pattern, String.ReadOnlyContext readOnlyContext, CollectorString collector) throws Exception {Integer userVip user.getVip();//获取广播流的数据 不是通过map保存Pattern pattern readOnlyContext.getBroadcastState(new MapStateDescriptor(patterns, Types.VOID, Types.POJO(Pattern.class))).get(null);if (pattern!null){Integer patternVip pattern.vip;String result 当前系统需要的vip等级 patternVip ,用户id user.userId ,vip userVip;if (userVip patternVip){resultresult符合要求;}else {resultresult不符合要求;}collector.collect(result);}else {System.out.println(pattern is null );}}Overridepublic void processBroadcastElement(Pattern pattern, KeyedBroadcastProcessFunctionInteger,User, Pattern, String.Context context, CollectorString collector) throws Exception {BroadcastStateVoid, Pattern bcState context.getBroadcastState(new MapStateDescriptor(patterns, Types.VOID, Types.POJO(Pattern.class)));// 将广播状态更新为当前的patternbcState.put(null, pattern);}}public static class CustomerSimpleProcess extends CoProcessFunctionUser, Pattern, String {ValueStateInteger vip; //这个是保留主流的state的。 不是保留广播流的stateHashMapString,Integer vipMap;Overridepublic void open(Configuration parameters) throws Exception {vip getRuntimeContext().getState(new ValueStateDescriptor(vip, Integer.class));vipMapnew HashMapString,Integer();super.open(parameters);}Overridepublic void processElement1(User user, CoProcessFunctionUser, Pattern, String.Context context, CollectorString collector) throws Exception {Integer userVip user.getVip();Integer patternVip vipMap.getOrDefault(vip, 0);String result 当前系统需要的vip等级 patternVip ,用户id user.userId ,vip userVip;if (userVippatternVip){resultresult符合要求;}else {resultresult不符合要求;}collector.collect(result);}Overridepublic void processElement2(Pattern pattern, CoProcessFunctionUser, Pattern, String.Context context, CollectorString collector) throws Exception {vipMap.put(vip,pattern.vip);}}public static class User {public Integer userId;public Integer vip;public User() {}public User(Integer userId, Integer vip) {this.userId userId;this.vip vip;}public Integer getUserId() {return userId;}public void setUserId(Integer userId) {this.userId userId;}public Integer getVip() {return vip;}public void setVip(Integer vip) {this.vip vip;}Overridepublic String toString() {return Action{ userId userId , vip vip \ };}}// 定义行为模式POJO类包含先后发生的两个行为public static class Pattern {public Integer vip;public Pattern() {}public Pattern(Integer vip) {this.vip vip;}Overridepublic String toString() {return Pattern{ vip vip \ };}}private static class CustomerSource implements SourceFunctionUser {boolean run true;Overridepublic void run(SourceContextUser sourceContext) throws Exception {while (true) {Integer userId new Random().nextInt(1000);Integer vip new Random().nextInt(10);sourceContext.collect(new User(userId, vip));Thread.sleep(1000);}}Overridepublic void cancel() {run false;}}private static class ChangeSource implements SourceFunctionPattern {boolean run true;Overridepublic void run(SourceContextPattern sourceContext) throws Exception {int i 1;while (true) {sourceContext.collect(new Pattern(i));Thread.sleep(5000);}}Overridepublic void cancel() {run false;}}}demo思想以上述vip做例子获取用户不断投诉的id和vip等级 数据库保存可以享受人工服务的vip等级该等级可以自行调整(我是随着时间变化主键增大)。 test1 不广播 注意看pattern:4 print vip2的消息但是不代表是task4收到的消息我们看到1输出了vip2 但是task10 task9都还是vip0 说明流没有广播除非此处并行度设置为1 test2 map保存变化数据 test3通过描述器获取数据 和test2 一样不过要注意因为两个流的数据有先后可能还没有pattern就来了user信息所以建议先初始化或者先添加pattern流。
http://www.w-s-a.com/news/599365/

相关文章:

  • wordpress教程 网站标题莱芜大众网
  • 网站建设业务终止合作范本主机公园wordpress
  • 口碑好企业网站建设网站建设与什么专业有关
  • 助贷获客系统快速优化排名公司推荐
  • 重庆做网站优化推广的公司企业网站如何进行定位
  • 高密市赏旋网站设计有限公司山东广饶县建设局网站
  • 成都哪里有网站开发公司网业分离是什么
  • 购物导购网站开发女孩学建筑学好找工作吗
  • 做网站沈阳掌握夏邑进入公众号
  • 怎么做自动提卡网站谷歌推广怎么做
  • 大同网站建设熊掌号wordpress 首页单页
  • 青岛网站美工成都优秀网站建设
  • 聊城大型门户网站建设多版本wordpress
  • 建网站的公司 快云wordpress的搜索
  • 贷款网站模版东莞网站建设哪家专业
  • 做做网站已更新878网站正在建设中
  • dz旅游网站模板网站上做百度广告赚钱么
  • 青岛外贸假发网站建设seo优化名词解释
  • 四川建设厅网站施工员证查询网站建设行业政策
  • 网站全站出售dw怎么设计网页
  • 合肥网站建设方案服务网站建设推荐郑国华
  • 襄阳网站建设需要多少钱台州网站设计公司网站
  • 东莞专业拍摄做网站照片如何在百度上发布自己的广告
  • 网站建设费 科目做网站建设最好学什么
  • php商城网站建设多少钱深圳市建设
  • 有什么做糕点的视频网站黄岛做网站
  • 做视频课程网站建设一个普通网站需要多少钱
  • 专做化妆品的网站合肥做网站建设公司
  • 唐山企业网站网站建设费计入那个科目
  • 企业网站制作运营彩虹云主机官网