当前位置: 首页 > news >正文

建站教程视频下载自己做网站卖东西需要交税吗

建站教程视频下载,自己做网站卖东西需要交税吗,无极兼职网,制作网站如何选择主机1.CDC概述 CDC#xff08;Change Data Capture#xff09;是一种用于捕获和处理数据源中的变化的技术。它允许实时地监视数据库或数据流中发生的数据变动#xff0c;并将这些变动抽取出来#xff0c;以便进行进一步的处理和分析。 传统上#xff0c;数据源的变化通常通过…1.CDC概述 CDCChange Data Capture是一种用于捕获和处理数据源中的变化的技术。它允许实时地监视数据库或数据流中发生的数据变动并将这些变动抽取出来以便进行进一步的处理和分析。 传统上数据源的变化通常通过周期性地轮询整个数据集进行检查来实现。但是这种轮询的方式效率低下且不能实时反应变化。而 CDC 技术则通过在数据源上设置一种机制使得变化的数据可以被实时捕获并传递给下游处理系统从而实现了实时的数据变动监控。 Flink 作为一个强大的流式计算引擎提供了内置的 CDC 功能能够连接到各种数据源如数据库、消息队列等捕获其中的数据变化并进行灵活的实时处理和分析。 通过使用 Flink CDC我们可以轻松地构建实时数据管道对数据变动进行实时响应和处理为实时分析、实时报表和实时决策等场景提供强大的支持。 2.CDC 的实现原理 通常来讲CDC 分为主动查询和事件接收两种技术实现模式。对于主动查询而言用户通常会在数据源表的某个字段中保存上次更新的时间戳或版本号等信息然后下游通过不断的查询和与上次的记录做对比来确定数据是否有变动是否需要同步。这种方式优点是不涉及数据库底层特性实现比较通用缺点是要对业务表做改造且实时性不高不能确保跟踪到所有的变更记录且持续的频繁查询对数据库的压力较大。事件接收模式可以通过触发器Trigger或者日志例如 Transaction log、Binary log、Write-ahead log 等来实现。当数据源表发生变动时会通过附加在表上的触发器或者 binlog 等途径将操作记录下来。下游可以通过数据库底层的协议订阅并消费这些事件然后对数据库变动记录做重放从而实现同步。这种方式的优点是实时性高可以精确捕捉上游的各种变动缺点是部署数据库的事件接收和解析器例如 Debezium、Canal 等有一定的学习和运维成本对一些冷门的数据库支持不够。综合来看事件接收模式整体在实时性、吞吐量方面占优如果数据源是 MySQL、PostgreSQL、MongoDB 等常见的数据库实现建议使用Debezium来实现变更数据的捕获下图来自Debezium 官方文档如果使用的只有 MySQL则还可以用Canal。 3.为什么选 Flink 从上图可以看到Debezium 官方架构图中是通过 Kafka Streams 直接实现的 CDC 功能。而我们这里更建议使用 Flink CDC 模块因为 Flink 相对 Kafka Streams 而言有如下优势 强大的流处理引擎 Flink 是一个强大的流处理引擎具备高吞吐量、低延迟、Exactly-Once 语义等特性。它通过基于事件时间的处理模型支持准确和有序的数据处理适用于实时数据处理和分析场景。这使得 Flink 成为实现 CDC 的理想选择。 内置的 CDC 功能 Flink 提供了内置的 CDC 功能可以直接连接到各种数据源捕获数据变化并将其作为数据流进行处理。这消除了我们自行开发或集成 CDC 解决方案的需要使得实现 CDC 变得更加简单和高效。 多种数据源的支持 Flink CDC 支持与各种数据源进行集成如关系型数据库如MySQL、PostgreSQL、消息队列如Kafka、RabbitMQ、文件系统等。这意味着无论你的数据存储在哪里Flink 都能够轻松地捕获其中的数据变化并进行进一步的实时处理和分析。 灵活的数据处理能力 Flink 提供了灵活且强大的数据处理能力可以通过编写自定义的转换函数、处理函数等来对 CDC 数据进行各种实时计算和分析。同时Flink 还集成了 SQL 和 Table API为用户提供了使用 SQL 查询语句或 Table API 进行简单查询和分析的方式。 完善的生态系统 Flink 拥有活跃的社区和庞大的生态系统这意味着你可以轻松地获取到丰富的文档、教程、示例代码和解决方案。此外Flink 还与其他流行的开源项目如Apache Kafka、Elasticsearch深度集成提供了更多的功能和灵活性。 4.支持的连接器 5.支持的 Flink 版本 6.Flink CDC特性 支持读取数据库快照即使出现故障也能继续读取binlog并进行Exactly-once处理 DataStream API 的 CDC 连接器用户可以在单个作业中使用多个数据库和表的更改而无需部署 Debezium 和 Kafka Table/SQL API 的 CDC 连接器用户可以使用 SQL DDL 创建 CDC 源来监视单个表上的更改 下表显示了连接器的当前特性 7.用法实例 7.1DataStream API 的用法(推荐) 请严格按照上面的《5.支持的 Flink 版本》搭配来使用Flink CDC propertiesflink.version1.13.0/flink.versionmaven.compiler.source1.8/maven.compiler.sourcemaven.compiler.target1.8/maven.compiler.target /properties dependencygroupIdcom.ververica/groupIdartifactIdflink-connector-mysql-cdc/artifactIdversion${flinkcdc.version}/version /dependency !-- flink核心API -- dependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_2.12/artifactIdversion${flink.version}/version /dependency dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version /dependency dependencygroupIdorg.apache.flink/groupIdartifactIdflink-scala_2.12/artifactIdversion${flink.version}/version /dependency dependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_2.12/artifactIdversion${flink.version}/version /dependency dependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-scala_2.12/artifactIdversion${flink.version}/version /dependency dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-common/artifactIdversion${flink.version}/version /dependency dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner-blink_2.12/artifactIdversion${flink.version}/version /dependency dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge_2.12/artifactIdversion${flink.version}/version /dependency请提前开启MySQL中的binlog配置my.cnf文件重启mysqld服务即可 my.cnf [client] default_character_setutf8 [mysqld] server-id1 collation_serverutf8_general_ci character_set_serverutf8 log-binmysql-bin binlog_formatrow expire_logs_days30ddldml.sql create table test_cdc (id int not nullprimary key,name varchar(100) null,age int null );INSERT INTO flink.test_cdc (id, name, age) VALUES (1, Daniel, 25); INSERT INTO flink.test_cdc (id, name, age) VALUES (2, David, 38); INSERT INTO flink.test_cdc (id, name, age) VALUES (3, James, 16); INSERT INTO flink.test_cdc (id, name, age) VALUES (4, Robert, 27);FlinkDSCDC.java package com.daniel.util;import com.ververica.cdc.connectors.mysql.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** Author Daniel* Date: 2023/7/25 10:03* Description DataStream API CDC**/ public class FlinkDSCDC {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DebeziumSourceFunctionString sourceFunction MySqlSource.Stringbuilder().hostname(localhost).port(3306).username(root).password(123456).databaseList(flink)// 这里一定要是db.table的形式.tableList(flink.test_cdc).deserializer(new StringDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();DataStreamSourceString dataStreamSource env.addSource(sourceFunction);dataStreamSource.print();env.execute(FlinkDSCDC);} } UPDATE flink.test_cdc t SET t.age 24 WHERE t.id 1; UPDATE flink.test_cdc t SET t.name Andy WHERE t.id 3;打印出的日志 SourceRecord{sourcePartition{servermysql_binlog_source}, sourceOffset{transaction_idnull, ts_sec1690272544, filemysql-bin.000001, pos7860, row1, server_id1, event2}} ConnectRecord{topicmysql_binlog_source.flink.test_cdc, kafkaPartitionnull, keyStruct{id1}, keySchemaSchema{mysql_binlog_source.flink.test_cdc.Key:STRUCT}, valueStruct{beforeStruct{id1,nameDaniel,age25},afterStruct{id1,nameDaniel,age24},sourceStruct{version1.5.2.Final,connectormysql,namemysql_binlog_source,ts_ms1690272544000,dbflink,tabletest_cdc,server_id1,filemysql-bin.000001,pos7989,row0},opu,ts_ms1690272544122}, valueSchemaSchema{mysql_binlog_source.flink.test_cdc.Envelope:STRUCT}, timestampnull, headersConnectHeaders(headers)} SourceRecord{sourcePartition{servermysql_binlog_source}, sourceOffset{transaction_idnull, ts_sec1690272544, filemysql-bin.000001, pos7860, row1, server_id1, event4}} ConnectRecord{topicmysql_binlog_source.flink.test_cdc, kafkaPartitionnull, keyStruct{id3}, keySchemaSchema{mysql_binlog_source.flink.test_cdc.Key:STRUCT}, valueStruct{beforeStruct{id3,nameJames,age16},afterStruct{id3,nameAndy,age16},sourceStruct{version1.5.2.Final,connectormysql,namemysql_binlog_source,ts_ms1690272544000,dbflink,tabletest_cdc,server_id1,filemysql-bin.000001,pos8113,row0},opu,ts_ms1690272544122}, valueSchemaSchema{mysql_binlog_source.flink.test_cdc.Envelope:STRUCT}, timestampnull, headersConnectHeaders(headers)}可以得出的结论 1、日志中的数据变化操作类型op可以表示为 ‘u’表示更新操作。在第一条日志中发生了一个更新操作对应的记录的 key 是 id1更新前的数据是 {id1, nameDaniel, age25}更新后的数据是 {id1, nameDaniel, age24}。在第二条日志中也发生了一个更新操作对应的记录的 key 是 id3更新前的数据是 {id3, nameJames, age16}更新后的数据是 {id3, nameAndy, age16}。 2、每条日志还提供了其他元数据信息如数据源source、版本号version、连接器名称connector、时间戳ts_ms等。这些信息可以帮助我们追踪记录的来源和处理过程。 3、日志中的 sourceOffset 包含了一些关键信息如事务IDtransaction_id、文件名file、偏移位置pos等。这些信息可以用于确保数据的准确顺序和一致性。 7.2Table/SQL API的用法 FlinkSQLCDC.java package com.daniel.util;import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row;/*** Author Daniel* Date: 2023/7/25 15:25* Description**/ public class FlinkSQLCDC {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);tableEnv.executeSql(CREATE TABLE test_cdc ( id int primary key, name STRING, age int ) WITH ( connector mysql-cdc, scan.startup.mode latest-offset, hostname localhost, port 3306, username root, password 123456, database-name flink, table-name test_cdc ));Table table tableEnv.sqlQuery(select * from test_cdc);DataStreamTuple2Boolean, Row dataStreamSource tableEnv.toRetractStream(table, Row.class);dataStreamSource.print();env.execute(FlinkSQLCDC);} } UPDATE flink.test_cdc t SET t.age 55 WHERE t.id 2; UPDATE flink.test_cdc t SET t.age 22 WHERE t.id 3; UPDATE flink.test_cdc t SET t.name Alice WHERE t.id 4; UPDATE flink.test_cdc t SET t.age 18 WHERE t.id 1; INSERT INTO flink.test_cdc (id, name, age) VALUES (5, David, 29);打印出的日志 (false,-U[2, David, 38]) (true,U[2, David, 55]) (false,-U[3, Andy, 16]) (true,U[3, Andy, 22]) (false,-U[4, Robert, 27]) (true,U[4, Alice, 27]) (false,-U[1, Daniel, 24]) (true,U[1, Daniel, 18]) (true,I[5, David, 29])
http://www.w-s-a.com/news/498816/

