网站图片自动下载,wordpress 微信 论坛,wordpress打字烟花,北京网站建设龙鹏文章目录 1、基本介绍2、代码实战2.1、数据源准备2.2、代码实战2.3、数据格式 1、基本介绍
Flink CDC 是 Apache Flink 提供的一个功能强大的组件#xff0c;用于实时捕获和处理数据库中的数据变更。可以实时地从各种数据库#xff08;如MySQL、PostgreSQL、Oracle、MongoDB… 文章目录 1、基本介绍2、代码实战2.1、数据源准备2.2、代码实战2.3、数据格式 1、基本介绍
Flink CDC 是 Apache Flink 提供的一个功能强大的组件用于实时捕获和处理数据库中的数据变更。可以实时地从各种数据库如MySQL、PostgreSQL、Oracle、MongoDB等中捕获数据变更并将其转换为流式数据FlinkCDC 同步数据有两种方式
FlinkSQLFlink DataStream 和 Table API本文使用该方式 对比其他的CDC开源方案发现FlinkCDC是绝大多数场景最好的选择方式别在傻傻的只关注Canal了如下图所示
2、代码实战
2.1、数据源准备
本次我是用MySQL 8.0版本并且创建好数据库库名为quick_chat本次演示表结构如下
CREATE TABLE quick_chat_msg (id bigint NOT NULL COMMENT 主键id,from_id varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT 账户id发送人,to_id varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT 账户id接收人,relation_id varchar(50) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT 发送关联,content varchar(500) DEFAULT NULL COMMENT 消息内容,msg_type tinyint(1) DEFAULT NULL COMMENT 消息类型1文字2语音3表情包4文件5语音通话6视频通话,extra_info varchar(500) DEFAULT NULL COMMENT 额外信息,create_time datetime DEFAULT NULL COMMENT 创建时间,deleted tinyint(1) DEFAULT NULL COMMENT 删除标识,PRIMARY KEY (id) USING BTREE
) ENGINEInnoDB DEFAULT CHARSETutf8mb3;需要保证MySQL的Binlog格式是ROW不过MySQL 8.0版本格式默认就是ROW 最后要把数据库时区配置好否则会出现问题命令如下
SET persist time_zone 8:00;
SET time_zone 8:00;
SHOW VARIABLES LIKE %time_zone%;2.2、代码实战
首先引入Flink CDC相关依赖内容如下
dependencies!-- Flink connector连接器基础包 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-base/artifactIdversion1.14.0/version/dependency!-- Flink CDC MySQL源 --dependencygroupIdcom.ververica/groupIdartifactIdflink-sql-connector-mysql-cdc/artifactIdversion2.3.0/version/dependency!-- Flink DataStream数据流API --dependencygroupIdcom.ververica/groupIdartifactIdflink-connector-mysql-cdc/artifactIdversion2.2.0/versionscopeprovided/scope/dependency!-- Flink客户端--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_2.12/artifactIdversion1.14.0/version/dependency!--Flink WebUI端口8081默认没有开启--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-runtime-web_2.12/artifactIdversion1.14.0/version/dependency!--Flink Table APISQL程序可以连接到其他外部系统用于读写批处理表和流式表。--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-runtime_2.12/artifactIdversion1.14.0/version/dependency
/dependencies第二步开发 Sink 监听类用于监听 MySQL 数据变化
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;public class MySinkHandler extends RichSinkFunctionString {Overridepublic void invoke(String value, Context context) throws Exception {System.out.println(value);}Overridepublic void open(Configuration parameters) throws Exception {}Overridepublic void close() throws Exception {}
}最后配置好 Flink CDC 监听进程随着项目启动运行
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;Component
public class MySqlSourceExample {PostConstructpublic void init() throws Exception {// 配置监听数据源MySqlSourceString source MySqlSource.Stringbuilder().hostname(8.141.28.132).port(3306)// 数据库集合可以配置多个.databaseList(quick_chat)// 表集合可以配置多个.tableList(quick_chat.quick_chat_msg).username(root).password(root).deserializer(new JsonDebeziumDeserializationSchema()).includeSchemaChanges(true).build();// 配置 Flink WebUIConfiguration configuration new Configuration();configuration.setInteger(RestOptions.PORT, 8081);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 检查点间隔时间// checkpoint的侧重点是“容错”即Flink作业意外失败并重启之后能够直接从早先打下的checkpoint恢复运行且不影响作业逻辑的准确性。env.enableCheckpointing(5000);DataStreamSinkString sink env.fromSource(source, WatermarkStrategy.noWatermarks(), MySQL Source).addSink(new MySinkHandler());env.execute();}
}项目启动完毕后可以通过8081端口访问Flink UI页面
2.3、数据格式
上述操作完毕后我对表数据进行了新增、修改、删除操作控制台可以看到MySQL变更监听日志输出信息
# 新增
{before: null,after: {id: 3,from_id: dog,to_id: cat,relation_id: dog:cat,content: 你好啊,msg_type: 1,extra_info: null,create_time: 1729164075000,deleted: 0},source: {version: 1.6.4.Final,connector: mysql,name: mysql_binlog_source,ts_ms: 1729135279000,snapshot: false,db: quick_chat,sequence: null,table: quick_chat_msg,server_id: 1,gtid: null,file: binlog.000002,pos: 2452,row: 0,thread: null,query: null},op: c,ts_ms: 1729135278633,transaction: null
}# 修改
{before: {id: 3,from_id: dog,to_id: cat,relation_id: dog:cat,content: 你好啊,msg_type: 1,extra_info: null,create_time: 1729164075000,deleted: 0},after: {id: 3,from_id: dog,to_id: cat,relation_id: dog:cat,content: 你好啊小猫咪,msg_type: 1,extra_info: null,create_time: 1729164075000,deleted: 0},source: {version: 1.6.4.Final,connector: mysql,name: mysql_binlog_source,ts_ms: 1729135289000,snapshot: false,db: quick_chat,sequence: null,table: quick_chat_msg,server_id: 1,gtid: null,file: binlog.000002,pos: 2825,row: 0,thread: null,query: null},op: u,ts_ms: 1729135288473,transaction: null
}# 删除
{before: {id: 3,from_id: dog,to_id: cat,relation_id: dog:cat,content: 你好啊小猫咪,msg_type: 1,extra_info: null,create_time: 1729164075000,deleted: 0},after: null,source: {version: 1.6.4.Final,connector: mysql,name: mysql_binlog_source,ts_ms: 1729135301000,snapshot: false,db: quick_chat,sequence: null,table: quick_chat_msg,server_id: 1,gtid: null,file: binlog.000002,pos: 3247,row: 0,thread: null,query: null},op: d,ts_ms: 1729135300692,transaction: null
}