如何让wordpress主页不显示文章,西安优化网站技术,丰台手机网站建设,企业网站优化设计应该把什么放在首位SpringBoot整合Flink CDC实时同步postgresql变更数据#xff0c;基于WAL日志 一、前言二、技术介绍#xff08;Flink CDC#xff09;1、Flink CDC2、Postgres CDC 三、准备工作四、代码示例五、总结 一、前言
在工作中经常会遇到要实时获取数据库#xff08;postgresql、m… SpringBoot整合Flink CDC实时同步postgresql变更数据基于WAL日志 一、前言二、技术介绍Flink CDC1、Flink CDC2、Postgres CDC 三、准备工作四、代码示例五、总结 一、前言
在工作中经常会遇到要实时获取数据库postgresql、mysql等的变更数据主要体现数据的实时性mysql数据库有canal工具实现很简单但是基于postgresql数据库获取实时数据就比较复杂之前已经写过一篇获取postgresql数据库实时数据的文章如下 【技术实现】java实时同步postgresql变更数据基于WAL日志 但是之前的实现方式比较繁琐不利于维护所有本文整合Flink CDC通过一个比较简单的方式实现
二、技术介绍Flink CDC
1、Flink CDC
Flink CDCChange Data Capture是一个基于Apache Flink构建的开源数据变更捕获CDC框架。其核心功能是从各种关系型数据库如MySQL、PostgreSQL、Oracle等中捕获数据变更如增删改操作并将这些变更以流的形式提供给Flink等流处理引擎进行处理 1CDCChange Data Capture数据变更捕获的简称用于监测并捕获数据库的变动然后将这些变更按照发生顺序捕获并写入到目标存储系统如数据仓库、数据湖、消息队列等。 2Flink CDC基于Flink的CDC实现将CDC技术与Flink流处理引擎相结合实现数据的实时捕获、处理和传输。
2、Postgres CDC
1Postgres CDCChange Data Capture连接器是用于从PostgreSQL数据库捕获数据变更如增删改操作并将其以流的形式提供给数据处理引擎如Flink的组件 2PostgreSQL版本Postgres CDC连接器通常支持PostgreSQL的多个版本具体版本可能因连接器版本不同而有所差异。常见的支持版本包括9.6、10、11、12、13、14等
三、准备工作
1、安装postgresql数据库并创建库和测试使用的表这里不再列举详细步骤 2、修改postgresql数据库配置通过wal日志监听变更数据
修改postgresql.conf文件重启服务
wal_levellogical3、springboot关键maven依赖
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-base/artifactIdversion1.19.0/version
/dependency
dependencygroupIdcom.ververica/groupIdartifactIdflink-connector-postgres-cdc/artifactIdversion3.0.1/version
/dependency注其它依赖不在列举可以通过获取源码查看
四、代码示例
InitAction02.java
package com.sk.proxytest.init;import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;Configuration
public class InitAction02 {PostConstructpublic void run() throws Exception {DebeziumDeserializationSchemaString deserializer new JsonDebeziumDeserializationSchema();JdbcIncrementalSourceString postgresIncrementalSource PostgresSourceBuilder.PostgresIncrementalSource.Stringbuilder().hostname(127.0.0.1).port(5432).database(postgres).schemaList(public).tableList(public.student).username(postgres).password(password).slotName(flink).decodingPluginName(pgoutput) // use pgoutput for PostgreSQL 10.deserializer(deserializer).includeSchemaChanges(true) // output the schema changes as well.splitSize(2) // the split size of each snapshot split.build();StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(3000);env.fromSource(postgresIncrementalSource,WatermarkStrategy.noWatermarks(),PostgresParallelSource).setParallelism(2).addSink(new CustomSink());//.print();env.execute(Output Postgres Snapshot);}}CustomSink.java
package com.sk.proxytest.init;import lombok.extern.log4j.Log4j2;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;Log4j2
public class CustomSink extends RichSinkFunctionString {Overridepublic void invoke(String value, Context context) throws Exception {log.info(数据发生变化:{}, value);}
}执行结果
1新增数据
2变更数据输出
2024-07-31T00:00:15,761 INFO [debezium-reader-0] io.debezium.util.Threads$3: Creating thread debezium-postgresconnector-postgres_cdc_source-keep-alive
2024-07-31T00:00:15,761 INFO [debezium-reader-0] io.debezium.connector.postgresql.PostgresStreamingChangeEventSource: Processing messages
2024-07-31T00:00:15,762 INFO [debezium-reader-0] io.debezium.connector.postgresql.connection.WalPositionLocator: Message with LSN LSN{0/3588018} arrived, switching off the filtering
2024-07-31T00:00:16,678 INFO [Sink: Unnamed (1/4)#0] com.sk.proxytest.init.CustomSink: 数据发生变化:{before:null,after:{id:8,name:8,age:8,remark:8},source:{version:1.9.7.Final,connector:postgresql,name:postgres_cdc_source,ts_ms:1722355215252,snapshot:false,db:postgres,sequence:[null,\56131608\],schema:public,table:student,txId:932,lsn:56131608,xmin:null},op:c,ts_ms:1722355216336,transaction:null}五、总结
Postgres CDC 连接器是一个 Flink Source 连接器它将首先读取数据库快照然后继续读取二进制日志即使发生故障也会进行一次处理 Postgres CDC 连接器 注文章源代码关注下面公众号获取