兴宁电子商务网站建设,做外链那些网站比较好,做网站是不是很麻烦,网页的网站建设在哪里定义
canal组件是一个基于mysql数据库增量日志解析#xff0c;提供增量数据订阅和消费#xff0c;支持将增量数据投递到下游消费者#xff08;kafka#xff0c;rocketmq等#xff09;或者存储#xff08;elasticearch,hbase等#xff09;canal感知到mysql数据变动…定义
canal组件是一个基于mysql数据库增量日志解析提供增量数据订阅和消费支持将增量数据投递到下游消费者kafkarocketmq等或者存储elasticearch,hbase等canal感知到mysql数据变动然后解析变动数据将变动数据发送到mq或者同步到其他数据库等待进一步业务逻辑处理
canal的工作原理
mysql master将数据变更写入到二进制日志binary long简称binlogmysql slave将master的bin log拷贝到它的中继日志relay logmysql slave重放relay log操作将变更数据同步到最新 mysql binlog日志
介绍
mysql的binlog可以说mysql的最重要的日志它记录了所有的DDL(表的创建)和DML(数据发生变化)语句以事件形式记录mysql默认情况下是不开启binlog因为记录binlog日志需要消耗时间官方给出的数据是有1%的性能损耗 一般来说在下面两场场景下会开启binlog日志 mysql主从集群部署时需要将在master端开启binlog方便将数据同步到slaves中数据恢复了通过使用mysql binlog工具来使数据恢复
binlog的分类
分类介绍优点缺点STATEMENT语句级别记录每一次执行写操作的语句相当于row模式节省了空间但是可能产生的数据不一致有疑执行时间不同导致产生的数据不同节省空间可能造成数据不一致ROW行级记录每次操作后每行记录的变化假如一个update的sql执行结果是1万行statement只存一条如果是row的话会把这1万行的结果存着数据绝对一致性因为不管sql是什么引用了什么函数他只记录执行后的效果占用较大空间MIXED是对STATEMENT的升级如当函数中包含UUID()时包含AUTO_INCREMENT字段的表被更新时执行INSERT DELAYED语句时用UDF时会按照ROW的方式进行处理节省空间同时兼顾了一定的一致性还有些极个别情况依旧会造成不一致另外STATEMENT和mixed对于需要对binlog的监控的情况都不方便
canal工作原理
canal将自己伪装为mysql slave,向mysql master发送bump协议mysql master 收到dump请求开始推送binary log给slavecanalcanal接收并解析binlog日志得到变更的数据执行后续逻辑 canal应用场景
数据同步
canal可以帮助用户进行多种数据同步操作如实时同步mysql数据到elasticsearchredis等数据存储介质中
数据库实时监控
canal可以实时监控mysql的更新操作对于敏感数据的修改可以及时通知相关人员
数据分析和挖掘
canal可以将mysql增量数据投递到kafka等消息队列中为数据分析和挖掘提供数据来源
数据库备份
canal可以将mysql主库上的数据增量日志复制到备库上实现数据库的备份
数据集成
canal可以将多个mysql数据库中的数据进行集成为数据处理提供更加高效可靠的解决方案
数据库迁移
canal可以协助完成mysql数据库的辨别升级及数据迁移任务
canal中mysql配置
vim /etc/mysql/my.cnf
[mysqld]
# 设置主服务器唯一ID
server-id1
# 启用二进制日志
log-binmysql-bin
# 设置不要复制的数据库(可以设置多个)
binlog-ignore-dbmysql
# 设置需要监听的数据库,不配置表示所有数据库均开启
#binlog-do-dbcanal-demo
# 设置logbin格式
binlog_formatrow
bind-address 0.0.0.0canal下载配置文件
官网下载
tar -zvxf canal.deployer-1.1.7.tar.gz
cd canal
cd conf
#################################################
######### common argument #############
#################################################
# tcp bind ip
canal.ip
# register ip to zookeeper
canal.register.ip 192.168.7.129 # canal的ip
canal.port 11111
canal.metrics.pull.port 11112
# canal instance user/passwd
# canal.user canal
# canal.passwd E3619321C1A937C46A0D8BD1DAC39F93B27D4458# canal admin config
#canal.admin.manager 127.0.0.1:8089
canal.admin.port 11110
canal.admin.user admin
canal.admin.passwd 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto true
#canal.admin.register.cluster
#canal.admin.register.name canal.zkServers 192.168.5.6:2181 # kafka的端口的和ip
# flush data to zk
canal.zookeeper.flush.period 1000
canal.withoutNetty false
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode kafka # 选择同步方式为kafka
# flush meta cursor/parse position to file
canal.file.data.dir ${canal.conf.dir}
canal.file.flush.period 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode MEMSIZE
canal.instance.memory.rawEntry true## detecing config
canal.instance.detecting.enable false
#canal.instance.detecting.sql insert into retl.xdual values(1,now()) on duplicate key update xnow()
canal.instance.detecting.sql select 1
canal.instance.detecting.interval.time 3
canal.instance.detecting.retry.threshold 3
canal.instance.detecting.heartbeatHaEnable false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds 60# network config
canal.instance.network.receiveBufferSize 16384
canal.instance.network.sendBufferSize 16384
canal.instance.network.soTimeout 30# binlog filter config
canal.instance.filter.druid.ddl true
canal.instance.filter.query.dcl false
canal.instance.filter.query.dml false
canal.instance.filter.query.ddl false
canal.instance.filter.table.error false
canal.instance.filter.rows false
canal.instance.filter.transaction.entry false
canal.instance.filter.dml.insert false
canal.instance.filter.dml.update false
canal.instance.filter.dml.delete false# binlog format/image check
canal.instance.binlog.format ROW,STATEMENT,MIXED
canal.instance.binlog.image FULL,MINIMAL,NOBLOB# binlog ddl isolation
canal.instance.get.ddl.isolation false# parallel parser config
canal.instance.parser.parallel true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize 256# table meta tsdb info
canal.instance.tsdb.enable true
canal.instance.tsdb.dir ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE1000;MODEMYSQL;
canal.instance.tsdb.dbUsername canal
canal.instance.tsdb.dbPassword canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire 360#################################################
######### destinations #############
#################################################
canal.destinations example,script # 需要进行同步的任务在conf/example文件下和script下
# conf root dir
canal.conf.dir ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan true
canal.auto.scan.interval 5
# set this value to true means that when binlog pos not found, skip to latest.
# WARN: pls keep false in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode falsecanal.instance.tsdb.spring.xml classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml classpath:spring/tsdb/mysql-tsdb.xmlcanal.instance.global.mode spring
canal.instance.global.lazy false
canal.instance.global.manager.address ${canal.admin.manager}
#canal.instance.global.spring.xml classpath:spring/memory-instance.xml
canal.instance.global.spring.xml classpath:spring/file-instance.xml
#canal.instance.global.spring.xml classpath:spring/default-instance.xml##################################################
######### MQ Properties #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey
canal.aliyun.secretKey
canal.aliyun.uidcanal.mq.flatMessage true
canal.mq.canalBatchSize 50
canal.mq.canalGetTimeout 100
# Set this value to cloud, if you want open message trace feature in aliyun.
canal.mq.accessChannel localcanal.mq.database.hash true
canal.mq.send.thread.size 30
canal.mq.build.thread.size 8##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers 192.168.5.6:9092 # kafka的IP地址和端口
kafka.acks all
kafka.compression.type none
kafka.batch.size 16384
kafka.linger.ms 1
kafka.max.request.size 1048576
kafka.buffer.memory 33554432
kafka.max.in.flight.requests.per.connection 1
kafka.retries 0kafka.kerberos.enable false
kafka.kerberos.krb5.file ../conf/kerberos/krb5.conf
kafka.kerberos.jaas.file ../conf/kerberos/jaas.conf# sasl demo
# kafka.sasl.jaas.config org.apache.kafka.common.security.scram.ScramLoginModule required \\n username\alice\ \\npasswordalice-secret\;
# kafka.sasl.mechanism SCRAM-SHA-512
# kafka.security.protocol SASL_PLAINTEXT##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group test
rocketmq.enable.message.trace false
rocketmq.customized.trace.topic
rocketmq.namespace
rocketmq.namesrv.addr 127.0.0.1:9876
rocketmq.retry.times.when.send.failed 0
rocketmq.vip.channel.enabled false
rocketmq.tag ##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host
rabbitmq.virtual.host
rabbitmq.exchange
rabbitmq.username
rabbitmq.password
rabbitmq.deliveryMode ##################################################
######### Pulsar #############
##################################################
pulsarmq.serverUrl
pulsarmq.roleToken
pulsarmq.topicTenantPrefix
配置任务
vim canal/conf/example
vim instance.properties
#################################################
## mysql serverId , v1.0.26 will autoGen
# canal.instance.mysql.slaveId0# enable gtid use true/false
canal.instance.gtidonfalse# position info
canal.instance.master.address192.168.7.129:3306 # 数据库的ip和端口
canal.instance.master.journal.name
canal.instance.master.position
canal.instance.master.timestamp
canal.instance.master.gtid# rds oss binlog
canal.instance.rds.accesskey
canal.instance.rds.secretkey
canal.instance.rds.instanceId# table meta tsdb info
canal.instance.tsdb.enabletrue
#canal.instance.tsdb.urljdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsernamecanal
#canal.instance.tsdb.dbPasswordcanal#canal.instance.standby.address
#canal.instance.standby.journal.name
#canal.instance.standby.position
#canal.instance.standby.timestamp
#canal.instance.standby.gtid# username/password
canal.instance.dbUsernamecanal # 数据库的用户名
canal.instance.dbPassworddocker211102 # 数据库的密码
canal.instance.connectionCharset UTF-8
# enable druid Decrypt database password
canal.instance.enableDruidfalse
#canal.instance.pwdPublicKeyMFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ# table regex
canal.instance.filter.regexscript\\..* #监控script这个数据库下面的所有的表
# table black regex
canal.instance.filter.black.regexmysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.fieldtest1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.fieldtest1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config
canal.mq.topicexample # 将数据发送到example这个主题里面
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopicmytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition0
# hash partition config
#canal.mq.enableDynamicQueuePartitionfalse
#canal.mq.partitionsNum3
#canal.mq.dynamicTopicPartitionNumtest.*:4,mycanal:6
#canal.mq.partitionHashtest.table:id^name,.*\\..*
#
# multi stream for polardbx
canal.instance.multi.stream.onfalse
#################################################
vim canal/conf/script
vim instance.properties
#################################################
## mysql serverId , v1.0.26 will autoGen
# canal.instance.mysql.slaveId0# enable gtid use true/false
canal.instance.gtidonfalse# position info
canal.instance.master.address192.168.7.129:3306 # 数据库的ip和端口
canal.instance.master.journal.name
canal.instance.master.position
canal.instance.master.timestamp
canal.instance.master.gtid# rds oss binlog
canal.instance.rds.accesskey
canal.instance.rds.secretkey
canal.instance.rds.instanceId# table meta tsdb info
canal.instance.tsdb.enabletrue
#canal.instance.tsdb.urljdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsernamecanal
#canal.instance.tsdb.dbPasswordcanal#canal.instance.standby.address
#canal.instance.standby.journal.name
#canal.instance.standby.position
#canal.instance.standby.timestamp
#canal.instance.standby.gtid# username/password
canal.instance.dbUsernamecanal # 数据库的用户名
canal.instance.dbPassworddocker211102 # 数据库的密码
canal.instance.connectionCharset UTF-8
# enable druid Decrypt database password
canal.instance.enableDruidfalse
#canal.instance.pwdPublicKeyMFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ# table regex
canal.instance.filter.regexcanal-demo\\..* # 监控cancl-demo这个库下面的所有表
# table black regex
canal.instance.filter.black.regexmysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.fieldtest1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.fieldtest1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config
canal.mq.topicscript # 将数据发送到script这个主题里面
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopicmytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition0
# hash partition config
#canal.mq.enableDynamicQueuePartitionfalse
#canal.mq.partitionsNum3
#canal.mq.dynamicTopicPartitionNumtest.*:4,mycanal:6
#canal.mq.partitionHashtest.table:id^name,.*\\..*
#
# multi stream for polardbx
canal.instance.multi.stream.onfalse
#################################################