合肥网站建设是什么意思,微信企业app手机下载安装,有什么免费推广项目的好软件,网站怎么在百度做推广我们使用 Debezium 实时同步一个 MySQL 的数据到另一个 MySQL#xff0c;代码网上基本都有#xff0c;都是在引入 debezium-api#xff0c;debezium-embedded 后写 Java 代码#xff0c;做好了基本配置后启动程序#xff0c;Debezium 会自动读取 MySQL 的实时 binlog… 我们使用 Debezium 实时同步一个 MySQL 的数据到另一个 MySQL代码网上基本都有都是在引入 debezium-apidebezium-embedded 后写 Java 代码做好了基本配置后启动程序Debezium 会自动读取 MySQL 的实时 binlog然后触发相应的事件让我们处理我们就把事件里的数据读取出来插入到目标库即可。我们的 MySQL 的版本是 5.7 。 但我们在其中发现了一个很奇怪的问题目标库存在多个相同的 sql 我们以为是 Debezium 重复消费了 binlog 里的事件就记录下每个事件的 position 并判重但 sql 还是重复了我们一开始觉得 MySQL 写的 binlog 肯定没问题一个事务对应一个事件。之后我们使用 binlog2sql 这个 python 工具读取了已归档的 binlog 文件发现里面没有重复的 sql 这说明 MySQL binlog 还是没有问题的问题在 Debezium但 Debezium 作为一个成熟的 cdc 工具应该也不会有什么大的问题可能是 Debezium 的配置问题但检查了 Debezium 的所有配置后还是没发现有什么问题配置改了后重新运行结果还是一样。 后面我们怀疑可能和 gtid 有关我们发现 “Insert into xxx values (xxx) ” 会产生一个 binlog 事件因为一个事务会产生一个 binlog 事件但 “Insert into xxx values (xxx)xxx)xxx...” 会产生多个事件但这些事件的 gtid 还是同一个事件里的 query 属性还是同一个事件的 query 属性即原始 sql 这就破案了我们一直消费每个事件的query但可能多个事件里的 query 属性是一样的因为它们的 gtid 属性相同它们属于同一个全局事务。后面我们使用 gtid 过滤相同属性就解决了数据重复问题。至于为什么一个批量插入会产生一个多个事件并且多个事件的 gtid 是同一个我们猜测 MySQL 的 binlog 就是这样写日志的修改一行数据就产生一个事件要是批量修改就产生多个事件但这些批量事件同属于一个全局事务。 怎么过滤重复 gtid 问题因为 gtid 是递增的相同的 gtid 都会一起出现所以可以使用自动老元素的 Map或是设置键过期的 redis或是 带有 gtid 属性的数据库表并设置它是唯一索引或是插入数据之前先检查数据库里是否有本事件的gtid有就跳过没有就插入并把这个过程加锁保证原子性。 核心代码
// 启动
DebeziumEngineChangeEventString, String engine DebeziumEngine.create(Json.class).using(config) .notifying(DataSync::handleChangeEvent).build();ExecutorService executor Executors.newSingleThreadExecutor();executor.execute(engine);
private static void handleChangeEvent(ChangeEventString, String event) {JSONObject valueJson JSON.parseObject(event.value());if (valueJson ! null) {JSONObject payload valueJson.getJSONObject(payload);JSONObject source payload.getJSONObject(after);// 原始sqlString query source.getString(data_definition);// 对 sql 字符串进行美化query query.replaceAll([\\n\\r\\t\\s], );String database source.getString(database);String table source.getString(table_name);String gtid source.getString(gtid);synchronized (lock) {// 查询数据库该 gtid 的数量long cnt queryGtid(gtid);if (cnt 0) {// 如果数据库不存在该 sql 就插入save(query, database, table, gtid);} else {System.out.println(gtid 有重复);}} }}