网站开发设计的论文,怎么做服装网站,手机如何建免费网站,ui设计加班很严重目录大纲 一、环境配置1.1 docker-compose.yml 配置1.2 docker-compose 常用命令1.3 镜像服务启动状态 二、MySQL binlog 配置2.1 docker-compose command 配置 binlog2.2 创建canal用户#xff0c;以及查看是否开启binlog 三、canal 相关配置文件3.1 canal.properties 完整文… 目录大纲 一、环境配置1.1 docker-compose.yml 配置1.2 docker-compose 常用命令1.3 镜像服务启动状态 二、MySQL binlog 配置2.1 docker-compose command 配置 binlog2.2 创建canal用户以及查看是否开启binlog 三、canal 相关配置文件3.1 canal.properties 完整文件3.2 instance.properties 完整文件3.3 检查配置是否与宿主机一致3.4 开启相关端口防火墙配置 四、代码实现4.1 相关pom依赖引入4.2 完整pom.xml4.3 application.yml 配置4.4 完整application.yml配置4.5 RabbitConstants 基础常量配置4.6 CanalMqConfigure MQ队列交换机配置4.7 CanalConsumer 消费者 五、运行与测试 一、环境配置
1.1 docker-compose.yml 配置
version: 3.8services:redis:container_name: redisimage: redis:6.2.7restart: alwaysnetworks:- app_netports:- 6379:6379volumes:- /usr/local/docker/redis/data:/data- /usr/local/docker/redis/config/redis.conf:/usr/local/redis/config/redis.conf- /usr/local/docker/redis/logs:/logscommand: [ redis-server,/usr/local/redis/config/redis.conf ]mysql:container_name: mysqlimage: mysql:8.0.30restart: alwaysnetworks:- app_netports:- 3306:3306volumes:- /usr/local/docker/mysql/data:/var/lib/mysql- /usr/local/docker/mysql/config:/etc/mysql/conf.denvironment:MYSQL_ROOT_PASSWORD: rootTZ: Asia/Shanghaicommand:--default-authentication-pluginmysql_native_password--character-set-serverutf8mb4--collation-serverutf8mb4_general_ci--explicit_defaults_for_timestamptrue--lower_case_table_names1--log-bin/var/lib/mysql/mysql-bin--server-id1--binlog-formatROW--expire_logs_days7--max_binlog_size500Mcanal:image: canal/canal-server:v1.1.5container_name: canalrestart: alwaysports:- 11110:11110- 11111:11111- 11112:11112volumes:- /usr/local/docker/canal/conf/canal.properties:/home/admin/canal-server/conf/canal.properties- /usr/local/docker/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties- /usr/local/docker/canal/logs:/home/admin/canal-server/logsnetworks:- app_netdepends_on:- mysql- rabbitmqrabbitmq:image: rabbitmq:3-managementcontainer_name: rabbitmqrestart: alwaysports:- 5672:5672- 15672:15672volumes:- /usr/local/docker/rabbitmq/data/:/var/lib/rabbitmq/- /usr/local/docker/rabbitmq/log/:/var/log/rabbitmq/environment:- RABBITMQ_DEFAULT_USERguest- RABBITMQ_DEFAULT_PASSguestnetworks:- app_netnetworks:app_net:driver: bridge1.2 docker-compose 常用命令
# 后台启动容器编排文件
docker-compose up -d [service]# 停止up命令所启动的容器并移除网络
docker-compose down# 进入指定容器
docker-compose exec [service]# 列出项目中所有的容器
docker-compose ps [service]# 重启项目中容器
docker-compose restart [service]# 删除项目中所有容器
docker-compose rm -f [service]# 启动项目中容器或指定容器
docker-compose start [service]# 暂停项目中容器或指定容器
docker-compose stop [service]1.3 镜像服务启动状态
[rootlavm-13jmyj9ugf docker]# docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
0d4260bc557b canal/canal-server:v1.1.5 /alidata/bin/main.s… 38 minutes ago Up 38 minutes 9100/tcp, 0.0.0.0:11110-11112-11110-11112/tcp, :::11110-11112-11110-11112/tcp canal
c66b3f1f13a9 mysql:8.0.30 docker-entrypoint.s… 38 minutes ago Up 38 minutes 0.0.0.0:3306-3306/tcp, :::3306-3306/tcp, 33060/tcp mysql
645e27bd4001 rabbitmq:3-management docker-entrypoint.s… 5 hours ago Up 49 minutes 4369/tcp, 5671/tcp, 0.0.0.0:5672-5672/tcp, :::5672-5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672-15672/tcp, :::15672-15672/tcp rabbitmq
f55d42cbbd8e redis:6.2.7 docker-entrypoint.s… 3 days ago Up 49 minutes 0.0.0.0:6379-6379/tcp, :::6379-6379/tcp redis二、MySQL binlog 配置
2.1 docker-compose command 配置 binlog
--log-bin/var/lib/mysql/mysql-bin
--server-id1
--binlog-formatROW
--expire_logs_days7
--max_binlog_size500M2.2 创建canal用户以及查看是否开启binlog
mysql CREATE USER canal IDENTIFIED BY canal;
Query OK, 0 rows affected (0.05 sec)mysql GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal%;
Query OK, 0 rows affected (0.05 sec)mysql FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.05 sec)mysql select * from mysql.user where User canal;
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| Host | User | Select_priv | Insert_priv | Update_priv | Delete_priv | Create_priv | Drop_priv | Reload_priv | Shutdown_priv | Process_priv | File_priv | Grant_priv | References_priv | Index_priv | Alter_priv | Show_db_priv | Super_priv | Create_tmp_table_priv | Lock_tables_priv | Execute_priv | Repl_slave_priv | Repl_client_priv | Create_view_priv | Show_view_priv | Create_routine_priv | Alter_routine_priv | Create_user_priv | Event_priv | Trigger_priv | Create_tablespace_priv | ssl_type | ssl_cipher | x509_issuer | x509_subject | max_questions | max_updates | max_connections | max_user_connections | plugin | authentication_string | password_expired | password_last_changed | password_lifetime | account_locked | Create_role_priv | Drop_role_priv | Password_reuse_history | Password_reuse_time | Password_require_current | User_attributes |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| % | canal | Y | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | Y | Y | N | N | N | N | N | N | N | N | | | | | 0 | 0 | 0 | 0 | mysql_native_password | *E3619321C1A937C46A0D8BD1DAC39F93B27D4458 | N | 2025-03-10 11:53:49 | NULL | N | N | N | NULL | NULL | NULL | NULL |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1 row in set (0.08 sec)mysql show variables like log_bin;
----------------------
| Variable_name | Value |
----------------------
| log_bin | ON |
----------------------
1 row in set (0.30 sec)三、canal 相关配置文件
canal.properties 主要核心配置
canal.serverModerabbitMQ选择 RabbitMQ 作为通知服务模型。 rabbitmq.hostrabbitmq基于 Docker 同一网络下可以使用容器名称代替 host。 rabbitmq.queue、rabbitmq.routingKey、rabbitmq.exchange RabbitMQ 的三件套用于后续创建具体通道监听。
#################################################
######### common argument #############
#################################################
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ 支持的服务模型tcp直连或mq此处我选择RabbitMQ
canal.serverModerabbitMQ##################################################
######### RabbitMQ #############
##################################################
rabbitmq.hostrabbitmq
rabbitmq.virtual.host/
rabbitmq.exchangecanal-exchange
rabbitmq.usernameguest
rabbitmq.passwordguest
rabbitmq.queuecanal-queue
rabbitmq.routingKeycanal-routing-key
rabbitmq.deliveryModeinstance.properties 主要核心配置 canal.instance.master.address 数据库地址 canal.instance.dbUsername 数据库用户名 canal.instance.dbPassword 数据库密码 canal.mq.topic RabbitMQ路由
# 数据地址此处mysql是因为canal和mysql是同一network下可以使用容器名称代替具体ip
canal.instance.master.addressmysql:3306# username/password
canal.instance.dbUsernameroot
canal.instance.dbPasswordroot# mq config
canal.mq.topiccanal-routing-key3.1 canal.properties 完整文件
#################################################
######### common argument #############
#################################################
canal.ip
canal.register.ip
canal.port11111
canal.metrics.pull.port11112
canal.admin.port11110
canal.admin.useradmin
canal.admin.passwd
canal.zkServers
canal.zookeeper.flush.period1000
canal.withoutNettyfalse
canal.serverModerabbitMQ
canal.file.data.dir${canal.conf.dir}
canal.file.flush.period1000
canal.instance.memory.buffer.size16384
canal.instance.memory.buffer.memunit1024
canal.instance.memory.batch.modeMEMSIZE
canal.instance.memory.rawEntrytrue
canal.instance.detecting.enablefalse
canal.instance.detecting.sqlselect 1
canal.instance.detecting.interval.time3
canal.instance.detecting.retry.threshold3
canal.instance.detecting.heartbeatHaEnablefalse
canal.instance.transaction.size1024
canal.instance.fallbackIntervalInSeconds60
canal.instance.network.receiveBufferSize16384
canal.instance.network.sendBufferSize16384
canal.instance.network.soTimeout30
canal.instance.filter.druid.ddltrue
canal.instance.filter.query.dclfalse
canal.instance.filter.query.dmlfalse
canal.instance.filter.query.ddlfalse
canal.instance.filter.table.errorfalse
canal.instance.filter.rowsfalse
canal.instance.filter.transaction.entryfalse
canal.instance.filter.dml.insertfalse
canal.instance.filter.dml.updatefalse
canal.instance.filter.dml.deletefalse
canal.instance.binlog.formatROW,STATEMENT,MIXED
canal.instance.binlog.imageFULL,MINIMAL,NOBLOB
canal.instance.get.ddl.isolationfalse
canal.instance.parser.paralleltrue
canal.instance.parser.parallelThreadSize 16
canal.instance.parser.parallelBufferSize256
canal.instance.tsdb.enabletrue
canal.instance.tsdb.dir${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.urljdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE1000;MODEMYSQL;
canal.instance.tsdb.dbUsernamecanal
canal.instance.tsdb.dbPasswordcanal
canal.instance.tsdb.snapshot.interval24
canal.instance.tsdb.snapshot.expire360
#################################################
######### destinations #############
#################################################
canal.destinationsexample
canal.conf.dir../conf
canal.auto.scantrue
canal.auto.scan.interval5
canal.auto.reset.latest.pos.modefalse
canal.instance.tsdb.spring.xmlclasspath:spring/tsdb/h2-tsdb.xml
canal.instance.global.modespring
canal.instance.global.lazyfalse
canal.instance.global.manager.address${canal.admin.manager}
canal.instance.global.spring.xmlclasspath:spring/file-instance.xml
##################################################
######### MQ Properties #############
##################################################
canal.aliyun.accessKey
canal.aliyun.secretKey
canal.aliyun.uid
canal.mq.flatMessagetrue
canal.mq.canalBatchSize50
canal.mq.canalGetTimeout100
canal.mq.accessChannellocal
canal.mq.database.hashtrue
canal.mq.send.thread.size30
canal.mq.build.thread.size8
##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers127.0.0.1:9092
kafka.acksall
kafka.compression.typenone
kafka.batch.size16384
kafka.linger.ms1
kafka.max.request.size1048576
kafka.buffer.memory33554432
kafka.max.in.flight.requests.per.connection1
kafka.retries0
kafka.kerberos.enablefalse
kafka.kerberos.krb5.file../conf/kerberos/krb5.conf
kafka.kerberos.jaas.file../conf/kerberos/jaas.conf
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.grouptest
rocketmq.enable.message.tracefalse
rocketmq.customized.trace.topic
rocketmq.namespace
rocketmq.namesrv.addr127.0.0.1:9876
rocketmq.retry.times.when.send.failed0
rocketmq.vip.channel.enabledfalse
rocketmq.tag
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.hostrabbitmq
rabbitmq.virtual.host/
rabbitmq.exchangecanal-exchange
rabbitmq.usernameguest
rabbitmq.passwordguest
rabbitmq.queuecanal-queue
rabbitmq.routingKeycanal-routing-key
rabbitmq.deliveryMode
##################################################
######### Pulsar #############
##################################################
pulsarmq.serverUrl
pulsarmq.roleToken
pulsarmq.topicTenantPrefix3.2 instance.properties 完整文件
#################################################
## mysql serverId , v1.0.26 will autoGen
# canal.instance.mysql.slaveId0# enable gtid use true/false
canal.instance.gtidonfalse# rds oss binlog
canal.instance.rds.accesskey
canal.instance.rds.secretkey
canal.instance.rds.instanceId# position info
canal.instance.master.addressmysql:3306
canal.instance.master.journal.name
canal.instance.master.position
canal.instance.master.timestamp
canal.instance.master.gtid# multi stream for polardbx
canal.instance.multi.stream.onfalse# ssl
#canal.instance.master.sslModeDISABLED
#canal.instance.master.tlsVersions
#canal.instance.master.trustCertificateKeyStoreType
#canal.instance.master.trustCertificateKeyStoreUrl
#canal.instance.master.trustCertificateKeyStorePassword
#canal.instance.master.clientCertificateKeyStoreType
#canal.instance.master.clientCertificateKeyStoreUrl
#canal.instance.master.clientCertificateKeyStorePassword# table meta tsdb info
canal.instance.tsdb.enabletrue
#canal.instance.tsdb.urljdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsernamecanal
#canal.instance.tsdb.dbPasswordcanal#canal.instance.standby.address
#canal.instance.standby.journal.name
#canal.instance.standby.position
#canal.instance.standby.timestamp
#canal.instance.standby.gtid# username/password
canal.instance.dbUsernameroot
canal.instance.dbPasswordadmin123!#
canal.instance.connectionCharset UTF-8
# enable druid Decrypt database password
canal.instance.enableDruidfalse
#canal.instance.pwdPublicKeyMFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ# table regex
canal.instance.filter.regex.*\\..*
# table black regex
canal.instance.filter.black.regexmysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.fieldtest1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.fieldtest1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config
canal.mq.topiccanal-routing-key
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopicmytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition0
# hash partition config
#canal.mq.enableDynamicQueuePartitionfalse
#canal.mq.partitionsNum3
#canal.mq.dynamicTopicPartitionNumtest.*:4,mycanal:6
#canal.mq.partitionHashtest.table:id^name,.*\\..*
#################################################3.3 检查配置是否与宿主机一致
进入容器内部docker exec -it canal bash
检查配置文件内容是否与宿主机一致
cat /home/admin/canal-server/conf/canal.propertiescat /home/admin/canal-server/conf/example/instance.properties
3.4 开启相关端口防火墙配置
canal11110、11111、11112mysql3306redis6379RabbitMQ 15672、5672
四、代码实现
4.1 相关pom依赖引入
parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.7.15/versionrelativePath/ !-- lookup parent from repository --
/parent!-- Spring Boot MQ 依赖 --
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency!-- canal --
dependencygroupIdcom.alibaba.otter/groupIdartifactIdcanal.client/artifactIdversion1.1.0/version
/dependency4.2 完整pom.xml
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.neo/groupIdartifactIdcode-repository/artifactIdversion1.0-SNAPSHOT/versionpackagingjar/packagingparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.7.15/versionrelativePath/ !-- lookup parent from repository --/parentnamecode-repository/namepropertiesjava.version17/java.versionproject.build.sourceEncodingUTF-8/project.build.sourceEncodinghutool.version5.8.20/hutool.versionmysql.version8.0.30/mysql.versionmybatis-plus.version3.5.3.1/mybatis-plus.versionredis.version3.1.0/redis.versiondruid.version1.2.16/druid.versionfastjson.version1.2.83/fastjson.versionsa-token.version1.37.0/sa-token.version/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-mail/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-aop/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.apache.commons/groupIdartifactIdcommons-pool2/artifactId/dependencydependencygroupIdcom.baomidou/groupIdartifactIdmybatis-plus-boot-starter/artifactIdversion${mybatis-plus.version}/version/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion${mysql.version}/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIddruid-spring-boot-starter/artifactIdversion${druid.version}/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion${fastjson.version}/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdcn.hutool/groupIdartifactIdhutool-all/artifactIdversion${hutool.version}/version/dependencydependencygroupIdcn.dev33/groupIdartifactIdsa-token-spring-boot-starter/artifactIdversion${sa-token.version}/version/dependencydependencygroupIdcn.dev33/groupIdartifactIdsa-token-redis-jackson/artifactIdversion${sa-token.version}/version/dependencydependencygroupIdcom.alibaba.otter/groupIdartifactIdcanal.client/artifactIdversion1.1.0/version/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build
/project4.3 application.yml 配置
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /publisher-confirm-type: correlatedpublisher-returns: true4.4 完整application.yml配置
server:port: 8088servlet:context-path: /apispring:datasource:type: com.alibaba.druid.pool.DruidDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/db_v1?useUnicodetruecharacterEncodingutf-8useSSLfalseserverTimezoneAsia/Shanghaiusername: rootpassword: rootdruid:initial-size: 5min-idle: 5max-active: 20max-wait: 60000validation-query: SELECT 1test-while-idle: truestat-view-servlet:enabled: trueurl-pattern: /druid/*login-username: adminlogin-password: admin123web-stat-filter:enabled: trueurl-pattern: /*exclusions: *.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*filter:stat:enabled: truelog-slow-sql: trueslow-sql-millis: 1000wall:enabled: trueconfig:drop-table-allow: falseredis:host: localhostport: 6379password: 123456database: 0timeout: 5000lettuce:pool:max-active: 8max-wait: -1max-idle: 8min-idle: 0mail:host: smtp.aliyun.comusername: password: port: 25properties:mail:smtp:auth: truestarttls:enable: truerequired: truerabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /publisher-confirm-type: correlatedpublisher-returns: truemybatis-plus:mapper-locations: classpath*:mapper/*_Mapper.xmlglobal-config:db-config:logic-delete-field: delFlaglogic-delete-value: 1logic-not-delete-value: 0configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpl4.5 RabbitConstants 基础常量配置
public interface RabbitConstants {interface Canal {String QUEUE canal-queue;String EXCHANGE canal-exchange;String ROUTING canal-routing-key;}interface EventType {String INSERT INSERT;String UPDATE UPDATE;String DELETE DELETE;}}
4.6 CanalMqConfigure MQ队列交换机配置
Configuration
public class CanalMqConfigure {Beanpublic Queue queue() {return new Queue(RabbitConstants.Canal.QUEUE, true);}Beanpublic DirectExchange directExchange() {return new DirectExchange(RabbitConstants.Canal.EXCHANGE, true, false);}Beanpublic Binding bindingCanal() {return BindingBuilder.bind(queue()).to(directExchange()).with(RabbitConstants.Canal.ROUTING);}
}4.7 CanalConsumer 消费者
package com.neo.core.canal;import com.neo.core.constant.RabbitConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.Map;Slf4j
Component
RabbitListener(queues RabbitConstants.Canal.QUEUE)
public class CanalConsumer {RabbitHandlerpublic void execute(MapString, Object msg) {log.info(canal消息监听事件触发,消息内容{}, msg);boolean isDdl (boolean) msg.get(isDdl);if (isDdl) {return;}String database (String) msg.get(database);String table (String) msg.get(table);String type (String) msg.get(type);List? data (List?) msg.get(data);log.info(database:{}.table:{}, database, table);if (RabbitConstants.EventType.INSERT.equalsIgnoreCase(type)) {System.out.println(INSERT);} else if (RabbitConstants.EventType.UPDATE.equalsIgnoreCase(type)) {System.out.println(UPDATE);} else if (RabbitConstants.EventType.DELETE.equalsIgnoreCase(type)) {System.out.println(DELETE);} else {// 其他事件}}
}五、运行与测试
当MySQL数据出现变动后会触发canal-queue的监听事件后续可根据具体业务逻辑实现业务处理。
25-03-10.16:51:28.431 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - canal消息监听事件触发,消息内容{data[{id2, accounttest, password9c79685ab5ca920a187d94688d6f1845, salt7c2b6b0caf0b4174816e2a4bf5f05cba, emailtestqq.com, nick_nametest1, enabled1, create_by0, create_time2025-03-08 22:22:00, update_by0, update_time2025-03-08 22:22:01, del_flag0}], databasedb_v1, es1741596660000, id5, isDdlfalse, mysqlType{idbigint, accountvarchar(128), passwordvarchar(128), saltvarchar(128), emailvarchar(64), nick_namevarchar(32), enabledtinyint(1), create_bybigint, create_timedatetime, update_bybigint, update_timedatetime, del_flagtinyint(1)}, oldnull, pkNames[id], sql, sqlType{id-5, account12, password12, salt12, email12, nick_name12, enabled-6, create_by-5, create_time93, update_by-5, update_time93, del_flag-6}, tablesys_user, ts1741596660932, typeDELETE}
25-03-10.16:51:28.431 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - database:db_v1.table:sys_user
DELETE
25-03-10.16:51:31.523 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - canal消息监听事件触发,消息内容{data[{id2, accounttest, password9c79685ab5ca920a187d94688d6f1845, salt7c2b6b0caf0b4174816e2a4bf5f05cba, emailtestqq.com, nick_nametest, enabled1, create_by0, create_time2025-03-08 22:22:00, update_by0, update_time2025-03-08 22:22:01, del_flag0}], databasedb_v1, es1741596663000, id6, isDdlfalse, mysqlType{idbigint, accountvarchar(128), passwordvarchar(128), saltvarchar(128), emailvarchar(64), nick_namevarchar(32), enabledtinyint(1), create_bybigint, create_timedatetime, update_bybigint, update_timedatetime, del_flagtinyint(1)}, oldnull, pkNames[id], sql, sqlType{id-5, account12, password12, salt12, email12, nick_name12, enabled-6, create_by-5, create_time93, update_by-5, update_time93, del_flag-6}, tablesys_user, ts1741596664038, typeINSERT}
25-03-10.16:51:31.523 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - database:db_v1.table:sys_user
INSERT
25-03-10.16:51:36.030 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - canal消息监听事件触发,消息内容{data[{id2, accounttest1, password9c79685ab5ca920a187d94688d6f1845, salt7c2b6b0caf0b4174816e2a4bf5f05cba, emailtestqq.com, nick_nametest, enabled1, create_by0, create_time2025-03-08 22:22:00, update_by0, update_time2025-03-08 22:22:01, del_flag0}], databasedb_v1, es1741596668000, id7, isDdlfalse, mysqlType{idbigint, accountvarchar(128), passwordvarchar(128), saltvarchar(128), emailvarchar(64), nick_namevarchar(32), enabledtinyint(1), create_bybigint, create_timedatetime, update_bybigint, update_timedatetime, del_flagtinyint(1)}, old[{accounttest}], pkNames[id], sql, sqlType{id-5, account12, password12, salt12, email12, nick_name12, enabled-6, create_by-5, create_time93, update_by-5, update_time93, del_flag-6}, tablesys_user, ts1741596668545, typeUPDATE}
25-03-10.16:51:36.030 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - database:db_v1.table:sys_user
UPDATE以上是RabbitMQCanal数据一致性的完整解决方案包括环境配置、代码实现以及运行测试等环节确保了数据在不同系统间的一致性和可靠性。