网站系统建设思想如何写,做代理记账网站,镇江市远航网络科技有限公司,相册网站怎么做的一#xff0c;Flink 和Flink CDC
1#xff0c; Flink
Apache Flink是一个框架和分布式处理引擎#xff0c;用于对无界和有界数据流进行有状态计算。
中文文档 Apache Flink Documentation | Apache Flink
官方文档 #xff1a;https://flink.apache.org Flink 中文社区… 一Flink 和Flink CDC
1 Flink
Apache Flink是一个框架和分布式处理引擎用于对无界和有界数据流进行有状态计算。
中文文档 Apache Flink Documentation | Apache Flink
官方文档 https://flink.apache.org Flink 中文社区视频课程https://github.com/flink-china/flink-training-course Flink 中文社区 https://www.slidestalk.com/FlinkChina ververica 教程 https://training.ververica.com/ ververica 教程中文文档:https://ci.apache.org/projects/flink/flink-docs-master/zh/ 源码https://github.com/apache/flink Flink 知识图谱 https://developer.aliyun.com/article/744741 2Flink CDC
CDC 的全称是 Change Data Capture 在广义的概念上只要是能捕获数据变更的技术我们都可以称之为 CDC 。
目前通常描述的 CDC 技术主要面向数据库的变更是一种用于捕获数据库中数据变更的技术。CDC 技术的应用场景非常广泛 数据迁移常用于数据库备份、容灾等 数据分发将一个数据源分发给多个下游常用于业务解耦、微服务 数据采集将分散异构的数据源集成到数据仓库中消除数据孤岛便于后续的分析。
目前业界主流的 CDC 实现机制可以分为两种
2.1 基于查询的 CDC
离线调度查询作业批处理。依赖表中的更新时间字段每次执行查询去获取表中最新的数据无法捕获删除事件从而无法保证数据一致性 无法保障实时性基于离线调度存在天然的延迟。
2.2 基于日志的 CDC
实时消费日志流处理。例如 MySQL 的 binlog 日志完整记录了数据库中的变更可以把binlog 文件当作流的数据源 保障数据一致性因为 binlog 文件包含了所有历史变更明细 保障实时性因为类似 binlog 的日志文件是可以流式消费的提供的是实时数据。
2.3 主流CDC技术对比 1) DataX 不支持增量同步Canal 不支持全量同步。虽然两者都是非常流行的数据同步工具但在场景支持上仍不完善。
2) 在全量增量一体化同步方面只有 Flink CDC、Debezium、Oracle Goldengate 支持较好。
3) 在架构方面Apache Flink 是一个非常优秀的分布式流处理框架因此 Flink CDC 作为Apache Flink 的一个组件具有非常灵活的水平扩展能力。而 DataX 和 Canal 是个单机架构在大数据场景下容易面临性能瓶颈的问题。
4) 在数据加工的能力上CDC 工具是否能够方便地对数据做一些清洗、过滤、聚合甚至关联打宽。Flink CDC 依托强大的 Flink SQL 流式计算能力可以非常方便地对数据进行加工。而Debezium 等则需要通过复杂的 Java 代码才能完成使用门槛比较高。
5) 另外在生态方面这里指的是上下游存储的支持。Flink CDC 上下游非常丰富支持对接MySQL、PostgreSQL 等数据源还支持写入到 TiDB、HBase、Kafka、Hudi 等各种存储系统中也支持灵活的自定义 connector。 二安装Flink
1为了运行Flink只需提前安装好 Java 11或以上版本 java -version 2下载Flink
下载地址Index of /dist/flinkhttps://archive.apache.org/dist/flink
可以找到你要安装的版本 wget https://archive.apache.org/dist/flink/flink-1.19.1/flink-1.19.1-bin-scala_2.12.tgz 3解压 tar -xzf flink-1.19.1-bin-scala_2.12.tgz 4进入到flink-1.19.1目录启动集群 ./bin/start-cluster.sh 5如果想访问WebUI可以看下面
【问题解决】Flink在linux上运行成功但是无法访问webUI界面_flink web ui的ip访问-CSDN博客
6停止集群 ./bin/stop-cluster.sh 三运行Flink示例
在examples目录下有很多示例可以试着运行 1运行单词统计示例 ./bin/flink run examples/streaming/WordCount.jar 进入到log目录查看日志 tail -f flink-root-taskexecutor-0-rocky8-template.out 四运行Flink CDC示例
1通过编写sql脚本来实现同步数据
SET execution.checkpointing.interval 5s;drop table if exists order_01;
CREATE TABLE order_01 (id INT NOT NULL,order_id VARCHAR,amount VARCHAR,remark VARCHAR,PRIMARY KEY (id) NOT ENFORCED) WITH (connector mysql-cdc,hostname x.x.x.x,port 3306,username root,password p4ssword,database-name account,server-time-zone UTC,table-name order_01);drop table if exists order_02;
CREATE TABLE order_02 (id INT NOT NULL,order_id VARCHAR,amount VARCHAR,remark VARCHAR,PRIMARY KEY (id) NOT ENFORCED) WITH (connector jdbc,url jdbc:mysql://x.x.x.x:3306/account?useSSLfalseallowPublicKeyRetrievaltrueserverTimezoneUTC,username root,password p4ssword,table-name order_02,driver com.mysql.cj.jdbc.Driver,scan.fetch-size 200);INSERT INTO order_02
SELECT id,order_id,amount,remark
FROM order_01;编写以上脚本命名为flinkCdc2Mysql.sql上传到flink的sql目录下这里的sql是我新建的你可以自己指定。 需要下载flink-connecter和flink-sql-connector包
下载地址Central Repository: com/ververica 通过以下命令执行 ./bin/sql-client.sh -f ./sql/flinkCdc2Mysql.sql 就可以完成数据从order_01表同步到order_02表。 2 编写Java类编译成jar执行jar来实现数据同步
AccountVoucherSumaryDWSSQL类
package com.xxx.demoimport org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** date 2024/10/31 下午3:57*/
public class AccountVoucherSumaryDWSSQL {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);env.enableCheckpointing(5000);final StreamTableEnvironment tEnv StreamTableEnvironment.create(env);tEnv.executeSql(CREATE DATABASE IF NOT EXISTS account);// 动态表此为source表tEnv.executeSql(CREATE TABLE account.order_01 (\n id INT,\n order_id VARCHAR,\n amount VARCHAR,\n remark VARCHAR,\n PRIMARY KEY (id) NOT ENFORCED\n ) WITH (\n connector mysql-cdc,\n hostname x.x.x.x,\n port 3306,\n username root,\n password p4ssword,\n database-name account,\n table-name order_01,\n server-time-zone UTC,\n scan.incremental.snapshot.enabled false\n ));// 动态表此为sink表。sink表和source表的connector不一样tEnv.executeSql(CREATE TABLE account.order_02 (\n id INT,\n order_id VARCHAR,\n amount VARCHAR,\n remark VARCHAR,\n PRIMARY KEY (id) NOT ENFORCED\n ) WITH (\n connector jdbc,\n url jdbc:mysql://x.x.x.x:3306/account?useSSLfalseallowPublicKeyRetrievaltrueconnectionTimeZoneUTC,\n username root,\n password p4ssword,\n table-name order_02,\n sink.buffer-flush.max-rows 1,\n sink.buffer-flush.interval 1s,\n sink.max-retries 3 \n));tEnv.executeSql(INSERT INTO account.order_02 (id, order_id, amount, remark)\n select t1.id,\n t1.order_id,\n t1.amount,\n t1.remark\n from account.order_01 t1 \n);env.execute();}
}
pom文件
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.xxx.demo/groupIdartifactIdFlinkDemo/artifactIdversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncodingscala.version2.12/scala.versionjava.version8/java.versionflink.version1.19.1/flink.versionfastjson.version1.2.62/fastjson.versionhadoop.version2.8.3/hadoop.versionscope.modecompile/scope.modeslf4j.version1.7.30/slf4j.version/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-scala-bridge_${scala.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner_${scala.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-scala_${scala.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-files/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-csv/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc/artifactIdversion3.2.0-1.19/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion3.2.0-1.19/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion${fastjson.version}/version/dependency!-- Add log dependencies when debugging locally --dependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion${slf4j.version}/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion${slf4j.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-runtime-web/artifactIdversion${flink.version}/version/dependencydependencygroupIdcom.google.code.gson/groupIdartifactIdgson/artifactIdversion2.8.7/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.26/versionscopecompile/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-runtime/artifactIdversion${flink.version}/version/dependency!-- Test dependencies --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-test-utils/artifactIdversion${flink.version}/versionscopetest/scope/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.26/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-statebackend-rocksdb/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.doris/groupIdartifactIdflink-doris-connector-1.15/artifactIdversion1.2.1/version/dependencydependencygroupIdcom.ververica/groupIdartifactIdflink-connector-mysql-cdc/artifactIdversion3.0.1/version/dependency/dependenciesbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion3.8.1/versionconfigurationsource8/sourcetarget8/target/configuration/plugin!-- 打fatjar配置 --plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-assembly-plugin/artifactIdversion3.3.0/versionconfigurationarchivemanifest!--这里指定要运行的main类--mainClasscom.xxx.demo.AccountVoucherSumaryDWSSQL/mainClass/manifest/archivedescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/id !-- 此处指定继承合并 --phasepackage/phase !-- 绑定到打包阶段 --goalsgoalsingle/goal/goals/execution/executions/plugin/plugins/build/project
完成之后通过maven来打包。
上传包到flink下examples目录下这里也新建一个目录CDC 通过以下命令来运行 ./bin/flink run examples/cdc/FlinkDemo-1.0-SNAPSHOT-jar-with-dependencies.jar 上面两种运行方式运行之后都可以去Flink的Web页面查看到运行的任务。 五小结
上面只是搭建了Flink并运行了示例项目初步体验了一下Flink的功能还完全到不了实际项目中运用的程度。后续会一步步去探索Flink在项目中应用的技巧。