浙江省火电建设公司网站,小火箭服务器节点购买,华为云服务器购买,工信部门备案网站Flink的API中只提供了join的算子,并没有left join或者right join,这里我们就介绍一下join算子的使用,其实join算子底层调用的就是coGroup,具体原理这里就不过多介绍了,如果感兴趣可以看我前面发布的文章Flink多流操作之coGroup.
数据源➜ ~ nc -lk 1111
101,A
102,B
103,C
10…Flink的API中只提供了join的算子,并没有left join或者right join,这里我们就介绍一下join算子的使用,其实join算子底层调用的就是coGroup,具体原理这里就不过多介绍了,如果感兴趣可以看我前面发布的文章Flink多流操作之coGroup.
数据源➜ ~ nc -lk 1111
101,A
102,B
103,C
104,D
105,E
106,F➜ ~ nc -lk 2222
101,A,男,程序员
102,B,男,程序员
103,C,男,会计
104,D,男,安全工程师
106,K,男,程序员
108,女,本科,人事代码import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;/*** Author: J* Version: 1.0* CreateTime: 2023/8/10* Description: 多流操作-join**/
public class FlinkJoin {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(3);// 数据源1,以socket作为数据源DataStreamSourceString socketStream1 env.socketTextStream(localhost, 1111);SingleOutputStreamOperatorString[] mapStream1 socketStream1.map(str - str.split(,)).returns(new TypeHintString[]() {});// 数据源2,以socket作为数据源DataStreamSourceString socketStream2 env.socketTextStream(localhost, 2222);SingleOutputStreamOperatorString[] mapStream2 socketStream2.map(str - str.split(,)).returns(new TypeHintString[]() {});// 关联数据流DataStreamString joinedStream mapStream1.join(mapStream2).where(arr - arr[0]) // mapStream1以数组中的第一个字段作为关联字段.equalTo(arr - arr[0]) // mapStream2以数组中的第一个字段作为关联字段.window(TumblingProcessingTimeWindows.of(Time.seconds(20))) // 以20秒作为一个窗口.apply(new JoinFunctionString[], String[], String() {// 这里是写关联后的具体逻辑Overridepublic String join(String[] first, String[] second) throws Exception {String result first[0] , second[1] , second[2] , second[3];return result;}});// 打印结果数据joinedStream.print();env.execute(Flink join);}
}结果3 103,C,男,会计
2 106,K,男,程序员
2 101,A,男,程序员
3 104,D,男,安全工程师
3 102,B,男,程序员这个API使用起来还是比较简单的,如果想实现left join或者right join的功能就需要通过coGroup来实现了.