当前位置: 首页 > news >正文

备案 非网站东营有做网站的公司

备案 非网站,东营有做网站的公司,网页模版下载器,大型网站建设需要What’s Flink-cdc? Flink CDC 是基于Apache Flink的一种数据变更捕获技术#xff0c;用于从数据源#xff08;如数据库#xff09;中捕获和处理数据的变更事件。CDC技术允许实时地捕获数据库中的增、删、改操作#xff0c;将这些变更事件转化为流式数据#xff0c;并能够…What’s Flink-cdc? Flink CDC 是基于Apache Flink的一种数据变更捕获技术用于从数据源如数据库中捕获和处理数据的变更事件。CDC技术允许实时地捕获数据库中的增、删、改操作将这些变更事件转化为流式数据并能够对这些事件进行实时处理和分析。 Flink CDC提供了与各种数据源集成的功能包括常见的关系型数据库如MySQL、PostgreSQL、Oracle等以及NoSQL数据库如MongoDB、HBase等。它通过监控数据库的日志或轮询方式来捕获数据变更并将变更事件作为数据流发送到Flink的任务中进行处理。 Flink CDC 深度集成并由 Apache Flink 驱动提供以下核心功能 ✅ 端到端的数据集成框架 ✅ 为数据集成的用户提供了易于构建作业的 API ✅ 支持在 Source 和 Sink 中处理多个表 ✅ 整库同步 ✅具备表结构变更自动同步的能力Schema Evolution 在使用者的角度就是Flink-cdc可以简化流处理的流程: 引入Flink-cdc之前流处理流程 引入Flink-cdc之后后流处理流程 如上所示在flink-cdc被引入后大大简化了流处理流程 Flink-cdc支持的链接及对应的版本 Pipeline Connectors Source Connectors 截止目前2024-05-23 Flink-cdc与Flink对应对影版本的关系 截止目前2024-05-23 flink-connector-mysql-cdc 实例分析 示例代码 demo代码 import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class MySqlSourceDemo {public static void main(String[] args) throws Exception {MySqlSourceString mySqlSource MySqlSource.Stringbuilder().hostname(mysql-server-host).port(3306).databaseList(mydb) // 设置捕获的数据库.tableList(mydb.products) // 设置捕获的表,如果需要同步整个数据库请将 tableList 设置为 .*. // .tableList(.*) // 捕获整个数据库的表 // .tableList(^(?!mysql|information_schema|performance_schema).*) // 设置捕获的表排除系统库 // .tableList(mydb.(?!products|orders).*) // 同步排除products和orders表之外的整个my_db库.username(flink-cdc).password(xxx).serverId(5400-5405).deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串.serverTimeZone(Asia/Shanghai) // 设置时区.startupOptions(StartupOptions.initial()).scanNewlyAddedTableEnabled(true) // 启用扫描新添加的表功能 // .includeSchemaChanges(true) // 包括 schema 变更.build();org.apache.flink.configuration.Configuration config new org.apache.flink.configuration.Configuration();config.setString(rest.port, 8081); // StreamExecutionEnvironment env StreamExecutionEnvironment.createLocalEnvironment(config); //本地环境调试用StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 设置 3s 的 checkpoint 间隔env.enableCheckpointing(3000);env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage(file:///tmp/ck);//本地文件系统 // env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 1.14.0 版本开始支持env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), MySQL Source)// 设置 source 节点的并行度为 4.setParallelism(5).print().setParallelism(1); // 设置 sink 节点并行度为 1env.execute(Print MySQL Snapshot Binlog);} } maven依赖 propertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncodingflink.version1.14.5/flink.versionscala.binary.version2.12/scala.binary.version/propertiesdependenciesdependencygroupIdjunit/groupIdartifactIdjunit/artifactIdscopetest/scope/dependency!-- 将 Apache Flink 的 Web 运行时模块添加到项目中 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-runtime-web_${scala.binary.version}/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_${scala.binary.version}/artifactIdversion${flink.version}/versionscopeprovided/scope !--provided生命周期在test模式才可以运行在main模式会找不到包--/dependencydependencygroupIdcom.ververica/groupIdartifactIdflink-connector-mysql-cdc/artifactIdversion2.3.0/versionscopecompile/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-common/artifactIdversion${flink.version}/versionscopecompile/scope/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion1.7.25/versionscopeprovided/scope/dependency/dependencies日志配置文件 log4j.properties log4j.rootCategoryerror,stdout log4j.appender.stdoutorg.apache.log4j.ConsoleAppender log4j.appender.stdout.targetSystem.out log4j.appender.stdout.layoutorg.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern%d{yyyy-MM-dd HH:mm:ss} %p %c{1}:%L - %m%n 启动standalone Flink级群 # jobmanager docker run -d \ --name flink-jm \ --hostname flink-jm \ -p 8082:8081 \ --env FLINK_PROPERTIESjobmanager.rpc.address: flink-jm \ --network flink-network-standalone \ ponylee/flink:1.15.0-java8 \ jobmanager# taskmanager docker run -d \ --name flink-tm \ --hostname flink-tm \ --env FLINK_PROPERTIESjobmanager.rpc.address: flink-jm \ --network flink-network-standalone \ ponylee/flink:1.15.0-java8 \ taskmanager \ -Dtaskmanager.memory.process.size1024m \ -Dtaskmanager.numberOfTaskSlots5 \ -Drest.flamegraph.enabledtrue 分析说明 为每个 Reader 设置不同的 Server id 每个用于读取 binlog 的 MySQL 数据库客户端都应该有一个唯一的 id称为 Server id。 MySQL 服务器将使用此 id 来维护网络连接和 binlog 位置。 因此如果不同的作业共享相同的 Server id 则可能导致从错误的 binlog 位置读取数据。 因此建议通过为每个 Reader 设置不同的 Server id , 假设 Source 并行度为 4server id 配置必须serverId(“5400-5405”)5405-54005 4。来为 4 个 Source readers 中的每一个分配唯一的 Server id。 查看mysql链接发现 select * from information_schema.processlist where user ‘flink-cdc’; Flink-cdc对mysql的影响 正常情况下Flink-cdc是No-lock Read主库可以继续处理事务和查询而不会导致主库进程阻塞不会对主库产生直接影响。但是在某些情况下数据同步的过程中可能会对主库产生一些间接影响比如网络、IO、CPU负载以及mysql的并发连接数等资源消耗。但这些对主库的开销影响相对较小全量同步阶段可能比较耗能但时间相对比较短。 从上图mysql资源使用情况来看flink-cdc对内存和CPU负载影响微乎极微是No-lock Read主要表现对网络和IO的资源消耗。 断点续传 通过从checkpoint/savepoint 恢复flink-cdc可以保证断点续传。 从checkpoint/savepoint恢复缩小同步范围例如从tableList(“mydb.products,mydb.orders”)或tableList(“.*”) 缩小到 tableList(“mydb.products”)应用更新生效。 应用从checkpoint/savepoint恢复扩大同步范围的部分不会生效例如从tableList(“mydb.products”) 到 tableList(“mydb.products,mydb.orders”)或tableList(“.*”)应用更新不生效生效。若想使动态加表生效可以显示制定scanNewlyAddedTableEnabled(true) 来启用扫描新添加的表功能。如没有特殊情况建议在开发环境开启此配置。 flink-cdc包名变更 Flink CDC 项目 从 2.0.0 版本将 group id 从com.alibaba.ververica 改成 com.ververica, 自 3.1 版本从将 group id 从 com.ververica 改成 org.apache.flink。 这是为了让项目更加社区中立让各个公司的开发者共建时更方便。所以在maven仓库找 2.x 的包时路径是 /com/ververica找3.1及以上版本的包时路径是/org/apache/flink 参考 flink-cdc flink-cdc docs
http://www.w-s-a.com/news/979175/

