当前位置: 首页 > news >正文

柳州网站建设 来宾市网站制作照片网站模板

柳州网站建设 来宾市网站制作,照片网站模板,企业建设网站有哪些,好的网站建设网站Flink基础 1、系统时间与事件时间 系统时间#xff08;处理时间#xff09; 在Sparksreaming的任务计算时#xff0c;使用的是系统时间。 假设所用窗口为滚动窗口#xff0c;大小为5分钟。那么每五分钟#xff0c;都会对接收的数据进行提交任务. 但是#xff0c;这里有…Flink基础 1、系统时间与事件时间 系统时间处理时间 在Sparksreaming的任务计算时使用的是系统时间。 假设所用窗口为滚动窗口大小为5分钟。那么每五分钟都会对接收的数据进行提交任务. 但是这里有个要注意的点有个概念叫时间轴对齐。若我们在1212开始接收数据按道理我们会在1217进行提交任务。事实上我们会在1220进行提交任务因为会进行时间轴对齐将一天按照五分钟进行划分会对应到1220。在此时提交任务后面每个五分钟提交任务都会对应到我们所划分的时间轴。 事件时间 flink支持带有事件时间的窗口Window操作 事件时间区别于系统时间如下举例 flink处理实时数据对数据进行逐条处理。设定事件时间为5分钟1200开始接收数据接收的第一条数据时间为1201接收的第二条数据为1202。假设从此时起没有收到数据那么将不会进行提交任务。**到了1206接收到了第三条数据。第三条数据的接收时间自1200起已经超过了五分钟**那么此时便会进行任务提交。 2、wordcount简单案例的实现 import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class Demo01StreamWordCount {public static void main(String[] args) throws Exception {// 1、构建Flink环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2、通过Socket模拟无界流环境方便FLink处理// 虚拟机启动nc -lk 8888// 从Source构建第一个DataStream// TODO C:\Windows\System32\drivers\etc\hosts文件中配置了master与IP地址的映射所以这里可以使用masterDataStreamString lineDS env.socketTextStream(master, 8888);// 统计每个单词的数量// 第一步将每行数据的每个单词切出来并进行扁平化处理DataStreamString wordsDS lineDS.flatMap(new FlatMapFunctionString, String() {/***FlatMapFunctionString, String: 表示输入、输出数据的类型* param line DS中的一条数据* param out 通过collect方法将数据发送到下游* throws Exception*/Overridepublic void flatMap(String line, CollectorString out) throws Exception {for (String word : line.split(,)) {// 将每个单词发送到下游out.collect(word);}}});// 第二步将每个单词变成 KV格式V置为1;返回的数据是一个二元组Tuple2DataStreamTuple2String, Integer wordKVDS wordsDS.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String word) throws Exception {return Tuple2.of(word, 1);}});/*** 第三步按每一个单词进行分组; 无法再使用其父类DataStream进行定义无法向上转型* KeyedStreamT, K 是 DataStreamT 的一个特殊化版本它添加了与键控操作相关的特定方法如 reduce、aggregate、window 等。* 由于 KeyedStream 提供了额外的功能和方法它不能简单地被视为 DataStream 的一个简单实例* 因为它实现了额外的接口如 KeyedOperationsT, K并可能覆盖了某些方法的行为以支持键控操作。*/KeyedStreamTuple2String, Integer, String keyedDS wordKVDS.keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer tuple2) throws Exception {// 对Key进行分组return tuple2.f0;}});// 第四步对1进行聚合sum,下标是从0开始的DataStreamTuple2String, Integer wordCntDS keyedDS.sum(1);// 3、打印结果将DS中的内容Sink到控制台wordCntDS.print();// 执行任务env.execute();} }3、设置任务执行的并行度 本机为8核可并行16的线程 手动改变任务的并行度若不设置则会显示1-16设置后只会显示1-2 env.setParallelism(2); setBufferTimeout():设置输出缓冲区刷新的最大时间频率(毫秒)。 env.setBufferTimeout(200); import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class Demo01StreamWordCount {public static void main(String[] args) throws Exception {// 1、构建Flink环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 手动改变任务的并行度,默认并行度为最大env.setParallelism(2);// setBufferTimeout():设置输出缓冲区刷新的最大时间频率(毫秒)。env.setBufferTimeout(200);// 2、通过Socket模拟无界流环境方便FLink处理// 虚拟机启动nc -lk 8888// 从Source构建第一个DataStreamDataStreamString lineDS env.socketTextStream(master, 8888);System.out.println(lineDS并行度: lineDS.getParallelism());// 统计每个单词的数量// 第一步将每行数据的每个单词切出来并进行扁平化处理DataStreamString wordsDS lineDS.flatMap(new FlatMapFunctionString, String() {/**** param line DS中的一条数据* param out 通过collect方法将数据发送到下游* throws Exception*/Overridepublic void flatMap(String line, CollectorString out) throws Exception {for (String word : line.split(,)) {// 将每个单词发送到下游out.collect(word);}}});System.out.println(wordsDS并行度: wordsDS.getParallelism());// 第二步将每个单词变成 KV格式V置为1DataStreamTuple2String, Integer wordKVDS wordsDS.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String word) throws Exception {return Tuple2.of(word, 1);}});System.out.println(wordKVDS并行度: wordKVDS.getParallelism());// 第三步按每一个单词进行分组// keyBy之后数据流会进行分组相同的key会进入同一个线程中被处理// 传递数据的规则hash取余线程总数默认CPU的总线程数原理KeyedStreamTuple2String, Integer, String keyedDS wordKVDS.keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer tuple2) throws Exception {return tuple2.f0;}});System.out.println(keyedDS并行度: keyedDS.getParallelism());// 第四步对1进行聚合sumDataStreamTuple2String, Integer wordCntDS keyedDS.sum(1);System.out.println(wordCntDS并行度: wordCntDS.getParallelism());// 3、打印结果将DS中的内容Sink到控制台keyedDS.print();env.execute();} }4、设置批/流处理方式使用Lambda表达式使用自定类实现接口中抽象的方法 package com.shujia.flink.core;import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class Demo02BatchWordCount {public static void main(String[] args) throws Exception {// 1、构建环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 设置Flink程序的处理方式默认是流处理/*** BATCH批处理只能处理有界流底层是MR模型可以进行预聚合* STREAMING流处理可以处理无界流也可以处理有界流底层是持续流模型数据一条一条处理* AUTOMATIC自动判断当所有的Source都是有界流则使用BATCH模式当Source中有一个是无界流则会使用STREAMING模式*/env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 2、获得第一个DS// 通过readTextFile可以基于文件构建有界流DataStreamString wordsFileDS env.readTextFile(flink/data/words.txt);// 3、DS之间的转换// 统计每个单词的数量// 第一步将每行数据的每个单词切出来并进行扁平化处理// Flink处理逻辑传入的方式// new XXXFunction 使用匿名内部类 // DataStreamString wordsDS wordsFileDS.flatMap(new FlatMapFunctionString, String() { // /** // * param line DS中的一条数据 // * param out 通过collect方法将数据发送到下游 // * throws Exception // * Type parameters: // * FlatMapFunctionT, O // * T – Type of the input elements. O – Type of the returned elements. // */ // Override // public void flatMap(String line, CollectorString out) throws Exception { // for (String word : line.split(,)) { // // 将每个单词发送到下游 // out.collect(word); // } // } // });/*** 使用Lambda表达式* 使用时得清楚FlatMapFunction中所要实现的抽象方法flatMap的两个参数的含义* ()-{}* 通过 - 分隔左边是函数的参数右边是函数实现的具体逻辑* 并且需要给出 flatMap函数的输出类型Types.STRING* line: 输入数据类型, out: 输出数据类型*/DataStreamString wordsDS wordsFileDS.flatMap((line, out) - {for (String word : line.split(,)) {out.collect(word);}}, Types.STRING);//TODO 使用自定类实现接口中抽象的方法一般不使用这种方法wordsFileDS.flatMap(new MyFunction()).print();// 第二步将每个单词变成 KV格式V置为1 // DataStreamTuple2String, Integer wordKVDS wordsDS.map(new MapFunctionString, Tuple2String, Integer() { // Override // public Tuple2String, Integer map(String word) throws Exception { // return Tuple2.of(word, 1); // } // });// TODO 此处需要给出 map函数的输出类型Types.TUPLE(Types.STRING, Types.INT)是一个二元组DataStreamTuple2String, Integer wordKVDS wordsDS.map(word - Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));/*** 第三步按每一个单词进行分组* keyBy之后数据流会进行分组相同的key会进入同一个线程中被处理* 传递数据的规则hash取余线程总数默认CPU的总线程数本机为16原理*/ // KeyedStreamTuple2String, Integer, String keyedDS wordKVDS.keyBy(new KeySelectorTuple2String, Integer, String() { // Override // public String getKey(Tuple2String, Integer tuple2) throws Exception { // return tuple2.f0; // } // });// TODO 此处的Types.STRING 并不是直接表示某个方法的输出类型而是用来指定 keyBy 方法中键key的类型。这里可以省略KeyedStreamTuple2String, Integer, String keyedDS wordKVDS.keyBy(kv - kv.f0, Types.STRING);// 第四步对1进行聚合sum无需指定返回值类型DataStreamTuple2String, Integer wordCntDS keyedDS.sum(1);// 4、最终结果的处理保存/输出打印wordCntDS.print();env.execute();} }class MyFunction implements FlatMapFunctionString,String{Overridepublic void flatMap(String line, CollectorString out) throws Exception {for (String word : line.split(,)) {// 将每个单词发送到下游out.collect(word);}} }5、source Flink 在流处理和批处理上的 source 大概有 4 类 基于本地集合的 source、 基于文件的 source、 基于网络套接字的 source、 自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等当然你也可以定义自己的 source。 1、从本地集合source中读取数据 package com.shujia.flink.source;import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;public class Demo01ListSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 本地集合SourceArrayListString arrList new ArrayList();arrList.add(flink);arrList.add(flink);arrList.add(flink);arrList.add(flink);//TODO 有界流fromCollectionDataStreamString listDS env.fromCollection(arrList);listDS.print();env.execute();} }2、新版本从本地文件中读取数据有界流和无界流两种方式 package com.shujia.flink.source;import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.connector.file.src.FileSource; import org.apache.flink.connector.file.src.reader.TextLineInputFormat; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.io.File; import java.time.Duration;public class Demo02FileSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//TODO 历史版本读文件的方式有界流DataStreamString oldFileDS env.readTextFile(flink/data/words.txt); // oldFileDS.print();//TODO 读取案例一 新版本加载文件的方式FileSource默认是有界流FileSourceString fileSource FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path(flink/data/words.txt)).build();//TODO 从Source加载数据构建DS使用自带source类使用 fromSourceDataStreamString fileSourceDS env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), fileSource);fileSourceDS.print();//TODO 读取案例二 将读取文件变成无界流FileSourceString fileSource2 FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path(flink/data/words))//TODO 使成为无界流读取一个文件夹中的数据类似Flume中的spool dir可以监控一个目录下文件的变化// Duration.ofSeconds(5) 以5秒为间隔持续监控.monitorContinuously(Duration.ofSeconds(5)).build();DataStreamString fileSourceDS2 env.fromSource(fileSource2,WatermarkStrategy.noWatermarks(),fileSource2);fileSourceDS2.print();env.execute();} }3、自定义source类区分有界流与无界流 只有在Source启动时会执行一次 run方法如果会结束则Source会得到一个有界流run方法如果不会结束则Source会得到一个无界流package com.shujia.flink.source;import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction;public class Demo03MySource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// TODO 使用自定义source类通过addSource对其进行添加DataStreamString mySourceDS env.addSource(new MySource());mySourceDS.print();env.execute();} }class MySource implements SourceFunctionString{/*** 只有在Source启动时会执行一次* run方法如果会结束则Source会得到一个有界流* run方法如果不会结束则Source会得到一个无界流* 下面的例子Source会得到一个无界流*/Overridepublic void run(SourceContextString ctx) throws Exception {System.out.println(run方法启动了);// ctx 可以通过collect方法向下游发送数据long cnt 0L;while(true){ctx.collect(cnt);cnt ;// 休眠一会Thread.sleep(1000);}}// Source结束时会执行Overridepublic void cancel() {System.out.println(Source结束了);} }4、自定义source类读取MySQL中的数据并进行处理 package com.shujia.flink.source;import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement;public class Demo04MyMySQLSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamStudents studentDS env.addSource(new MyMySQLSource());// 统计班级人数DataStreamTuple2String, Integer clazzCntDS studentDS.map(stu - Tuple2.of(stu.clazz, 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(t2 - t2.f0).sum(1);clazzCntDS.print();// 统计性别人数DataStreamTuple2String, Integer genderCntDS studentDS.map(stu - Tuple2.of(stu.gender, 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(t2 - t2.f0).sum(1);genderCntDS.print();env.execute();} }// TODO 自定义source类从MySQL中读取数据 class MyMySQLSource implements SourceFunctionStudents {Overridepublic void run(SourceContextStudents ctx) throws Exception {//TODO run方法只会执行一次创建下列对象的操作// 建立连接Connection conn DriverManager.getConnection(jdbc:mysql://master:3306/bigdata_30, root, 123456);// 创建StatementStatement st conn.createStatement();// 执行查询ResultSet rs st.executeQuery(select * from students2);// 遍历rs提取每一条数据while (rs.next()) {long id rs.getLong(id);String name rs.getString(name);int age rs.getInt(age);String gender rs.getString(gender);String clazz rs.getString(clazz);Students stu new Students(id, name, age, gender, clazz);ctx.collect(stu);/*** 16 (文科四班,1)* 15 (女,1)* 15 (女,2)* 2 (男,1)* 7 (文科六班,1)* 15 (女,3)* 2 (男,2)* 17 (理科六班,1)* 17 (理科六班,2)* 13 (理科五班,1)* 20 (理科二班,1)* 13 (理科四班,1)*/}rs.close();st.close();conn.close();}Overridepublic void cancel() {} }// TODO 创建一个类用于存储从MySQL中取出的数据 class Students {Long id;String name;Integer age;String gender;String clazz;public Students(Long id, String name, Integer age, String gender, String clazz) {this.id id;this.name name;this.age age;this.gender gender;this.clazz clazz;} }6、sink Flink 将转换计算后的数据发送的地点 。 Flink 常见的 Sink 大概有如下几类 写入文件、 打印出来、 写入 socket 、 自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等同理你也可以定义自己的 sink。 1、构建FileSink监控一个端口中的数据并将其写入到本地文件夹中 package com.shujia.flink.sink;import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.configuration.MemorySize; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;import java.time.Duration; public class Demo01FileSink {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString lineDS env.socketTextStream(master, 8888);// 构建FileSinkFileSinkString fileSink FileSink.StringforRowFormat(new Path(flink/data/fileSink), new SimpleStringEncoderString(UTF-8)).withRollingPolicy(DefaultRollingPolicy.builder()// 这个设置定义了滚动的时间间隔。.withRolloverInterval(Duration.ofSeconds(10))// 这个设置定义了一个不活动间隔。.withInactivityInterval(Duration.ofSeconds(10))// 这个设置定义了单个日志文件可以增长到的最大大小。在这个例子中每个日志文件在被滚动之前可以增长到最多1MB。.withMaxPartSize(MemorySize.ofMebiBytes(1)).build()).build();lineDS.sinkTo(fileSink);env.execute();} }2、自定义sink类 package com.shujia.flink.sink;import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.util.ArrayList;public class Demo02MySink {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();ArrayListString arrList new ArrayList();arrList.add(flink);arrList.add(flink);arrList.add(flink);arrList.add(flink);DataStreamSourceString ds env.fromCollection(arrList);ds.addSink(new MySinkFunction());env.execute();/*** 进入了invoke方法* flink* 进入了invoke方法* flink* 进入了invoke方法* flink* 进入了invoke方法* flink*/} }class MySinkFunction implements SinkFunctionString{Overridepublic void invoke(String value, Context context) throws Exception {System.out.println(进入了invoke方法);// invoke 每一条数据会执行一次// 最终数据需要sink到哪里就对value进行处理即可System.out.println(value);} }7、Transformation数据转换的常用操作 1、Map package com.shujia.flink.tf;import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo01Map {public static void main(String[] args) throws Exception {// 传入一条数据返回一条数据StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamString ds env.socketTextStream(master, 8888);// 1、使用匿名内部类DataStreamTuple2String, Integer mapDS ds.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String word) throws Exception {return Tuple2.of(word, 1);}});// mapDS.print();// 2、使用lambda表达式DataStreamTuple2String, Integer mapDS2 ds.map(word - Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));mapDS2.print();env.execute();} }2、FlatMap package com.shujia.flink.tf;import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class Demo02FlatMap {public static void main(String[] args) throws Exception {// 传入一条数据返回多条数据类似UDTF函数StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamString ds env.socketTextStream(master, 8888);// 1、使用匿名内部类SingleOutputStreamOperatorTuple2String, Integer flatMapDS01 ds.flatMap(new FlatMapFunctionString, Tuple2String, Integer() {Overridepublic void flatMap(String line, CollectorTuple2String, Integer out) throws Exception {for (String word : line.split(,)) {out.collect(Tuple2.of(word, 1));}}});flatMapDS01.print();// 2、使用lambda表达式SingleOutputStreamOperatorTuple flatMapDS02 ds.flatMap((line, out) - {for (String word : line.split(,)) {out.collect(Tuple2.of(word, 1));}}, Types.TUPLE(Types.STRING, Types.INT));flatMapDS02.print();env.execute();} }3、Filter package com.shujia.flink.tf;import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo03Filter {public static void main(String[] args) throws Exception {// 过滤数据注意返回值必须是布尔类型返回true则保留数据返回false则过滤数据StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamString ds env.socketTextStream(master, 8888);/*** Integer.valueOf该方法将字符串参数转换为 Integer 对象。返回的是 Integer 类型即 java.lang.Integer 的一个实例。* Integer.parseInt该方法将字符串参数解析为基本数据类型 int 的值。返回的是 int 类型的值而不是对象。* 无需指定返回值类型*/// 只输出大于10的数字SingleOutputStreamOperatorString filterDS ds.filter(new FilterFunctionString() {Overridepublic boolean filter(String value) throws Exception {return Integer.parseInt(value) 10;}});filterDS.print();ds.filter(value - Integer.parseInt(value) 10).print();env.execute();} }4、KeyBy // 两种不同的简写方式 ds.keyBy(value - value.toLowerCase(), Types.STRING).print(); ds.keyBy(String::toLowerCase, Types.STRING).print(); package com.shujia.flink.tf;import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo04KeyBy {public static void main(String[] args) throws Exception {// 用于就数据流分组让相同的Key进入到同一个任务中进行处理后续可以跟聚合操作StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamString ds env.socketTextStream(master, 8888);KeyedStreamString, String keyByDS ds.keyBy(new KeySelectorString, String() {Overridepublic String getKey(String value) throws Exception {return value;}});keyByDS.print();// 两种不同的简写方式ds.keyBy(value - value.toLowerCase(), Types.STRING).print();ds.keyBy(String::toLowerCase, Types.STRING).print();env.execute();} }5、Reduce package com.shujia.flink.tf;import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo05Reduce {public static void main(String[] args) throws Exception {// 用于对KeyBy之后的数据流进行聚合计算StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamString ds env.socketTextStream(master, 8888);// 统计班级的平均年龄/** 文科一班,20* 文科一班,22* 文科一班,21* 文科一班,20* 文科一班,22** 理科一班,20* 理科一班,21* 理科一班,20* 理科一班,21* 理科一班,20**/SingleOutputStreamOperatorTuple3String, Integer, Integer kvDS ds.map(line - {String[] split line.split(,);String clazz split[0];int age Integer.parseInt(split[1]);return Tuple3.of(clazz, age, 1);}, Types.TUPLE(Types.STRING, Types.INT, Types.INT));KeyedStreamTuple3String, Integer, Integer, String keyByDS kvDS.keyBy(kv - kv.f0, Types.STRING);keyByDS.reduce(new ReduceFunctionTuple3String, Integer, Integer() {Overridepublic Tuple3String, Integer, Integer reduce(Tuple3String, Integer, Integer value1, Tuple3String, Integer, Integer value2) throws Exception {return Tuple3.of(value1.f0, value1.f1 value2.f1, value1.f2 value2.f2);}}).map(t3 - Tuple2.of(t3.f0, (double) t3.f1 / t3.f2),Types.TUPLE(Types.STRING,Types.DOUBLE)).print();keyByDS.reduce((v1,v2)-Tuple3.of(v1.f0, v1.f1 v2.f1, v1.f2 v2.f2)).map(t3 - Tuple2.of(t3.f0, (double) t3.f1 / t3.f2),Types.TUPLE(Types.STRING,Types.DOUBLE)).print();env.execute();} }6、Window package com.shujia.flink.tf;import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time;public class Demo06Window {public static void main(String[] args) throws Exception {// Flink窗口操作时间、计数、会话StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamString ds env.socketTextStream(master, 8888);SingleOutputStreamOperatorTuple2String, Integer kvDS ds.map(word - Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));// 每隔5s统计每个单词的数量 --- 滚动窗口实现与spark中的定义相同SingleOutputStreamOperatorTuple2String, Integer outputDS01 kvDS// 按照Tuple2中的第一个元素进行分组.keyBy(kv - kv.f0, Types.STRING)// 设置滚动时间.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 对Tuple2中的第二个元素索引为1的元素即Integer类型进行求和.sum(1);// outputDS01.print();// 每隔5s统计最近10s内的每个单词的数量 --- 滑动窗口实现与spark中的定义相同kvDS.keyBy(kv - kv.f0, Types.STRING)// 设置窗口大小和滑动大小.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum(1).print();env.execute();} }7、Union package com.shujia.flink.tf;import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo07Union {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamString ds01 env.socketTextStream(master, 8888);DataStreamString ds02 env.socketTextStream(master, 9999);DataStreamString unionDS ds01.union(ds02);// union 就是将两个相同结构的DS合并成一个DS上下合并unionDS.print();env.execute();} }8、Process 通过processElement实现Map算子操作、flatMap算子操作实现扁平化、filter算子操作 package com.shujia.flink.tf;import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector;public class Demo08Process {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamString ds01 env.socketTextStream(master, 8888);ds01.process(new ProcessFunctionString, Object() {/** 每进来一条数据就会执行一次* value 一条数据* ctx可以获取任务执行时的信息* out用于输出数据* ProcessFunctionString, Object.Context ctxflink的上下文对象*/Overridepublic void processElement(String value, ProcessFunctionString, Object.Context ctx, CollectorObject out) throws Exception {// 通过processElement实现Map算子操作out.collect(Tuple2.of(value, 1));// 通过processElement实现flatMap算子操作实现扁平化for (String word : value.split(,)) {out.collect(word);}// 通过processElement实现filter算子操作if(java.equals(value)){out.collect(java ok);}}}).print();env.execute();} }通过processElement实现KeyBy算子操作 package com.shujia.flink.tf;import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector;import java.util.HashMap;public class Demo09KeyByProcess {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamString ds01 env.socketTextStream(master, 8888);KeyedStreamTuple2String, Integer, String keyedDS ds01.process(new ProcessFunctionString, Tuple2String, Integer() {Overridepublic void processElement(String value, ProcessFunctionString, Tuple2String, Integer.Context ctx, CollectorTuple2String, Integer out) throws Exception {for (String word : value.split(,)) {out.collect(Tuple2.of(word, 1));}}}).keyBy(t2 - t2.f0, Types.STRING);// 基于分组之后的数据流同样可以调用process方法/*** KeyedProcessFunctionK, I, O* Type parameters:* K – Type of the key. I – Type of the input elements. O – Type of the output elements.*/keyedDS.process(new KeyedProcessFunctionString, Tuple2String, Integer, String() {HashMapString, Integer wordCntMap;// 当KeyedProcessFunction构建时只会执行一次这样就避免了重复创建HashMap对象Overridepublic void open(Configuration parameters) throws Exception {wordCntMap new HashMapString, Integer();}// 每一条数据会执行一次Overridepublic void processElement(Tuple2String, Integer value, KeyedProcessFunctionString, Tuple2String, Integer, String.Context ctx, CollectorString out) throws Exception {// 通过process实现word count// 判断word是不是第一次进入通过HashMap查找word是否有count值String word value.f0;int cnt 1;if (wordCntMap.containsKey(word)) {//get 在集合中通过value来获取对应的值int newCnt wordCntMap.get(word) 1;wordCntMap.put(word, newCnt);cnt newCnt;} else {wordCntMap.put(word, 1);}out.collect(word : cnt);}}).print();env.execute();} }8、Flink并行度 如何设置并行度 1、考虑吞吐量 有聚合操作的任务1w条/s 一个并行度 无聚合操作的任务10w条/s 一个并行度 2、考虑集群本身的资源 注 Task的数量由并行度以及有无Shuffle一起决定可在shuffle之前观察是否有可合并的Task,可以来减少Task数量 Task Slot数量 是由任务中最大的并行度决定 TaskManager的数量由配置文件中每个TaskManager设置的Slot数量及任务所需的Slot数量一起决定 FLink 并行度设置的几种方式: 1、通过env设置不推荐如果需要调整并行度得修改代码重新打包提交任务 2、每个算子可以单独设置并行度视实际情况决定一般不常用 3、还可以在提交任务的时候指定并行度最常用 比较推荐的方式 命令行flink run 可以通过 -p 参数设置全局并行度 4、配置文件flink-conf.yaml中设置 web UI填写parallelism输入框即可设置优先级算子本身的设置 env做的全局设置 提交任务时指定的 配置文件flink-conf.yaml package com.shujia.flink.core;import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo03Parallelism {public static void main(String[] args) throws Exception {/*** 如何设置并行度* 1、考虑吞吐量* 有聚合操作的任务1w条/s 一个并行度* 无聚合操作的任务10w条/s 一个并行度* 2、考虑集群本身的资源** Task的数量由并行度以及有无Shuffle一起决定可在shuffle之前观察是否有可合并的Task,可以来减少Task数量* Task Slot数量 是由任务中最大的并行度决定* TaskManager的数量由配置文件中每个TaskManager设置的Slot数量及任务所需的Slot数量一起决定**/// FLink 并行度设置的几种方式StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 1、通过env设置不推荐如果需要调整并行度得修改代码重新打包提交任务env.setParallelism(3);// socketTextStream的并行度为1无法调整DataStreamSourceString ds env.socketTextStream(master, 8888);// 2、每个算子可以单独设置并行度视实际情况决定一般不常用SingleOutputStreamOperatorTuple2String, Integer kvDS ds.map(word - Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT)).setParallelism(4);SingleOutputStreamOperatorTuple2String, Integer wordCntDS2P kvDS.keyBy(kv - kv.f0).sum(1).setParallelism(2);// 如果算子不设置并行度则以全局为准wordCntDS2P.print();/*** 3、还可以在提交任务的时候指定并行度最常用 比较推荐的方式* 命令行flink run 可以通过 -p 参数设置全局并行度* * web UI填写parallelism输入框即可设置优先级算子本身的设置 env做的全局设置 提交任务时指定的 配置文件flink-conf.yaml*/env.execute();} }上述代码执行如下 9、事件时间 事件时间指的是数据产生的时间或是数据发生的时间。它是数据本身所携带的时间信息代表了事件真实发生的时间。在Flink中事件时间通过数据元素自身带有的时间戳来表示这个时间戳具有业务含义并与系统时间独立。 1、案例一基于事件事件的滚动窗口的实现 窗口的触发条件: 1、水位线大于等于窗口的结束时间 2、窗口内有数据 水位线某个线程中所接收到的数据中最大的时间戳 水位线设置1 单调递增时间戳策略不考虑数据乱序问题。所传入数据的最大事件时间作为水位线 .Tuple2String, LongforMonotonousTimestamps() 水位线设置2 设置水位线前移容忍5s的数据乱序到达本质上将水位线前移5s缺点导致任务延时变大 .Tuple2String, LongforBoundedOutOfOrderness(Duration.ofSeconds(5)) package com.shujia.flink.core;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time;import java.time.Duration;public class Demo04EventTime {public static void main(String[] args) throws Exception {// 事件时间数据本身自带的时间StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 设置全局并行度env.setParallelism(1);/*数据格式单词,时间戳很大的整数Long类型a,1722233813000a,1722233814000a,1722233815000a,1722233816000a,1722233817000a,1722233818000a,1722233819000a,1722233820000a,1722233822000a,1722233827000*/DataStreamSourceString wordTsDS env.socketTextStream(master, 8888);SingleOutputStreamOperatorTuple2String, Long mapDS wordTsDS.map(line - Tuple2.of(line.split(,)[0], Long.parseLong(line.split(,)[1])), Types.TUPLE(Types.STRING, Types.LONG));// 指定数据的时间戳告诉Flink将其作为事件时间进行处理SingleOutputStreamOperatorTuple2String, Long assDS mapDS.assignTimestampsAndWatermarks(WatermarkStrategy// 水位线某个线程中所接收到的数据中最大的时间戳 // //水位线设置1 单调递增时间戳策略不考虑数据乱序问题。所传入数据的最大事件时间作为水位线 // .Tuple2String, LongforMonotonousTimestamps()//TODO 水位线设置2 设置水位线前移容忍5s的数据乱序到达本质上将水位线前移5s缺点导致任务延时变大.Tuple2String, LongforBoundedOutOfOrderness(Duration.ofSeconds(5))// 指定事件时间可以提取数据的某一部分作为事件时间.withTimestampAssigner(new SerializableTimestampAssignerTuple2String, Long() {Overridepublic long extractTimestamp(Tuple2String, Long t2, long recordTimestamp) {return t2.f1;}}));// 不管是事件时间还是处理时间都需要搭配窗口操作一起使用assDS.map(kv - Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(t2 - t2.f0)/*** 窗口的触发条件* 1、水位线大于等于窗口的结束时间* 2、窗口内有数据*TumblingEventTimeWindows:滚动窗口*/.window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(1).print();env.execute();} }2、案例二自定义水平线策略 多并行度map之后指定水位线生成策略 注必须两个线程中的水位线都超过了窗口的大小才能触发窗口的执行 当窗口满足执行条件 1、所有线程的水位线都超过了窗口的结束时间 依次每两个不同编号的线程为一组该组均超过 2、窗口有数据 触发一次process方法 package tfTest;import com.shujia.flink.event.MyEvent; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;public class Demo05WaterMarkStrategy {public static void main(String[] args) throws Exception {// 自定义水位线策略// 参考链接https://blog.csdn.net/zznanyou/article/details/121666563StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSourceString eventDS env.socketTextStream(master, 8888);// 将每条数据变成MyEvent类型eventDS.map(new MapFunctionString, MyEvent() {Overridepublic MyEvent map(String value) throws Exception {String[] split value.split(,);return new MyEvent(split[0],Long.parseLong(split[1]));}})// TODO 设置事件时间和自定义水平线策略.assignTimestampsAndWatermarks(new WatermarkStrategyMyEvent() {Overridepublic TimestampAssignerMyEvent createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssignerMyEvent() {Overridepublic long extractTimestamp(MyEvent element, long recordTimestamp) {return element.getTs();}};}Overridepublic WatermarkGeneratorMyEvent createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new MyMapWatermarkGenerator();}}).keyBy(my- my.getWord()).window(TumblingEventTimeWindows.of(Time.seconds(5)))// 当窗口满足执行条件1、所有线程的水位线都超过了窗口的结束时间 2、窗口有数据 触发一次process方法.process(new ProcessWindowFunctionMyEvent, String, String, TimeWindow() {Overridepublic void process(String s, ProcessWindowFunctionMyEvent, String, String, TimeWindow.Context context, IterableMyEvent elements, CollectorString out) throws Exception {System.out.println(窗口触发执行了。);System.out.println(当前水位线为 context.currentWatermark() ,当前窗口的开始时间 context.window().getStart() ,当前窗口的结束时间 context.window().getEnd());// 基于elements做统计 通过out可以将结果发送到下游}}).print();env.execute();} }// 用于map之后指定水位线生成策略 class MyMapWatermarkGenerator implements WatermarkGeneratorMyEvent {private final long maxOutOfOrderness 0;private long currentMaxTimeStamp;//TODO 每来一条数据会处理一次若maxOutOfOrderness为0则为单调递增时间戳策略若不为0则是水位线前移策略Overridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {currentMaxTimeStamp Math.max(currentMaxTimeStamp, eventTimestamp);System.out.println(当前线程编号为 Thread.currentThread().getId() ,当前水位线为 (currentMaxTimeStamp - maxOutOfOrderness));}// 周期性的执行env.getConfig().getAutoWatermarkInterval(); 默认是200msOverridepublic void onPeriodicEmit(WatermarkOutput output) {// 发送output.emitWatermark(new Watermark(currentMaxTimeStamp - maxOutOfOrderness));} }执行结果 多并行度source之后设置水位线策略 效果通线程并行度为1的情况 package com.shujia.flink.core;import com.shujia.flink.event.MyEvent; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;public class Demo05WaterMarkStrategy {public static void main(String[] args) throws Exception {// 自定义水位线策略// 参考链接https://blog.csdn.net/zznanyou/article/details/121666563StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSourceString eventDS env.socketTextStream(master, 8888);// 在Source之后就指定水位线策略eventDS.assignTimestampsAndWatermarks(new WatermarkStrategyString() {// 指定时间戳的提取策略Overridepublic TimestampAssignerString createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssignerString() {Overridepublic long extractTimestamp(String element, long recordTimestamp) {return Long.parseLong(element.split(,)[1]);}};// 简写方式 // return (ele,ts)-Long.parseLong(ele.split(,)[1]);}// 指定水位线的策略Overridepublic WatermarkGeneratorString createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new MyWatermarkGenerator();}})// 将数据变成KV格式即单词,1.map(line - Tuple2.of(line.split(,)[0], 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(t2 - t2.f0).window(TumblingEventTimeWindows.of(Time.seconds(5)))// 当窗口满足执行条件1、水位线超过了窗口的结束时间 2、窗口有数据 触发一次process方法.process(new ProcessWindowFunctionTuple2String, Integer, Tuple2String, Integer, String, TimeWindow() {Overridepublic void process(String s, ProcessWindowFunctionTuple2String, Integer, Tuple2String, Integer, String, TimeWindow.Context context, IterableTuple2String, Integer elements, CollectorTuple2String, Integer out) throws Exception {System.out.println(窗口触发执行了。);System.out.println(当前水位线为 context.currentWatermark() ,当前窗口的开始时间 context.window().getStart() ,当前窗口的结束时间 context.window().getEnd());// 基于elements做统计 通过out可以将结果发送到下游}}).print();env.execute();} }// 用于Source之后直接指定水位线生成策略 class MyWatermarkGenerator implements WatermarkGeneratorString {private final long maxOutOfOrderness 0;private long currentMaxTimeStamp;// 每来一条数据会处理一次Overridepublic void onEvent(String event, long eventTimestamp, WatermarkOutput output) {currentMaxTimeStamp Math.max(currentMaxTimeStamp, eventTimestamp);System.out.println(当前线程编号为 Thread.currentThread().getId() ,当前水位线为 (currentMaxTimeStamp - maxOutOfOrderness));}// 周期性的执行env.getConfig().getAutoWatermarkInterval(); 默认是200msOverridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(currentMaxTimeStamp - maxOutOfOrderness));} }10、窗口 1、时间窗口滚动与滑动窗口 时间窗口滚动、滑动 时间类型处理时间、事件时间 package com.shujia.flink.window;import com.shujia.flink.event.MyEvent; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time;import java.time.Duration;public class Demo01TimeWindow {public static void main(String[] args) throws Exception {/** 时间窗口滚动、滑动* 时间类型处理时间、事件时间*/StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamMyEvent myDS env.socketTextStream(master, 8888).map(new MapFunctionString, MyEvent() {Overridepublic MyEvent map(String value) throws Exception {String[] split value.split(,);return new MyEvent(split[0], Long.parseLong(split[1]));}});// 基于处理时间的滚动、滑动窗口SingleOutputStreamOperatorTuple2String, Integer processDS myDS.map(e - Tuple2.of(e.getWord(), 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(t2 - t2.f0)// 滚动窗口 每隔5s统计一次 // .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 滑动窗口 每隔5s统计最近10s内的数据.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum(1);// 基于事件时间的滚动、滑动窗口SingleOutputStreamOperatorMyEvent assDS myDS.assignTimestampsAndWatermarks(// 设置水位线策略、指定事件时间WatermarkStrategy// Duration.ofSeconds(5)水位线前移5s.MyEventforBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event, ts) - event.getTs()));SingleOutputStreamOperatorTuple2String, Integer eventDS assDS.map(e - Tuple2.of(e.getWord(), 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(t2 - t2.f0)// 滚动窗口由于水位线前移了5s整体有5s的延时 // .window(TumblingEventTimeWindows.of(Time.seconds(5)))// 滑动窗口.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5))).sum(1);// processDS.print();eventDS.print();env.execute();} }2、会话窗口 基于处理时间的会话窗口当一段时间没有数据那么就认定此次会话结束并触发窗口的执行 基于事件时间的会话窗口连续接收的两条数据的事件时间之差要大于5s(窗口大小)才能触发窗口的执行 package com.shujia.flink.window;import com.shujia.flink.event.MyEvent; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time;public class Demo02Session {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamMyEvent myDS env.socketTextStream(master, 8888).map(new MapFunctionString, MyEvent() {Overridepublic MyEvent map(String value) throws Exception {String[] split value.split(,);return new MyEvent(split[0], Long.parseLong(split[1]));}});// 基于处理时间的会话窗口当一段时间没有数据那么就认定此次会话结束并触发窗口的执行SingleOutputStreamOperatorTuple2String, Integer processSessionDS myDS.map(e - Tuple2.of(e.getWord(), 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(t2 - t2.f0)// 10秒内没有数据则认定此次会话结束并触发窗口的执行.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).sum(1);//TODO 基于事件时间的会话窗口连续接收的两条数据的事件时间之差要大于5s(窗口大小)才能触发窗口的执行// 指定水位线策略并提供数据中的时间戳解析规则SingleOutputStreamOperatorMyEvent assDS myDS.assignTimestampsAndWatermarks(WatermarkStrategy.MyEventforMonotonousTimestamps().withTimestampAssigner((e, ts) - e.getTs()));SingleOutputStreamOperatorTuple2String, Integer eventSessionDS assDS.map(e - Tuple2.of(e.getWord(), 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(t2 - t2.f0).window(EventTimeSessionWindows.withGap(Time.seconds(5))).sum(1);// processSessionDS.print();eventSessionDS.print();env.execute();} }3、计数窗口滚动、滑动 滚动下每同一个key的5条数据会统计一次 滑动下每隔同一个key的5条数据统计最近的同一个key的10条数据 package com.shujia.flink.window;import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo03CountWindow {public static void main(String[] args) throws Exception {// 计数窗口滚动、滑动StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceString ds env.socketTextStream(master, 8888);ds.map(word- Tuple2.of(word,1), Types.TUPLE(Types.STRING,Types.INT)).keyBy(t2-t2.f0) // .countWindow(5) // 每同一个key的5条数据会统计一次.countWindow(10,5) // 每隔同一个key的5条数据统计最近的同一个key的10条数据.sum(1).print();env.execute();/*** 每隔同一个key的5条数据统计最近的同一个key的10条数据* 输入* a* a* a* a* a* b* b* b* a* a* a* a* a* 输出* 13 (a,5)* 13 (a,10)*/} }
http://www.w-s-a.com/news/869096/

