北京网站建设哪家设计好,标识标牌网站怎么做,wordpress无法访问上传的图面,wordpress写文章报错1、相关环境
Flink作为当前流行的流式计算框架#xff0c;在对接StarRocks时#xff0c;若直接使用JDBC的方式流式写入数据#xff0c;对StarRocks是不友好的#xff0c;StarRocks作为一款MVCC的数据库#xff0c;其导入的核心思想还是攒微批降频率在对接StarRocks时若直接使用JDBC的方式流式写入数据对StarRocks是不友好的StarRocks作为一款MVCC的数据库其导入的核心思想还是攒微批降频率。为此StarRocks单独开发了flink-connector-starrocks其内部实现仍是通过对数据缓存攒批后执行Stream Load导入。
1.1、StarRocks相关下载
https://www.mirrorship.cn/zh-CN/download/community 1.2、Flink CDC连接器
参考地址 https://ververica.github.io/flink-cdc-connectors/release-2.0/content/about.html#supported-flink-versions
https://github.com/StarRocks/starrocks-connector-for-apache-flink
https://docs.starrocks.io/zh-cn/main/loading/Flink-connector-starrocks
1.3、搭建环境
StarRocksFlinkKafkaZookeeperMySQL
2、Flink读取Kafka数据写入StarRocks
Routine Load是StarRocks自带的可以消费Kafka数据的导入方式其特点是简单易用不依赖外部组件但若需要对Kafka中的数据进行复杂的ETLRoutine Load可能就不能胜任了这时就可以考虑使用Flink去消费Kafka中的数据进行清洗转换后再sink至StarRocks。
常见的实时报表的例子使用Flink对Kafka中追加写入的数据进行实时处理然后将数据源源不断的同步入库StarRocks。
2.1、数据准备
2.1.1、在Kafka中创建主题behavior和province
kafka-topics.sh --zookeeper 192.168.110.101:2181 --create --replication-factor 1 --partitions 1 --topic behaviorkafka-topics.sh --zookeeper 192.168.110.101:2181 --create --replication-factor 1 --partitions 1 --topic province2.1.2、向主题behavior生产数据
kafka-console-producer.sh --broker-list 192.168.110.101:9092 --topic behavior2.1.3、生产数据
10001,zs,18,11,shopping
10002,ls,19, 11,add
10003,ww,19,61,star2.1.4、向主题province生产数据
kafka-console-producer.sh --broker-list 192.168.110.101:9092 --topic province
2.1.5、生产数据
11,北京
61,陕西2.2、StarRocks准备
2.2.1、创建主键模型表s_province
create database starrocks;
use starrocks;
CREATE TABLE IF NOT EXISTS starrocks.s_province (uid int(10) NOT NULL COMMENT ,p_id int(2) NOT NULL COMMENT ,p_name varchar(30) NULL COMMENT
)
PRIMARY KEY(uid)
DISTRIBUTED BY HASH(uid) BUCKETS 1
PROPERTIES (
replication_num 1,
-- 限主键模型
enable_persistent_index true
);2.3、Flink准备
2.3.1、启动Flink ./start-cluster.sh2.3.2、启动sql-client
/sql-client.sh embedded2.3.3、执行Flink SQL创建上下游的映射表
1、Source部分创建Flink向Kafka的映射表kafka_source_behavior
CREATE TABLE kafka_source_behavior (uuid int,name string,age int,province_id int,behavior string
) WITH (connector kafka,topic behavior,properties.bootstrap.servers 192.168.110.101:9092,properties.group.id source_behavior,scan.startup.mode earliest-offset,format csv
);2、创建映射表kafka_source_province
CREATE TABLE kafka_source_province (pid int,p_name string
) WITH (connector kafka,topic province,properties.bootstrap.servers 192.168.110.101:9092,properties.group.id source_province,scan.startup.mode earliest-offset,format csv
);3、Sink部分创建Flink向StarRocks的映射表sink_province
CREATE TABLE sink_province (uid INT,p_id INT,p_name STRING,PRIMARY KEY (uid) NOT ENFORCED
)WITH (connector starrocks,jdbc-urljdbc:mysql://192.168.110.101:9030,load-url192.168.110.101:8030,database-name starrocks,table-name s_province,username root,password root,sink.buffer-flush.interval-ms 5000,sink.properties.column_separator \x01,sink.properties.row_delimiter \x02
);2.3.4、执行同步任务
执行Flink SQL开始同步任务
insert into sink_province select b.uuid as uid, b.province_id as p_id, p.p_name from kafka_source_behavior b join kafka_source_province p on b.province_id p.pid;2.4、StarRocks查看数据
mysql -h192.168.110.101 -P9030 -uroot –prootuse starrocks;
select * from s_province;3、Flink JDBC读取MySQL数据写入StarRocks
使用Flink JDBC方式读取MySQL数据的实时场景不多因为JDBC下Flink只能获取执行命令时MySQL表的数据所以更适合离线场景。假设有复杂的MySQL数据就可以在Flink中跑定时任务来获取清洗后的数据完成后写入StarRocks。
3.1、MySQL准备
3.1.1、MySQL中创建表s_user
use ODS;
CREATE TABLE s_user (id INT(11) NOT NULL,name VARCHAR(32) DEFAULT NULL,p_id INT(2) DEFAULT NULL,PRIMARY KEY (id)
);3.1.2、插入数据
insert into s_user values(10086,lm,61),(10010, ls,11), (10000,ll,61);3.2、StarRocks准备
3.2.1、StarRocks创建表s_user
use starrocks;
CREATE TABLE IF NOT EXISTS starrocks.s_user (id int(10) NOT NULL COMMENT ,name varchar(20) NOT NULL COMMENT ,p_id INT(2) NULL COMMENT
)
PRIMARY KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
replication_num 1,
-- 限主键模型
enable_persistent_index true
);3.3、Flink创建映射表
3.3.1、启动Flink服务未停止可以跳过 ./start-cluster.sh3.3.2、启动sql-client
./sql-client.sh embedded3.3.3、Source部分创建映射至MySQL的映射表source_mysql_suser
CREATE TABLE source_mysql_suser (id INT,name STRING,p_id INT,PRIMARY KEY (id) NOT ENFORCED
)WITH (connector jdbc,url jdbc:mysql://192.168.110.102:3306/ODS,table-name s_user,username root,password root
);3.3.4、Sink部分创建至StarRocks的映射表sink_starrocks_suser
CREATE TABLE sink_starrocks_suser (id INT,name STRING,p_id INT,PRIMARY KEY (id) NOT ENFORCED
)WITH (connector starrocks,jdbc-urljdbc:mysql://192.168.110.101:9030,load-url192.168.110.101:8030,database-name starrocks,table-name s_user,username root,password root,sink.buffer-flush.interval-ms 5000,sink.properties.column_separator \x01,sink.properties.row_delimiter \x02
);3.3.5、Flink清洗数据并写入StarRocks
只是简单做一个where筛选实际业务可能是多表join的复杂场景
insert into sink_starrocks_suser select id,name,p_id from source_mysql_suser where p_id 61;数据写入StarRocks后Flink任务完成并结束。此时若再对MySQL中s_user表的数据进行增删或修改操作Flink亦不会感知。
4、Flink读取StarRocks数据写入MySQL
还使用MySQL 中的s_user表和StarRocks的s_user表将业务流程反转一下读取StarRocks中的数据写入其他业务库例如MySQL。
4.1、Flink创建映射表
4.1.1、启动Flink服务未停止可以跳过
./start-cluster.sh4.1.2、启动sql-client
./sql-client.sh embedded4.1.3、Source部分创建StarRocks映射表source_starrocks_suser
CREATE TABLE source_starrocks_suser (id INT,name STRING,p_id INT
)WITH (connector starrocks,scan-url192.168.110.101:8030,jdbc-urljdbc:mysql://192.168.110.101:9030,database-name starrocks,table-name s_user,username root,password root
);4.1.4、Sink部分创建向MySQL的映射表sink_mysql_suser
CREATE TABLE sink_mysql_suser (id INT,name STRING,p_id INT,PRIMARY KEY (id) NOT ENFORCED
)WITH (connector jdbc,url jdbc:mysql://192.168.110.102:3306/ODS,table-name s_user,username root,password root
);4.2、MySQL准备
4.2.1、清空MySQL s_user表数据为一会儿导入新数据做准备 use ODS;
truncate table s_user;4.3、Flink执行导入任务
简单梳理操作实际业务可能会对StarRocks中多个表的数据进行分组或者join等处理然后再导入。 insert into sink_mysql_suser select id,name,p_id from source_starrocks_suser;4.4、查看MySQL数据 select * from s_user;5、Flink CDC同步MySQL数据至StarRocks
使用FlinkJDBC来读取MySQL数据时JDBC的方式是“一次性”的导入若希望让Flink感知MySQL数据源的数据变化并近实时的实现据 同步就需要使用Flink CDC。CDC是变更数据捕获Change Data Capture技术的缩写它可以将源数据库Source的数据变动记录同步到一个或多个数据目的地中Sink。直观的说就是当数据源的数据变化时通过CDC可以让目标库中的数据同步发生变化仅限于DML操作。还使用前面MySQL的s_user表以及StarRocks的s_user表来演示。
5.1、MySQL准备
5.1.1、MySQL开启binlog格式为ROW模式
vi /etc/my.cnf
log-binmysql-bin # 开启binlog
binlog-formatROW # 选择ROW模式
server_id1 # 配置MySQL replaction5.1.2、重启MySQL服务
systemctl restart mysqld5.2、StarRocks准备
5.2.1、StarRocks中清空s_user表中的数据
mysql -h192.168.110.101 -P9030 -uroot –prootuse starrocks;
truncate table s_user;5.3、Flink准备
5.3.1、启动Flink服务未停止可以跳过
./start-cluster.sh5.3.2、启动sql-client
./sql-client.sh embedded5.3.3、Source部分创建MySQL映射表cdc_mysql_suser
CREATE TABLE cdc_mysql_suser (id INT,name STRING,p_id INT
) WITH (connector mysql-cdc,hostname 192.168.110.102,port 3306,username root,password root,database-name ODS,scan.incremental.snapshot.enabledfalse,table-name s_user
);5.3.4、Sink部分创建向StarRocks的cdc_starrocks_suser
CREATE TABLE cdc_starrocks_suser (id INT,name STRING,p_id INT,PRIMARY KEY (id) NOT ENFORCED
)WITH (connector starrocks,jdbc-urljdbc:mysql://192.168.110.101:9030,load-url192.168.110.101:8030,database-name starrocks,table-name s_user,username root,password root,sink.buffer-flush.interval-ms 5000,sink.properties.column_separator \x01,sink.properties.row_delimiter \x02
);5.4、执行同步任务
insert into cdc_starrocks_suser select id,name,p_id from cdc_mysql_suser;在CDC场景下Flink SQL执行后同步任务将会持续进行当MySQL中数据出现变化Flink会快速感知并将变化同步至StarRocks中。
5.5、数据观察
5.5.1、MySQL库中观察数据
mysql -uroot –prootuse ODS;
select * from s_user;5.5.2、StarRocks库中观察数据
mysql -h192.168.110.101 -P9030 -uroot –prootuse starrocks;
select * from s_user;5.5.3、MySQL中对数据进行增删改操作 INSERT INTO s_user VALUES(12345,SR,61);DELETE FROM s_user WHERE id 10010;UPDATE s_user SET nameNo.1 WHERE id 10086;5.5.4、查看StarRocks中表的数据 select * from s_user;可以确认对MySQL源表数据的增加、修改和删除操作引起的数据变化都能同步至StarRocks目标表中。
6、通过CDCSMT实现MySQL多表数据的秒级同步
StarRocks Migration Tool为了友好的解决多表同步时的问题StarRocks发布了StarRocks-migrate-tools简称smt工具来快捷生成StarRocks表结构和Flink-SQL映射表及同步语句。Smt目前可用于MySQL、PostgreSQL、Oracle和hive后面三个数据库的同步还在公测中先以MySQL来进行演示。
6.1 MySQL准备
已开启binlog的MySQL中创建数据库CDC并在其中创建表departments和jobs创建完成后再导入少量数据。
6.1.1、创建表departments
CREATE DATABASE CDC;
USE CDC;CREATE TABLE departments (department_id int(4) NOT NULL AUTO_INCREMENT,department_name varchar(3) DEFAULT NULL,manager_id int(6) DEFAULT NULL,location_id int(4) DEFAULT NULL,PRIMARY KEY (department_id)
);6.1.2、为表departments插入数据
insert into departments(department_id,department_name,manager_id,location_id)
values (10,Adm,200,1700),(20,Mar,201,1800),(30,Pur,114,1700),(40,Hum,203,2400),(50,Shi,121,1500),(60,IT,103,1400),(70,Pub,204,2700),(80,Sal,145,2500),(90,Exe,100,1700),(100,Fin,108,1700),(110,Acc,205,1700),(120,Tre,NULL,1700),(130,Cor,NULL,1700),(140,Con,NULL,1700),(150,Sha,NULL,1700),(160,Ben,NULL,1700),(170,Man,NULL,1700),(180,Con,NULL,1700),(190,Con,NULL,1700),(200,Ope,NULL,1700),(210,IT ,NULL,1700),(220,NOC,NULL,1700),(230,IT ,NULL,1700),(240,Gov,NULL,1700),(250,Ret,NULL,1700),(260,Rec,NULL,1700),(270,Pay,NULL,1700);6.1.3、创建表jobs
CREATE TABLE jobs (job_id varchar(10) NOT NULL,job_title varchar(35) DEFAULT NULL,min_salary int(6) DEFAULT NULL,max_salary int(6) DEFAULT NULL,PRIMARY KEY (job_id)
);6.1.4、为表jobs插入数据
insert into jobs(job_id,job_title,min_salary,max_salary)
values (AC_ACCOUNT,Public Accountant,4200,9000),(AC_MGR,Accounting Manager,8200,16000),(AD_ASST,Administration Assistant,3000,6000),(AD_PRES,President,20000,40000),(AD_VP,Administration Vice President,15000,30000),(FI_ACCOUNT,Accountant,4200,9000),(FI_MGR,Finance Manager,8200,16000),(HR_REP,Human Resources Representative,4000,9000),(IT_PROG,Programmer,4000,10000),(MK_MAN,Marketing Manager,9000,15000),(MK_REP,Marketing Representative,4000,9000),(PR_REP,Public Relations Representative,4500,10500),(PU_CLERK,Purchasing Clerk,2500,5500),(PU_MAN,Purchasing Manager,8000,15000),(SA_MAN,Sales Manager,10000,20000),(SA_REP,Sales Representative,6000,12000),(SH_CLERK,Shipping Clerk,2500,5500),(ST_CLERK,Stock Clerk,2000,5000),(ST_MAN,Stock Manager,5500,8500);6.2 配置SMT工具
6.2.1 下载smt工具解压后修改配置文件
vi conf/config_prod.conf
1、配置MySQL部分
[db]host 192.168.110.102 #MySQL所在服务器IP
port 3306 #MySQL服务端口
user root #用户名
password root #密码
# currently available types: mysql, pgsql, oracle, hive
type mysql #类型选择MySQL目前PostgreSQL、Oracle和Hive正在公测中
# # only takes effect on type hive.
# # Available values: kerberos, none, nosasl, kerberos_http, none_http, zk, ldap
# authentication kerberos
[other]
# number of backends in StarRocks
be_num 1 #配置StarRocks BE的节点数以便生成更合理bucket数量的建表语句
# decimal_v3 is supported since StarRocks-1.18.1
use_decimal_v3 true #使用更高精度的Decimal类型1.18后的版本都支持
# file to save the converted DDL SQL
output_dir ./result #后续生成sql文件的保存目录
# !!!database table schema are case sensitive in oracle!!!
[table-rule.1]
# pattern to match databases for setting properties
# !!! database should be a whole instance(or pdb) name but not a regex when it comes with an oracle db !!!
database CDC #配置需要同步的数据库需使用正则表达式的写法
# pattern to match tables for setting properties
table departments|jobs #配置需要同步的表需使用正则表达式的写法
# schema only takes effect on postgresql and oracle
schema ^public$ #同步MySQL时不需要管这个2、配置StarRocks集群信息
############################################
### flink sink configurations #这部分与Flink Sink部分写法相似
### DO NOT set connector, table-name, database-name, they are auto-generated
############################################
flink.starrocks.jdbc-urljdbc:mysql://192.168.110.101:9030
flink.starrocks.load-url192.168.110.101:8030
flink.starrocks.usernameroot
flink.starrocks.passwordroot
flink.starrocks.sink.properties.formatjson #以json格式攒批
flink.starrocks.sink.properties.strip_outer_arraytrue #展开为数组
flink.starrocks.sink.buffer-flush.interval-ms10000 #攒批10秒导入一次
# # used to set the server-id for mysql-cdc jobs instead of using a random server-id
# flink.cdc.server-id 50006.3 SMT工具使用
参考地址 https://docs.starrocks.io/zh-cn/latest/loading/Flink_cdc_load#%E4%BB%8E-mysql-%E5%AE%9E%E6%97%B6%E5%90%8C%E6%AD%A5
6.3.1 执行smt工具
./starrocks-migrate-tool6.3.2 在配置的./result路径下生成sql语句文件
flink-create.1.sql
flink-create.all.sql
starrocks-create.1.sql
starrocks-create.all.sql
starrocks-external-create.1.sql
starrocks-external-create.all.sql6.4 生成Flink 任务
6.4.1 同步库表结构
如果数据需要经过 Flink 处理后写入目标表目标表与源表的结构不一样则您需要修改 SQL 文件 starrocks-create.all.sql 中的建表语句。
mysql -h192.168.110.101 -P9030 -uroot -proot /opt/module/smt/result/starrocks-create.all.sql
6.4.2、同步数据
进入 Flink 目录执行如下命令
./bin/sql-client.sh -f /opt/module/smt/result/flink-create.all.sql6.4.3、处理同步数据
在同步过程中如果您需要对数据进行一定的处理例如 GROUP BY、JOIN 等则可以修改 SQL 文件 flink-create.all.sql。可以通过执行 count(*) 和 GROUP BY 计算。
INSERT INTO default_catalog.demo.orders_sink SELECT product_id,product_name, COUNT(*) AS cnt FROM default_catalog.demo.orders_src WHERE order_date 2021-01-01 00:00:01 GROUP BY product_id,product_name;执行同步数据命令5.4.2如果返回如下结果则表示 Flink job 已经提交开始同步全量和增量数据。
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 5ae005c4b3425d8bb13fe660260a35da6.5 观察任务状况 ./flink listWaiting for response...------------------ Running/Restarting Jobs -------------------19.01.2022 21:55:30 : 80c4e81de2d0d7e34c8f1aac1c22a8c4 : insert-into_default_catalog.CDC.departments_sink (RUNNING)19.01.2022 21:55:34 : b2b76afe7d33196a09a274142d9128cf : insert-into_default_catalog.CDC.jobs_sink (RUNNING)6.6 数据观察
就不再演示改变数据了与场景四中的情况相同当数据源中的数据变化时StarRocks中的数据也会同步变化实现数据的近实时同步。
这个场景特别适合维度表的数据同步因为当前StarRocks还不支持update语法就可以将数据需要频繁更新的维度表放在MySQL中使用Flink CDCSMT实时的在StarRocks中同步数据实现灵活的多表关联查询。