当前位置: 首页 > news >正文

昆明网站建设那家好做网站赚钱流程

昆明网站建设那家好,做网站赚钱流程,万网服务器网站建设,建湖网站建设详解 Flink 中的 WaterMark 1.基础概念1.1 流处理1.2 乱序1.3 窗口及其生命周期1.4 Keyed vs Non-Keyed1.5 Flink 中的时间 2.Watermark2.1 案例一2.2 案例二2.3 如何设置最大乱序时间2.4 延迟数据重定向 3.在 DDL 中的定义3.1 事件时间3.2 处理时间 1.基础概念 1.1 流处理 流… 详解 Flink 中的 WaterMark 1.基础概念1.1 流处理1.2 乱序1.3 窗口及其生命周期1.4 Keyed vs Non-Keyed1.5 Flink 中的时间 2.Watermark2.1 案例一2.2 案例二2.3 如何设置最大乱序时间2.4 延迟数据重定向 3.在 DDL 中的定义3.1 事件时间3.2 处理时间 1.基础概念 1.1 流处理 流处理最本质的是在处理数据的时候接受一条处理一条数据。 批处理则是累积数据到一定程度在处理。这是他们本质的区别。 在设计上 Flink 认为数据是流式的批处理只是流处理的特例。同时对数据分为有界数据和无界数据。 有界数据对应批处理API 对应 DateSet。无界数据对应流处理API 对应 DataStream。 1.2 乱序 什么是乱序呢 可以理解为数据到达的顺序和其实际产生时间的排序不一致。导致这的原因有很多比如延迟消息积压重试等等。 我们知道流处理从事件产生到流经 source再到 operator中间是有一个过程和时间的。虽然大部分情况下流到 operator 的数据都是按照事件产生的时间顺序来的但是也不排除由于网络、背压等原因导致乱序的产生out-of-order 或者说 late element。 ✅ 某数据源中的某些数据由于某种原因如网络原因外部存储自身原因会有 5 秒的延时也就是在实际时间的第 1 秒产生的数据有可能在第 5 秒中产生的数据之后到来比如到 Window 处理节点。例如有 1 ~ 10 个事件乱序到达的序列是2, 3, 4, 5, 1, 6, 3, 8, 9, 10, 7。 1.3 窗口及其生命周期 对于 Flink如果来一条消息计算一条这样是可以的但是这样计算是非常频繁而且消耗资源如果想做一些统计这是不可能的。所以对于 Spark 和 Flink 都产生了窗口计算。 比如因为我们想看到过去一分钟或过去半小时的访问数据这时候我们就需要窗口。 WindowWindow 是处理无界流的关键Window 将流拆分为一个个有限大小的 buckets可以在每一个 buckets 中进行计算。当 Window 是时间窗口的时候每个 Window 都会有一个开始时间start_time和结束时间end_time左闭右开这个时间是系统时间。 简而言之只要属于此窗口的第一个元素到达就会创建一个窗口当时间事件或处理时间超过其结束时间戳加上用户指定的允许延迟时窗口将被完全删除。 窗口有如下组件 Window Assigner用来决定某个元素被分配到哪个或哪些窗口中去。Trigger触发器。决定了一个窗口何时能够被计算或清除。触发策略可能类似于 “当窗口中的元素数量大于 4” 时或 “当水位线通过窗口结束时”。Evictor驱逐器。Evictor 提供了在使用 WindowFunction 之前或者之后从窗口中删除元素的能力。 窗口还拥有函数比如 ProcessWindowFunctionReduceFunctionAggregateFunction 或 FoldFunction。该函数将包含要应用于窗口内容的计算而触发器指定窗口被认为准备好应用该函数的条件。 1.4 Keyed vs Non-Keyed 在定义窗口之前要指定的第一件事是流是否需要 Keyed使用 keyBy(...) 将无界流分成逻辑的 keyed stream。如果未调用 keyBy(...)则表示流不是 keyed stream。 对于 Keyed 流可以将传入事件的任何属性用作 key。拥有 keyed stream 将允许窗口计算由多个任务并行执行因为每个逻辑 Keyed 流可以独立于其余任务进行处理。相同 Key 的所有元素将被发送到同一个任务。在 Non-Keyed 流的情况下原始流将不会被分成多个逻辑流并且所有窗口逻辑将由单个任务执行即并行性为 1。 1.5 Flink 中的时间 Flink 在流处理程序支持不同的时间概念。分别为是 事件时间Event Time、处理时间Processing Time、提取时间Ingestion Time。 从时间序列角度来说发生的先后顺序是事件时间 ▶ 提取时间 ▶ 处理时间。 Event Time 是事件在现实世界中发生的时间它通常由事件中的时间戳描述。Ingestion Time 是数据进入 Apache Flink 流处理系统的时间也就是 Flink 读取数据源时间。Processing Time 是数据流入到具体某个算子 (消息被计算处理) 时候相应的系统时间。也就是 Flink 程序处理该事件时当前系统时间。 2.Watermark Watermark 是 Apache Flink 为了处理 EventTime 窗口计算提出的一种机制本质上也是一种时间戳。Watermark 是用于处理乱序事件或延迟数据的这通常用 Watermark 机制结合 Window 来实现Watermarks 用来触发 Window 窗口计算。 2.1 案例一 public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarksMyEvent {private final long maxOutOfOrderness 3000; // 3.0 secondsprivate long currentMaxTimestamp;Overridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {long timestamp element.getCreationTime();currentMaxTimestamp Math.max(timestamp, currentMaxTimestamp);return timestamp;}Overridepublic Watermark getCurrentWatermark() {// return the watermark as current highest timestamp minus the out-of-orderness bound// 生成 watermarkreturn new Watermark(currentMaxTimestamp - maxOutOfOrderness);} }上图中是一个 10s 大小的窗口10000 20000 为一个窗口。当 EventTime 为 23000 的数据到来生成的 WaterMark 的时间戳为 20000大于等于 window_end_time会触发窗口计算。 2.2 案例二 public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarksMyEvent {private final long maxTimeLag 3000; // 3 secondsOverridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {return element.getCreationTime();}Overridepublic Watermark getCurrentWatermark() {// return the watermark as current time minus the maximum time lagreturn new Watermark(System.currentTimeMillis() - maxTimeLag);} }只是简单的用当前系统时间减去最大延迟时间生成 Watermark 当 WaterMark 为 20000 时大于等于窗口的结束时间会触发 10000 20000 窗口计算。再当 EventTime 为 19500 的数据到来它本应该是属于窗口 10000 20000 窗口的但这个窗口已经触发计算了所以此数据会被丢弃。 2.3 如何设置最大乱序时间 虽说水位线表明着早于它的事件不应该再出现接收到水位线以前的的消息是不可避免的这就是所谓的 迟到事件。实际上迟到事件是乱序事件的特例和一般乱序事件不同的是它们的乱序程度超出了水位线的预计导致窗口在它们到达之前已经关闭。 迟到事件出现时窗口已经关闭并产出了计算结果因此处理的方法有 3 种 重新激活已经关闭的窗口并重新计算以修正结果。将迟到事件收集起来另外处理。将迟到事件视为错误消息并丢弃。Flink 默认的处理方式是直接丢弃其他两种方式分别使用 Side Output 和 Allowed Lateness。Side Output 机制 可以将迟到事件单独放入一个数据流分支这会作为 Window 计算结果的副产品以便用户获取并对其进行特殊处理。Allowed Lateness 机制 允许用户设置一个允许的最大迟到时长。Flink 会在窗口关闭后一直保存窗口的状态直至超过允许迟到时长这期间迟到的事件不会被丢弃而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算代价比较大所以允许迟到时长不宜设得太长迟到事件也不宜过多否则应该考虑降低水位线提高的速度或者调整算法。 这里总结机制为 窗口 Window 的作用是为了周期性的获取数据。WaterMark 的作用是防止数据出现乱序经常事件时间内获取不到指定的全部数据而做的一种保险方法。allowLateNess 是将窗口关闭时间再延迟一段时间。sideOutPut 是最后兜底操作所有过期延迟数据指定窗口已经彻底关闭了就会把数据放到侧输出流。 public class TumblingEventWindowExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1); // env.getConfig().setAutoWatermarkInterval(100);DataStreamString socketStream env.socketTextStream(localhost, 9999);DataStreamTuple2String, Long resultStream socketStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorString(Time.seconds(3)) {Overridepublic long extractTimestamp(String element) {long eventTime Long.parseLong(element.split( )[0]);System.out.println(eventTime);return eventTime;}}).map(new MapFunctionString, Tuple2String, Long() {Overridepublic Tuple2String, Long map(String value) throws Exception {return Tuple2.of(value.split( )[1], 1L);}}).keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)) // 允许延迟处理2秒.reduce(new ReduceFunctionTuple2String, Long() {Overridepublic Tuple2String, Long reduce(Tuple2String, Long value1, Tuple2String, Long value2) throws Exception {return new Tuple2(value1.f0, value1.f1 value2.f1);}});resultStream.print();env.execute();} }当 watermark 为 21000 时触发了 [10000, 20000) 窗口计算由于设置了 allowedLateness(Time.seconds(2))即允许两秒延迟处理watermark window_end_time lateTime 公式得到满足因此随后 10000 和 12000 进入窗口时依然能触发窗口计算随后 watermark 增加到 22000watermark window_end_time lateTime 不再满足因此 11000 再次进入窗口时窗口不再进行计算。 2.4 延迟数据重定向 流的返回值必须是 SingleOutputStreamOperator其是 DataStream 的子类。通过 getSideOutput 方法获取延迟数据。可以将延迟数据重定向到其他流或者进行输出。 public class TumblingEventWindowExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);DataStreamString socketStream env.socketTextStream(localhost, 9999);//保存被丢弃的数据OutputTagTuple2String, Long outputTag new OutputTagTuple2String, Long(late-data){};//注意由于getSideOutput方法是SingleOutputStreamOperator子类中的特有方法所以这里的类型不能使用它的父类dataStream。SingleOutputStreamOperatorTuple2String, Long resultStream socketStream// Time.seconds(3)有序的情况修改为0.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorString(Time.seconds(3)) {Overridepublic long extractTimestamp(String element) {long eventTime Long.parseLong(element.split( )[0]);System.out.println(eventTime);return eventTime;}}).map(new MapFunctionString, Tuple2String, Long() {Overridepublic Tuple2String, Long map(String value) throws Exception {return Tuple2.of(value.split( )[1], 1L);}}).keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(10))).sideOutputLateData(outputTag) // 收集延迟大于2s的数据.allowedLateness(Time.seconds(2)) //允许2s延迟.reduce(new ReduceFunctionTuple2String, Long() {Overridepublic Tuple2String, Long reduce(Tuple2String, Long value1, Tuple2String, Long value2) throws Exception {return new Tuple2(value1.f0, value1.f1 value2.f1);}});resultStream.print();//把迟到的数据暂时打印到控制台实际中可以保存到其他存储介质中DataStreamTuple2String, Long sideOutput resultStream.getSideOutput(outputTag);sideOutput.print();env.execute();} }3.在 DDL 中的定义 3.1 事件时间 事件时间属性是通过 CREATE TABLE DDL 语句中的 WATERMARK 语句定义的。水印语句在现有事件时间字段上定义 水印生成表达式将事件时间字段标记为事件时间属性。 Flink SQL 支持在 TIMESTAMP 和 TIMESTAMP_LTZ 列上定义事件时间属性。如果源中的时间戳数据以 年-月-日-时-分-秒 表示通常是不含时区信息的字符串值例如 2020-04-15 20:13:40.564建议将事件-时间属性定义为 TIMESTAMP 列。 CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time TIMESTAMP(3),-- Declare the user_action_time column as an event-time attribute-- and use a 5-seconds-delayed watermark strategy.WATERMARK FOR user_action_time AS user_action_time - INTERVAL 5 SECOND ) WITH (... );SELECT TUMBLE_START(user_action_time, INTERVAL 10 MINUTE), COUNT(DISTINCT user_name) FROM user_actions GROUP BY TUMBLE(user_action_time, INTERVAL 10 MINUTE);如果数据源中的时间戳数据以纪元时间表示通常是一个长值例如 1618989564564建议将事件时间属性定义为 TIMESTAMP_LTZ 列。 CREATE TABLE user_actions (user_name STRING,data STRING,ts BIGINT,time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),-- Declare the time_ltz column as an event-time attribute-- and use a 5-seconds-delayed watermark strategy.WATERMARK FOR time_ltz AS time_ltz - INTERVAL 5 SECOND ) WITH (... );SELECT TUMBLE_START(time_ltz, INTERVAL 10 MINUTE), COUNT(DISTINCT user_name) FROM user_actions GROUP BY TUMBLE(time_ltz, INTERVAL 10 MINUTE);3.2 处理时间 处理时间能让表格程序根据本地机器的时间产生结果。这是最简单的时间概念但会产生非确定性结果。处理时间不需要提取时间戳或生成水印。 在 CREATE TABLE DDL 语句中使用系统 PROCTIME() 函数将处理时间属性定义为计算列。函数返回类型为 TIMESTAMP_LTZ。 CREATE TABLE user_actions (user_name STRING,data STRING,-- Declare an additional field as a processing-time attribute.user_action_time AS PROCTIME() ) WITH (... );SELECT TUMBLE_START(user_action_time, INTERVAL 10 MINUTE), COUNT(DISTINCT user_name) FROM user_actions GROUP BY TUMBLE(user_action_time, INTERVAL 10 MINUTE);
http://www.w-s-a.com/news/522877/

相关文章:

  • 校园网站开发背景吴江网站建设公司
  • 网站开发工程师发展趋势山东省建设工程电子信息网站
  • 适合大学生创业的网站建设类型吉林省舒兰市建设银行网站
  • 呼和浩特网站建设哪家好培训学校加盟费用
  • 网站如何做友情链接有道云笔记WordPress
  • 贵阳企业网站建设制作赤峰浩诚网站建设公司
  • asp官方网站微信模板素材
  • wordpress 留言给站长发邮件做百度推广员赚钱吗
  • 北京建站公司做网站价格专门找人做软件的网站
  • 商务网站的特点ui软件界面设计
  • 广州个性化网站开发网站索引量是什么意思
  • 公司网站制作专业公司python做后台网站的多吗
  • 桂林建站平台哪家好给别人做网站怎么收取费用
  • python做网站显示表格用visual做的网站
  • 彩票网站建设需要什么聊城网站建设首选天成网络
  • 安徽建设工程网站wordpress标签云代码
  • 推荐佛山顺德网站建设手机网站建设域名空间
  • 电子商务网站建设策划书例子企业官网用什么cms系统
  • 网站栏目设计怎么写平面设计接单报价表
  • 做网站美工要学什么网站推广的方法包括
  • 哪个网站可以做笔译兼职wordpress加表单
  • 百度站内搜索 wordpress微餐饮建站费用
  • 用什么做网站的访问量统计制作手工作品
  • 微信公众号搭建网站河南卫生基层系统网站建设
  • steam账号注册网站重庆手机版建站系统哪家好
  • 中新生态城建设局门户网站wordpress云盘视频播放
  • 大型网站开发基本流程wordpress记录用户搜索
  • 云服务器安装win系统做网站wordpress边栏扩大尺寸
  • 网站开发面试自我介绍软件下载网站如何建设
  • 可以做翻译任务的网站陕西省建设厅八大员证