当前位置: 首页 > news >正文

如何建议一个网站网站设计培训班

如何建议一个网站,网站设计培训班,国外seo做的好的网站,嘉兴做网站优化哪家好1.map 特性#xff1a;接收一个数据#xff0c;经过处理之后#xff0c;就返回一个数据 1.1. 源码分析 我们来看看map的源码 map需要接收一个MapFunctionT,R的对象#xff0c;其中泛型T表示传入的数据类型#xff0c;R表示经过处理之后输出的数据类型我们继续往…1.map 特性接收一个数据经过处理之后就返回一个数据 1.1. 源码分析 我们来看看map的源码 map需要接收一个MapFunctionT,R的对象其中泛型T表示传入的数据类型R表示经过处理之后输出的数据类型我们继续往下点看看MapFunctionT,R的源码 这是一个接口那么在代码中我们就需要实现这个接口 1.2. 案例 那么我们现在要实现一个功能就是从给一个文件中读取数据返回每一行的字符串长度。 我们要读取的文件内容如下 代码贴在这里为了让打击不看迷糊导包什么的我就省略了 public class TransformTest1_Base {public static void main(String[] args) throws Exception {// 1. 获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2. 将并行度设为1env.setParallelism(1);// 3. 读取文件夹DataStreamSourceString inputDataStream env.readTextFile(C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor);// 4. 将文件夹每一行的数据都返回它的长度// 在这里我们用匿名内部类的方式创建了一个MapFunction对象SingleOutputStreamOperatorInteger dataStream inputDataStream.map(new MapFunctionString, Integer() {// 5. 重写map方法参数s是接收到的一个数据我们只需要返回它的长度就行了。Overridepublic Integer map(String s) throws Exception {return s.length();}});// 6. 打印输出dataStream.print();// 7. 启动执行环境env.execute();} } 显示 1.3. 总结 map的使用范围就是需要对的那个数据进行处理并且每次返回一个数据的时候map就比较方便了。 2. flatMap 接收一个数据可以返回多条数据 2.1. 源码分析 我们发现它需要传入一个FlatMapFunction的一个对象 我们继续点进去看看FlatMapFunction的源码可以发现FlatMapFunctionT,R也是一个接口并且接口里面的方法的返回值是一个Collector也就是多个值的集合。 2.2. 案例 我们还是读取那个文件这次我们要做的处理是将文件的每一行数据按照逗号隔开给出代码 public class TransformTest2_Base {public static void main(String[] args) throws Exception {// 1. 获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2. 设置并行度env.setParallelism(1);// 3. 读取文件夹DataStreamSourceString dataStream env.readTextFile(C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor);// 4. 用匿名内部类的方式重写FlatMapFuncction将每行字符按,隔开SingleOutputStreamOperatorString flatMapStream dataStream.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String s, CollectorString collector) throws Exception {// 5. 分割一行字符获得对应的字符串数组String[] split s.split(,);for (String slt : split) {// 6. 将这些数据返回collector.collect(slt);}}});// 7. 打印输出处理后的数据flatMapStream.print();// 8. 启动执行环境env.execute();} } 可以看到执行的结果 3. filter 听这个名字就知道是个过滤器用来过滤数据。 3.1. 源码分析 我们看看filer的源码继承子FilterFunction可以看到这次泛型就只有一个值了因为filter只允许返回的数据原来的数据所以只做过滤并不能改变数据蕾西没必要设置返回的类型 我们继续点进去看看FilterFunction的源码 果不其然也是一个接口而里面的filter方法只有一个参数并且返回的是一个boolean类型若返回true则var1原样返回若返回false则var1会被过滤掉。 3.2. 案例 我们还是读取以上文件这一次我们返回以sensor_1开头的字符串其余的一律不返回给出代码 public class TransformTest3_Base {public static void main(String[] args) throws Exception {// 1. 获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2. 设置并行度env.setParallelism(1);// 3. 读取文件DataStreamSourceString dataStream env.readTextFile(C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor);// 4. 用匿名内部类的方式重写FilterFunctionSingleOutputStreamOperatorString filterDataStream dataStream.filter(new FilterFunctionString() {Overridepublic boolean filter(String s) throws Exception {// 5. 若s以sensor_1开头则返回truereturn s.startsWith(\sensor_1\);}});// 6. 打印处理后的数据filterDataStream.print();// 7. 启动执行环境env.execute();} } 4. 分组聚合 注意任何的聚合操作都有默认的分组聚合是在分组的基础上进行的。比如对整体进行求和那么分组就是整体。所以在做聚合操作之前一定要明确是在哪个分组上进行聚合操作注意聚合操作本质上是一个多对一一对一是多对一的特殊情况的操作。特别注意的是这个’一‘可以是一个值mean sum等,同样也可以是一个对象list set等对象 4.1. 分组keyBy DataStream → KeyedStream逻辑地将一个流拆分成不相交的分区每个分区包含具有相同 key 的元素在内部以 hash 的形式实现的。 分组就是为了聚合操作做准备的keyBy方法会将数据流按照hash实现分别放在不同的分区每个分区都可以进行聚合操作。我们可以用这个性质计算每一个sensor温度的最大值我们为此将文件修改 分组之后的图就是所有sensor_1在一个分区里sensor_6sensor_7sensor_10在不同的三个分区也就是有四个分区而后三个分区中只有一条数据所以最大值和最小值都只有一个在flink中分组操作是由keyBy方法来完成的我们来看看keyBy的源码 可以发现keyBy可以对对象和元组进行聚合。 4.2. 聚合 这些算子可以针对 KeyedStream 的每一个支流做聚合。 ⚫ sum()对每个支流求和 ⚫ min()对每个支流求最小值 ⚫ max()对每个支流求最大值 ⚫ minBy() ⚫ maxBy() 我们来看看max()的源码 这也是传一个属性名也就是求对应的属性名的最大值。 4.3. 实例演示 public class TransformTest1_RollingAggreation {public static void main(String[] args) throws Exception {// 1. 获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2. 设置并行度env.setParallelism(1);// 3. 读取文件DataStreamSourceString stringDataStreamSource env.readTextFile(C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor);// 4. 用map将每行数据变成一个对象SingleOutputStreamOperatorSensorReading map stringDataStreamSource.map(new MapFunctionString, SensorReading() {Overridepublic SensorReading map(String s) throws Exception {String[] split s.split(,);return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));}});// 5. 分组操作以id属性分组KeyedStreamSensorReading, Tuple keyedstream map.keyBy(id);// 6. 聚合操作求每个分组的温度最大值SingleOutputStreamOperatorSensorReading resultStream keyedstream.max(temperature);// 7. 打印输出resultStream.print();// 8. 启动执行环境env.execute();} } 运行结果 诶这有人就要问了不是求每一个分组的温度最大值么为什么sensor_1的这个分组所有的数据都有 答flink是一个流处理分布式框架这是一条数据流每来一个数据就得处理一次所以输出的都是当前状态下的最大值。 4.4. reduce自定义聚合 在实际生产中不可能让我们完成这么简单的操作就行了所以我们需要更复杂的操作而reduce就是满足这个条件它可以让我们自定义聚合的方式。 我们来看看reduce的源码 reduce需要传入的是一个ReduceFunction的对象我们再来看看ReduceFunction是个什么东西 var1是当前这个分组的状态var2是新加入的值而reduce函数体就是我们要进行的操作返回一个新的状态。 到这我就明白了要是我们向实时获取最大温度的话var1是之前的最大温度通过var1和var2的比较就能实现。 4.5. reduce实例 我们这一次要实现一个实时的温度最大值也就是返回的数据中的时间戳是当前的。 public class TransformTest1_Reduce {public static void main(String[] args) throws Exception {// 1. 获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2. 设置并行度env.setParallelism(1);// 3. 读取文件DataStreamSourceString dataStream env.readTextFile(C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor);// 4. 通过map将每行数据转换为一个对象SingleOutputStreamOperatorSensorReading map dataStream.map(new MapFunctionString, SensorReading() {Overridepublic SensorReading map(String s) throws Exception {String[] split s.split(,);return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));}});// 5. 按对象的id分组KeyedStreamSensorReading, Tuple keyStream map.keyBy(id);// 6. reduce自定义聚合SingleOutputStreamOperatorSensorReading reduce keyStream.reduce(new ReduceFunctionSensorReading() {Overridepublic SensorReading reduce(SensorReading sensorReading, SensorReading t1) throws Exception {// 7. 获取当前时间为止接收到的最大温度return new SensorReading(sensorReading.getId(), System.currentTimeMillis(), Math.max(sensorReading.getTemperature(),t1.getTemperature()));}});// 8. 打印输出reduce.print();// 9. 启动运行环境env.execute();} } 这一次的输出我们就得你好好研究一下了。 从这块可以发现我们获取的都是当前的时间戳而且时间戳也在改变这一点很好理解但是下面这个数据就很诡异了。 这两块的时间戳为什么没有改变呢这需要我们再来看看reduce方法了reduce方法是传入两个参数第一个是当前的状态第二个是新读取的值通过方法体的操作返回一个最新的状态。仔细理解一下这句话若我刚开始没有数据的时候那么哪来的状态呢所以reduce把接收到的第一个参数作为状态其中sensor_6,7,8这三个分区只有一个数据所以直接拿来当作状态。 5. 多流转换算子 5.1. 分流操作Split 和 Select Split能将流中的数据按条件贴上标签比如我把温度大于30度的对象贴上一个high标签把温度低于30度的贴上一个low标签标签可以贴多个。那么就把流中的数据按照标签分类了这里并没有分流Select是按照标签来分流 split源码 可以发现返回的是一个SplitStream需要传入一个选择器我们看看OutputSeclector的源码 传入value返回这个value对应的标签实现对这个value进行类似分类的操作。select源码 只需要接收一个或者多个标签就能返回包含那个标签对象的数据流。 5.2. 实例演示 我们这一次要把读取到的数据分成三条流一条是high高于30度一条是low低于30度一条是all所有的数据。代码 public class TransformTest4_MultipleStreams {public static void main(String[] args) throws Exception {// 1. 获取执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2. 设置并行度env.setParallelism(1);// 3. 读取文件DataStreamSourceString dataStream env.readTextFile(C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor);// 4. 通过map将每行数据转换为一个对象SingleOutputStreamOperatorSensorReading map dataStream.map(new MapFunctionString, SensorReading() {Overridepublic SensorReading map(String s) throws Exception {String[] split s.split(,);return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));}});// 5. 按条件贴标签SplitStreamSensorReading split map.split(new OutputSelectorSensorReading() {Overridepublic IterableString select(SensorReading value) {return value.getTemperature() 30 ? Collections.singletonList(high) : Collections.singletonList(low);}});// 6. 按标签选择生成不同的数据流DataStreamSensorReading high split.select(high);DataStreamSensorReading low split.select(low);DataStreamSensorReading all split.select(high, low);high.print(high);low.print(low);all.print(all);env.execute();} } 5.3. 合流操作Connect 和 CoMap DataStream,DataStream → ConnectedStreams连接两个保持他们类型的数 据流两个数据流被 Connect 之后只是被放在了一个同一个流中内部依然保持各自的数据和形式不发生任何变化两个流相互独立。 ConnectedStreams → DataStream作用于 ConnectedStreams 上功能与 map和 flatMap 一样对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap处理。类似于一国两制看似两条流合并在了一起其实内部依旧是按照自己的约定运行类型并没有改变。 connect源码 将当前调用者的流和参数中的流合并返回一个ConnectedStreamsT,R类型 我们再来看看ConnectionStreamsT,R中的map方法其中要传的是一个CoMapFunctionIN1,IN2,R的对象最重要的就是这个类我们来看看这个类 这个CoMapFunctionIN1,IN2,R和之前的MapFunction不太一样这里要重写的方法有两个map1和map2一个是针对IN1的一个是针对IN2的R就是返回类型。 这下全明白了在这个方法内部对这两条流分别操作合成一条流。 5.4. 实例演示 public class TransformTest5_MultipleStreams {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 1. 读取文件DataStreamSourceString dataStreamSource env.readTextFile(C:\\Users\\Administrator\\IdeaProjects\\FlinkTutorial\\src\\main\\resources\\sensor);// 2. 将每行数据变成一个对象SingleOutputStreamOperatorSensorReading map dataStreamSource.map(new MapFunctionString, SensorReading() {Overridepublic SensorReading map(String s) throws Exception {String[] split s.split(,);return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));}});// 3. 将数据打上标签SplitStreamSensorReading split map.split(new OutputSelectorSensorReading() {Overridepublic IterableString select(SensorReading value) {return value.getTemperature() 30 ? Collections.singletonList(high) : Collections.singletonList(low);}});// 4. 按照高温和低温的标签分成两条流DataStreamSensorReading high split.select(high);DataStreamSensorReading low split.select(low);// 5. 将high流的数据转换为二元组SingleOutputStreamOperatorTuple2String, Double tuple2SingleOutputStreamOperator high.map(new MapFunctionSensorReading, Tuple2String, Double() {Overridepublic Tuple2String, Double map(SensorReading sensorReading) throws Exception {return new Tuple2(sensorReading.getId(), sensorReading.getTemperature());}});// 6. 将tuple2SingleOutputStreamOperator和low连接ConnectedStreamsTuple2String, Double, SensorReading connect tuple2SingleOutputStreamOperator.connect(low);// 7. 调用map传参CoMapFunction将两条流合并成一条流objectSingleOutputStreamOperatorSingleOutputStreamOperatorObject objectSingleOutputStreamOperator connect.map(new CoMapFunctionTuple2String, Double, SensorReading, Object() {// 这是处理high流的方法Overridepublic Object map1(Tuple2String, Double value) throws Exception {return new Tuple3(value.getField(0), value.getField(1), temp is too high);}// 这是处理low流的方法Overridepublic Object map2(SensorReading value) throws Exception {return new Tuple2(value.getTemperature(), normal);}});objectSingleOutputStreamOperator.print();env.execute();} } 5.5. 多条流合并union 之前我们只能合并两条流那我们要合并多条流呢这里我们就需要用到union方法。 Connect 与 Union 区别 Union 之前两个流的类型必须是一样Connect 可以不一样在之后的 coMap中再去调整成为一样的。Connect 只能操作两个流Union 可以操作多个。 若我们给出以下代码 high.union(low,all); 那么highlowall三条流都会合并在一起。
http://www.w-s-a.com/news/611062/

