当前位置: 首页 > news >正文

网站导航栏下拉菜单一个网站用多个域名

网站导航栏下拉菜单,一个网站用多个域名,网页在线发短信平台,天河区住房和建设水务局官方网站文章目录起源与发展flink在github上的现状实时计算VS离线计算实时计算离线计算实时计算常用的场景框架流处理流程flink电商场景下的业务图示例flink中一些重要特性有界数据和无界数据时间语义、水位线事件时间处理时间水位线flink窗口概念理想中的数据处理含有延迟数据的数据处… 文章目录起源与发展flink在github上的现状实时计算VS离线计算实时计算离线计算实时计算常用的场景框架流处理流程flink电商场景下的业务图示例flink中一些重要特性有界数据和无界数据时间语义、水位线事件时间处理时间水位线flink窗口概念理想中的数据处理含有延迟数据的数据处理Flink存储桶概念窗口类型滚动窗口滑动窗口会话窗口全局窗口flink状态管理检查点Checkpoint检查点恢复数据过程下载安装入门Demo示例pom配置Demo代码打包到集群流运行时执行环境任务槽Slot扩展Demo时间窗口DemoTable Api Demo对迟到数据处理Demo起源与发展 ​         Flink 起源于一个叫作 Stratosphere 的项目它是由 3 所地处柏林的大学和欧洲其他一些大学在 2010~2014 年共同进行的研究项目由柏林理工大学的教授沃克尔·马尔科Volker Markl领衔开发。2014 年 4 月Stratosphere 的代码被复制并捐赠给了 Apache 软件基金会Flink 就是在此基础上被重新设计出来的。 ​        在德语中“flink”一词表示“快速、灵巧”。项目的 logo 是一只彩色的松鼠当然了这不仅是因为 Apache 大数据项目对动物的喜好是否联想到了 Hadoop、Hive更是因为松鼠这种小动物完美地体现了“快速、灵巧”的特点。关于 logo 的颜色还一个有趣的缘由柏林当地的松鼠非常漂亮颜色是迷人的红棕色而 Apache 软件基金会的 logo刚好也是一根以红棕色为主的渐变色羽毛。于是Flink 的松鼠 Logo 就设计成了红棕色而且拥有一个漂亮的渐变色尾巴尾巴的配色与 Apache 软件基金会的 logo 一致。这只松鼠色彩炫目既呼应了 Apache 的风格似乎也预示着 Flink 未来将要大放异彩。 从命名上我们也可以看出 Flink 项目对于自身特点的定位那就是对于大数据处理要做到快速和灵活。 2014 年 8 月Flink 第一个版本 0.6 正式发布至于 0.5 之前的版本那就是在Stratosphere 名下的了。与此同时 Fink 的几位核心开发者创办了 Data Artisans 公司主要做 Fink 的商业应用帮助企业部署大规模数据处理解决方案。2014 年 12 月Flink 项目完成了孵化一跃成为 Apache 软件基金会的顶级项目。2015 年 4 月Flink 发布了里程碑式的重要版本 0.9.0很多国内外大公司也正是从这时开始关注、并参与到 Flink 社区建设的。2019 年 1 月长期对 Flink 投入研发的阿里巴巴以 9000 万欧元的价格收购了 Data Artisans 公司之后又将自己的内部版本 Blink 开源继而与 8 月份发布的 Flink 1.9.0版本进行了合并。自此之后Flink 被越来越多的人所熟知成为当前最火的新一代大数据处理框架。2020年Flink Forward Asia 和 Flink Forward Europe 两个 Flink 社区活动成功举办。2021年Flink 与 ClickHouse 成为 Apache 流式处理项目的顶级项目。2022年Flink 1.15 和 1.16 版本相继发布引入了许多新特性和改进。 flink在github上的现状 目前已经到了1.16.1的release版本 实时计算VS离线计算 实时计算 数据实时到达数据到达次序独立不受应用系统所控制数据规模大且无法预知容量原始数据一经处理除非特意保存否则不能被再次取出处理或者再次提取数据代价昂贵 离线计算 数据量大且时间周期长一天、一星期、一个月、半年、一年在大量数据上进行复杂的批量计算操作数据在计算之前已经固定不再会发生变化能够方便的查询批量计算的结果 实时计算常用的场景 实时数据存储实时数据存储的时候做一些微聚合、过滤某些字段、数据脱敏组建数据仓库实时 ETL等实时数据分析实时数据接入机器学习框架TensorFlow或者一些算法进行数据建模、分析然后动态的给出商品推荐、广告推荐等实时监控告警金融相关涉及交易、实时风控、行车流量预警、服务器监控告警、应用日志告警等实时数据报表活动营销时销售额/销售量大屏TopN 商品等 框架流处理流程 flink电商场景下的业务图示例 flink中一些重要特性 Apache Flink 是一个功能强大的流处理引擎如果你要在 Flink 上进行研发下面是一些你必须了解的重要特性 数据流处理和批处理统一Flink 支持将批处理和流处理统一到一个编程模型中这使得 Flink 应用程序可以同时处理无界和有界数据集。事件时间处理Flink 引入了基于事件时间的处理模式这使得 Flink 应用程序可以处理无序事件数据并且可以处理延迟事件。窗口操作Flink 可以对无界数据流进行窗口操作例如滑动窗口、会话窗口、全局窗口等等。状态管理Flink 提供了高效的状态管理可以轻松地维护和访问应用程序的状态。高可用性Flink 提供了可靠的容错机制以保证 Flink 应用程序的高可用性和数据完整性。数据源集成Flink 提供了对多种数据源的支持例如 Mysql HBase、Elasticsearch、Hive、Kafka、Redis 等等。Flink SQLFlink SQL 允许用户使用 SQL 语言来进行 Flink 应用程序的编写这使得 Flink 应用程序的编写变得更加简单和直观。深度学习支持Flink 提供了对深度学习的支持可以将 TensorFlow、PyTorch 等深度学习框架与 Flink 结合使用以便在流处理中进行模型训练和推理。可扩展性Flink 可以轻松地水平扩展以处理大规模数据集同时还能保持低延迟。 有界数据和无界数据 ​        Flink 则认为流处理才是最基本的操作批处理也可以统一为流处理。在 Flink 的世界观中万物皆流实时数据是标准的、没有界限的流而离线数据则是有界限的流。 无界数据流Unbounded Data Stream ​ 所谓无界数据流就是有头没尾数据的生成和传递会开始但永远不会结束如上图所示。我们无法等待所有数据都到达因为输入是无界的永无止境数据没有“都到达”的时候。所以对于无界数据流必须连续处理也就是说必须在获取数据后立即处理。在处理无界流时为了保证结果的正确性我们必须能够做到按照顺序处理数据。 有界数据流Bounded Data Stream ​ 有界数据流有明确定义的开始和结束如上图所示所以我们可以通过获取所有数据来处理有界流。处理有界流就不需要严格保证数据的顺序了因为总可以对有界数据集进行排序。有界流的处理也就是批处理。 时间语义、水位线 ​        在讲解flink的窗口之前需要先了解一下flink的时间语义中的处理时间、事件时间以及水位线 事件时间 ​         事件时间是指每个事件在对应的设备上发生的时间也就是数据生成的时间。 处理时间 ​         处理时间的概念非常简单就是指执行处理操作的机器的系统时间。 ​        当然处理时间和时间时间可能在处理事件的时候不一样因网络波动或者其他因素影响到数据的传输导致在事件处理时不一致如8点产生的数据在8点01s到达迟到了一秒当前系统时间为8:01而事件时间却是8:00 水位线 ​        水位线可以看作一条特殊的数据记录它是插入到数据流中的一个标记点主要内容就是一个时间戳用来指示当前的事件时间。而它插入流中的位置就应该是在某个数据到来之后这样就可以从这个数据中提取时间戳作为当前水位线的时间戳了。 flink窗口 概念 ​        Flink 是一种流式计算引擎主要是来处理无界数据流的数据源源不断、无穷无尽。想要更加方便高效地处理无界流一种方式就是将无限数据切割成有限的“数据块”进行处理这就是所谓的“窗口”Window。 ​        在 Flink 中, 窗口就是用来处理无界流的核心。我们很容易把窗口想象成一个固定位置的“框”数据源源不断地流过来到某个时间点窗口该关闭了就停止收集数据、触发计算并输出结果。 理想中的数据处理 如下图所示到到达间隔时间时触发这个窗口里所有数据的计算 ​ 含有延迟数据的数据处理 ​         如下图所示显示情况中一定会出现因为网络问题数据迟到的问题下面定义了一个延迟时间为2s的窗口这样 0~10 秒的窗口会在时间戳为 12 秒的数据到来之后才真正关闭计算输出结果这样就可以正常包含迟到的 9 秒数据了。但是这样一来0~10 秒的窗口不光包含了迟到的 9 秒数据连 11 秒和 12 秒的数据也包含进去了。我们为了正确处理迟到数据结果把早到的数据划分到了错误的窗口——最终结果都是错误的。 Flink存储桶概念 ​         在 Flink 中窗口其实并不是一个“框”流进来的数据被框住了就只能进这一个窗口。相比之下我们应该把窗口理解成一个“桶”如下图所示。在 Flink 中窗口可以把流切割成有限大小的多个“存储桶”bucket)每个数据都会分发到对应的桶中当到达窗口结束时间时就对每个桶中收集的数据进行计算处理。 窗口类型 滚动窗口 ​         滚动窗口有固定的大小是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠也不会有间隔是“首尾相接”的状态。如果我们把多个窗口的创建看作一个窗口的运动那就好像它在不停地向前“翻滚”一样。 滑动窗口 ​         与滚动窗口类似滑动窗口的大小也是固定的。区别在于窗口之间并不是首尾相接的而是可以“错开”一定的位置。如果看作一个窗口的运动那么就像是向前小步“滑动”一样。 会话窗口 ​         会话窗口顾名思义是基于“会话”session来来对数据进行分组的。这里的会话类似Web 应用中 session 的概念不过并不表示两端的通讯过程而是借用会话超时失效的机制来描述窗口。简单来说就是数据来了之后就开启一个会话窗口如果接下来还有数据陆续到来那么就一直保持会话如果一段时间一直没收到数据那就认为会话超时失效窗口自动关闭。 全局窗口 ​         这种窗口全局有效会把相同 key 的所有数据都分配到同一个窗口中说直白一点就跟没分窗口一样。无界流的数据永无止尽所以这种窗口也没有结束的时候默认是不会做触发计算的。 flink状态管理 ​         状态的存储和管理是由 Flink Runtime 负责的Flink 提供了多种状态后端来存储和管理状态数据例如内存状态后端、文件系统状态后端、RocksDB 状态后端等等。Flink 还提供了可插拔的状态后端接口可以自定义状态后端来满足特定的需求。Flink 还提供了高效的状态快照机制可以在应用程序故障时对状态进行快速恢复以保证应用程序的正确性和数据的完整性。 检查点Checkpoint ​         有状态流应用中的检查点checkpoint其实就是所有任务的状态在某个时间点的一个快照一份拷贝。简单来讲就是一次“存盘”让我们之前处理数据的进度不要丢掉。在一个流应用程序运行时Flink 会定期保存检查点在检查点中会记录每个算子的 id 和状态如果发生故障Flink 就会用最近一次成功保存的检查点来恢复应用的状态重新启动处理流程就如同“读档”一样。 检查点恢复数据过程 ​        这里 Source 任务已经处理完毕所以偏移量为 5Map 任务也处理完成了。而 Sum 任务在处理中发生了故障此时状态并未保存。 1、在算子计算时发生故障 2、应用重启 3、读取检查点重置状态 4、读取检查点重制偏移量 5、继续处理数据 下载安装 flink下载页:点击跳转 flink官方文档地址:点击跳转 安装后启动bin目录下./start-cluster.sh文件(⚠️:需要配置java环境变量)访问地址:localhost:8081 入门Demo示例 pom配置 propertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncodingflink.version1.13.0/flink.versionjava.version1.8/java.versionscala.binary.version2.12/scala.binary.versionslf4j.version1.7.30/slf4j.version/propertiesdependencies!-- 引入 Flink 相关依赖--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_${scala.binary.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_${scala.binary.version}/artifactIdversion${flink.version}/version/dependency!-- 引入日志管理相关依赖--dependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion${slf4j.version}/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion${slf4j.version}/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-to-slf4j/artifactIdversion2.14.0/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.22/versionscopecompile/scope/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.58/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge_${scala.binary.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner-blink_${scala.binary.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-scala_${scala.binary.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-common/artifactIdversion${flink.version}/version/dependency/dependenciesbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-assembly-plugin/artifactIdversion3.0.0/versionconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/pluginplugingroupIdnet.alchim31.maven/groupIdartifactIdscala-maven-plugin/artifactIdversion3.2.2/versionexecutionsexecutionidscala-compile-first/idphaseprocess-resources/phasegoalsgoaladd-source/goalgoalcompile/goal/goals/executionexecutionidscala-test-compile/idphaseprocess-test-resources/phasegoalsgoaltestCompile/goal/goals/execution/executions/pluginplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdconfigurationsource1.8/sourcetarget1.8/target/configuration/plugin/plugins/buildDemo代码 import com.alibaba.fastjson.JSON; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;public class FlinkExampleWordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment executionEnvironment StreamExecutionEnvironment.getExecutionEnvironment();executionEnvironment.setParallelism(4);//使用nc -lk 9999,进行数据流输入如hello word hello flinkDataStreamString dataStream executionEnvironment.socketTextStream(localhost, 9999);dataStream.flatMap(new FlatMapFunctionString, Tuple2String, Integer() {Overridepublic void flatMap(String data, CollectorTuple2String, Integer collector) throws Exception {String[] split data.split(\\s);System.out.println(JSON.toJSON(split));for (String word : split) {collector.collect(Tuple2.of(word, 1));}}}).keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer stringIntegerTuple2) throws Exception {return stringIntegerTuple2.f0;}}).sum(1).print();executionEnvironment.execute();} }打包到集群 使用maven对项目package打包将打好的jar包上传到集群中可以通过界面上传或者拷贝到机器上直接用指令进行运行 界面上传 点击Submit之后可以取相应位置看输出日志 日志查看 流运行时执行环境 每个 Flink 应用程序都需要一个执行环境env。流媒体应用程序需要使用StreamExecutionEnvironment.应用程序中进行的 DataStream API 调用构建了一个附加到 StreamExecutionEnvironment. 当env.execute()被调用时这个图被打包并发送到 JobManagerJobManager 将作业并行化并将它的切片分配给任务管理器以供执行。您的作业的每个并行切片都将在 任务槽中执行。 任务槽Slot 每个workerTaskManager都是一个JVM进程可以在不同的线程中执行一个或多个子任务。为了控制 TaskManager 接受多少任务它有所谓的任务槽至少一个。每个 TaskManager 有一个插槽意味着每个任务组都在单独的 JVM 中运行例如可以在单独的容器中启动。拥有多个槽意味着更多的子任务共享同一个 JVM。同一个 JVM 中的任务共享 TCP 连接通过多路复用和心跳消息。它们还可以共享数据集和数据结构从而减少每个任务的开销。 扩展Demo 时间窗口Demo import lombok.*; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.ArrayList; import java.util.Calendar; import java.util.Comparator; import java.util.Random;import java.sql.Timestamp;public class FlinkWindowTopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().setAutoWatermarkInterval(100);SingleOutputStreamOperatorUserEvent eventStream env.addSource(new CustomSouce()).assignTimestampsAndWatermarks(WatermarkStrategy.UserEventforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerUserEvent() {Overridepublic long extractTimestamp(UserEvent element, longrecordTimestamp) {return element.getTimestemp();}}));//按照访问url分区求出每个 url 的访问量SingleOutputStreamOperatorUserViewCount urlCountStream eventStream.keyBy(data - data.getUrl())//滚动窗口:5s间隔.window(TumblingEventTimeWindows.of(Time.seconds(5)))//自定义聚合函数.aggregate(new NewUrlViewCountAgg(), new NewUrlViewCountResult());// 对结果中同一个窗口的统计数据进行排序处理SingleOutputStreamOperatorString result urlCountStream.keyBy(data - data.windowEnd).process(new TopN(3));result.print(result);env.execute();}//自定义增量聚合public static class NewUrlViewCountAgg implements AggregateFunctionUserEvent, Long, Long {Overridepublic Long createAccumulator() {return 0L;}Overridepublic Long add(UserEvent value, Long accumulator) {return accumulator 1L;}Overridepublic Long getResult(Long accumulator) {return accumulator;}Overridepublic Long merge(Long a, Long b) {return null;}}//自定义全窗口函数只需要包装窗口信息public static class NewUrlViewCountResult extends ProcessWindowFunctionLong, UserViewCount, String, TimeWindow {Overridepublic void process(String url, ProcessWindowFunctionLong, UserViewCount, String, TimeWindow.Context context, IterableLong elements, CollectorUserViewCount out) throws Exception {// 结合窗口信息包装输出内容long currentWindowStartTimeStemp context.window().getStart();long currentWindowEndTimeStemp context.window().getEnd();out.collect(new UserViewCount(url, elements.iterator().next(), currentWindowStartTimeStemp, currentWindowEndTimeStemp));}}// 自定义处理函数排序取 top npublic static class TopN extends KeyedProcessFunctionLong, UserViewCount, String {// 将 n 作为属性private Integer n;// 定义一个列表状态private ListStateUserViewCount urlViewCountListState;public TopN(Integer n) {this.n n;}public TopN() {super();}Overridepublic void open(Configuration parameters) throws Exception {// 从环境中获取列表状态句柄urlViewCountListState getRuntimeContext().getListState(new ListStateDescriptorUserViewCount(url-view-count-list,Types.POJO(UserViewCount.class)));}Overridepublic void processElement(UserViewCount value, KeyedProcessFunctionLong, UserViewCount, String.Context ctx, CollectorString out) throws Exception {// 将 count 数据添加到列表状态中保存起来urlViewCountListState.add(value);// 注册 window end 1ms 后的定时器等待所有数据到齐开始排序ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey() 1);}Overridepublic void onTimer(long timestamp, KeyedProcessFunctionLong, UserViewCount, String.OnTimerContext ctx, CollectorString out) throws Exception {// 将数据从列表状态变量中取出放入 ArrayList方便排序ArrayListUserViewCount urlViewCountArrayList new ArrayList();for (UserViewCount urlViewCount : urlViewCountListState.get()) {urlViewCountArrayList.add(urlViewCount);}// 清空状态释放资源urlViewCountListState.clear();// 排序urlViewCountArrayList.sort(new ComparatorUserViewCount() {Overridepublic int compare(UserViewCount o1, UserViewCount o2) {return o2.count.intValue() - o1.count.intValue();}});// 提取前n名(由函数入口提供)构建输出结果StringBuilder result new StringBuilder();result.append(\n);result.append(窗口结束时间 new Timestamp(timestamp - 1) \n);for (int i 0; i this.n; i) {UserViewCount userViewCount urlViewCountArrayList.get(i);if (null userViewCount) {break;}result.append(处理数据所在窗口: new Timestamp(userViewCount.getWindowStart()) ~ new Timestamp(userViewCount.getWindowEnd()));UserViewCount needUserView userViewCount;String info No. (i 1) url userViewCount.url 浏览量 userViewCount.count \n;result.append(info);}result.append(\n);out.collect(result.toString());}}DataNoArgsConstructorToStringAllArgsConstructorpublic static class UserEvent {private String userName;private String url;private Long timestemp;}DataNoArgsConstructorAllArgsConstructorBuilderpublic static class UserViewCount {public String url;public Long count;public Long windowStart;public Long windowEnd;Overridepublic String toString() {return UserViewCount{ url url \ , count count , windowStart new Timestamp(windowStart) , windowEnd new Timestamp(windowEnd) };}}public static class CustomSouce implements SourceFunctionUserEvent {// 声明一个布尔变量作为控制数据生成的标识位private Boolean running true;Overridepublic void run(SourceContextUserEvent ctx) throws Exception {Random random new Random(); // 在指定的数据集中随机选取数据String[] users {Mary, Alice, Bob, Cary};String[] urls {./home, ./cart, ./fav, ./prod?id1,./prod?id2};while (running) {ctx.collect(new UserEvent(users[random.nextInt(users.length)],urls[random.nextInt(urls.length)],Calendar .getInstance().getTimeInMillis()));// 隔 200 ms生成一个点击事件方便观测Thread.sleep(200);}}Overridepublic void cancel() {running false;}}}运行结果 Table Api Demo import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class TableApiDemo {public static void main(String[] args) throws Exception {// 获取流执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取数据源SingleOutputStreamOperatorUserEvent eventStream env.fromElements(new UserEvent(Alice, ./home, 1000L),new UserEvent(Bob, ./cart, 1000L),new UserEvent(Alice, ./prod?id1, 5 * 1000L),new UserEvent(Cary, ./home, 60 * 1000L),new UserEvent(Bob, ./prod?id3, 90 * 1000L),new UserEvent(Alice, ./prod?id7, 105 * 1000L));// 获取表环境StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);// 将数据流转换成表Table eventTable tableEnv.fromDataStream(eventStream);// 用执行 SQL 的方式提取数据Table visitTable tableEnv.sqlQuery(select url, userName from eventTable);// 将表转换成数据流打印输出tableEnv.toDataStream(visitTable).print();// 执行程序env.execute();}DataNoArgsConstructorToStringAllArgsConstructorpublic static class UserEvent {private String userName;private String url;private Long timestemp;}} 运行结果 对迟到数据处理Demo import com.alibaba.fastjson.JSON; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import org.example.entity.Event;import java.sql.Timestamp; import java.time.Duration;public class LateDataProcessExampleDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取 socket 文本流/** 样例数据* Alice ./home 1000* Alice ./home 2000* Alice ./home 10000* Alice ./home 9000* Alice ./cart 12000* Alice ./prod?id100 15000* Alice ./home 9000* Alice ./home 8000* Alice ./prod?id200 70000* Alice ./home 8000* Alice ./prod?id300 72000* Alice ./home 8000*/SingleOutputStreamOperatorEvent stream env.socketTextStream(localhost, 9999).map(new MapFunctionString, Event() {Overridepublic Event map(String value) throws Exception {String[] fields value.split(\\s);return new Event(fields[0].trim(), fields[1].trim(),Long.valueOf(fields[2].trim()));}})// 方式一设置 watermark 延迟时间2 秒钟.assignTimestampsAndWatermarks(WatermarkStrategy.EventforBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssignerEvent() {Overridepublic long extractTimestamp(Event element, longrecordTimestamp) {return element.getTimestamp();}}));// 定义侧输出流标签OutputTagEvent outputTag new OutputTagEvent(late) {};SingleOutputStreamOperatorUrlViewCount result stream.keyBy(data - data.getUrl())//设置10s的时间窗口.window(TumblingEventTimeWindows.of(Time.seconds(10)))// 方式二允许窗口处理迟到数据设置 1 分钟的等待时间.allowedLateness(Time.minutes(1))// 方式三将最后的迟到数据输出到侧输出流.sideOutputLateData(outputTag).aggregate(new UrlViewCountAgg(), new UrlViewCountResult());result.print(result);/*** 输出数据* origin data input Event(userAlice, url./prod?id300, timestamp72000)* origin data input Event(userAlice, url./home, timestamp8000)* late data result Event(userAlice, url./home, timestamp8000)* * 我们设置窗口等待的时间为 1 分钟所以当时间推进到 10000 60 * 1000 70000 时窗* 口就会真正被销毁。此前的所有迟到数据可以直接更新窗口的计算结果而之后的迟到数据已* 经无法整合进窗口就只能用侧输出流来捕获了。需要注意的是这里的“时间”依然是由水* 位线来指示的所以时间戳为 70000 的数据到来并不会触发窗口的销毁当时间戳为 72000* 的数据到来水位线推进到了 72000 – 2 * 1000 70000此时窗口真正销毁关闭之后再来的* 迟到数据就会输出到侧输出流了*/result.getSideOutput(outputTag).print(late data result);// 为方便观察可以将原始数据也输出stream.print(origin data input);env.execute();}public static class UrlViewCountAgg implements AggregateFunctionEvent, Long,Long {Overridepublic Long createAccumulator() {return 0L;}Overridepublic Long add(Event value, Long accumulator) {return accumulator 1;}Overridepublic Long getResult(Long accumulator) {return accumulator;}Overridepublic Long merge(Long a, Long b) {return null;}}public static class UrlViewCountResult extends ProcessWindowFunctionLong, UrlViewCount, String, TimeWindow {Overridepublic void process(String url, Context context, IterableLong elements,CollectorUrlViewCount out) throws Exception {// 结合窗口信息包装输出内容Long start context.window().getStart();Long end context.window().getEnd();out.collect(new UrlViewCount(url, elements.iterator().next(), start,end));}}DataNoArgsConstructorAllArgsConstructorBuilderpublic static class UrlViewCount {public String url;public Long count;public Long windowStart;public Long windowEnd;Overridepublic String toString() {return UrlViewCount{ url url \ , count count , windowStart new Timestamp(windowStart) , windowEnd new Timestamp(windowEnd) };}} }
http://www.w-s-a.com/news/119452/

相关文章:

  • 证券投资网站建设视频直播怎么赚钱的
  • 建设酒店网站ppt模板下载郑州小程序设计外包
  • 网站建设自我总结google推广公司
  • 安全网站建设情况wordpress 评论表单
  • 网站建设发言材料个人网站推广软件
  • php建站软件哪个好南京哪家做网站好
  • 排名好的手机网站建设番禺网站建设专家
  • 番禺怎么读百度有专做优化的没
  • 网站开发中应注意哪些问题网络营销的主要特点
  • 网站定制案例北京网站制作招聘网
  • 网站建设与推广实训小结网站建设专业英文
  • 郑州网站建设动态凡科网站建设是免费的吗
  • 湖北手机网站建设wordpress转emlog博客
  • 北京东站设计网名的花样符号
  • 安徽建设厅网站首页网站开发aichengkeji
  • 自贡网站制作荣茂网站建设
  • 什么做的网站吗正规的机械外包加工订单网
  • 网络工程公司的业务邵阳seo快速排名
  • 博主怎么赚钱网站seo找准隐迅推
  • 营销号经典废话北京网站建设公司网站优化资讯
  • 一六八互联网站建设怎么做套版网站
  • wordpress 书站建筑公司简介范文大全
  • 建设官方网站多少鲜花网站建设的主要工作流程
  • 卖主机网站轻量wordpress主题
  • 网站建设规划书结构制作一个自己的网站
  • 外贸网站商城建设做网站和推广
  • 网站建设微信群免费简约ppt模板
  • 哈尔滨网站设计公司哪家更好shopify和wordpress
  • 岚县网站建设网站建设中效果
  • 网站建设软文推广网站建设分金手指排名十四