相关文章:

  • 企业网站首页学生做的网站成品
  • 网站开发 架构设计企业信息管理系统的组成不包括
  • 网站维护模式网页传奇游戏平台排行
  • 企业网站改自适应蛋糕方案网站建设
  • 网站开发技术职责网站升级中html
  • 天网网站建设百度权重高的网站
  • 明年做哪些网站致富网站站长 感受
  • 东莞营销网站建设优化怎么做微信网站推广
  • 网站建设一个多少钱php网站服务器怎么来
  • 引流用的电影网站怎么做2012服务器如何做网站
  • 什么网站可以做推广广州安全信息教育平台
  • 网站开发具备的相关知识wordpress简约文字主题
  • asp网站伪静态文件下载seo外包公司哪家好
  • 淘宝客网站根目录怎么建个废品网站
  • 网站备案更改需要多久百度免费网站空间
  • 外发加工是否有专门的网站wordpress主页 摘要
  • 企业网站优化系统浙江建设信息港证书查询
  • 很多年前的51网站如何做跨境电商需要哪些条件
  • 网站建设中 请稍后访问互联网营销设计
  • 软文网站名称用户浏览网站的方式
  • 大兴模版网站搭建哪家好网站建设与管理管理课程
  • 四川成都网站制作微信广告平台推广
  • 网站价格网页制作网站开发实训步骤
  • cms 导航网站鹤壁做网站价格
  • 微信营销软件免费版郑州关键词优化费用
  • 邢台专业做网站哪家好临沂网站建设中企动力
  • 建设网站是主营成本吗wordpress 后台
  • 猎头可以做单的网站企业网站建设
  • 建小程序需要网站吗在putty上怎样安装wordpress
  • 天津智能网站建设找哪家WordPress相册插件pro