当前位置: 首页 > news >正文

国内最大的摄影网站贵州便宜网站推广优化电话

国内最大的摄影网站,贵州便宜网站推广优化电话,深圳网站建设ejiew,惠州网站制作找哪家简介 基于doris官方用doris构建实时仓库的思路#xff0c;从flinkcdc到doris实时数仓的实践。 原文 Apache Flink X Apache Doris 构建极速易用的实时数仓架构 (qq.com) 前提-Flink CDC 原理、实践和优化 CDC 是什么 CDC 是变更数据捕获#xff08;Change Data Captur…简介 基于doris官方用doris构建实时仓库的思路从flinkcdc到doris实时数仓的实践。 原文  Apache Flink X Apache Doris 构建极速易用的实时数仓架构 (qq.com)  前提-Flink CDC 原理、实践和优化 CDC 是什么 CDC 是变更数据捕获Change Data Capture技术的缩写它可以将源数据库Source的增量变动记录同步到一个或多个数据目的Sink。在同步过程中还可以对数据进行一定的处理例如分组GROUP BY、多表的关联JOIN等。 例如对于电商平台用户的订单会实时写入到某个源数据库A 部门需要将每分钟的实时数据简单聚合处理后保存到 Redis 中以供查询B 部门需要将当天的数据暂存到 Elasticsearch 一份来做报表展示C 部门也需要一份数据到 ClickHouse 做实时数仓。随着时间的推移后续 D 部门、E 部门也会有数据分析的需求这种场景下传统的拷贝分发多个副本方法很不灵活而 CDC 可以实现一份变动记录实时处理并投递到多个目的地。 下图是一个示例通过腾讯云 Oceanus 提供的 Flink CDC 引擎可以将某个 MySQL 的数据库表的变动记录实时同步到下游的 Redis、Elasticsearch、ClickHouse 等多个接收端。这样大家可以各自分析自己的数据集互不影响同时又和上游数据保持实时的同步。 CDC 的实现原理 通常来讲CDC 分为主动查询和事件接收两种技术实现模式。 对于主动查询而言用户通常会在数据源表的某个字段中保存上次更新的时间戳或版本号等信息然后下游通过不断的查询和与上次的记录做对比来确定数据是否有变动是否需要同步。这种方式优点是不涉及数据库底层特性实现比较通用缺点是要对业务表做改造且实时性不高不能确保跟踪到所有的变更记录且持续的频繁查询对数据库的压力较大。 事件接收模式可以通过触发器Trigger或者日志例如 Transaction log、Binary log、Write-ahead log 等来实现。当数据源表发生变动时会通过附加在表上的触发器或者 binlog 等途径将操作记录下来。下游可以通过数据库底层的协议订阅并消费这些事件然后对数据库变动记录做重放从而实现同步。这种方式的优点是实时性高可以精确捕捉上游的各种变动缺点是部署数据库的事件接收和解析器例如 Debezium、Canal 等有一定的学习和运维成本对一些冷门的数据库支持不够。 综合来看事件接收模式整体在实时性、吞吐量方面占优如果数据源是 MySQL、PostgreSQL、MongoDB 等常见的数据库实现建议使用 Debezium 来实现变更数据的捕获下图来自 Debezium 官方文档。如果使用的只有 MySQL则还可以用 Canal。 为什么选 Flink 从上图可以看到Debezium 官方架构图中是通过 Kafka Streams 直接实现的 CDC 功能。而我们这里更建议使用 Flink CDC 模块因为 Flink 相对 Kafka Streams 而言有如下优势 Flink 的算子和 SQL 模块更为成熟和易用Flink 作业可以通过调整算子并行度的方式轻松扩展处理能力Flink 支持高级的状态后端State Backends允许存取海量的状态数据Flink 提供更多的 Source 和 Sink 等生态支持Flink 有更大的用户基数和活跃的支持社群问题更容易解决Flink 的开源协议允许云厂商进行全托管的深度定制而 Kafka Streams 只能自行部署和运维 而且 Flink Table / SQL 模块将数据库表和变动记录流例如 CDC 的数据流看做是同一事物的两面因此内部提供的 Upsert 消息结构I 表示新增、-U 表示记录更新前的值、U 表示记录更新后的值-D 表示删除可以与 Debezium 等生成的变动记录一一对应。 Flink CDC 的使用方法 目前 Flink CDC 支持两种数据源输入方式。 输入 Debezium 等数据流进行同步 例如 MySQL - Debezium - Kafka - Flink - PostgreSQL。适用于已经部署好了 Debezium希望暂存一部分数据到 Kafka 中以供多次消费只需要 Flink 解析并分发到下游的场景。 在该场景下由于 CDC 变更记录会暂存到 Kafka 一段时间因此可以在这期间任意启动/重启 Flink 作业进行消费也可以部署多个 Flink 作业对这些数据同时处理并写到不同的数据目的Sink库表中实现了 Source 变动与 Sink 的解耦。  CREATE TABLE Data_Input (id BIGINT,actor VARCHAR,alias VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (connector kafka, -- 可选 kafka,kafka-0.11. 注意选择对应的内置 Connectortopic YourDebeziumTopic, -- 替换为您要消费的 Topicscan.startup.mode earliest-offset -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets 的任何一种properties.bootstrap.servers 10.0.1.2:9092, -- 替换为您的 Kafka 连接地址properties.group.id YourGroup, -- 必选参数, 一定要指定 Group ID-- 定义数据格式 (Debezium JSON 格式)format debezium-json,debezium-json.schema-include false, );CREATE TABLE Data_Output (id BIGINT,actor VARCHAR,alias VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (connector jdbc,url jdbc:postgresql://postgresql.example:50060/myDatabase?currentSchemamySchemareWriteBatchedInsertstrue, -- 请替换为您的实际 PostgreSQL 连接参数table-name MyTable, -- 需要写入的数据表username user, -- 数据库访问的用户名需要提供 INSERT 权限password helloworld -- 数据库访问的密码 );INSERT INTO Data_Output SELECT * FROM Data_Input; 如果在流计算 Oceanus 界面上可以勾选 kafka 和 jdbc 两个内置的 Connector 随后直接开始运行作业Flink 就会源源不断的消费 YourDebeziumTopic 这个 Kafka 主题中 Debezium 写入的记录然后输出到下游的 MySQL 数据库中实现了数据同步。 直接对接上游数据库进行同步 我们还可以跳过 Debezium 和 Kafka 的中转使用 Flink CDC Connectors 对上游数据源的变动进行直接的订阅处理。从内部实现上讲Flink CDC Connectors 内置了一套 Debezium 和 Kafka 组件但这个细节对用户屏蔽因此用户看到的数据链路如下图所示 用法示例 同样的这次我们有个 MySQL 数据库需要实时将内容同步到 PostgreSQL 中。但我们没有也不想安装 Debezium 等额外组件那我们可以新建一个 Flink SQL 作业然后输入如下 SQL 代码连接参数都是虚拟的仅供参考 CREATE TABLE Data_Input (id BIGINT,actor VARCHAR,alias VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (connector mysql-cdc, -- 可选 mysql-cdc 和 postgres-cdchostname 192.168.10.22, -- 数据库的 IPport 3306, -- 数据库的访问端口username debezium, -- 数据库访问的用户名需要提供 SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, RELOAD 权限password helloworld!, -- 数据库访问的密码database-name YourData, -- 需要同步的数据库table-name YourTable -- 需要同步的数据表名 );CREATE TABLE Data_Output (id BIGINT,actor VARCHAR,alias VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (connector jdbc,url jdbc:postgresql://postgresql.example:50060/myDatabase?currentSchemamySchemareWriteBatchedInsertstrue, -- 请替换为您的实际 PostgreSQL 连接参数table-name MyTable, -- 需要写入的数据表username user, -- 数据库访问的用户名需要提供 INSERT 权限password helloworld -- 数据库访问的密码 );INSERT INTO Data_Output SELECT * FROM Data_Input; 如果在流计算页面可以选择内置的 mysql-cdc 和 jdbc Connector 注意 需要使用 Flink CDC Connectors 附加组件。腾讯云 Oceanus 已经自带了 MySQL-CDC Connector如果自行部署的话需要下载 jar 包并将其放入 Flink 的 lib 目录下。 访问数据库时请确保连接的用户足够权限PostgreSQL 用户看这里MySQL 用户看这里 Flink CDC 模块的实现 Debezium JSON 格式解析类探秘 flink-json 模块中的 org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory 是负责构造解析 Debezium JSON 格式的工厂类同样地org.apache.flink.formats.json.canal.CanalJsonFormatFactory 负责 Canal JSON 格式。这些类已经内置在 Flink 1.11 的发行版中直接可以使用无需附加任何程序包。 对于 Debezium JSON 格式而言Flink 将具体的解析逻辑放在了 org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema#DebeziumJsonDeserializationSchema 类中。 上图表示 Debezium JSON 的一条更新Update消息它表示上游已将 id123 的数据更新且字段内包含了更新前的旧值以及更新后的新值。 那么Flink 是如何解析并生成对应的 Flink 消息呢我们看下这个类的 deserialize 方法 GenericRowData before (GenericRowData) payload.getField(0); // 更新前的数据 GenericRowData after (GenericRowData) payload.getField(1); // 更新后的数据 String op payload.getField(2).toString(); // 获取 op 字段的类型if (OP_CREATE.equals(op) || OP_READ.equals(op)) { // 如果是创建 (c) 或快照读取 (r) 消息after.setRowKind(RowKind.INSERT); // 设置消息类型为新建 (I)out.collect(after); // 发送给下游 } else if (OP_UPDATE.equals(op)) { // 如果是更新 (u) 消息before.setRowKind(RowKind.UPDATE_BEFORE); // 把更新前的数据类型设置为撤回 (-U)after.setRowKind(RowKind.UPDATE_AFTER); // 把更新后的数据类型设置为更新 (U)out.collect(before); // 发送两条数据给下游out.collect(after); } else if (OP_DELETE.equals(op)) { // 如果是删除 (d) 消息before.setRowKind(RowKind.DELETE); // 将消息类型设置为删除 (-D)out.collect(before); // 发送给下游 } else {... // 异常处理逻辑 } 从上述逻辑可以看出对于每一种 Debezium 的操作码op 字段的类型都可以用 Flink 的 RowKind 类型来表示。对于插入 I 和删除 D都只需要一条消息即可而对于更新则涉及删除旧数据和写入新数据因此需要 -U 和 U 两条消息来对应。 特别地在 MySQL、PostgreSQL 等支持 Upsert原子操作的 Update or Insert语义的数据库中通常前一个 -U 消息可以省略只把后一个 U 消息用作实际的更新操作即可这个优化在 Flink 中也有实现。 因此可以看到Debezium 到 Flink 消息的转换逻辑是非常简单和自然的这也多亏了 Flink 先进的设计理念很早就提出并实现了 Upsert 数据流和动态数据表之间的映射关系。 Flink CDC Connectors 的实现 flink-connector-debezium 模块 我们在使用 Flink CDC Connectors 时也会好奇它究竟是如何做到的不需要安装和部署外部服务就可以实现 CDC 的。当我们阅读 flink-connector-mysql-cdc 的源码时可以看到它内部依赖了 flink-connector-debezium 模块而这个模块将 Debezium Embedded 嵌入到了 Connector 中。 flink-connector-debezium 的数据源实现类为 com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction它集成了 Flink 中的 RichSourceFunction 并实现了 CheckpointedFunction 以支持快照保存状态。 通常而言对于 SourceFunction我们可以从它的 run 方法入手分析。它的核心代码如下 this.engine DebeziumEngine.create(Connect.class).using(properties) // 初始化 Debezium 所需的参数.notifying(debeziumConsumer) // 收到批量的变更消息, 则 Debezium 会回调 DebeziumChangeConsumer 来反序列化并向下游输出数据.using(OffsetCommitPolicy.always()).using((success, message, error) - {if (!success error ! null) {this.reportError(error);}}).build(); ...executor.execute(engine); // 向 Executor 提交 Debezium 线程以启动运行 可以看到这个 SourceFunction 使用一些预先定义的参数初始化了一个嵌入式的 DebeziumEngineJava 的 Runnable然后提交给线程池executor去执行。这个 Debezium 线程会批量接收 binlog 信息并回调传入的 debeziumConsumer 以反序列化消息并交给 Flink 来处理。本类的其他方法主要负责初始化状态和保存快照这里略过。 这里我们再来看一下 DebeziumChangeConsumer 的实现它的最核心的方法是 handleBatch 。当 Debezium 收到一批新的事件时会调用这个方法来通知我们的 Connector 进行处理。这里有个 for 循环轮询的逻辑 for (ChangeEventSourceRecord, SourceRecord event : changeEvents) { // 轮询各个事件SourceRecord record event.value();if (isHeartbeatEvent(record)) { // 如果时心跳包// 只更新当前 offset 信息, 然后继续不进行实际处理synchronized (checkpointLock) {debeziumOffset.setSourcePartition(record.sourcePartition());debeziumOffset.setSourceOffset(record.sourceOffset());}continue;}deserialization.deserialize(record, debeziumCollector); // 反序列化这条消息if (isInDbSnapshotPhase) { // 如果处于数据库快照期, 需要阻止 Flink 检查点Checkpoint生成if (!lockHold) {MemoryUtils.UNSAFE.monitorEnter(checkpointLock);lockHold true;...}if (!isSnapshotRecord(record)) { // 如果已经不在数据库快照期了, 就释放锁, 允许 Flink 正常生成检查点CheckpointMemoryUtils.UNSAFE.monitorExit(checkpointLock);isInDbSnapshotPhase false;...}}// 更新当前 offset 信息, 并向下游 Flink 算子发送数据emitRecordsUnderCheckpointLock(debeziumCollector.records, record.sourcePartition(), record.sourceOffset()); } 可以看到逻辑比较简单只需要关注 checkpointLock 这个对象只有持有这个对象的锁时才允许 Flink 进行检查点的生成。 当作业处于数据库快照期即作业刚启动时需全量同步源数据库的一份完整快照此时收到的数据类型是 Debezium 的 SnapshotRecord则不允许 Flink 进行 Checkpoint 即检查点的生成以避免作业崩溃恢复后状态不一致同样地如果正在向下游算子发送数据并更新 offset 信息时也不允许快照的进行。这些操作都是为了保证 Exacly-Once精确一致语义。 这里也解释了在作业刚启动时如果数据库较大同步时间较久Flink 刚开始的 Checkpoint 永远失败超时的原因只有当 Flink 完整同步了全量数据后才可以进行增量数据的处理以及 Checkpoint 的生成。 flink-connector-mysql-cdc 模块 而对于 flink-connector-mysql-cdc 模块而言它主要涉及到 MySQLTableSource 的声明和实现。 我们知道Flink 是通过 Java 的 SPIService Provider Interface机制动态加载 Connector 的因此我们首先看这个模块的 src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory 文件里面内容指向 com.alibaba.ververica.cdc.connectors.mysql.table.MySQLTableSourceFactory。 打开这个工厂类我们可以看到它定义了该 Connector 所需的参数例如 MySQL 数据库的用户名、密码、表名等信息并负责 MySQLTableSource 实例的具体创建而 MySQLTableSource 类对这些参数做转换最终会生成一个上文提到的 DebeziumSourceFunction 对象。 因此我们可以发现这个模块作用是一个 MySQL 参数的封装和转换层最终的逻辑实现仍然是由 flink-connector-debezium 完成的。 MySQL CDC 常见问题 优化(重点Maxwell如果不设置成Full那么update就可能没有old数据) 由于 Flink 的 CDC 功能还比较新1.11 版本刚开始支持1.12 版本逐步完善因而在应用过程中很可能会遇到有各种问题。鉴于大多数客户的数据源都是 MySQL我们这里整理了客户常见的一些问题和优化方案希望能够帮助到大家。 Debezium 报错binlog probably contains events generated with statement or mixed based replication format 或 The MySQL server is not configured to use a FULL binlog_row_image 当前的 Binlog 格式被设置为了 STATEMENT 或者 MIXED 这两种都不被 Debezium 支持。为了使用 Flink CDC 功能需要把 MySQL 的 binlog-format 设置为 ROW SET GLOBAL binlog_format ROW; SET GLOBAL binlog_row_image FULL; 如果您使用的是腾讯云的 TencentDB for MySQL请确认下面设置 Debezium 报错User does not have the LOCK TABLES privilege required to obtain a consistent snapshot 或 Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) 请对作业中指定的 MySQL 用户赋予如下权限SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT例如 GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 用户名 IDENTIFIED BY 密码; FLUSH PRIVILEGES; 如果您使用的数据库不允许或者不希望使用 RELOAD 进行全局锁则还需要授予 LOCK TABLES 权限以令 Debezium 尝试进行表级锁。注意表级锁会导致更长的数据库锁定时间 如果希望彻底跳过锁对数据的一致性要求不高但要求数据库不能被锁则可以在 WITH 参数中设置 debezium.snapshot.locking.mode none 参数来跳过锁操作。但请注意同步过程中千万不要随意变更库表的结构。 作业刚启动期间Flink Checkpoint 一直失败/重启 前文讲过Flink CDC Connector 在初始的全量快照同步阶段会屏蔽掉快照的执行因此如果 Flink Checkpoint 需要执行的话就会因为一直无法获得 checkpointLock 对象的锁而超时。 可以设置 Flink 的 execution.checkpointing.tolerable-failed-checkpoint 参数以容忍更多的 Checkpoint 失败事件同时可以调大 Checkpoint 周期避免作业因 Checkpoint 失败而一直重启。 JDBC Sink 批量写入时数据会缺失几条 如果发现数据库中的某些数据在 CDC 同步后有缺失请确认是否仍在使用 Flink 旧版 1.10 的 Flink SQL WITH 语法例如 WITH 参数中的 connector.type 是旧语法connector 是新语法。 旧版语法的 Connector 在 JDBC 批量写入 Upsert 数据例如数据库的更新记录时并未考虑到 Upsert 与 Delete 消息之间的顺序关系因此会出现错乱的问题请尽快迁移到新版的 Flink SQL 语法。 异常数据造成作业持续重启 默认情况下如果遇到异常的数据例如消费的 Kafka topic 在无意间混入了其他数据Flink 会立刻崩溃重启然后从上个快照点Checkpoint重新消费。由于某条异常数据的存在作业会永远因为异常而重启。可以在 WITH 参数中加入 debezium-json.ignore-parse-errors true 来应对这个问题。 上游 Debezium 崩溃导致写入重复数据结果不准 Debezium 服务端发生异常并恢复后由于可能没有及时记录崩溃前的现场可能会退化为 At least once 模式即同样的数据可能被发送多次造成下游结果不准确。 为了应对这个问题新版的 Flink 1.12 增加了一个 table.exec.source.cdc-events-duplicate 配置项可以编辑 flink-conf.yaml 文件来配置建议将其设置为 true 以对这些重复数据进行去重。 但是需要注意该选项需要数据源表定义了主键否则也无法进行去重操作。 未来展望 在 Flink 1.11 版本中CDC 功能首次被集成到内核中。由于 Flink 1.11.0 版本有个 严重 Bug 造成 Upsert 数据无法写入下游我们建议使用 1.11.1 及以上版本。 在 1.12 版本上Flink 还在配置项中增加了前文提到的 table.exec.source.cdc-events-duplicate 等选项以更好地支持 CDC 去重还支持 Avro 格式的 Debezium 数据流而不仅仅限于 JSON 了。另外这个版本增加了对 Maxwell 格式的 CDC 数据流支持 为了更好地完善 CDC 功能模块Flink 社区创建了 FLINK-18822 以追踪关于该模块的进展。可以从中看到Flink 1.13 主要着力于支持更多的类型FLINK-18758以及允许从 Debezium Avro、Canal 等数据流中读取一些元数据信息等。 而在更远的规划中Flink 还可能支持基于 CDC 的内存数据库缓存这样我们可以在内存中动态地 JOIN 一个数据库的副本而不必每次都查询源库这将极大地提升作业的处理能力并降低数据库的查询压力。 参考文章 Flink CDC 原理、实践和优化 - 腾讯云开发者社区-腾讯云 FlinkCDC-MySql到Doris FlinkCDC将MySQL接入Doris实战_wangleigiser的博客-CSDN博客 基于Flink CDC 和 Doris Connector 实现 MySQL分库分表数据数据实时入Doris - 知乎  Doris构建简易实时数仓  Apache Flink X Apache Doris 构建极速易用的实时数仓架构 (qq.com)
http://www.w-s-a.com/news/105534/

