淄博网站建设找李光明,建筑设计说明模板100字,深圳福田区,今天重要新闻1.Maxwell简介 Maxwell 是由美国Zendesk公司开源#xff0c;用Java编写的MySQL变更数据抓取软件。它会实时监控Mysql数据库的数据变更操作#xff08;包括insert、update、delete#xff09;#xff0c;并将变更数据以 JSON 格式发送给 Kafka、Kinesi等流数据处理平台。 官…1.Maxwell简介 Maxwell 是由美国Zendesk公司开源用Java编写的MySQL变更数据抓取软件。它会实时监控Mysql数据库的数据变更操作包括insert、update、delete并将变更数据以 JSON 格式发送给 Kafka、Kinesi等流数据处理平台。 官网地址http://maxwells-daemon.io/ Maxwell输出数据格式 注Maxwell输出的json字段说明
字段解释database变更数据所属的数据库table表更数据所属的表type数据变更类型ts数据变更发生的时间xid事务idcommit事务提交标志可用于重新组装事务data对于insert类型表示插入的数据对于update类型标识修改之后的数据对于delete类型表示删除的数据old对于update类型表示修改之前的数据只包含变更字段
2.Maxwell原理
Maxwell的工作原理是实时读取MySQL数据库的二进制日志Binlog从中获取变更数据再将变更数据以JSON格式发送至Kafka等流处理平台。
2.1 MySQL二进制日志
二进制日志Binlog是MySQL服务端非常重要的一种日志它会保存MySQL数据库的所有数据变更记录。Binlog的主要作用包括主从复制和数据恢复。Maxwell的工作原理和主从复制密切相关。
2.2 MySQL主从复制
MySQL的主从复制就是用来建立一个和主数据库完全一样的数据库环境这个数据库称为从数据库。
主从复制的应用场景如下 做数据库的热备主数据库服务器故障后可切换到从数据库继续工作。读写分离主数据库只负责业务数据的写入操作而多个从数据库只负责业务数据的查询工作在读多写少场景下可以提高数据库工作效率。 主从复制的工作原理如下 Master主库将数据变更记录写到二进制日志(binary log)中Slave从库向mysql master发送dump协议将master主库的binary log events拷贝到它的中继日志(relay log)Slave从库读取并回放中继日志中的事件将改变的数据同步到自己的数据库。 2.3 原理总结
很简单就是将自己伪装成slave并遵循MySQL主从复制的协议从master同步数据。
3.Maxwell部署
3.1 安装Maxwell 下载安装包 地址https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz 注Maxwell-1.30.0及以上版本不再支持JDK1.8 将安装包上传到hadoop102节点的/opt/software目录 注此处使用教学版安装包教学版对原版进行了改造增加了自定义Maxwell输出数据中ts时间戳的参数生产环境请使用原版。 将安装包解压至/opt/module*
[roothadoop102 maxwell]$ tar -zxvf maxwell-1.29.2.tar.gz -C /opt/module/修改名称
[roothadoop102 module]$ mv maxwell-1.29.2/ maxwell3.2 配置MySQL
3.2.1 启用MySQL Binlog
MySQL服务器的Binlog默认是未开启的如需进行同步需要先进行开启。
修改MySQL配置文件/etc/my.cnf增加如下配置
[mysqld]#数据库id
server-id 1
#启动binlog该参数的值会作为binlog的文件名
log-binmysql-bin
#binlog类型maxwell要求为row类型
binlog_formatrow
#启用binlog的数据库需根据实际情况作出修改
binlog-do-dbgmall注MySQL Binlog模式 Statement-based基于语句Binlog会记录所有写操作的SQL语句包括insert、update、delete等。 优点 节省空间 缺点 有可能造成数据不一致例如insert语句中包含now()函数。 Row-based基于行Binlog会记录每次写操作后被操作行记录的变化。 优点保持数据的绝对一致性。 缺点占用较大空间。 mixed混合模式默认是Statement-based如果SQL语句可能导致数据不一致就自动切换到Row-based。
Maxwell要求Binlog采用Row-based模式。
重启MySQL服务
[roothadoop102 ~]$ sudo systemctl restart mysqld3.2.2 创建Maxwell所需数据库和用户
Maxwell需要在MySQL中存储其运行过程中的所需的一些数据包括binlog同步的断点位置Maxwell支持断点续传等等故需要在MySQL为Maxwell创建数据库及用户。
创建数据库
msyql CREATE DATABASE maxwell;调整MySQL数据库密码级别
mysql set global validate_password_policy0;
mysql set global validate_password_length4;创建Maxwell用户并赋予其必要权限
mysql CREATE USER maxwell% IDENTIFIED BY maxwell;
mysql GRANT ALL ON maxwell.* TO maxwell%;
mysql GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO maxwell%;3.3 配置Maxwell
修改Maxwell配置文件名称
[roothadoop102 maxwell]$ cd /opt/module/maxwell
[roothadoop102 maxwell]$ cp config.properties.example config.properties修改Maxwell配置文件
[roothadoop102 maxwell]$ vim config.properties#Maxwell数据发送目的地可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producerkafka
#目标Kafka集群地址
kafka.bootstrap.servershadoop102:9092,hadoop103:9092
#目标Kafka topic可静态配置例如:maxwell也可动态配置例如%{database}_%{table}
kafka_topicmaxwell#MySQL相关配置
hosthadoop102
usermaxwell
passwordmaxwell
jdbc_optionsuseSSLfalseserverTimezoneAsia/Shanghai4.Maxwell使用
4.1 启动Kafka集群
若Maxwell发送数据的目的地为Kafka集群则需要先确保Kafka集群为启动状态。
4.2 Maxwell启停
启动Maxwell
[roothadoop102 ~]$ /opt/module/maxwell/bin/maxwell --config /opt/module/maxwell/config.properties --daemon停止Maxwell
[roothadoop102 ~]$ ps -ef | grep maxwell | grep -v grep | grep maxwell | awk {print $2} | xargs kill -9Maxwell启停脚本 创建并编辑Maxwell启停脚本 [roothadoop102 bin]$ vim mxw.sh脚本内容如下 #!/bin/bashMAXWELL_HOME/opt/module/maxwellstatus_maxwell(){resultps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -lreturn $result
}start_maxwell(){status_maxwellif [[ $? -lt 1 ]]; thenecho 启动Maxwell$MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemonelseecho Maxwell正在运行fi
}stop_maxwell(){status_maxwellif [[ $? -gt 0 ]]; thenecho 停止Maxwellps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk {print $2} | xargs kill -9elseecho Maxwell未在运行fi
}case $1 instart )start_maxwell;;stop )stop_maxwell;;restart )stop_maxwellstart_maxwell;;
esac4.3 增量数据同步
启动Kafka消费者
[roothadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic maxwell模拟生成数据
[roothadoop102 db_log]$ java -jar gmall2020-mock-db-2021-01-22.jar观察Kafka消费者
{database: gmall,table: comment_info,type: insert,ts: 1634023510,xid: 1653373,xoffset: 11998,data: {id: 1447825655672463369,user_id: 289,nick_name: null,head_img: null,sku_id: 11,spu_id: 3,order_id: 18440,appraise: 1204,comment_txt: 评论内容12897688728191593794966121429786132276125164551411,create_time: 2020-06-16 15:25:09,operate_time: null}
}
{database: gmall,table: comment_info,type: insert,ts: 1634023510,xid: 1653373,xoffset: 11999,data: {id: 1447825655672463370,user_id: 774,nick_name: null,head_img: null,sku_id: 25,spu_id: 8,order_id: 18441,appraise: 1204,comment_txt: 评论内容67552221621263422568447438734865327666683661982185,create_time: 2020-06-16 15:25:09,operate_time: null}
}4.4 历史数据全量同步
我们已经实现了使用Maxwell实时增量同步MySQL变更数据的功能。但有时只有增量数据是不够的我们可能需要使用到MySQL数据库中从历史至今的一个完整的数据集。这就需要我们在进行增量同步之前先进行一次历史数据的全量同步。这样就能保证得到一个完整的数据集。
把全部的数据输入到Kafka
4.4.1 Maxwell-bootstrap
Maxwell提供了bootstrap功能来进行历史数据的全量同步命令如下
指定数据库名指定表名指定配置文件一般上线的时候用一次就行了
[roothadoop102 maxwell]$ /opt/module/maxwell/bin/maxwell-bootstrap --database gmall --table user_info --config /opt/module/maxwell/config.properties4.4.2 boostrap数据格式
采用bootstrap方式同步的输出数据格式如下
{database: fooDB,table: barTable,type: bootstrap-start,ts: 1450557744,data: {}
}
{database: fooDB,table: barTable,type: bootstrap-insert,ts: 1450557744,data: {txt: hello}
}
{database: fooDB,table: barTable,type: bootstrap-insert,ts: 1450557744,data: {txt: bootstrap!}
}
{database: fooDB,table: barTable,type: bootstrap-complete,ts: 1450557744,data: {}
}注意事项
第一条type为bootstrap-start和最后一条type为bootstrap-complete的数据是bootstrap开始和结束的标志不包含数据中间的type为bootstrap-insert的数据才包含数据。一次bootstrap输出的所有记录的ts都相同为bootstrap开始的时间。