做互联网网站待遇,做徽标的网站,网页设计作业成品框架集,买网站主机目录 概述实践代码总结表创建插入一行数据更新一行数据删除一笔数据 概述 本文测试 canal 监控 mysql 表变化。canal 1.1.7 mysql 8.0.x 版本。
实践
代码
public static void main(String[] args) {// 创建一个 CanalConnector 连接器// username:字符串类型,Canal使用该用… 目录 概述实践代码总结表创建插入一行数据更新一行数据删除一笔数据 概述 本文测试 canal 监控 mysql 表变化。canal 1.1.7 mysql 8.0.x 版本。
实践
代码
public static void main(String[] args) {// 创建一个 CanalConnector 连接器// username:字符串类型,Canal使用该用户名验证客户端身份// password:字符串类型,Canal使用该密码验证客户端身份CanalConnector canalConnector CanalConnectors.newSingleConnector(new InetSocketAddress(10.xx.xx.142, 11111), example, canal, canal);try {while (true) {try {// 连接 Canal Server 尝试多次重连canalConnector.connect();break;} catch (Exception e) {System.out.println(重新连接...);Thread.sleep(1000);}}// 订阅数据库表默认监听所有的数据 库、表、等同于: .*\\..*//canalConnector.subscribe(.*\\..*);// 监听指定的数据库、表canalConnector.subscribe(shop.product);// 回滚到上一次的 batchId,取消已经消费过的日志canalConnector.rollback();// 持续监听 Canal Server 推送的数据,并使用自定义的 CanalEventDownStreamHandler 处理器消费数据while (true) {// 允许指定 batchSize 一次可以获取多条 每次返回的对象为 Message 包含的内容为// batch id 唯一标识// entries 具体的数据对象Message message canalConnector.getWithoutAck(100);long batchId message.getId();// 如果没有新数据 则暂停固定时间后 继续获取if (batchId -1 || message.getEntries().isEmpty()) {Thread.sleep(1000);}else {// 解析 binlog 数据输出详细信息for (CanalEntry.Entry entry : message.getEntries()) {if (entry.getEntryType() CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChange null;try {rowChange CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {e.printStackTrace();continue;}String schemaName entry.getHeader().getSchemaName();String tableName entry.getHeader().getTableName();CanalEntry.EventType eventType rowChange.getEventType();System.out.println(String.format(Binlog[%s:%s] schema[%s] table[%s] eventType[%s],entry.getHeader().getLogfileName(),entry.getHeader().getLogfileOffset(),schemaName,tableName,eventType));for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {// 根据事件类型 输出变更前、后的列数据if (eventType CanalEntry.EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType CanalEntry.EventType.INSERT) {printColumn(rowData.getAfterColumnsList());}else {System.out.println(before);printColumn(rowData.getBeforeColumnsList());System.out.println(after);printColumn(rowData.getAfterColumnsList());}}// 确认消费成功canalConnector.ack(batchId);}}}} catch (Exception e) {e.printStackTrace();}finally {canalConnector.disconnect();}}private static void printColumn(ListCanalEntry.Column columns) {for (CanalEntry.Column column : columns) {System.out.println(column.getName() : column.getValue() update: column.getUpdated());}}总结
表创建
Binlog[binlog.000002:3153] schema[shop] table[product] eventType[CREATE]插入一行数据
Binlog[binlog.000002:4484] schema[shop] table[product] eventType[INSERT]
id : 1 update:true
title : 测试 update:true
cover_img : 21 update:true
amout : 11.0 update:true
summary : 11 update:true
detail : 11 update:true
phone : 11 update:true
gmt_create : 2024-06-11 03:11:44 update:true
gmt_modified : 2024-06-11 03:11:44 update:true更新一行数据
Binlog[binlog.000002:4847] schema[shop] table[product] eventType[UPDATE]
before
id : 1 update:false
title : 测试 update:false
cover_img : 21 update:false
amout : 11.0 update:false
summary : 11 update:false
detail : 11 update:false
phone : 11 update:false
gmt_create : 2024-06-11 03:11:44 update:false
gmt_modified : 2024-06-11 03:11:44 update:false
after
id : 1 update:false
title : 测试99 update:true
cover_img : 21 update:false
amout : 11.0 update:false
summary : 11 update:false
detail : 11 update:false
phone : 11 update:false
gmt_create : 2024-06-11 03:11:44 update:false
gmt_modified : 2024-06-11 03:12:21 update:true删除一笔数据
Binlog[binlog.000002:5248] schema[shop] table[product] eventType[DELETE]
id : 1 update:false
title : 测试99 update:false
cover_img : 21 update:false
amout : 11.0 update:false
summary : 11 update:false
detail : 11 update:false
phone : 11 update:false
gmt_create : 2024-06-11 03:11:44 update:false
gmt_modified : 2024-06-11 03:12:21 update:false