相关文章:

  • 平台营销型网站建设小城镇建设的网站文献
  • 燕郊个人做网站小企业网站模板
  • 网站ip需要备案新开河街做网站公司
  • 网站定制设计方案wordpress批量传图片
  • 做外贸兼职的网站设计福州网站开发私人
  • 金华建站模板目前国内有哪些网站做家具回收
  • 个人做网站还是公众号赚钱好部门网站建设和维护
  • 系列图标设计网站推荐建商城网站
  • 中牟建设工程信息网站黑龙江 哈尔滨
  • 网站设计基本结构wap自助建论坛网站
  • 专业番禺网站建设爱做网站外国
  • 深圳罗湖网站设计公司价格制作网站的公司办什么营业执照
  • 长清网站建设价格群辉NAS搭建wordpress
  • 变更股东怎样在工商网站做公示网站建设和网站优化哪个更重要
  • 西安手机网站python网站开发效率
  • 深圳建站的公司羽毛球赛事2022直播
  • j2ee网站开发搜索推广的流程
  • 网站目录结构图虚拟主机如何安装WordPress
  • 信产部网站备案保定软件开发网站制作
  • 东莞网站设计定做东莞网站建设最牛
  • 网站开发的软件天猫的网站导航怎么做的
  • 做链接哪个网站好网站建设平台方案设计
  • 资质升级业绩备案在哪个网站做网站建设方案费用预算
  • 做网站找哪个平台好wordpress 3.9 性能
  • 大兴模版网站建设公司企业网站备案案例
  • h5建站是什么wordpress客户端 接口
  • 济南自适应网站建设制作软件下载
  • 望都网站建设抖音广告投放收费标准
  • 网站制作软件排行榜上海市网站建设公司58
  • 什么是网站风格中国工商网企业查询官网