第三方做农产品价格数据的网站,修改wordpress主题字体大小,做装修那个网站好,设计公司网站设计详情我在用flinkcdc把数据从sqlserver写到doris
正常情况下sqlserver有删除数据#xff0c;doris是能捕获到并很快同步删除的。
但是我现在情况是doris做为数仓#xff0c;数据写到ods#xff0c;ods的数据还会通过flink计算后写入dwd层#xff0c;所以此时ods的数据是删除了…我在用flinkcdc把数据从sqlserver写到doris
正常情况下sqlserver有删除数据doris是能捕获到并很快同步删除的。
但是我现在情况是doris做为数仓数据写到odsods的数据还会通过flink计算后写入dwd层所以此时ods的数据是删除了但是dwd甚至ads的都没删除这样就会有脏数据。此时我们就需要去捕获被删除的数据就要用到debezium插件。
利用cdc把数据从sqlserver发送到kafka并转为debezium的json格式。
source表正常配sink-kafka的表配置如下 然后再拉一个工作流 想获取数据的话就用如下方式 很多人不懂op是什么c是什么接下来解释下
debezium会有四种op的数据
op 表示当前事件的类型取值为c表示insert、u表示update、d表示delete、r表示快照readts_ms: connector处理该事件的本地时间戳可以省略before变化事件发生之前的值after变化事件发生之后的值source事件源的结构信息包括connector版本、事务ID等 举个例子
新增一条数据
insert into inventory.a values (4, n4);控制台输出的是
...payload:{before:null,after:{id:4,name:n4}...op:c...修改一条数据
update inventory.a set name n4-upd where id 4;控制台输出的是
...payload:{before:{id:4,name:n4},after:{id:4,name:n4-upd}...op:u...删除一条数据
delete from inventory.a where id 1;控制台输出的是
...payload:{before:{id:1,name:n1},after:null...op:d...
这样就可以获取到被删除的数据或者是更新前的数据啦~
我在sqlserver测试的时候发现update一条数据实际上会先有一条d然后有一条c。