怎么做网站卖货,免费网站申请注册步骤,wordpress register_setting,电商网站设计思路广播流是什么#xff1f;
将一条数据广播到所有的节点。使用 dataStream.broadCast()
广播流使用场景#xff1f;
一般用于动态加载配置项。比如lol#xff0c;每天不断有人再投诉举报#xff0c;客服根本忙不过来#xff0c;腾讯内部做了一个判断#xff0c;只有vip3…广播流是什么
将一条数据广播到所有的节点。使用 dataStream.broadCast()
广播流使用场景
一般用于动态加载配置项。比如lol每天不断有人再投诉举报客服根本忙不过来腾讯内部做了一个判断只有vip3以上的客户的投诉才会有人工一对一回复过了一段时间大家都发现vip3才有人工都开始充钱到vip3此时人还是很多于是只有vip4上的客户才能人工回复
vip3-vip4 这种判断标准在不断的变化。此时就需要广播流。因为此时数据只有1条需要多个节点都收到这个变化的数据。
广播流怎么用
一般通过connect合流去操作 a connect b.broadcast 。a是主流也就是数据流b是配置变化流
不多说直接上demo开箱即用
package com.chenchi.broadcast;import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;import java.util.HashMap;
import java.util.Random;public class BroadCastStreamDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamPattern patternDataStream env.addSource(new ChangeSource());DataStreamUser userDataStream env.addSource(new CustomerSource());userDataStream.print(user);patternDataStream.print(pattern);//test1 直接合流 不广播。只会在一个节点更新。 用于特殊需求
// userDataStream
// .keyBy(user - user.userId)
// .connect(patternDataStream)
// .process(new CustomerSimpleProcess())
// .print();//test2// 定义广播状态的描述器创建广播流 如何保存需要的广播数据呢 这个案例是通过map保留变化数据
// userDataStream
// .keyBy(user - user.userId)
// .connect(patternDataStream.broadcast())
// .process(new CustomerSimpleProcess())
// .print();//test3MapStateDescriptorVoid, Pattern bcStateDescriptor new MapStateDescriptor(patterns, Types.VOID, Types.POJO(Pattern.class));//通过描述器 更新BroadcastStreamPattern broadcast patternDataStream.broadcast(bcStateDescriptor);userDataStream.keyBy(user - user.userId).connect(broadcast).process(new CustomerBroadCastProcess()).print();env.execute();}private static class CustomerBroadCastProcess extends KeyedBroadcastProcessFunctionInteger, User, Pattern, String {Overridepublic void processElement(User user, KeyedBroadcastProcessFunctionInteger, User, Pattern, String.ReadOnlyContext readOnlyContext, CollectorString collector) throws Exception {Integer userVip user.getVip();//获取广播流的数据 不是通过map保存Pattern pattern readOnlyContext.getBroadcastState(new MapStateDescriptor(patterns, Types.VOID, Types.POJO(Pattern.class))).get(null);if (pattern!null){Integer patternVip pattern.vip;String result 当前系统需要的vip等级 patternVip ,用户id user.userId ,vip userVip;if (userVip patternVip){resultresult符合要求;}else {resultresult不符合要求;}collector.collect(result);}else {System.out.println(pattern is null );}}Overridepublic void processBroadcastElement(Pattern pattern, KeyedBroadcastProcessFunctionInteger,User, Pattern, String.Context context, CollectorString collector) throws Exception {BroadcastStateVoid, Pattern bcState context.getBroadcastState(new MapStateDescriptor(patterns, Types.VOID, Types.POJO(Pattern.class)));// 将广播状态更新为当前的patternbcState.put(null, pattern);}}public static class CustomerSimpleProcess extends CoProcessFunctionUser, Pattern, String {ValueStateInteger vip; //这个是保留主流的state的。 不是保留广播流的stateHashMapString,Integer vipMap;Overridepublic void open(Configuration parameters) throws Exception {vip getRuntimeContext().getState(new ValueStateDescriptor(vip, Integer.class));vipMapnew HashMapString,Integer();super.open(parameters);}Overridepublic void processElement1(User user, CoProcessFunctionUser, Pattern, String.Context context, CollectorString collector) throws Exception {Integer userVip user.getVip();Integer patternVip vipMap.getOrDefault(vip, 0);String result 当前系统需要的vip等级 patternVip ,用户id user.userId ,vip userVip;if (userVippatternVip){resultresult符合要求;}else {resultresult不符合要求;}collector.collect(result);}Overridepublic void processElement2(Pattern pattern, CoProcessFunctionUser, Pattern, String.Context context, CollectorString collector) throws Exception {vipMap.put(vip,pattern.vip);}}public static class User {public Integer userId;public Integer vip;public User() {}public User(Integer userId, Integer vip) {this.userId userId;this.vip vip;}public Integer getUserId() {return userId;}public void setUserId(Integer userId) {this.userId userId;}public Integer getVip() {return vip;}public void setVip(Integer vip) {this.vip vip;}Overridepublic String toString() {return Action{ userId userId , vip vip \ };}}// 定义行为模式POJO类包含先后发生的两个行为public static class Pattern {public Integer vip;public Pattern() {}public Pattern(Integer vip) {this.vip vip;}Overridepublic String toString() {return Pattern{ vip vip \ };}}private static class CustomerSource implements SourceFunctionUser {boolean run true;Overridepublic void run(SourceContextUser sourceContext) throws Exception {while (true) {Integer userId new Random().nextInt(1000);Integer vip new Random().nextInt(10);sourceContext.collect(new User(userId, vip));Thread.sleep(1000);}}Overridepublic void cancel() {run false;}}private static class ChangeSource implements SourceFunctionPattern {boolean run true;Overridepublic void run(SourceContextPattern sourceContext) throws Exception {int i 1;while (true) {sourceContext.collect(new Pattern(i));Thread.sleep(5000);}}Overridepublic void cancel() {run false;}}}demo思想以上述vip做例子获取用户不断投诉的id和vip等级 数据库保存可以享受人工服务的vip等级该等级可以自行调整(我是随着时间变化主键增大)。
test1 不广播 注意看pattern:4 print vip2的消息但是不代表是task4收到的消息我们看到1输出了vip2
但是task10 task9都还是vip0 说明流没有广播除非此处并行度设置为1
test2 map保存变化数据 test3通过描述器获取数据
和test2 一样不过要注意因为两个流的数据有先后可能还没有pattern就来了user信息所以建议先初始化或者先添加pattern流。