个体户网站备案,城市建设专题新闻发布会,做传销网站后果严重吗,遵义网站设计制作网站视频教程#xff1a;【尚硅谷】大数据Canal教程丨Alibaba数据实时同步神器教程资料#xff1a;https://pan.baidu.com/s/1VhGBcqeywM6jyXJxtytd1w?pwd6666#xff0c;提取码#xff1a;6666本套教程以Canal的底层原理展开讲解#xff0c;细致地介绍了Canal的安装部署及常… 视频教程【尚硅谷】大数据Canal教程丨Alibaba数据实时同步神器教程资料https://pan.baidu.com/s/1VhGBcqeywM6jyXJxtytd1w?pwd6666提取码6666本套教程以Canal的底层原理展开讲解细致地介绍了Canal的安装部署及常见应用详细讲解了如何实现MySQL数据的采集并将数据分别发送至Kafka同时使用TCP模式深层解析封装的数据并实现自定义数据格式。官方文档Home · alibaba/canal Wiki · GitHub MySQL如何实时同步数据到ES试试这款阿里开源的神器 - 简书阿里的数据同步神器——Canal_阿里canal_恒哥~Bingo的博客-CSDN博客目录
P01【01-尚硅谷-大数据采集技术-Canal课程介绍】
P02【02-尚硅谷-大数据采集技术-Canal什么是Canal】
P03【03-尚硅谷-大数据采集技术-CanalMySQL Binlog介绍】
P04【04-尚硅谷-大数据采集技术-Canal工作原理】
P05【05-尚硅谷-大数据采集技术-Canal使用场景】
P06【06-尚硅谷-大数据采集技术-CanalMySQL 环境准备】
P07【07-尚硅谷-大数据采集技术-Canal下载与安装】
P08【08-尚硅谷-大数据采集技术-CanalTCP模式 创建项目Canal封装数据格式分析】
P09【09-尚硅谷-大数据采集技术-CanalTCP模式 代码编写 创建连接拉取数据】
P10【10-尚硅谷-大数据采集技术-CanalTCP模式 代码编写 解析结果数据并打印】
P11【11-尚硅谷-大数据采集技术-CanalTCP模式 代码测试】
P12【12-尚硅谷-大数据采集技术-CanalKafka模式 配置文件修改】
P13【13-尚硅谷-大数据采集技术-CanalKafka模式 案例测试】 P01【01-尚硅谷-大数据采集技术-Canal课程介绍】 canal实时采集mysql中变化的数据新增、修改、删除使用canal实时监控到修改的数据并将修改的数据写到消息队列供实时计算框架(spark streaming、flink)使用。 前置知识 Kafka将采集到的实时数据写入消息队列中大数据领域最为主流的消息队列kafka。Zookeeperkafka的搭建及运行依赖于kafka。MySQLcanal实时抓取mysql中的写数据变化。Java案例代码。尚硅谷大数据技术之Canal 第1章 Canal入门第2章 MySql的准备第3章 Canal的下载和安装第4章 实时监控测试 TCP模式测试Kafka模式测试P02【02-尚硅谷-大数据采集技术-Canal什么是Canal】 Canal是用Java 开发的基于数据库增量日志解析提供增量数据订阅消费的中间件。canal采集日志。 Canal主要支持了MySQL的Binlog解析解析完成后才利用Canal Client来处理获得的相关数据。数据库同步需要阿里的Otter中间件基于Canal。 P03【03-尚硅谷-大数据采集技术-CanalMySQL Binlog介绍】 MySQL Binlog的格式有三种分别是STATEMENT、MIXED、ROW。在配置文件中可以选择配置binlog_format statement|mixed|row。 P04【04-尚硅谷-大数据采集技术-Canal工作原理】 P05【05-尚硅谷-大数据采集技术-Canal使用场景】 P06【06-尚硅谷-大数据采集技术-CanalMySQL 环境准备】 CREATE TABLE user_info(id VARCHAR(255),name VARCHAR(255),sex VARCHAR(255)
);[rootnode1 ~]# mysql -V
mysql Ver 14.14 Distrib 5.7.29, for Linux (x86_64) using EditLine wrapper
[rootnode1 ~]# cat /etc/redhat-release
CentOS Linux release 7.7.1908 (Core)
[rootnode1 ~]# mysql -uroot -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 8
Server version: 5.7.29 MySQL Community Server (GPL)Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.Type help; or \h for help. Type \c to clear the current input statement.mysql show databases;
--------------------
| Database |
--------------------
| information_schema |
| gmall-2021 |
| hive3 |
| mysql |
| performance_schema |
| sys |
--------------------
6 rows in set (0.00 sec)mysql use gmall-2021;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -ADatabase changed
mysql
mysql show tables;
----------------------
| Tables_in_gmall-2021 |
----------------------
| user_info |
----------------------
1 row in set (0.00 sec)mysql select * from user_info;
------------------
| id | name | sex |
------------------
| 001 | aaa | 男 |
| 002 | bbb | 女 |
------------------
2 rows in set (0.00 sec)mysql ERROR 1193 (HY000): Unknown system variable ‘validate_password_policy‘ CREATE TABLE user_info(id VARCHAR(255),name VARCHAR(255),sex VARCHAR(255)
);
INSERT INTO user_info VALUES(1001,zhangsan,male);SET GLOBAL validate_password_length4;SHOW VARIABLES LIKE validate_password%;ALTER USER rootlocalhost IDENTIFIED BY root;INSTALL PLUGIN validate_password SONAME validate_password.so;SELECT plugin_name, plugin_status FROM information_schema.plugins WHERE plugin_name LIKE validate%;SHOW VARIABLES LIKE validate_password%;SET GLOBAL validate_password_policy0;SET GLOBAL validate_password_length4;SET GLOBAL validate_password_policy0;GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal% IDENTIFIED BY canal ;
[rootnode1 ~]# sudo vim /etc/my.cnf
[rootnode1 ~]# sudo systemctl restart mysqld
[rootnode1 ~]# cd /var/lib/mysql
[rootnode1 mysql]# ll
总用量 188508
-rw-r----- 1 mysql mysql 56 2月 23 11:43 auto.cnf
-rw------- 1 mysql mysql 1680 2月 23 11:43 ca-key.pem
-rw-r--r-- 1 mysql mysql 1112 2月 23 11:43 ca.pem
-rw-r--r-- 1 mysql mysql 1112 2月 23 11:43 client-cert.pem
-rw------- 1 mysql mysql 1680 2月 23 11:43 client-key.pem
drwxr-x--- 2 mysql mysql 62 3月 1 16:00 gmall002d2021
drwxr-x--- 2 mysql mysql 8192 2月 23 22:02 hive3
-rw-r----- 1 mysql mysql 683 3月 1 16:10 ib_buffer_pool
-rw-r----- 1 mysql mysql 79691776 3月 1 16:10 ibdata1
-rw-r----- 1 mysql mysql 50331648 3月 1 16:10 ib_logfile0
-rw-r----- 1 mysql mysql 50331648 2月 23 11:43 ib_logfile1
-rw-r----- 1 mysql mysql 12582912 3月 1 16:10 ibtmp1
drwxr-x--- 2 mysql mysql 4096 2月 23 11:43 mysql
-rw-r----- 1 mysql mysql 154 3月 1 16:10 mysql-bin.000001
-rw-r----- 1 mysql mysql 19 3月 1 16:10 mysql-bin.index
srwxrwxrwx 1 mysql mysql 0 3月 1 16:10 mysql.sock
-rw------- 1 mysql mysql 6 3月 1 16:10 mysql.sock.lock
drwxr-x--- 2 mysql mysql 8192 2月 23 11:43 performance_schema
-rw------- 1 mysql mysql 1676 2月 23 11:43 private_key.pem
-rw-r--r-- 1 mysql mysql 452 2月 23 11:43 public_key.pem
-rw-r--r-- 1 mysql mysql 1112 2月 23 11:43 server-cert.pem
-rw------- 1 mysql mysql 1680 2月 23 11:43 server-key.pem
drwxr-x--- 2 mysql mysql 8192 2月 23 11:43 sys
[rootnode1 mysql]# ll
总用量 188508
-rw-r----- 1 mysql mysql 56 2月 23 11:43 auto.cnf
-rw------- 1 mysql mysql 1680 2月 23 11:43 ca-key.pem
-rw-r--r-- 1 mysql mysql 1112 2月 23 11:43 ca.pem
-rw-r--r-- 1 mysql mysql 1112 2月 23 11:43 client-cert.pem
-rw------- 1 mysql mysql 1680 2月 23 11:43 client-key.pem
drwxr-x--- 2 mysql mysql 62 3月 1 16:00 gmall002d2021
drwxr-x--- 2 mysql mysql 8192 2月 23 22:02 hive3
-rw-r----- 1 mysql mysql 683 3月 1 16:10 ib_buffer_pool
-rw-r----- 1 mysql mysql 79691776 3月 1 16:12 ibdata1
-rw-r----- 1 mysql mysql 50331648 3月 1 16:12 ib_logfile0
-rw-r----- 1 mysql mysql 50331648 2月 23 11:43 ib_logfile1
-rw-r----- 1 mysql mysql 12582912 3月 1 16:12 ibtmp1
drwxr-x--- 2 mysql mysql 4096 2月 23 11:43 mysql
-rw-r----- 1 mysql mysql 452 3月 1 16:12 mysql-bin.000001
-rw-r----- 1 mysql mysql 19 3月 1 16:10 mysql-bin.index
srwxrwxrwx 1 mysql mysql 0 3月 1 16:10 mysql.sock
-rw------- 1 mysql mysql 6 3月 1 16:10 mysql.sock.lock
drwxr-x--- 2 mysql mysql 8192 2月 23 11:43 performance_schema
-rw------- 1 mysql mysql 1676 2月 23 11:43 private_key.pem
-rw-r--r-- 1 mysql mysql 452 2月 23 11:43 public_key.pem
-rw-r--r-- 1 mysql mysql 1112 2月 23 11:43 server-cert.pem
-rw------- 1 mysql mysql 1680 2月 23 11:43 server-key.pem
drwxr-x--- 2 mysql mysql 8192 2月 23 11:43 sys
P07【07-尚硅谷-大数据采集技术-Canal下载与安装】 tar -zxvf canal.deployer-1.1.2.tar.gz -C /opt/module/canal P08【08-尚硅谷-大数据采集技术-CanalTCP模式 创建项目Canal封装数据格式分析】 P09【09-尚硅谷-大数据采集技术-CanalTCP模式 代码编写 创建连接拉取数据】 修改Linux虚拟机的IP地址 package com.atguigu;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;import java.net.InetSocketAddress;public class CanalClient {public static void main(String[] args) {//TODO 获取连接//1.获取canal连接对象CanalConnector canalConnector CanalConnectors.newSingleConnector(newInetSocketAddress(test001, 11111), example, , );while (true) {//TODO 连接canalConnector.connect();//TODO 订阅数据库canalConnector.subscribe(gmall-2021.*);//TODO 获取数据Message message canalConnector.get(100);}}
}
P10【10-尚硅谷-大数据采集技术-CanalTCP模式 代码编写 解析结果数据并打印】 idea快速获取变量名 .varctrlvpackage com.atguigu;import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;import java.net.InetSocketAddress;
import java.util.List;public class CanalClient {public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {//TODO 获取连接//获取canal连接对象CanalConnector canalConnector CanalConnectors.newSingleConnector(newInetSocketAddress(test001, 11111), example, , );while (true) {//TODO 连接canalConnector.connect();//TODO 订阅数据库canalConnector.subscribe(gmall-2021.*);//TODO 获取数据Message message canalConnector.get(100);//TODO 获取Entry集合ListCanalEntry.Entry entries message.getEntries();//TODO 判断集合是否为空如果为空则等待一会儿继续拉取数据if (entries.size() 0) {System.out.println(当次抓取没有数据休息一会儿...);Thread.sleep(1000);} else {//TODO 遍历entries单条解析for (CanalEntry.Entry entry : entries) {//1.获取表名String tableName entry.getHeader().getTableName();//2.获取类型CanalEntry.EntryType entryType entry.getEntryType();//3.获取序列化后的数据ByteString storeValue entry.getStoreValue();//4.判断当前entryType类型是否为ROWDATAif (CanalEntry.EntryType.ROWDATA.equals(entryType)) {//5.反序列化数据CanalEntry.RowChange rowChange CanalEntry.RowChange.parseFrom(storeValue);//6.获取当前事件的操作类型CanalEntry.EventType eventType rowChange.getEventType();//7.获取数据集ListCanalEntry.RowData rowDataList rowChange.getRowDatasList();//8.遍历rowDataList并打印数据集for (CanalEntry.RowData rowData : rowDataList) {JSONObject beforeData new JSONObject();ListCanalEntry.Column beforeColumnsList rowData.getBeforeColumnsList();for (CanalEntry.Column column : beforeColumnsList) {beforeData.put(column.getName(), column.getValue());}JSONObject afterData new JSONObject();ListCanalEntry.Column afterColumnsList rowData.getAfterColumnsList();for (CanalEntry.Column column : afterColumnsList) {afterData.put(column.getName(), column.getValue());}//数据打印System.out.println(Table: tableName ,EventType: eventType ,Before: beforeData ,After: afterData);}} else {System.out.println(当前操作类型为 entryType);}}}}}
}
P11【11-尚硅谷-大数据采集技术-CanalTCP模式 代码测试】 启动canal服务端 连接成功
Last login: Thu Mar 2 14:43:07 2023 from 192.168.88.1
[rootnode1 ~]# cd /opt/module/canal/bin
[rootnode1 bin]# cd ../
[rootnode1 canal]# ll
总用量 4
drwxr-xr-x 2 root root 76 3月 1 16:43 bin
drwxr-xr-x 5 root root 93 3月 1 16:49 conf
drwxr-xr-x 2 root root 4096 3月 1 16:43 lib
drwxrwxrwx 2 root root 6 11月 26 2018 logs
[rootnode1 canal]# bin/startup.sh
cd to /opt/module/canal/bin for workaround relative path
LOG CONFIGURATION : /opt/module/canal/bin/../conf/logback.xml
canal conf : /opt/module/canal/bin/../conf/canal.properties
CLASSPATH :/opt/module/canal/bin/../conf:/opt/module/canal/bin/../lib/zookeeper-3.4.5.jar:/opt/module/canal/bin/../lib/zkclient-0.10.jar:/opt/module/canal/bin/../lib/spring-tx-3.2.18.RELEASE.jar:/opt/module/canal/bin/../lib/spring-orm-3.2.18.RELEASE.jar:/opt/module/canal/bin/../lib/spring-jdbc-3.2.18.RELEASE.jar:/opt/module/canal/bin/../lib/spring-expression-3.2.18.RELEASE.jar:/opt/module/canal/bin/../lib/spring-core-3.2.18.RELEASE.jar:/opt/module/canal/bin/../lib/spring-context-3.2.18.RELEASE.jar:/opt/module/canal/bin/../lib/spring-beans-3.2.18.RELEASE.jar:/opt/module/canal/bin/../lib/spring-aop-3.2.18.RELEASE.jar:/opt/module/canal/bin/../lib/snappy-java-1.1.7.1.jar:/opt/module/canal/bin/../lib/slf4j-api-1.7.12.jar:/opt/module/canal/bin/../lib/simpleclient_pushgateway-0.4.0.jar:/opt/module/canal/bin/../lib/simpleclient_httpserver-0.4.0.jar:/opt/module/canal/bin/../lib/simpleclient_hotspot-0.4.0.jar:/opt/module/canal/bin/../lib/simpleclient_common-0.4.0.jar:/opt/module/canal/bin/../lib/simpleclient-0.4.0.jar:/opt/module/canal/bin/../lib/scala-reflect-2.11.12.jar:/opt/module/canal/bin/../lib/scala-logging_2.11-3.8.0.jar:/opt/module/canal/bin/../lib/scala-library-2.11.12.jar:/opt/module/canal/bin/../lib/rocketmq-remoting-4.3.0.jar:/opt/module/canal/bin/../lib/rocketmq-logging-4.3.0.jar:/opt/module/canal/bin/../lib/rocketmq-common-4.3.0.jar:/opt/module/canal/bin/../lib/rocketmq-client-4.3.0.jar:/opt/module/canal/bin/../lib/protobuf-java-3.6.1.jar:/opt/module/canal/bin/../lib/oro-2.0.8.jar:/opt/module/canal/bin/../lib/netty-tcnative-boringssl-static-1.1.33.Fork26.jar:/opt/module/canal/bin/../lib/netty-all-4.1.6.Final.jar:/opt/module/canal/bin/../lib/netty-3.2.2.Final.jar:/opt/module/canal/bin/../lib/mysql-connector-java-5.1.40.jar:/opt/module/canal/bin/../lib/metrics-core-2.2.0.jar:/opt/module/canal/bin/../lib/lz4-java-1.4.1.jar:/opt/module/canal/bin/../lib/logback-core-1.1.3.jar:/opt/module/canal/bin/../lib/logback-classic-1.1.3.jar:/opt/module/canal/bin/../lib/kafka-clients-1.1.1.jar:/opt/module/canal/bin/../lib/kafka_2.11-1.1.1.jar:/opt/module/canal/bin/../lib/jsr305-3.0.2.jar:/opt/module/canal/bin/../lib/jopt-simple-5.0.4.jar:/opt/module/canal/bin/../lib/jctools-core-2.1.2.jar:/opt/module/canal/bin/../lib/jcl-over-slf4j-1.7.12.jar:/opt/module/canal/bin/../lib/jackson-databind-2.9.6.jar:/opt/module/canal/bin/../lib/jackson-core-2.9.6.jar:/opt/module/canal/bin/../lib/jackson-annotations-2.9.0.jar:/opt/module/canal/bin/../lib/ibatis-sqlmap-2.3.4.726.jar:/opt/module/canal/bin/../lib/httpcore-4.4.3.jar:/opt/module/canal/bin/../lib/httpclient-4.5.1.jar:/opt/module/canal/bin/../lib/h2-1.4.196.jar:/opt/module/canal/bin/../lib/guava-18.0.jar:/opt/module/canal/bin/../lib/fastsql-2.0.0_preview_644.jar:/opt/module/canal/bin/../lib/fastjson-1.2.28.jar:/opt/module/canal/bin/../lib/druid-1.1.9.jar:/opt/module/canal/bin/../lib/disruptor-3.4.2.jar:/opt/module/canal/bin/../lib/commons-logging-1.1.3.jar:/opt/module/canal/bin/../lib/commons-lang3-3.4.jar:/opt/module/canal/bin/../lib/commons-lang-2.6.jar:/opt/module/canal/bin/../lib/commons-io-2.4.jar:/opt/module/canal/bin/../lib/commons-compress-1.9.jar:/opt/module/canal/bin/../lib/commons-codec-1.9.jar:/opt/module/canal/bin/../lib/commons-beanutils-1.8.2.jar:/opt/module/canal/bin/../lib/canal.store-1.1.2.jar:/opt/module/canal/bin/../lib/canal.sink-1.1.2.jar:/opt/module/canal/bin/../lib/canal.server-1.1.2.jar:/opt/module/canal/bin/../lib/canal.protocol-1.1.2.jar:/opt/module/canal/bin/../lib/canal.prometheus-1.1.2.jar:/opt/module/canal/bin/../lib/canal.parse.driver-1.1.2.jar:/opt/module/canal/bin/../lib/canal.parse.dbsync-1.1.2.jar:/opt/module/canal/bin/../lib/canal.parse-1.1.2.jar:/opt/module/canal/bin/../lib/canal.meta-1.1.2.jar:/opt/module/canal/bin/../lib/canal.instance.spring-1.1.2.jar:/opt/module/canal/bin/../lib/canal.instance.manager-1.1.2.jar:/opt/module/canal/bin/../lib/canal.instance.core-1.1.2.jar:/opt/module/canal/bin/../lib/canal.filter-1.1.2.jar:/opt/module/canal/bin/../lib/canal.deployer-1.1.2.jar:/opt/module/canal/bin/../lib/canal.common-1.1.2.jar:/opt/module/canal/bin/../lib/aviator-2.2.1.jar:/opt/module/canal/bin/../lib/aopalliance-1.0.jar:/opt/module/canal/bin/../lib/aliware-apache-rocketmq-cloud-1.0.jar:.:/export/server/jdk1.8.0_241/lib/dt.jar:/export/server/jdk1.8.0_241/lib/tools.jar
cd to /opt/module/canal for continue
[rootnode1 canal]# jps
44804 Jps
44284 CanalLauncher
[rootnode1 canal]# P12【12-尚硅谷-大数据采集技术-CanalKafka模式 配置文件修改】 P13【13-尚硅谷-大数据采集技术-CanalKafka模式 案例测试】 kafka依赖zookeeper启动kafka之前先启动zookeeper。 kafka启动消费者bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic canal_test 单条sql 多条sql 一个entry可能包含多条数据操作数据不方便做数据分析搞单行数据处理需要将一行数据拆分为两行数据。