相关文章:

  • 做网站的基本功制作网站公司推荐
  • 阿里云快速建站教程个人网站 费用
  • 广东购物网站建设微信公众号制作模板免费
  • 阿里国际站韩语网站怎么做让移动网站
  • 北京外包做网站如何报价中国几大网络推广公司
  • 中国建设部网站关于资质wordpress 建app
  • 程序员找工作的网站哈尔滨建设信息网站
  • 公司 网站 方案高考写作网站
  • 网站后台如何登陆网站开发需求逻辑图
  • 市级档案网站建设情况分析server2008做DNS与网站
  • 公积金门户网站建设方案网站建设代理平台怎么做
  • 网站建设知识论文抖音开放平台是干什么的
  • 网站建设期末试卷大气简洁网站
  • 电子商务网站建设报告范文单位做网站怎么做
  • 优质的外国网站qq小程序在哪里打开
  • 商务网站建设与推广实训报告免费素材网站无水印
  • 外贸站seoapp开发公司历程概述
  • 沈阳网站推广¥做下拉去118cr陶瓷企业 瓷砖地板公司网站建设
  • 医院网站官方微信精神文明建设我做服装设计师的 求推荐资源网站
  • 微信网站建设需要那些资料昆明cms模板建站
  • 安庆网站建设兼职中企动力是500强吗
  • 网站排名优化技巧基于网站的网络营销方法有哪些
  • 摄影素材网站做知识问答的网站
  • 中小企业网站建设济南兴田德润电话门店管理系统软件排行
  • 昆明工程建设信息网站柳州网站建设公司哪家好
  • 如何分析网站关键词北京门户网站网址
  • 做网站与做游戏那个好网站域名怎么起
  • 有没有做cad单的网站银行网站建设方案视频
  • 和各大网站做视频的工作高校网站群管理系统
  • 中国建设人才服务信息网是正规网站怎么注销自己名下的公司