网站整体营销方案,网站建设百度贴吧,网站被重定向跳转,服装设计师需要什么学历概述
Join:内连接
CoGroup#xff1a;内连接#xff0c;左连接#xff0c;右连接
Interval Join#xff1a;点对面
Join
1、Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。
2、Join 可以支持处理时间#xff08;processing time#xff09;和事件时…概述
Join:内连接
CoGroup内连接左连接右连接
Interval Join点对面
Join
1、Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。
2、Join 可以支持处理时间processing time和事件时间(event time)两种时间特征。
3、Join 通用用法如下stream.join(otherStream).where(KeySelector).equalTo(KeySelector).window(WindowAssigner).apply(JoinFunction)
滚动窗口
package com.bigdata.day07;import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.text.ParseException;
import java.time.Duration;
import java.util.Date;/*** 内连接* 可以通过两个socket流将数据合并为一个三元组key,value1,value2*/
public class _01_双流join_join_内连接 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 绿色的流DataStreamSourceString source env.socketTextStream(localhost, 7777);SingleOutputStreamOperatorTuple3String, Integer, String greenSource source.map(new MapFunctionString, Tuple3String, Integer, String() {Overridepublic Tuple3String, Integer, String map(String line) throws Exception {String[] split line.split(,);return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, StringforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, String() {Overridepublic long extractTimestamp(Tuple3String, Integer, String tuple3, long recordTimestamp) {String timeStr tuple3.f2;try {Date date DateUtils.parseDate(timeStr, yyyy-MM-dd hh-mm-ss);return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 红色的流DataStreamSourceString source2 env.socketTextStream(localhost, 7778);SingleOutputStreamOperatorTuple3String, Integer, String redSource source2.map(new MapFunctionString, Tuple3String, Integer, String() {Overridepublic Tuple3String, Integer, String map(String line) throws Exception {String[] split line.split(,);return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, StringforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, String() {Overridepublic long extractTimestamp(Tuple3String, Integer, String tuple3, long recordTimestamp) {String timeStr tuple3.f2;try {Date date DateUtils.parseDate(timeStr, yyyy-MM-dd hh-mm-ss);return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 双流joinDataStreamTuple3String, Integer, Integer rsSource greenSource.join(redSource).where(new KeySelectorTuple3String, Integer, String, String() {Overridepublic String getKey(Tuple3String, Integer, String tuple3) throws Exception {return tuple3.f0;}}).equalTo(new KeySelectorTuple3String, Integer, String, String() {Overridepublic String getKey(Tuple3String, Integer, String tuple3) throws Exception {return tuple3.f0;}// 滚动窗口}).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunctionTuple3String, Integer, String, Tuple3String, Integer, String, Tuple3String, Integer, Integer() {Overridepublic Tuple3String, Integer, Integer join(Tuple3String, Integer, String first, Tuple3String, Integer, String second) throws Exception {return Tuple3.of(first.f0, first.f1, second.f1);}});redSource.print(红色的流);greenSource.print(绿色的流);rsSource.print(合并后的流);env.execute();}
}
滑动窗口
package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;/*** 基本功能: 演示join的滑动窗口* program:FlinkDemo* author: 闫哥* create:2024-05-20 09:11:13**/
public class Demo02Join {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 将并行度设置为1,否则很难看到现象env.setParallelism(1);// 创建一个绿色的流DataStreamSourceString greenSource env.socketTextStream(localhost, 8899);// key,0,2021-03-26 12:09:00 将它变为三元组SingleOutputStreamOperatorTuple3String, Integer, String greenDataStream greenSource.map(new MapFunctionString, Tuple3String, Integer, String() {Overridepublic Tuple3String, Integer, String map(String value) throws Exception {String[] arr value.split(,);return new Tuple3(arr[0], Integer.valueOf(arr[1]), arr[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, StringforBoundedOutOfOrderness(Duration.ofSeconds(3))// 为什么这个地方的代码比之前要长原因是以前获取的数据都是long类型并且都是毫秒值.withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, String() {Overridepublic long extractTimestamp(Tuple3String, Integer, String element, long recordTimestamp) {// 指定你的数据中哪一个是时间戳并且时间戳必须是long类型必须是毫秒为单位的。String time element.f2; //2021-03-26 12:09:00SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);try {Date date sdf.parse(time);return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 创建一个橘色的流DataStreamSourceString orangeSource env.socketTextStream(localhost, 9988);// key,0,2021-03-26 12:09:00 将它变为三元组SingleOutputStreamOperatorTuple3String, Integer, String orangeDataStream orangeSource.map(new MapFunctionString, Tuple3String, Integer, String() {Overridepublic Tuple3String, Integer, String map(String value) throws Exception {String[] arr value.split(,);return new Tuple3(arr[0], Integer.valueOf(arr[1]), arr[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, StringforBoundedOutOfOrderness(Duration.ofSeconds(3))// 为什么这个地方的代码比之前要长原因是以前获取的数据都是long类型并且都是毫秒值.withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, String() {Overridepublic long extractTimestamp(Tuple3String, Integer, String element, long recordTimestamp) {// 指定你的数据中哪一个是时间戳并且时间戳必须是long类型必须是毫秒为单位的。String time element.f2; //2021-03-26 12:09:00SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);try {Date date sdf.parse(time);return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));//2. source-加载数据//3. transformation-数据处理转换DataStreamTuple3String, Integer, Integer resultStream greenDataStream.join(orangeDataStream).where(tuple3 - tuple3.f0).equalTo(tuple3 - tuple3.f0)// 滑动窗口.window(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(1))).apply(new JoinFunctionTuple3String, Integer, String, Tuple3String, Integer, String, Tuple3String, Integer, Integer() {Overridepublic Tuple3String, Integer, Integer join(Tuple3String, Integer, String first, Tuple3String, Integer, String second) throws Exception {return Tuple3.of(first.f0, first.f1, second.f1);}});//4. sink-数据输出greenDataStream.print(绿色的流);orangeDataStream.print(橘色的流);resultStream.print(最终的结果);//5. execute-执行env.execute();}
}
CoGroup
1、优势可以实现内连接左连接右连接
2、劣势内存压力大
3、和上面的写法区别将join换成coGroupapply中实现的具体方法有区别
4、流程
stream.coGroup(otherStream).where(KeySelector).equalTo(KeySelector).window(WindowAssigner).apply(CoGroupFunction);
内连接
package com.bigdata.day07;import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.text.ParseException;
import java.time.Duration;
import java.util.Date;/*** 内连接*/
public class _02_双流join_CoGroup_内连接 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 绿色的流DataStreamSourceString source env.socketTextStream(localhost, 7777);SingleOutputStreamOperatorTuple3String, Integer, String greenSource source.map(new MapFunctionString, Tuple3String, Integer, String() {Overridepublic Tuple3String, Integer, String map(String line) throws Exception {String[] split line.split(,);return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, StringforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, String() {Overridepublic long extractTimestamp(Tuple3String, Integer, String tuple3, long recordTimestamp) {String timeStr tuple3.f2;try {Date date DateUtils.parseDate(timeStr, yyyy-MM-dd hh-mm-ss);return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 红色的流DataStreamSourceString source2 env.socketTextStream(localhost, 7778);SingleOutputStreamOperatorTuple3String, Integer, String redSource source2.map(new MapFunctionString, Tuple3String, Integer, String() {Overridepublic Tuple3String, Integer, String map(String line) throws Exception {String[] split line.split(,);return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, StringforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, String() {Overridepublic long extractTimestamp(Tuple3String, Integer, String tuple3, long recordTimestamp) {String timeStr tuple3.f2;try {Date date DateUtils.parseDate(timeStr, yyyy-MM-dd hh-mm-ss);return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 连接DataStreamTuple3String, String, String rsSource greenSource.coGroup(redSource).where(new KeySelectorTuple3String, Integer, String, String() {Overridepublic String getKey(Tuple3String, Integer, String tuple3) throws Exception {return tuple3.f0;}}).equalTo(new KeySelectorTuple3String, Integer, String, String() {Overridepublic String getKey(Tuple3String, Integer, String tuple3) throws Exception {return tuple3.f0;}}).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunctionTuple3String, Integer, String, Tuple3String, Integer, String, Tuple3String, String, String() {Overridepublic void coGroup(IterableTuple3String, Integer, String first, IterableTuple3String, Integer, String second, CollectorTuple3String, String, String out) throws Exception {for (Tuple3String, Integer, String firesTuple3 : first) {for (Tuple3String, Integer, String secondTuple3 : second) {out.collect(Tuple3.of(firesTuple3.f0,greenfiresTuple3.f1,redsecondTuple3.f1));}}}});redSource.print(红色的流);greenSource.print(绿色的流);rsSource.print(合并后的流);env.execute();}
}
外连接
package com.bigdata.day07;import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.text.ParseException;
import java.time.Duration;
import java.util.Date;/*** 外连接*/
public class _03_双流join_CoGroup_外连接 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 绿色的流DataStreamSourceString source env.socketTextStream(localhost, 7777);SingleOutputStreamOperatorTuple3String, Integer, String greenSource source.map(new MapFunctionString, Tuple3String, Integer, String() {Overridepublic Tuple3String, Integer, String map(String line) throws Exception {String[] split line.split(,);return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, StringforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, String() {Overridepublic long extractTimestamp(Tuple3String, Integer, String tuple3, long recordTimestamp) {String timeStr tuple3.f2;try {Date date DateUtils.parseDate(timeStr, yyyy-MM-dd hh-mm-ss);return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 红色的流DataStreamSourceString source2 env.socketTextStream(localhost, 7778);SingleOutputStreamOperatorTuple3String, Integer, String redSource source2.map(new MapFunctionString, Tuple3String, Integer, String() {Overridepublic Tuple3String, Integer, String map(String line) throws Exception {String[] split line.split(,);return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, StringforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, String() {Overridepublic long extractTimestamp(Tuple3String, Integer, String tuple3, long recordTimestamp) {String timeStr tuple3.f2;try {Date date DateUtils.parseDate(timeStr, yyyy-MM-dd hh-mm-ss);return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));DataStreamTuple3String, String, String rsSource greenSource.coGroup(redSource).where(new KeySelectorTuple3String, Integer, String, String() {Overridepublic String getKey(Tuple3String, Integer, String tuple3) throws Exception {return tuple3.f0;}}).equalTo(new KeySelectorTuple3String, Integer, String, String() {Overridepublic String getKey(Tuple3String, Integer, String tuple3) throws Exception {return tuple3.f0;}}).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunctionTuple3String, Integer, String, Tuple3String, Integer, String, Tuple3String, String, String() {Overridepublic void coGroup(IterableTuple3String, Integer, String first, IterableTuple3String, Integer, String second, CollectorTuple3String, String, String out) throws Exception {// 内连接左连接右连接的区别只在这里面存在两层循环for (Tuple3String, Integer, String firesTuple3 : first) {boolean isExist false;for (Tuple3String, Integer, String secondTuple3 : second) {isExist true;out.collect(Tuple3.of(firesTuple3.f0,greenfiresTuple3.f1,redsecondTuple3.f1));}if (!isExist){out.collect(Tuple3.of(firesTuple3.f0,greenfiresTuple3.f1,red null));}}}});redSource.print(红色的流);greenSource.print(绿色的流);rsSource.print(合并后的流);env.execute();}
}
Interval Join 1、Join以及CoGroup 原因是 Join和CoGroup是窗口Join必须给定窗口
2、Interval Join不需要给窗口。Interval Join 必须先分组才能使用。
3、先对数据源进行keyBy
4、 外流.intervalJoin内流.between(-2,2).processbetween 左不包右包
内部的流为下面的流取单个值 代码实现
package com.bigdata.day07;import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.text.ParseException;
import java.time.Duration;
import java.util.Date;public class _04_双流join_Interval_Join {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//绿色的流DataStreamSourceString source env.socketTextStream(localhost, 7777);KeyedStreamTuple3String, Integer, String, String greenSource source.map(new MapFunctionString, Tuple3String, Integer, String() {Overridepublic Tuple3String, Integer, String map(String line) throws Exception {String[] split line.split(,);return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}// 水印}).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, StringforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, String() {Overridepublic long extractTimestamp(Tuple3String, Integer, String tuple3, long recordTimestamp) {String timeStr tuple3.f2;try {Date date DateUtils.parseDate(timeStr, yyyy-MM-dd hh-mm-ss);return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}// keyBy})).keyBy(new KeySelectorTuple3String, Integer, String, String() {Overridepublic String getKey(Tuple3String, Integer, String tuple3) throws Exception {return tuple3.f0;}});// 红色的流DataStreamSourceString source2 env.socketTextStream(localhost, 7778);KeyedStreamTuple3String, Integer, String, String redSource source2.map(new MapFunctionString, Tuple3String, Integer, String() {Overridepublic Tuple3String, Integer, String map(String line) throws Exception {String[] split line.split(,);return Tuple3.of(split[0], Integer.valueOf(split[1]), split[2]);}// 水印}).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, StringforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, String() {Overridepublic long extractTimestamp(Tuple3String, Integer, String tuple3, long recordTimestamp) {String timeStr tuple3.f2;try {Date date DateUtils.parseDate(timeStr, yyyy-MM-dd hh-mm-ss);return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}// 分组})).keyBy(new KeySelectorTuple3String, Integer, String, String() {Overridepublic String getKey(Tuple3String, Integer, String tuple3) throws Exception {return tuple3.f0;}});// 实现SingleOutputStreamOperatorString rsSource greenSource.intervalJoin(redSource).between(Time.seconds(-2), Time.seconds(2)).process(new ProcessJoinFunctionTuple3String, Integer, String, Tuple3String, Integer, String, String() {Overridepublic void processElement(Tuple3String, Integer, String left, Tuple3String, Integer, String right, ProcessJoinFunctionTuple3String, Integer, String, Tuple3String, Integer, String, String.Context ctx, CollectorString out) throws Exception {out.collect(left中的key:left.f0,valueleft.f1,timeleft.f2,right中的key:right.f0,valueright.f1,timeright.f2);}});redSource.print(红色的流);greenSource.print(绿色的流);rsSource.print(合并后的流);env.execute();
/*** 红色的为下面的流* 范围* 假如现在是10* 9 10 11 12*/}
}