建设网站主题,360竞价推广怎么做,泰安人才网招聘信息网电焊工,网站众筹该怎么做文章目录多流转换分流基本合流操作联合#xff08;Union#xff09;连接#xff08;Connect#xff09;基于时间的合流——双流联结#xff08;Join#xff09;窗口联结#xff08;Window Join#xff09;间隔联结#xff08;Interval Join#xff09;窗口同组联结Union连接Connect基于时间的合流——双流联结Join窗口联结Window Join间隔联结Interval Join窗口同组联结Window CoGroup多流转换
无论是基本的简单转换和聚合还是基于窗口的计算我们都是针对一条流上的数据进行处理的。而在实际应用中可能需要将不同来源的数据连接合并在一起处理也有可能需要将一条流拆分开所以经常会有对多条流进行处理的场景。
简单划分的话多流转换可以分为“分流”和“合流”两大类
分流的操作一般是通过侧输出流side output来实现而合流的算子比较丰富根据不同的需求可以调用 union、connect、join 以及 coGroup 等接口进行连接合并操作。
分流
所谓“分流”就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream得到完全平等的多个子 DataStream。一般来说我们会定义一些筛选条件将符合条件的数据拣选出来放到对应的流里。 其实根据条件筛选数据的需求本身非常容易实现只要针对同一条流多次独立调用.filter()方法进行筛选就可以得到拆分之后的流了。
例如我们可以将电商网站收集到的用户行为数据进行一个拆分根据类型type的不同分为“Mary”的浏览数据、“Bob”的浏览数据等等。那么代码就可以这样实现
public class SplitStreamByFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorEvent stream env.addSource(new ClickSource());// 筛选Mary的浏览行为放入MaryStream流中DataStreamEvent MaryStream stream.filter(new FilterFunctionEvent() {Overridepublic boolean filter(Event value) throws Exception {return value.user.equals(Mary);}});// 筛选Bob的购买行为放入BobStream流中DataStreamEvent BobStream stream.filter(new FilterFunctionEvent() {Overridepublic boolean filter(Event value) throws Exception {return value.user.equals(Bob);}});// 筛选其他人的浏览行为放入elseStream流中DataStreamEvent elseStream stream.filter(new FilterFunctionEvent() {Overridepublic boolean filter(Event value) throws Exception {return !value.user.equals(Mary) !value.user.equals(Bob) ;}});MaryStream.print(Mary pv);BobStream.print(Bob pv);elseStream.print(else pv);env.execute();}
}这种实现非常简单但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的却重复写了三次。而且这段代码背后的含义是将原始数据流 stream 复制三份然后对每一份分别做筛选这明显是不够高效的。我们自然想到能不能不用复制流直接用一个算子就把它们都拆分开呢
在早期的版本中DataStream API 中提供了一个**.split()**方法专门用来将一条流“切分”成多个。它的基本思路其实就是按照给定的筛选条件给数据分类“盖戳”然后基于这条盖戳之后的流分别拣选想要的“戳”就可以得到拆分后的流。这样我们就不必再对流进行复制了。不过这种方法有一个缺陷因为只是“盖戳”拣选所以无法对数据进行转换分流后的数据类型必须跟原始流保持一致。这就极大地限制了分流操作的应用场景。
在 Flink 1.13 版本中已经弃用了.split()方法取而代之的是直接用处理函数process function的侧输出流side output。
我们知道**处理函数本身可以认为是一个转换算子它的输出类型是单一的处理之后得到的仍然是一个 DataStream而侧输出流则不受限制可以任意自定义输出数据它们就像从主流”上分叉出的“支流”。**尽管看起来主流和支流有所区别不过实际上它们都是某种类型的 DataStream所以本质上还是平等的。利用侧输出流就可以很方便地实现分流操作而且得到的多条 DataStream 类型可以不同这就给我们的应用带来了极大的便利。
简单来说只需要调用上下文 ctx 的.output()方法就可以输出任意类型的数据了。而侧输出流的标记和提取都离不开一个“输出标签”OutputTag它就相当于 split()分流时的“戳”指定了侧输出流的id 和类型。
public class SplitStreamByOutputTag {// 定义输出标签侧输出流的数据类型为三元组(user, url, timestamp)private static OutputTagTuple3String, String, Long MaryTag new OutputTagTuple3String, String, Long(Mary-pv){};private static OutputTagTuple3String, String, Long BobTag new OutputTagTuple3String, String, Long(Bob-pv){};public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorEvent stream env.addSource(new ClickSource());SingleOutputStreamOperatorEvent processedStream stream.process(new ProcessFunctionEvent, Event() {Overridepublic void processElement(Event value, Context ctx, CollectorEvent out) throws Exception {if (value.user.equals(Mary)){ctx.output(MaryTag, new Tuple3(value.user, value.url, value.timestamp));} else if (value.user.equals(Bob)){ctx.output(BobTag, new Tuple3(value.user, value.url, value.timestamp));} else {out.collect(value);}}});processedStream.getSideOutput(MaryTag).print(Mary pv);processedStream.getSideOutput(BobTag).print(Bob pv);processedStream.print(else);env.execute();}
}基本合流操作
联合Union
最简单的合流操作就是直接将多条流合在一起叫作流的“联合”union。联合操作要求必须流中的数据类型必须相同合并之后的新流会包括所有流中的元素数据类型不变。 在代码中我们只要基于 DataStream 直接调用.union()方法传入其他 DataStream 作为参数可以多个就可以实现流的联合了得到的依然是一个 DataStream
stream1.union(stream2, stream3, ...)这里需要考虑一个问题。在事件时间语义下水位线是时间的进度标志不同的流中可能水位线的进展快慢完全不同如果它们合并在一起水位线又该以哪个为准呢
还以要考虑水位线的本质含义是“之前的所有数据已经到齐了”所以对于合流之后的水位线也是要以最小的那个为准这样才可以保证所有流都不会再传来之前的数据。这与之前介绍的并行任务水位线传递的规则是完全一致的多条流的合并某种意义上也可以看作是多个并行任务向同一个下游任务汇合的过程。
public class UnionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorEvent stream1 env.socketTextStream(hadoop102, 7777).map(data - {String[] field data.split(,);return new Event(field[0].trim(), field[1].trim(), Long.valueOf(field[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.EventforBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssignerEvent() {Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));stream1.print(stream1);SingleOutputStreamOperatorEvent stream2 env.socketTextStream(hadoop103, 7777).map(data - {String[] field data.split(,);return new Event(field[0].trim(), field[1].trim(), Long.valueOf(field[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.EventforBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssignerEvent() {Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));stream2.print(stream2);// 合并两条流stream1.union(stream2).process(new ProcessFunctionEvent, String() {Overridepublic void processElement(Event value, Context ctx, CollectorString out) throws Exception {out.collect(水位线 ctx.timerService().currentWatermark());}}).print();env.execute();}
}连接Connect
流的联合虽然简单不过受限于数据类型不能改变灵活性大打折扣所以实际应用较少出现。除了联合unionFlink 还提供了另外一种方便的合流操作——连接connect。顾名思义这种操作就是直接把两条流像接线一样对接起来。
1连接流ConnectedStreams
为了处理更加灵活连接操作允许流的数据类型不同。但我们知道一个 DataStream 中的数据只能有唯一的类型所以连接得到的并不是 DataStream而是一个“连接流”ConnectedStreams。**连接流可以看成是两条流形式上的“统一”被放在了一个同一个流中事实上内部仍保持各自的数据形式不变彼此之间是相互独立的。**要想得到新的 DataStream还需要进一步定义一个“同处理”co-process转换操作用来说明对于不同来源、不同类型的数据怎样分别进行处理转换、得到统一的输出类型。所以整体上来两条流的连接就像是“一国两制”两条流可以保持各自的数据类型、处理方式也可以不同不过最终还是会统一到同一个 DataStream 中。
两条流的连接connect与联合union操作相比最大的优势就是可以处理不同类型的流的合并使用更灵活、应用更广泛。当然它也有限制就是合并流的数量只能是 2而 union可以同时进行多条流的合并。 在代码实现上需要分为两步首先基于一条 DataStream 调用.connect()方法传入另外一条 DataStream 作为参数将两条流连接起来得到一个 ConnectedStreams然后再调用同处理方法得到 DataStream。这里可以的调用的同处理方法有.map()/.flatMap()以及.process()方法。
public class ConnectTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamInteger stream1 env.fromElements(1,2,3);DataStreamLong stream2 env.fromElements(1L,2L,3L);ConnectedStreamsInteger, Long connectedStreams stream1.connect(stream2);SingleOutputStreamOperatorString result connectedStreams.map(new CoMapFunctionInteger, Long, String() {Overridepublic String map1(Integer value) {return Integer: value;}Overridepublic String map2(Long value) {return Long: value;}});result.print();env.execute();}
}调用.map()方法时传入的不再是一个简单的 MapFunction而是一个 CoMapFunction表示分别对两条流中的数据执行 map 操作。这个接口有三个类型参数依次表示第一条流、第二条流以及合并后的流中的数据类型。需要实现的方法也非常直白.map1()就是对第一条流中数据的 map 操作.map2()则是针对第二条流。
值得一提的是ConnectedStreams 也可以直接调用.keyBy()进行按键分区的操作得到的还是一个 ConnectedStreams
connectedStreams.keyBy(keySelector1, keySelector2);这里传入两个参数 keySelector1 和 keySelector2是两条流中各自的键选择器当然也可以直接传入键的位置值keyPosition或者键的字段名field这与普通的 keyBy 用法完全一致。ConnectedStreams 进行 keyBy 操作其实就是把两条流中 key 相同的数据放到了一起然后针对来源的流再做各自处理这在一些场景下非常有用。
另外我们也可以在合并之前就将两条流分别进行 keyBy,得到的 KeyedStream 再进行连接connect操作效果是一样的。要注意两条流定义的键的类型必须相同否则会抛出异常。
2CoProcessFunction
对于连接流 ConnectedStreams 的处理操作需要分别定义对两条流的处理转换因此接口中就会有两个相同的方法需要实现用数字“1”“2”区分在两条流中的数据到来时分别调用。我们把这种接口叫作“协同处理函数”co-process function。与 CoMapFunction 类似如果是调用.flatMap()就需要传入一个 CoFlatMapFunction需要实现 flatMap1()、flatMap2()两个方法而调用.process()时传入的则是一个 CoProcessFunction。
public abstract class CoProcessFunctionIN1, IN2, OUT extends AbstractRichFunction {...public abstract void processElement1(IN1 value, Context ctx, CollectorOUT out) throws Exception;public abstract void processElement2(IN2 value, Context ctx, CollectorOUT out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, CollectorOUT out) throws Exception {}public abstract class Context {...}...
}我们可以看到很明显 CoProcessFunction 也是“处理函数”家族中的一员用法非常相似。它需要实现的就是 processElement1()、processElement2()两个方法在每个数据到来时会根据来源的流调用其中的一个方法进行处理。CoProcessFunction 同样可以通过上下文 ctx 来访问 timestamp、水位线并通过 TimerService 注册定时器另外也提供了.onTimer()方法用于定义定时触发的处理操作。
具体示例我们可以实现一个实时对账的需求也就是app 的支付操作和第三方的支付操作的一个双流 Join。App 的支付事件和第三方的支付事件将会互相等待 5 秒钟如果等不来对应的支付事件那么就输出报警信息。程序如下
public class BillCheckExample {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 来自app的支付日志SingleOutputStreamOperatorTuple3String, String, Long appStream env.fromElements(Tuple3.of(order-1, app, 1000L),Tuple3.of(order-2, app, 2000L)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, String, LongforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerTuple3String, String, Long() {Overridepublic long extractTimestamp(Tuple3String, String, Long element, long recordTimestamp) {return element.f2;}}));// 来自第三方支付平台的支付日志SingleOutputStreamOperatorTuple4String, String, String, Long thirdpartStream env.fromElements(Tuple4.of(order-1, third-party, success, 3000L),Tuple4.of(order-3, third-party, success, 4000L)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple4String, String, String, LongforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerTuple4String, String, String, Long() {Overridepublic long extractTimestamp(Tuple4String, String, String, Long element, long recordTimestamp) {return element.f3;}}));// 检测同一支付单在两条流中是否匹配不匹配就报警appStream.connect(thirdpartStream).keyBy(data - data.f0, data - data.f0).process(new OrderMatchResult()).print();env.execute();}// 自定义实现CoProcessFunctionpublic static class OrderMatchResult extends CoProcessFunctionTuple3String, String, Long, Tuple4String, String, String, Long, String{// 定义状态变量用来保存已经到达的事件private ValueStateTuple3String, String, Long appEventState;private ValueStateTuple4String, String, String, Long thirdPartyEventState;Overridepublic void open(Configuration parameters) throws Exception {appEventState getRuntimeContext().getState(new ValueStateDescriptorTuple3String, String, Long(app-event, Types.TUPLE(Types.STRING, Types.STRING, Types.LONG)));thirdPartyEventState getRuntimeContext().getState(new ValueStateDescriptorTuple4String, String, String, Long(thirdparty-event, Types.TUPLE(Types.STRING, Types.STRING, Types.STRING,Types.LONG)));}Overridepublic void processElement1(Tuple3String, String, Long value, Context ctx, CollectorString out) throws Exception {// 看另一条流中事件是否来过if (thirdPartyEventState.value() ! null){out.collect(对账成功 value thirdPartyEventState.value());// 清空状态thirdPartyEventState.clear();} else {// 更新状态appEventState.update(value);// 注册一个5秒后的定时器开始等待另一条流的事件ctx.timerService().registerEventTimeTimer(value.f2 5000L);}}Overridepublic void processElement2(Tuple4String, String, String, Long value, Context ctx, CollectorString out) throws Exception {if (appEventState.value() ! null){out.collect(对账成功 appEventState.value() value);// 清空状态appEventState.clear();} else {// 更新状态thirdPartyEventState.update(value);// 注册一个5秒后的定时器开始等待另一条流的事件ctx.timerService().registerEventTimeTimer(value.f3 5000L);}}Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorString out) throws Exception {// 定时器触发判断状态如果某个状态不为空说明另一条流中事件没来if (appEventState.value() ! null) {out.collect(对账失败 appEventState.value() 第三方支付平台信息未到);}if (thirdPartyEventState.value() ! null) {out.collect(对账失败 thirdPartyEventState.value() app信息未到);}appEventState.clear();thirdPartyEventState.clear();}}
}3广播连接流BroadcastConnectedStream
关于两条流的连接还有一种比较特殊的用法DataStream 调用.connect()方法时传入的参数也可以不是一个 DataStream而是一个“广播流”BroadcastStream这时合并两条流得到的就变成了一个“广播连接流”BroadcastConnectedStream。
这种连接方式往往用在需要动态定义某些规则或配置的场景。因为规则是实时变动的所以我们可以用一个单独的流来获取规则数据而这些规则或配置是对整个应用全局有效的所以不能只把这数据传递给一个下游并行子任务处理而是要“广播”broadcast给所有的并行子任务。而下游子任务收到广播出来的规则会把它保存成一个状态这就是所谓的“广播状态”broadcast state。
广播状态底层是用一个“映射”map结构来保存的。在代码实现上可以直接调用DataStream 的.broadcast()方法传入一个“映射状态描述器”MapStateDescriptor说明状态的名称和类型就可以得到规则数据的“广播流”BroadcastStream
MapStateDescriptorString, Rule ruleStateDescriptor new MapStateDescriptor(...);
BroadcastStreamRule ruleBroadcastStream ruleStream.broadcast(ruleStateDescriptor);接下来我们就可以将要处理的数据流与这条广播流进行连接connect得到的就是所谓的“广播连接流”BroadcastConnectedStream。基于 BroadcastConnectedStream 调用.process()方法就可以同时获取规则和数据进行动态处理了。
这里既然调用了.process()方法当然传入的参数也应该是处理函数大家族中一员——如果对数据流调用过 keyBy 进行了按键分区那么要传入的就是 KeyedBroadcastProcessFunction如果没有按键分区就传入 BroadcastProcessFunction。
DataStreamString output stream.connect(ruleBroadcastStream).process(new BroadcastProcessFunction() {...} );BroadcastProcessFunction 与 CoProcessFunction 类似同样是一个抽象类需要实现两个方法针对合并的两条流中元素分别定义处理操作。区别在于这里一条流是正常处理数据而另一条流则是要用新规则来更新广播状态所以对应的两个方法叫作.processElement()和.processBroadcastElement()。源码中定义如下
public abstract class BroadcastProcessFunctionIN1, IN2, OUT extends BaseBroadcastProcessFunction {...public abstract void processElement(IN1 value, ReadOnlyContext ctx, CollectorOUT out) throws Exception;public abstract void processBroadcastElement(IN2 value, Context ctx, CollectorOUT out) throws Exception;...
}基于时间的合流——双流联结Join
窗口联结Window Join
如果我们希望将两条流的数据进行合并、且同样针对某段时间进行处理和统计可以使用窗口联结window join算子可以定义时间窗口并将两条流中共享一个公共键key的数据放在窗口中进行配对处理。侧重对2个输入里的 数据对 进行处理join方法的入参是单个数据。
1窗口联结的调用
窗口联结在代码中的实现首先需要调用 DataStream 的.join()方法来合并两条流得到一个 JoinedStreams接着通过.where()和.equalTo()方法指定两条流中联结的 key然后通过.window()开窗口并调用.apply()传入联结窗口函数进行处理计算。通用调用形式如下
stream1.join(stream2).where(KeySelector).equalTo(KeySelector).window(WindowAssigner).apply(JoinFunction)上面代码中.where()的参数是键选择器KeySelector用来指定第一条流中的 key 而.equalTo()传入的 KeySelector 则指定了第二条流中的 key。两者相同的元素如果在同一窗口中就可以匹配起来并通过一个“联结函数”JoinFunction进行处理了。
这里.window()传入的就是窗口分配器之前讲到的三种时间窗口都可以用在这里滚动窗口tumbling window、滑动窗口sliding window和会话窗口session window。
而后面调用.apply()可以看作实现了一个特殊的窗口函数。注意这里只能调用.apply()没有其他替代的方法。
传入的 JoinFunction 也是一个函数类接口使用时需要实现内部的.join()方法。这个方法有两个参数分别表示两条流中成对匹配的数据。JoinFunction 在源码中的定义如下
public interface JoinFunctionIN1, IN2, OUT extends Function, Serializable {OUT join(IN1 first, IN2 second) throws Exception;
}这里需要注意JoinFunciton 并不是真正的“窗口函数”它只是定义了窗口函数在调用时对匹配数据的具体处理逻辑。
当然既然是窗口计算在.window()和.apply()之间也可以调用可选 API 去做一些自定义比如用.trigger()定义触发器用.allowedLateness()定义允许延迟时间等等。
2窗口联结的处理流程
JoinFunction 中的两个参数分别代表了两条流中的匹配的数据。这里就会有一个问题什么时候就会匹配好数据调用.join()方法呢接下来我们就来介绍一下窗口 join 的具体处理流程
两条流的数据到来之后首先会按照 key 分组、进入对应的窗口中存储当到达窗口结束时间时算子会先统计出窗口内两条流的数据的所有组合也就是对两条流中的数据做一个笛卡尔积相当于表的交叉连接cross join然后进行遍历把每一对匹配的数据作为参数(firstsecond)传入 JoinFunction 的.join()方法进行计算处理得到的结果直接输出如图所示。
所以窗口中每有一对数据成功联结匹配JoinFunction 的.join()方法就会被调用一次并输出一个结果。 除了 JoinFunction在.apply()方法中还可以传入 FlatJoinFunction用法非常类似只是内部需要实现的.join()方法没有返回值。结果的输出是通过收集器Collector来实现的所以对于一对匹配数据可以输出任意条结果。
其实仔细观察可以发现窗口 join 的调用语法和我们熟悉的 SQL 中表的 join 非常相似
SELECT * FROM table1 t1, table2 t2 WHERE t1.id t2.id;3窗口联结实例
在电商网站中往往需要统计用户不同行为之间的转化这就需要对不同的行为数据流按照用户 ID 进行分组后再合并以分析它们之间的关联。如果这些是以固定时间周期比如1 小时来统计的那我们就可以使用窗口 join 来实现这样的需求。
public class WindowJoinTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamTuple2String, Long stream1 env.fromElements(Tuple2.of(a, 1000L),Tuple2.of(b, 1000L),Tuple2.of(a, 2000L),Tuple2.of(b, 2000L)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String, LongforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerTuple2String, Long() {Overridepublic long extractTimestamp(Tuple2String, Long stringLongTuple2, long l) {return stringLongTuple2.f1;}}));DataStreamTuple2String, Long stream2 env.fromElements(Tuple2.of(a, 3000L),Tuple2.of(b, 3000L),Tuple2.of(a, 4000L),Tuple2.of(b, 4000L)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String, LongforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerTuple2String, Long() {Overridepublic long extractTimestamp(Tuple2String, Long stringLongTuple2, long l) {return stringLongTuple2.f1;}}));stream1.join(stream2).where(r - r.f0).equalTo(r - r.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunctionTuple2String, Long, Tuple2String, Long, String() {Overridepublic String join(Tuple2String, Long left, Tuple2String, Long right) throws Exception {return left right;}}).print();env.execute();}
}输出
(a,1000)(a,3000)
(a,1000)(a,4000)
(a,2000)(a,3000)
(a,2000)(a,4000)
(b,1000)(b,3000)
(b,1000)(b,4000)
(b,2000)(b,3000)
(b,2000)(b,4000)间隔联结Interval Join
在有些场景下我们要处理的时间间隔可能并不是固定的。比如在交易系统中需要实时地对每一笔交易进行核验保证两个账户转入转出数额相等也就是所谓的“实时对账”。两次转账的数据可能写入了不同的日志流它们的时间戳应该相差不大所以我们可以考虑只统计一段时间内是否有出账入账的数据匹配。这时显然不应该用滚动窗口或滑动窗口来处理— —因为匹配的两个数据有可能刚好“卡在”窗口边缘两侧于是窗口内就都没有匹配了会话窗口虽然时间不固定但也明显不适合这个场景。 基于时间的窗口联结已经无能为力了。
为了应对这样的需求Flink 提供了一种叫作“间隔联结”interval join的合流操作。间隔联结的思路就是针对一条流的每个数据开辟出其时间戳前后的一段时间间隔看这期间是否有来自另一条流的数据匹配。
1间隔联结的原理
间隔联结具体的定义方式是我们给定两个时间点分别叫作间隔的“上界”upperBound和“下界”lowerBound于是对于一条流不妨叫作 A中的任意一个数据元素 a就可以开辟一段时间间隔[a.timestamp lowerBound, a.timestamp upperBound],即以 a 的时间戳为中心下至下界点、上至上界点的一个闭区间我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流不妨叫 B中的数据元素 b如果它的时间戳落在了这个区间范围内a 和 b 就可以成功配对进而进行计算输出结果。所以匹配的条件为
a.timestamp lowerBound b.timestamp a.timestamp upperBound这里需要注意做间隔联结的两条流 A 和 B也必须基于相同的 key下界 lowerBound应该小于等于上界 upperBound两者都可正可负间隔联结目前只支持事件时间语义。 下方的流 A 去间隔联结上方的流 B所以基于 A 的每个数据元素都可以开辟一个间隔区间。我们这里设置下界为-2 毫秒上界为 1 毫秒。于是对于时间戳为 2 的 A 中元素它的可匹配区间就是[0, 3],流 B 中有时间戳为 0、1 的两个元素落在这个范围内所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地A 中时间戳为 3 的元素可匹配区间为[1, 4]B 中只有时间戳为 1 的一个数据可以匹配于是得到匹配数据对(3, 1)。
2 间隔联结的调用
间隔联结在代码中是基于 KeyedStream 的联结join操作。DataStream 在 keyBy 得到KeyedStream 之后可以调用.intervalJoin()来合并两条流传入的参数同样是一个 KeyedStream两者的 key 类型应该一致得到的是一个 IntervalJoin 类型。后续的操作同样是完全固定的先通过.between()方法指定间隔的上下界再调用.process()方法定义对匹配数据对的处理操作。调用.process()需要传入一个处理函数这是处理函数家族的最后一员“处理联结函数”ProcessJoinFunction。
stream1.keyBy(KeySelector).intervalJoin(stream2.keyBy(KeySelector)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunctionInteger, Integer, String(){Overridepublic void processElement(Integer left, Integer right, Context ctx, CollectorString out) {out.collect(left , right);}});可以看到抽象类 ProcessJoinFunction 就像是 ProcessFunction 和 JoinFunction 的结合内部同样有一个抽象法.processElement()。与其他处理函数不同的是它多了一个参数这自然是因为有来自两条流的数据。
3间隔联结实例
在电商网站中某些用户行为往往会有短时间内的强关联。我们这里举一个例子我们有两条流一条是下订单的流一条是浏览数据的流。我们可以针对同一个用户来做这样一个联结。也就是使用一个用户的下订单的事件和这个用户的最近十分钟的浏览数据进行一个联结查询。
public class IntervalJoinTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorTuple3String, String, Long orderStream env.fromElements(Tuple3.of(Mary, order-1, 5000L),Tuple3.of(Alice, order-2, 5000L),Tuple3.of(Bob, order-3, 20000L),Tuple3.of(Alice, order-4, 20000L),Tuple3.of(Cary, order-5, 51000L)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, String, LongforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerTuple3String, String, Long() {Overridepublic long extractTimestamp(Tuple3String, String, Long element, long recordTimestamp) {return element.f2;}}));SingleOutputStreamOperatorEvent clickStream env.fromElements(new Event(Bob, ./cart, 2000L),new Event(Alice, ./prod?id100, 3000L),new Event(Alice, ./prod?id200, 3500L),new Event(Bob, ./prod?id2, 2500L),new Event(Alice, ./prod?id300, 36000L),new Event(Bob, ./home, 30000L),new Event(Bob, ./prod?id1, 23000L),new Event(Bob, ./prod?id3, 33000L)).assignTimestampsAndWatermarks(WatermarkStrategy.EventforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerEvent() {Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));orderStream.keyBy(data - data.f0).intervalJoin(clickStream.keyBy(data - data.user)).between(Time.seconds(-5), Time.seconds(10)).process(new ProcessJoinFunctionTuple3String, String, Long, Event, String() {Overridepublic void processElement(Tuple3String, String, Long left, Event right, Context ctx, CollectorString out) throws Exception {out.collect(right left);}}).print();env.execute();}}输出
Event{userAlice, url./prod?id100, timestamp1970-01-01 08:00:03.0} (Alice,order-2,5000)
Event{userAlice, url./prod?id200, timestamp1970-01-01 08:00:03.5} (Alice,order-2,5000)
Event{userBob, url./home, timestamp1970-01-01 08:00:30.0} (Bob,order-3,20000)
Event{userBob, url./prod?id1, timestamp1970-01-01 08:00:23.0} (Bob,order-3,20000)窗口同组联结Window CoGroup
除窗口联结和间隔联结之外Flink 还提供了一个“窗口同组联结”window coGroup操作。它的用法跟 window join 非常类似也是将两条流合并之后开窗处理匹配的元素调用时只需要将.join()换为.coGroup()就可以了。
stream1.coGroup(stream2).where(KeySelector).equalTo(KeySelector).window(TumblingEventTimeWindows.of(Time.hours(1))).apply(CoGroupFunction)与 window join 的区别在于调用.apply()方法定义具体操作时传入的是一个CoGroupFunction。这也是一个函数类接口源码中定义如下
public interface CoGroupFunctionIN1, IN2, O extends Function, Serializable {void coGroup(IterableIN1 first, IterableIN2 second, CollectorO out) throws Exception;
}内部的.coGroup()方法有些类似于 FlatJoinFunction 中.join()的形式同样有三个参数分别代表两条流中的数据以及用于输出的收集器Collector。不同的是这里的前两个参数不再是单独的每一组“配对”数据了而是传入了可遍历的数据集合。也就是说现在不会再去计算窗口中两条流数据集的笛卡尔积而是直接把收集到的所有数据一次性传入至于要怎样配对完全是自定义的。这样.coGroup()方法只会被调用一次而且即使一条流的数据没有任何另一条流的数据匹配也可以出现在集合中、当然也可以定义输出结果了。
所以能够看出coGroup 操作比窗口的 join 更加通用不仅可以实现类似 SQL 中的“内连接”inner join也可以实现左外连接left outer join、右外连接right outer join和全外连接full outer join。事实上窗口 join 的底层也是通过 coGroup 来实现的。
public class CoGroupTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamTuple2String, Long stream1 env.fromElements(Tuple2.of(a, 1000L),Tuple2.of(b, 1000L),Tuple2.of(a, 2000L),Tuple2.of(b, 2000L)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String, LongforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerTuple2String, Long() {Overridepublic long extractTimestamp(Tuple2String, Long stringLongTuple2, long l) {return stringLongTuple2.f1;}}));DataStreamTuple2String, Long stream2 env.fromElements(Tuple2.of(a, 3000L),Tuple2.of(b, 3000L),Tuple2.of(a, 4000L),Tuple2.of(b, 4000L)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String, LongforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerTuple2String, Long() {Overridepublic long extractTimestamp(Tuple2String, Long stringLongTuple2, long l) {return stringLongTuple2.f1;}}));stream1.coGroup(stream2).where(r - r.f0).equalTo(r - r.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunctionTuple2String, Long, Tuple2String, Long, String() {Overridepublic void coGroup(IterableTuple2String, Long iter1, IterableTuple2String, Long iter2, CollectorString collector) throws Exception {collector.collect(iter1 iter2);}}).print();env.execute();}
}输出结果是
[(a,1000), (a,2000)][(a,3000), (a,4000)]
[(b,1000), (b,2000)][(b,3000), (b,4000)]