设计软件免费下载网站,鄂尔多斯公司网站建设,阿里云 个人网站备案,网站恶意镜像1.背景
做数据分析处理#xff0c;平时主要使用DataWorks进行数据的离线处理#xff0c;通常处理的是昨天及以前的数据#xff0c;但业务上有些数据实时的数据价值更大#xff0c;需要进行数据流的实时获取、处理和展示#xff0c;那这时候使用Flink进行实时数据流处理是…1.背景
做数据分析处理平时主要使用DataWorks进行数据的离线处理通常处理的是昨天及以前的数据但业务上有些数据实时的数据价值更大需要进行数据流的实时获取、处理和展示那这时候使用Flink进行实时数据流处理是个很好的技术方案
2.Fink介绍
Flink是Apache Flink的简称。Flink是一款开源的流处理框架专注于处理无界和有界数据流具有高吞吐、低延迟、精准状态一致性等核心特性。它支持事件时间处理、精确一次状态保证并适用于实时分析、事件驱动应用、数据管道构建等场景是大数据生态中的重要工具。
主要特性有
1.高性能与低延迟每秒可处理数百万事件毫秒级延迟响应满足实时计算需求。
2.事件事件处理与状态一致性 支持事件时间Event-Time语义能正确处理乱序数据流确保计算结果准确性
3.高可用性与扩展性原生支持与 Kubernetes、YARN 等资源管理工具集成故障恢复快支持动态扩缩容保障 7×24 小时稳定运行
主要应用的场景 事件驱动型应用通过实时处理事件流触发计算逻辑如实时推荐系统等 流批一体化分析支持连续查询(无界流)与传统批处理(有界数据流)如实时日志分析历史数据统计 数据管道与ETL在不同存储系统间高效转换和迁移数据替代部分传统批量ETL工具
3.项目搭建
Apache Flink官网
Apache Flink开发文档
可以通过Flink的开发文档中的介绍方式进行导入到本地进行开发目前我这边都使用的方式是通过Maven导入Flink Jar包的方式在IDEA中进行开发。
开发环境: Java 8 Flink 1.17.0
Maven文件
propertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncodingflink.version1.17.0/flink.version
/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependencydependencygroupIdcom.ververica/groupIdartifactIdflink-connector-mysql-cdc/artifactIdversion2.4.1/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-base/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc/artifactIdversion3.1.1-1.17/version/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.33/version/dependency!-- JSON解析 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-json/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge/artifactIdversion${flink.version}/version/dependencydependencygroupIdcom.aliyun.openservices/groupIdartifactIdflink-log-connector/artifactIdversion0.1.38/version/dependencydependencygroupIdcom.google.protobuf/groupIdartifactIdprotobuf-java/artifactIdversion2.5.0/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.28/version/dependencydependencygroupIdcom.alibaba.fastjson2/groupIdartifactIdfastjson2/artifactIdversion2.0.39/version/dependency/dependencies我这里除了依赖了Flink还依赖了Flink CDC和一些Flink Connect用于与数据库进行连接便于数据流的输入和输出
4.项目开发
根据Flink的开发文档中介绍Flink的项目主要为这几步骤
public static void main(String[] args) throws Exception {// 1.获取Flink环境//StreamExecutionEnvironment 是所有 Flink 程序的基础。//如何使用IDEA进行运行那么它将创建一个本地环境该环境将在你的本地机器上执行你的程序。StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.指定数据源FileSourceString fileSource FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path(file:///path/to/file)).build();// 3.将数据源进行一定的业务操作fileSource.process();// 4.(可选)将数据源进行输出stream.sinkTo(FileSink.forRowFormat(new Path(outputPath), new SimpleStringEncoder()).build());// 5.本地执行环境 env.execute(Window WordCount);}接下来将对于重点的2、3、4步骤点进行分别介绍省流版可以直接跳到5.完整项目举例进行查看完整项目代码
指定输入数据源
这里就是输入原始数据流的连接数据库将需要处理的数据输入至项目中。这里读取的是数据库的binglog所以需要将数据库的binglog模式进行打开。这里以连接本地Mysql数据库进行举例说明
1.打开Mysql binglog模式
在 [mysqld] 部分添加以下配置
[mysqld]
# 启用 binlog
server-id 1
log_bin /opt/homebrew/var/mysql/mysql-bin.log
binlog_format ROW
binlog_row_image FULL
expire_logs_days 10
max_binlog_size 100M# 启用 GTID (推荐)
gtid_mode ON
enforce_gtid_consistency ON# 必须设置的事务隔离级别
transaction_isolation READ-COMMITTED2.验证Mysql binglog是否打开
SHOW VARIABLES LIKE log_bin; -- 应为 ON
SHOW VARIABLES LIKE binlog_format; -- 应为 ROW
SHOW VARIABLES LIKE binlog_row_image; -- 应为 FULL
SHOW VARIABLES LIKE gtid_mode; -- 应为 ON3.Flink CDC链接本地Mysql数据库
这里链接的是我本地Mysql数据库需要读取的Scheme是local读取的库是flink_input然后读取的数据是更新的数据StartupOptions.latest()如果需要读取全部数据则设置为StartupOptions.initial(),然后也添加了JDBC的一些配置。
Flink CDC中自带许多系统的Source如KafKaSource、MysqlSource还可以通过一些读取本地文件的方式进行Source输入。
此外有时候需要两个输入源进行数据比较或者数据合并的时候就可以设置两个Source进行分别输入。
MySqlSourceDataChangeInfo inputSource MySqlSource.DataChangeInfobuilder().hostname(localhost).port(3306).databaseList(local).tableList(local.flink_input).username(root).password(********).deserializer(new MysqlDeserialization()).startupOptions(StartupOptions.latest()).jdbcProperties(createJdbcProperties()).build();private static Properties createJdbcProperties() {Properties props new Properties();props.setProperty(useUnicode, true);props.setProperty(characterEncoding, UTF-8);props.setProperty(zeroDateTimeBehavior, convertToNull);props.setProperty(serverTimezone, Asia/Shanghai);props.setProperty(autoReconnect, true);props.setProperty(allowMultiQueries, true);return props;
}将数据源进行业务处理
Flink最主要的功能就是将数据进行一定的处理首先在Flink中分为流处理和批处理然后Flink处理数据的时间分为以系统时间戳时间进行处理和以事件时间进行处理并且设定窗口将窗口中数据最后交给算子进行处理。下面我将依次介绍
1.流处理/批处理
流处理还是批处理这主要是依据数据流是有界的还是无界的这两者具体的区别如下 流处理 (Stream Processing) : 核心概念: 处理持续不断、理论上永无止境的数据流。数据特性: 处理的是无界数据流 (Unbounded Data Stream) 。数据没有明确的终点会持续不断地产生和到达例如来自 Kafka 的实时交易数据、传感器读数、用户活动日志。执行模型: 增量计算。数据到达一条处理一条或微批次计算结果持续产生和更新。需要管理状态State、处理时间Time和乱序数据Watermark。API: 主要使用 DataStream API。 批处理 (Batch Processing) : 核心概念: 处理有限、完整的数据集。数据特性: 处理的是有界数据流 (Bounded Data Stream) 或有界数据集 (Bounded Dataset) 。数据是完整的、已知的、不会改变的例如存储在 HDFS 上的昨天的日志文件、关系数据库中的特定表快照。执行模型: 有界计算。整个数据集在开始处理前是已知的或可视为已知的。Flink 会一次性加载所有或大部分数据执行计算产生最终结果后作业结束。状态管理相对简单有序数据通常不需要处理乱序时间。API: 主要使用 DataSet API旧版逐步被 Table API / SQL 和 DataStream 有界流取代或 DataStream API 配合有界数据源推荐方式实现流批一体。
2.时间标定以系统时间处理数据/以事件时间处理数据
这个是Flink中标定是以哪个时间为准进行处理相对应的数据流
如果以系统时间进行处理那么基本上就是处理有序数据默认在这批数据是按照系统时间来进行
如果以事件时间处理那么需要指定数据的具体的字段作为处理时间的标定Flink为此也提出了一个Watermark水位线的概念这个水位线是专门给流处理数据进行处理的通常处理的是乱序数据
3.窗口
通常就是设定一个时间段在这个时间段内的数据将进行数据处理时间的标定是以上面系统时间/事件事件为准的
举例说明事件标定和窗口
//系统时间
dataStream.map(new MapFunctionDataChangeInfo, OrderInfo() {Overridepublic OrderInfo map(DataChangeInfo dataChangeInfo) {//初步处理}})// 设置了以系统时间的20秒窗口期.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))//事件时间
//水位线这里是以数据的create_time这个字段作为水位线并设置了10秒的允许迟到时间
WatermarkStrategyOrderInfo strategy WatermarkStrategy.OrderInfoforBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((o, t) - o.getCreate_time());//设置了20秒的窗口期
dataStream.map(new MapFunctionDataChangeInfo, OrderInfo() {Overridepublic OrderInfo map(DataChangeInfo dataChangeInfo) {//初步处理}})// 水位线.assignTimestampsAndWatermarks(strategy)// 窗口期.window(TumblingEventTimeWindows.of(Time.seconds(20)))重点讲解水位线和延迟时间 以20秒窗口、10秒延迟时间举例
流入数据{create_time:2025-06-14 08:50:01,id:29,status:1} 流水线 01
流入数据{create_time:2025-06-14 08:50:10,id:30,status:1} 流水线 10
流入数据{create_time:2025-06-14 08:50:20,id:31,status:1} 流水线 20
流入数据{create_time:2025-06-14 08:50:15,id:32,status:1} 流水线 20
流入数据{create_time:2025-06-14 08:50:25,id:33,status:1} 流水线 25
流入数据{create_time:2025-06-14 08:50:30,id:34,status:1} 流水线 30触发窗口
窗口2025-06-14 16:50:00---2025-06-14 16:50:20
窗口流出数据OrderInfo(id29, status1, create_time1749891000000)
窗口流出数据OrderInfo(id30, status1, create_time1749891010000)
窗口流出数据OrderInfo(id32, status1, create_time1749891015000)流入数据{create_time:2025-06-14T08:50:50Z,id:35,status:1} 流水线 50触发窗口
窗口2025-06-14 16:50:20---2025-06-14 16:50:40
窗口流出数据OrderInfo(id31, status1, create_time1749891020000)
窗口流出数据OrderInfo(id33, status1, create_time1749891025000)
窗口流出数据OrderInfo(id34, status1, create_time1749891030000)1.前两条数据就是正常输入流水线正常往下推进
2.到第三条数据流水线其实已经来到了20秒但是未触发窗口是因为还有10秒的允许迟到时间需要等到流水线推进到30时才会触发窗口关闭
3.第四条数据时间其实为15s,但是由于流水线不会回退所以仍为20这条数据也就是为迟到数据仍会归到窗口内数据
4.直到第六条数据流水线来到了30所以触发[00,20)的窗口关闭处理数据所以打印出29、30、32这三条数据
5.然后输入第七条数据流水线到了50触发了第二个窗口的关闭40窗口结束时间10允许延迟时间所以第二个窗口关闭输入第二个窗口的数据31、33、34
根据多次数据打印得出窗口期通常会以整数形式出现和结束无论第一个事件时间是否为整数窗口都为整数进行开始和结束
4.算子
算子就为flink的自带的各种数据流的处理包括流的合并流的连接如join、connect、cogroup、process等等这部分内容比较多后续单独再写一篇文章主要讲解常用的算子
5.将数据源进行输出
就是将处理好的最终数据以sink的方式输出到目标库中。这里我继承了官方的sink然后进行自定义重写方法主要目的为了将写入数据库失败可以以异常的形式日志打印出来目前官方的sink使用下来无法知道写入数据库的异常日志。
自定义MysqlSink
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class MysqlSink extends RichSinkFunctionOrderInfo {private transient Connection connection;private transient PreparedStatement ps;Overridepublic void open(Configuration parameters) throws Exception {connection DriverManager.getConnection(jdbc:mysql://localhost:3306/local?zeroDateTimeBehaviorconvertToNulluseUnicodetruecharacterEncodingUTF-8serverTimezoneAsia/ShanghaiautoReconnecttrueallowMultiQueriestrue,root, *******);ps connection.prepareStatement(insert into flink_output (id, record_time) VALUES (?, ?););}Overridepublic void invoke(OrderInfo value, Context context) throws Exception {try {System.out.println(窗口流出数据 value);ps.setString(1, value.getId().toString());ps.setString(2, DateFormatUtils.format(value.getCreate_time(), yyyy-MM-dd HH:mm:ss));ps.execute();} catch (SQLException e) {System.out.println(写入数据库失败 e.getMessage());e.printStackTrace();}}Overridepublic void close() throws Exception {if (ps ! null) {ps.close();}if (connection ! null) {connection.close();}}
}主函数调用sink
MysqlSink mysqlSink new MysqlSink();process.addSink(mysqlSink).name(Mysql Sink).setParallelism(1);5.完整项目举例
主工程
package com.flink;import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Properties;public class Main {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);MySqlSourceDataChangeInfo inputSource MySqlSource.DataChangeInfobuilder().hostname(localhost).port(3306).databaseList(local).tableList(local.flink_input).username(root).password(62983335).deserializer(new MysqlDeserialization()).startupOptions(StartupOptions.latest()).jdbcProperties(createJdbcProperties()).build();MysqlSink mysqlSink new MysqlSink();DataStreamDataChangeInfo dataStream env.fromSource(inputSource, WatermarkStrategy.noWatermarks(), MySQL Source).setParallelism(1);WatermarkStrategyOrderInfo strategy WatermarkStrategy.OrderInfoforBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((o, t) - o.getCreate_time());SingleOutputStreamOperatorOrderInfo process dataStream.filter(d - StringUtils.isNoneBlank(d.getAfterData())).map(new MapFunctionDataChangeInfo, OrderInfo() {Overridepublic OrderInfo map(DataChangeInfo dataChangeInfo) {try {System.out.println(原始数据 dataChangeInfo.getAfterData());return JSONObject.parseObject(dataChangeInfo.getAfterData(), OrderInfo.class);} catch (Exception e) {System.err.println(Failed to parse JSON: dataChangeInfo.getAfterData());System.err.println(Exception message: e.getMessage());e.printStackTrace();return null;}}}).assignTimestampsAndWatermarks(strategy).keyBy(OrderInfo::getStatus).window(TumblingEventTimeWindows.of(Time.seconds(20)))
// .window(TumblingEventTimeWindows.of(Time.seconds(20))).process(new ProcessWindowFunctionOrderInfo, OrderInfo, Integer, TimeWindow() {Overridepublic void process(Integer id, ProcessWindowFunctionOrderInfo, OrderInfo, Integer, TimeWindow.Context context, IterableOrderInfo iterable, CollectorOrderInfo output) throws Exception {long startTimeStamp context.window().getStart();long endTimeStamp context.window().getEnd();String startTime DateFormatUtils.format(startTimeStamp, yyyy-MM-dd HH:mm:ss);String endTime DateFormatUtils.format(endTimeStamp, yyyy-MM-dd HH:mm:ss);System.out.println(窗口 startTime --- endTime);iterable.forEach(orderInfo - {if (orderInfo.getStatus().equals(1)) {output.collect(orderInfo);
// output.collect(orderInfo.getId() ----- orderInfo.getCreate_time());}});}});process.print();process.addSink(mysqlSink).name(Mysql Sink).setParallelism(1);env.execute(start job);}private static Properties createJdbcProperties() {Properties props new Properties();props.setProperty(useUnicode, true);props.setProperty(characterEncoding, UTF-8);props.setProperty(zeroDateTimeBehavior, convertToNull);props.setProperty(serverTimezone, Asia/Shanghai);props.setProperty(autoReconnect, true);props.setProperty(allowMultiQueries, true);return props;}
}MysqlDeserialization类用于Mysql数据库binglog数据解析
package com.flink;import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableMap;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;import java.util.List;
import java.util.Map;
import java.util.Optional;
Slf4j
public class MysqlDeserialization implements DebeziumDeserializationSchemaDataChangeInfo {public static final String TS_MS ts_ms;public static final String BIN_FILE file;public static final String POS pos;public static final String BEFORE before;public static final String AFTER after;public static final String SOURCE source;/*** 获取操作类型 READ CREATE UPDATE DELETE TRUNCATE;* 变更类型 0 初始化 1新增 2修改 3删除 4导致源中的现有表被截断的操作*/private static final MapString, Integer OPERATION_MAP ImmutableMap.of(READ, 0,CREATE, 1,UPDATE, 2,DELETE, 3,TRUNCATE, 4);Overridepublic void deserialize(SourceRecord sourceRecord, CollectorDataChangeInfo collector) throws Exception {String topic sourceRecord.topic();String[] fields topic.split(\.);String database fields[1];String tableName fields[2];Struct struct (Struct) sourceRecord.value();final Struct source struct.getStruct(SOURCE);DataChangeInfo dataChangeInfo new DataChangeInfo();// 获取操作类型 READ CREATE UPDATE DELETE TRUNCATE;Envelope.Operation operation Envelope.operationFor(sourceRecord);String type operation.toString().toUpperCase();int eventType OPERATION_MAP.get(type);// fixme 一般情况是无需关心其之前之后数据的,直接获取最新的数据即可,但这里为了演示,都进行输出dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString());dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());if (eventType 3) {dataChangeInfo.setData(getJsonObject(struct, BEFORE).toJSONString());} else {dataChangeInfo.setData(getJsonObject(struct, AFTER).toJSONString());}dataChangeInfo.setOperatorType(eventType);dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse());dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x - Integer.parseInt(x.toString())).orElse(0));dataChangeInfo.setDatabase(database);dataChangeInfo.setTableName(tableName);dataChangeInfo.setOperatorTime(Optional.ofNullable(struct.get(TS_MS)).map(x - Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));// 输出数据collector.collect(dataChangeInfo);}/*** 从元素数据获取出变更之前或之后的数据** param value value* param fieldElement fieldElement* return JSONObject*/private JSONObject getJsonObject(Struct value, String fieldElement) {Struct element value.getStruct(fieldElement);JSONObject jsonObject new JSONObject();if (element ! null) {Schema afterSchema element.schema();ListField fieldList afterSchema.fields();for (Field field : fieldList) {Object afterValue element.get(field);jsonObject.put(field.name(), afterValue);}}return jsonObject;}Overridepublic TypeInformationDataChangeInfo getProducedType() {return TypeInformation.of(DataChangeInfo.class);}}DataChangeInfo类
import lombok.Data;Data
public class DataChangeInfo {/*** 变更类型 0 初始化 1新增 2修改 3删除 4导致源中的现有表被截断的操作*/private Integer operatorType;/*** 变更前数据*/private String beforeData;/*** 变更后数据*/private String afterData;/*** 操作的数据*/private String data;/*** binlog文件名*/private String fileName;/*** binlog当前读取点位*/private Integer filePos;/*** 数据库名*/private String database;/*** 表名*/private String tableName;/*** 变更时间*/private Long operatorTime;
}