替朋友做网站,收不到wordpress的邮件,易县做网站,外链平台#x1f44f;作者简介#xff1a;大家好#xff0c;我是爱敲代码的小黄#xff0c;阿里巴巴淘天Java开发工程师#xff0c;CSDN博客专家#x1f4d5;系列专栏#xff1a;Spring源码、Netty源码、Kafka源码、JUC源码、dubbo源码系列#x1f525;如果感觉博主的文章还不错… 作者简介大家好我是爱敲代码的小黄阿里巴巴淘天Java开发工程师CSDN博客专家系列专栏Spring源码、Netty源码、Kafka源码、JUC源码、dubbo源码系列如果感觉博主的文章还不错的话请三连支持一下博主哦博主正在努力完成2023计划中以梦为马扬帆起航2023追梦人联系方式hls1793929520加我进群大家一起学习一起进步一起对抗互联网寒冬 文章目录 Flink-算子一、Map二、FlatMap三、Filter四、Union真合并五、Connect假合并六、CoMap, CoFlatMap七、Split select已废弃八、side output九、Iterate十、keyBy十一、Reduce十二、Aggregations十三、总结 Flink-算子
Transformations 算子可以将一个或者多个算子转换成一个新的数据流
使用 Transformations 算子组合可以进行复杂的业务处理
一、Map
DataStream → DataStream
Map 比较简单遍历我们数据流的每一个元素产生一个新的元素
作用字符串的转换、去除空格等操作
注意只能一对一
示例如下
/*** 去除当前字符串的前后空格*/
public class MyMapFunction implements MapFunctionString, String {Overridepublic String map(String value) throws Exception {return value.trim();}
}二、FlatMap
DataStream → DataStream
遍历当前数据流中的每一个元素产生 N N 0,1,2,3个元素
**作用**与 Map 有点像主要可以输出多个
**注意**一对一、一对多
示例如下
/*** 将当前字符串按照逗号进行分割*/
public class MyFlatMapFunction implements FlatMapFunctionString, String {Overridepublic void flatMap(String value, CollectorString collector) throws Exception {if (value null || value.isEmpty()) {return;}for (String word : value.split(,)) {collector.collect(word);}}
}三、Filter
DataStream → DataStream
过滤算子根据数据流的元素的业务逻辑返回 true 或者 false
true保留当前元素
false丢弃当前元素
**作用**过滤某些不符合预期的数据流数据
示例如下
/*** 过滤掉处于黑名单的数据流数据*/
public class MyFilterFunction implements FilterFunctionString {private final static SetString blackSet new HashSet();static {blackSet.add(num1);blackSet.add(num2);blackSet.add(num3);}Overridepublic boolean filter(String value) throws Exception {return !blackSet.contains(value);}
}四、Union真合并
DataStream → DataStream
合并两个或者更多的数据流产生一个新的数据流
新的数据流包括所合并的数据流的元素
注意需要保证数据流中元素类型一致
/*** 聚合多条流数据*/
public class UnionFunction {private final static String hostName ;private final static int port 8088;public static void main(String[] args) throws Exception {// 1. 创建流环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2. 创建多条输入源DataStreamSourceString dataStream1 env.socketTextStream(hostName, port);DataStreamSourceString dataStream2 env.socketTextStream(hostName, port);// 3. 合并数据源DataStreamString unionDataStream dataStream1.union(dataStream2);// 4. 输出unionDataStream.print();// 5. 执行env.execute();}
}五、Connect假合并
DataStream,DataStream → ConnectedStreams
合并两个数据流并且保留两个数据流的数据类型能够共享两个流的状态
代码示例
public class ConnectFunction {private final static String hostName ;private final static int port 8088;public static void main(String[] args) throws Exception {// 1. 创建流环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2. 创建多条输入源DataStreamSourceString dataStream1 env.socketTextStream(hostName, port);DataStreamSourceString dataStream2 env.socketTextStream(hostName, port);ConnectedStreamsString, String connect dataStream1.connect(dataStream2);}
}六、CoMap, CoFlatMap
ConnectedStreams → DataStream
CoMap 和 CoFlatMap 并不是具体算子名称而是一类操作名称
CoMap基于 ConnectedStreams数据流做 map 遍历
SingleOutputStreamOperatorObject map connect.map(new CoMapFunctionString, String, Object() {Override// 第一个数据流转换public String map1(String value) throws Exception {return value;}Override// 第二个数据流转换public String map2(String value) throws Exception {return value;}
});CoFlatMap基于 ConnectedStreams 数据流做 flatMap 遍历
connect.flatMap(new CoFlatMapFunctionString, String, String() {Overridepublic void flatMap1(String value, CollectorString collector) throws Exception {if (value null || value.isEmpty()) {return;}for (String word : value.split(,)) {collector.collect(word);}}Overridepublic void flatMap2(String value, CollectorString collector) throws Exception {if (value null || value.isEmpty()) {return;}for (String word : value.split(,)) {collector.collect(word);}}
});七、Split select已废弃
DataStream → SplitStream
根据条件将一个流分成两个或者更多的流
注意
Split...Select... 中 Split 只是对流中的数据打上标记,并没有将流真正拆分。通过 Select 算子将流真正拆分出来。Split...Select... 已经过时
public static void main(String[] args) throws Exception {// 1. 创建流环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2. 创建多条输入源DataStreamSourceString dataStream env.socketTextStream(hostName, port);// 3. 定义拆分逻辑SplitStreamString splitStream dataStream.split(new OutputSelectorString() {Overridepublic IterableString select(String value) {ListString output new ArrayList();if (value.equals(AAA)) {output.add(A);} else {output.add(B);}return output;}});// 4. 将数据流真正拆分splitStream.select(A).print(输出A);splitStream.select(B).print(输出B);}八、side output
流计算过程可能遇到根据不同的条件来分隔数据流
filter 分割造成不必要的数据复制
OutputTagString rtTag new OutputTag(rt);OutputTagString qpsTag new OutputTag(qps);SingleOutputStreamOperatorObject process dataStream.process(new ProcessFunctionString, Object() {Overridepublic void processElement(String value, Context ctx, CollectorObject out) throws Exception {if (value.equals(RT)) {ctx.output(rtTag, value);} else if (value.equals(qps)) {ctx.output(qpsTag, value);} else {out.collect(value);}}});// 主流process.print();// rtDataStreamString rtOutput process.getSideOutput(rtTag);// qpsDataStreamString qpsOutput process.getSideOutput(qpsTag);九、Iterate
DataStream → IterativeStream → DataStream
Iterate 算子提供了对数据流迭代的支持
迭代有两部分组成迭代体、终止迭代条件
不满足终止迭代条件的数据流会返回到stream流中进行下一次迭代
满足终止迭代条件的数据流继续往下游发送
// 获取迭代数据源
IterativeStreamString iterate dataStreamSource.iterate();// 迭代体
// 每次数据累加
DataStreamString minusOne iterate.map(new MapFunctionString, String() {Overridepublic String map(String value) throws Exception {return value value;}
}).setParallelism(1);; // 设置 map 操作的并行度为1// 终止迭代条件(当数值小于等于10时均再次进行迭代)
DataStreamString stillGreaterThanZero minusOne.filter(new FilterFunctionString() {Overridepublic boolean filter(String value) throws Exception {return value.length() 10;}
}).setParallelism(1); // 设置 filter 操作的并行度为1iterate.closeWith(stillGreaterThanZero);十、keyBy
DataStream → KeyedStream
根据数据流中指定的字段来分区相同指定字段值的数据一定是在同一个分区中
按照某 key 进行分组
dataStream.keyBy(word)
public class WordCount {public String word;public int count;public WordCount(String word, int count) {this.word word;this.count count;}public WordCount() {}
}
// 或者使用KeySelector
KeyedStreamWordCount, String wordCountObjectKeyedStream dataStreamSource.keyBy(new KeySelectorWordCount, String() {Overridepublic String getKey(WordCount wordCount) throws Exception {return wordCount.word;}
});这里一定要注意如果你采用的是 POJO 类那么一定要加 Public 修饰符因为 Flink 通过反射机制访问和操作这些字段实现分组和聚合等操作
十一、Reduce
KeyedStream根据key分组 → DataStream
对于分组完的数据流进行聚合处理
如果只是简单的累加操作和 sum 区别不大
SingleOutputStreamOperatorWordCount dataStream wordCountObjectKeyedStream.reduce(new ReduceFunctionWordCount() {Overridepublic WordCount reduce(WordCount wordCount1, WordCount wordCount2) throws Exception {return new WordCount(wordCount1.word, wordCount1.count wordCount2.count);}
});十二、Aggregations
KeyedStream → DataStream
Aggregations代表的是一类聚合算子具体算子如下
// 根据键对流数据中的指定位置索引为0的值进行求和。
keyedStream.sum(0)
// 根据键对流数据中的名为key的字段的值进行求和。
keyedStream.sum(key)
// 根据键对流数据中的指定位置索引为0的值进行取最小值。
keyedStream.min(0)
// 根据键对流数据中的名为key的字段的值进行取最小值。
keyedStream.min(key)
// 根据键对流数据中的指定位置索引为0的值进行取最大值。
keyedStream.max(0)
// 根据键对流数据中的名为key的字段的值进行取最大值。
keyedStream.max(key)
//根据键对流数据中的指定位置索引为0的值进行最小值比较并返回具有最小值的元素。
keyedStream.minBy(0)
//根据键对流数据中的名为key的字段的值进行最小值比较并返回具有最小值的元素。
keyedStream.minBy(key)
// 根据键对流数据中的指定位置索引为0的值进行最大值比较并返回具有最大值的元素
keyedStream.maxBy(0)
// 根据键对流数据中的名为key的字段的值进行最大值比较并返回具有最大值的元素。
keyedStream.maxBy(key)十三、总结
鲁迅先生曾说独行难众行易和志同道合的人一起进步。彼此毫无保留的分享经验才是对抗互联网寒冬的最佳选择。 其实很多时候并不是我们不够努力很可能就是自己努力的方向不对如果有一个人能稍微指点你一下你真的可能会少走几年弯路。
如果你也对 后端架构 和 中间件源码 有兴趣欢迎添加博主微信hls1793929520一起学习一起成长
我是爱敲代码的小黄阿里巴巴淘天集团Java开发工程师双非二本培训班出身
通过两年努力成功拿下阿里、百度、美团、滴滴等大厂想通过自己的事迹告诉大家努力是会有收获的
双非本两年经验我是如何拿下阿里、百度、美团、滴滴、快手、拼多多等大厂offer的
我们下期再见。
从清晨走过也拥抱夜晚的星辰人生没有捷径你我皆平凡你好陌生人一起共勉。