相关文章:

  • 深圳专业建网站公司济南公司做网站的价格
  • 怎么运行自己做的网站网上申请平台怎么申请
  • 旅游公司网站 优帮云新闻近期大事件
  • 电商网站后台报价营销软文小短文
  • 网站建设项目售后服务承诺公司名称邮箱大全
  • 湖南网站建设哪里好做ppt的网站叫什么名字
  • 容城县建设银行网站电子商务网站建设子项目
  • 网站管理助手3.0做淘宝网站用什么软件做
  • 贵阳做网站的公司wordpress趣味插件
  • 自己设置免费网站设计平台南京哪里有做公司网站的
  • 建设公司内网网站的意义自助建站网站的宣传手册
  • 手机建设中网站建立个人网站服务器
  • 网站开发工程师岗位概要网站怎么制作教程
  • 城乡建设主管部门官方网站公司简介模板ppt范文
  • 网站认证必须做么cc0图片素材网站
  • net域名 著名网站国外设计案例网站
  • 淘宝客网站哪里可以做app地推网
  • 宜昌建设厅网站中国最新时事新闻
  • 微网站怎么开发wordpress 发表评论
  • 山东网站建设是什么一页网站首页图如何做
  • 游戏开发与网站开发哪个难万网影
  • 做网站编程语言建筑施工特种证书查询
  • 找人做网站内容自己编辑吗修改wordpress登陆界面
  • 登陆建设银行wap网站湖南网站建设磐石网络答疑
  • 58网站怎么做浏览度才高论坛网站怎么做排名
  • wordpress 手机网站支付京东网站建设的经费预算
  • 自己怎么样做游戏网站做海外贸易网站
  • 建立什么样的网站好制作网页网站代码
  • 岳麓区专业的建设网站公司尚一网常德论坛
  • 电商网站建设实训报告360站长平台链接提交