做电脑端网站手机端能搜到吗,wordpress博客主题 m1,医学类app制作公司,吉林网站开发公司背景
使用RichFlatMapFunction可以带状态来决定如何对数据流进行转换#xff0c;而且这种用法非常常见#xff0c;根据之前遇到过的某个key的状态来决定再次遇到同样的key时要如何进行数据转换#xff0c;本文就来简单举个例子说明下RichFlatMapFunction的使用方法
RichFl…背景
使用RichFlatMapFunction可以带状态来决定如何对数据流进行转换而且这种用法非常常见根据之前遇到过的某个key的状态来决定再次遇到同样的key时要如何进行数据转换本文就来简单举个例子说明下RichFlatMapFunction的使用方法
RichFlatMapFunction使用示例
下面的例子的输入是不用name下的count数量值当本次name的数量和前一次name的数量相差超过配置的阈值100时打印出来一条告警日志详细代码如下
package wikiedits.func.state;import java.util.Objects;import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;/*** Tuple2String, Integer 是输入的数据类型 String 是监控到异常值后的输出数据类型*/
public class MyRichFlatMapFunction extends RichFlatMapFunctionTuple2String, Integer, String {// 键值分区状态,对应每个name一个值ValueStateStateEntity nameState;Overridepublic void open(Configuration parameters) throws Exception {// 创建一个键值分区状态ValueStateDescriptorStateEntity state new ValueStateDescriptor(nameState, StateEntity.class);nameState getRuntimeContext().getState(state);}Overridepublic void flatMap(Tuple2String, Integer input, CollectorString collector) throws Exception {// 判断状态值是否为空状态默认值是空if (Objects.isNull(nameState.value())) {StateEntity sFalg new StateEntity(input.f0, input.f1);nameState.update(sFalg);return;}// 和上一次的状态值比较StateEntity value nameState.value();if (Math.abs(value.count - input.f1) 100) {collector.collect(new String(监控到异常值,名称: input.f0 上次的值: value 本次的值 input));}value.setName(input.f0);value.setCount(input.f1);// 更新状态值nameState.update(value);}}package wikiedits.func.state;import java.text.SimpleDateFormat;
import java.util.Date;import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;public class RichFlatMapFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 设置数据源一共三个元素DataStreamTuple2String, Integer dataStream env.addSource(new SourceFunctionTuple2String, Integer() {Overridepublic void run(SourceContextTuple2String, Integer ctx) throws Exception {for (int i 1; i Integer.MAX_VALUE; i) {// 只有XXXYYYZZZ三种nameString name (0 i % 3) ? XXX : ((i % 3 1) ? YYY : ZZZ);int count RandomUtils.nextInt(0, 1000);// 使用当前时间作为时间戳long timeStamp System.currentTimeMillis();// 发射一个元素并且戴上了时间戳ctx.collectWithTimestamp(new Tuple2String, Integer(name, count), timeStamp);// 每发射一次就延时1秒Thread.sleep(5000);}}Overridepublic void cancel() {}});dataStream.keyBy((f) - {return f.f0;}).flatMap(new MyRichFlatMapFunction()).print();env.execute();}public static String time(long timeStamp) {return new SimpleDateFormat(yyyy-MM-dd hh:mm:ss).format(new Date(timeStamp));}}
结果