相关文章:

  • 宁夏网站建设怎么样互联网 网站设计
  • 成都关键词seo推广平台手机端关键词排名优化软件
  • 学做软件的网站卡盟平台
  • 网站构建建设案例展示关于做服饰网站的首页
  • 如何建设网站论坛凡科建站手机版登录
  • 建设银行门户网站惠州公司网站建设价格
  • 用python开发网站网站如何取消验证码
  • 公司做企业网站互联网建网站
  • 建网站需要的费用公司注册后怎么做网站
  • 宣传电脑的网站开发运动网站建设教程
  • 网站建设公司都会有哪些花销做网站公司商丘
  • 网站风格有哪些软件定制和开发
  • 公司网络维护具体做什么河南网站推广优化公司哪家好
  • 中学生制作的网站常平哪里有招计算机网站开发的
  • 原创网站模版苏州响应式网站建设
  • 做海报在哪个网站可以找素材网址申请注册方法
  • 网站建设分哪些类别别人做的网站不能用
  • 做网站网站会怎么样全国高校校园网站联盟建设
  • 整站下载器 做网站地图地产项目网站设计
  • 创意设计网站公司手机wap网站建设多少钱
  • 甘肃省第八建设集团公司网站seo高级优化方法
  • 精美的商城网站介绍最多人用的wordpress子主题
  • 检察门户网站建设情况俄外长抵达北京
  • 老电脑做网站服务器网站在线留言如何做
  • 南宁广告公司网站建设小程序源码破解
  • 沛县做网站xlec网站建设开发方式包括哪些方面
  • 山西网站建设 哪家好四川城乡和建设厅网站
  • 有瀑布流的网站小型商城网站
  • 百石网怎么做网站二次开发软件
  • 网站域名是什么东西制作网页哪家好