php大型网站开发,中企动力是国企性质吗,免费网站诊断,已备案网站域名网流处理之多流转换算子
实验介绍
前面实验中介绍的算子已经能够满足我们的大部分开发需求了#xff0c;但是在实际工作中有时候还会遇到一些业务场景#xff0c;例如需要摄入多个输入流并将其合并处理#xff0c;或者需要将一条输入流分割为多条子流#xff0c;在不同的子…流处理之多流转换算子
实验介绍
前面实验中介绍的算子已经能够满足我们的大部分开发需求了但是在实际工作中有时候还会遇到一些业务场景例如需要摄入多个输入流并将其合并处理或者需要将一条输入流分割为多条子流在不同的子流中处理不同的业务逻辑。所以本节实验的内容我们将学习 DataSteam API 中的可以将多条输入流合并为一个输入流或者将一个输入流分割为多个子流的算子我们将其统称为“多流转换算子”。
知识点
Unionfilter
算子演示
Union
union 顾名思义就是连接的意思所以 union 算子的作用就是合并两条或者多条相同类型的 DataStream生成一个新的类型相同的 DataStream。如图所示
需要注意的是事件合流的方式为 FIFO 方式。操作符并不会产生一个特定顺序的事件流。union 操作符也不会进行去重。每一个输入事件都被发送到了下一个操作符。
假设某公司分别在淘宝和天猫都开设了自己的直营店公司高层需要实时监控到两个店铺的交易数据并希望通过大屏展示的方式实时滚动。我们可以通过两条 Socket 输入流来模拟这样的场景。
首先在我们 FlinkLearning 工程的 com.vlab.operator 包下创建一个 UnionOperator 的 Scala object输入如下代码
package com.vlab.operatorimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}object UnionOperator {def main(args: Array[String]): Unit {// 创建执行环境val env StreamExecutionEnvironment.getExecutionEnvironment// 接收京东订单val jdOrder:DataStream[String] env.socketTextStream(192.168.137.81, 9999)// 接收拼刀刀订单val pindaoOrder:DataStream[String] env.socketTextStream(192.168.137.81, 9998)// 将两条输入流合并为一条输入流val unionStream:DataStream[String] jdOrder.union(pindaoOrder)// 设置并行度unionStream.print().setParallelism(1)// 执行env.execute(UnionOperator)}}
我们使用 netcat 监控两个端口来模拟发送淘宝和天猫的订单信息然后使用 Flink 接收。打开终端窗口执行 nc -l -p 9998 命令紧接着打开另一个终端窗口执行 nc -l -p 9999 命令。这样的话我们监控了 9998 和 9999 两个端口接下来在 Flink 中进行接收。
运行刚刚的代码然后在前面打开的两个终端中交替发送订单数据观察 idea 控制台输出。
filter
使用 filter 来根据体温的阈值将流拆分为两个子流一个是正常体温流另一个是发烧体温流。然后我们可以对每个子流进行不同的业务逻辑处理。
疫情期间全国各地的超市、医院、机场等公共场所入口都有温度监控设备当该设备检测到某个人体温异常之后就会报警。假设鉴别正常体温和发烧体温的阈值为 36.0 摄氏度也就是说只要体温大于等于 36.0 摄氏度我们就认为其为发烧状态。我们使用 检测体温是否异常我们可以使用 filter 来将流分为两条子流一个代表 正常体温另一个代表 发烧体温然后可以对这些流进行不同的业务逻辑处理。
在我们 FlinkLearning 工程的 com.vlab.operator 包下创建一个名为 SelectOperator 的 Scala object代码如下
package com.shiyanlou.operatorimport org.apache.flink.streaming.api.scala._object SelectOperator {def main(args: Array[String]): Unit {// 设置流环境val env StreamExecutionEnvironment.getExecutionEnvironment// 读取socket文本数据流val inputDS: DataStream[String] env.socketTextStream(192.168.137.81, 9999)val peopleStream inputDS.map(line {val arr line.split( )People(arr(0), arr(1).toFloat)})// 使用 keyBy 按照温度类型high 或 normal进行分组val highTempStream peopleStream.filter(_.temperature 36.5)val normalTempStream peopleStream.filter(_.temperature 36.5)// 打印输出highTempStream.print(发烧)normalTempStream.print(体温正常)env.execute(SelectOperator)}case class People(name: String, temperature: Float)
}上面的代码中我们创建了一个 Socket 输入流监控localhost下的 9999 端口然后将输入的文本使用空格分隔之后转换为People类。紧接着使用 Split 算子将体温大于 36.0 的人群定义为fever将体温小于等于 36.0 的人群定义为normal最后使用select算子选择了fever发烧状态的人群并输出到控制台。
打开终端执行nc -l -p 9999在 idea 运行以上代码并在终端中依次发送下面的信息
张小明 35.6
李鹏程 36.3
赵露 36.7
李阳 35.5
刘明 37.0在 idea 的控制台会看到将体温高于 36.5 的做了打印赵露、刘明。
实验总结
本节实验中我们介绍了 Flink 中的多流转换算子其中 Union 是将两个或者多个类型相同的输入流转换成一个输入流而filter是将一个输入流根据给定的条件切分成多个子输入流。这部分内容在工作中会经常用到大家一定要理解。