网站改版怎么办,进行网站开发,夸网站做的好怎么夸,展示网站源码下载文章目录 一、Flink CDC、Flink、CDC各有啥关系1.1 概述1.2 和 jdbc Connectors 对比 二、使用2.1 Mysql 打开 bin-log 功能2.2 在 Mysql 中建库建表准备2.3 遇到的坑2.4 测试 三、番外 一、Flink CDC、Flink、CDC各有啥关系 Flink#xff1a;流式计算框架#xff0c;不包含 … 文章目录 一、Flink CDC、Flink、CDC各有啥关系1.1 概述1.2 和 jdbc Connectors 对比 二、使用2.1 Mysql 打开 bin-log 功能2.2 在 Mysql 中建库建表准备2.3 遇到的坑2.4 测试 三、番外 一、Flink CDC、Flink、CDC各有啥关系 Flink流式计算框架不包含 Flink CDC和 Flink CDC没关系 CDC是一种思想理念不涉及某一门具体的技术。CDC 是变更数据捕获Change Data Capture技术的缩写它可以将源数据库Source的增量变动记录同步到一个或多个数据目的Sink。在同步过程中还可以对数据进行一定的处理例如过滤、关联、分组、统计等。目前专业做数据库事件接受和解析的中间件是Debezium如果是捕获Mysql还有Canal。 Flink CDC是 CDC 的一种实现而已不属于 Flink 子版块。这个技术是阿里开发的。目的是为了丰富 Flink 的生态。
1.1 概述 Flink CDC 基于数据库日志的 Change Data Caputre 技术实现了全量和增量的一体化读取能力并借助 Flink 优秀的管道能力和丰富的上下游生态支持捕获多种数据库的变更并将这些变更实时同步到下游存储。
1.2 和 jdbc Connectors 对比 JDBC Connectors 连接器确实可以读取外部的 数据库。比如MySQL、Oracle、SqlServer等。但是JDBC连数据库只是瞬时操作没办法持续监听数据库的数据变化。 Flink CDC Connectors可以实现数据库的变更捕获能够持续不断地把变更数据同步到下游的系统中。
官网概述https://ververica.github.io/flink-cdc-connectors/ github链接https://github.com/ververica/flink-cdc-connectors
二、使用 FlinkCDC 同步数据有两种方式一种是 FlinkSQL 的方式一种是Flink DataStream 和 Table API 的方式。 我这里直接用的是 ieda 测试的 DataStream 方式。 代码来自https://github.com/yclxiao/flink-cdc-demo/tree/main/src/main/java/com/yclxiao/flinkcdcdemo CloudAcctProfit2DwsHdjProfitRecordAPI.java
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.xiaoqiang.utils.JdbcUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.*;public class CloudAcctProfit2DwsHdjProfitRecordAPI {private static final Logger LOG LoggerFactory.getLogger(CloudAcctProfit2DwsHdjProfitRecordAPI.class);private static String MYSQL_HOST x.x.x.x;private static int MYSQL_PORT 3306;private static String MYSQL_USER root;private static String MYSQL_PASSWD xiaoqiang;private static String SYNC_DB league_test;private static ListString SYNC_TABLES Arrays.asList(league_test.oc_settle_profit);public static void main(String[] args) throws Exception {MySqlSourceString mySqlSource MySqlSource.Stringbuilder().hostname(MYSQL_HOST).port(MYSQL_PORT).databaseList(SYNC_DB) // set captured database.tableList(String.join(,, SYNC_TABLES)) // set captured table.username(MYSQL_USER).password(MYSQL_PASSWD).deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String.build();StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(5000);DataStreamSourceString cdcSource env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), CDC Source xiaoqiang-flink);ListString tableList getTableList();System.out.println(tableList---tableList);for (String tbl : tableList) {SingleOutputStreamOperatorString filterStream filterTableData(cdcSource, oc_settle_profit);
// SingleOutputStreamOperatorString cleanStream clean(filterStream);// 流的数据sink出去filterStream.addSink(new CustomDealDataSink()).name(sink tbl);}env.execute(xiaoqiang-flink);}/*** 自定义sink*/private static class CustomDealDataSink extends RichSinkFunctionString {private transient Connection coalitiondbConnection;private transient Statement coalitiondbStatement;private transient Connection cloudConnection;private transient Statement cloudStatement;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// 在这里初始化 JDBC 连接coalitiondbConnection DriverManager.getConnection(jdbc:mysql://x.x.x.x:3306/league_test, root, );coalitiondbStatement coalitiondbConnection.createStatement();cloudConnection DriverManager.getConnection(jdbc:mysql://x.x.x.x:3306/cloud_test, root, );cloudStatement cloudConnection.createStatement();}Overridepublic void invoke(String value, Context context) throws Exception {// 解析拿到的CDC-JSON数据JSONObject rowJson JSON.parseObject(value);String outNo rowJson.getString(out_no);Integer userType rowJson.getInteger(user_type);String id rowJson.getString(id);String payOrderNo rowJson.getString(pay_order_no);String title rowJson.getString(title);String fromUserId rowJson.getString(from_user_id);String fromAccountId rowJson.getString(from_account_id);String userId rowJson.getString(user_id);String accountId rowJson.getString(account_id);Integer amount rowJson.getInteger(amount);Integer profitState rowJson.getInteger(profit_state);Date profitTime rowJson.getTimestamp(profit_time);Integer refundState rowJson.getInteger(refund_state);Date refundTime rowJson.getTimestamp(refund_time);Date addTime rowJson.getTimestamp(add_time);String remark rowJson.getString(remark);String acctCircle rowJson.getString(acct_circle);Integer fromUserType rowJson.getInteger(from_user_type);String companyId rowJson.getString(company_id);String bizCompanyId rowJson.getString(biz_company_id);
// if (1 ! profitState || !PG11111.equals(acctCircle)) {
// return;
// }
//
// // 读取相关表的数据与其他表进行关联
// Integer bizType null;
// String contributeUserId null;
// String relationBrandOwnerId null;
// ResultSet virtualOrderResultSet coalitiondbStatement.executeQuery(select * from tc_virtual_order where order_type ! 2 and id outNo );
// // 如果是tc_virtual_order订单上岗卡、安心卡、课程
// if (virtualOrderResultSet.next()) {
// // 处理数据逻辑
// Integer virtualOrder4OrderType virtualOrderResultSet.getInt(order_type);
// String virtualOrder4CompanyId virtualOrderResultSet.getString(company_id);
// String virtualOrder4BrandId virtualOrderResultSet.getString(brand_id);
// // 上岗卡订单排掉因为已经有别的任务处理了
// if (virtualOrder4OrderType 2) {
// return;
// }
// // orderType转换
// if (virtualOrder4OrderType 6) {
// bizType 10;
// } else if (virtualOrder4OrderType 1) {
// bizType 11;
// } else if (virtualOrder4OrderType 5) {
// bizType 12;
// }
// // userType转换
// if (virtualOrder4OrderType 6 userType 92) {
// contributeUserId virtualOrder4CompanyId;
// } else if (virtualOrder4OrderType 1 userType 92) {
// contributeUserId virtualOrder4CompanyId;
// } else if (virtualOrder4OrderType 5 userType 92) {
// contributeUserId virtualOrder4CompanyId;
// }
// // relationBrandOwnerId转换
// if (virtualOrder4OrderType 6 userType 90) {
// relationBrandOwnerId virtualOrder4BrandId;
// } else if (virtualOrder4OrderType 1 userType 90) {
// relationBrandOwnerId virtualOrder4BrandId;
// } else if (virtualOrder4OrderType 5 userType 90) {
// relationBrandOwnerId virtualOrder4BrandId;
// }
// // remark转换
// if (virtualOrder4OrderType 1 || virtualOrder4OrderType 5) {
// remark title;
// }
// } else {
// // 如果不是tc_virtual_order的数据则可能是其他数据此处只保留好到家实物商品数据
// if (StringUtils.isBlank(payOrderNo)) {
// return;
// }
// ResultSet acctPayOrderResultSet cloudStatement.executeQuery(select * from acct_pay_order t where t.id payOrderNo );
// if (!acctPayOrderResultSet.next()) {
// return;
// }
// Integer payCate acctPayOrderResultSet.getInt(pay_cate);
// if (200100 ! payCate) { // 好到家实物商品类型
// return;
// }
//
// bizType 20;
// if (userType 92 StringUtils.isNotBlank(bizCompanyId)) {
// contributeUserId bizCompanyId;
// } else if (userType 90 StringUtils.isNotBlank(bizCompanyId)) {
// ResultSet brandOwnerIdResultSet cloudStatement.executeQuery(select * from uc_brand_partner t where t.company_id bizCompanyId );
// if (brandOwnerIdResultSet.next()) {
// relationBrandOwnerId brandOwnerIdResultSet.getString(brand_owner_id);
// }
// }
// }
// if (StringUtils.isBlank(remark)) {
// remark title;
// }// 数据写入到mysqlString insertSql INSERT INTO dws_profit_record_hdj_flink_api (id, show_profit_id, order_no, from_user_id, from_user_type, user_id,\n user_type, amount, profit_time, state, acct_circle, biz_type,\n contribute_user_id, relation_brand_owner_id, remark, add_time)\n VALUES ( id , JSD id , outNo , fromUserId , fromUserType , userId , userType ,\n amount , DateFormatUtils.format(new Date(), yyyy-MM-dd HH:mm:ss, TimeZone.getTimeZone(GMT)) , profitState , acctCircle , 1 , (StringUtils.isBlank(123) ? null : contributeUserId ) , (StringUtils.isBlank(relationBrandOwnerId) ? null : relationBrandOwnerId ) , remark ,\n DateFormatUtils.format(new Date(), yyyy-MM-dd HH:mm:ss, TimeZone.getTimeZone(GMT)) );;cloudStatement.execute(delete from dws_profit_record_hdj_flink_api where id id );System.out.println(insertSql---insertSql);cloudStatement.execute(insertSql);}Overridepublic void close() throws Exception {super.close();// 在这里关闭 JDBC 连接coalitiondbStatement.close();coalitiondbConnection.close();cloudStatement.close();cloudConnection.close();}}/*** 清晰数据** param source* return*/private static SingleOutputStreamOperatorString clean(SingleOutputStreamOperatorString source) {return source.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String row, CollectorString out) throws Exception {try {LOG.info(row:{}, row);JSONObject rowJson JSON.parseObject(row);String op rowJson.getString(op);//history,insert,updateif (Arrays.asList(r, c, u).contains(op)) {out.collect(rowJson.getJSONObject(after).toJSONString());} else {LOG.info(filter other op:{}, op);}} catch (Exception ex) {LOG.warn(filter other format binlog:{}, row);}}});}/*** 过滤数据** param source* param table* return*/private static SingleOutputStreamOperatorString filterTableData(DataStreamSourceString source, String table) {return source.filter(new FilterFunctionString() {Overridepublic boolean filter(String row) throws Exception {try {JSONObject rowJson JSON.parseObject(row);JSONObject source rowJson.getJSONObject(source);String tbl source.getString(table);return table.equals(tbl);} catch (Exception ex) {ex.printStackTrace();return false;}}});}private static ListString getTableList() {ListString tables new ArrayList();String sql SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE TABLE_SCHEMA SYNC_DB ;ListJSONObject tableList JdbcUtil.executeQuery(MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWD, sql);for (JSONObject jsob : tableList) {String schemaName jsob.getString(TABLE_SCHEMA);String tblName jsob.getString(TABLE_NAME);String schemaTbl schemaName . tblName;if (SYNC_TABLES.contains(schemaTbl)) {tables.add(tblName);}}return tables;}
}JdbcUtil.java
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.sql.*;
import java.util.ArrayList;
import java.util.List;public class JdbcUtil {static {try {
// Class.forName(com.mysql.cj.jdbc.Driver);Class.forName(com.mysql.jdbc.Driver);} catch (ClassNotFoundException e) {e.printStackTrace();}}private static final Logger LOG LoggerFactory.getLogger(JdbcUtil.class);public static void main(String[] args) throws SQLException {}public static ListJSONObject executeQuery(String hostUrl, int port, String user, String password, String sql) {ListJSONObject beJson new ArrayList();String connectionUrl String.format(jdbc:mysql://%s:%s/league_test?useUnicodetruecharacterEncodingutf-8useSSLfalseserverTimezoneAsia/Shanghai, hostUrl, port);Connection con null;try {con DriverManager.getConnection(connectionUrl, user, password);PreparedStatement ps con.prepareStatement(sql);ResultSet rs ps.executeQuery();beJson resultSetToJson(rs);} catch (SQLException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();} finally {try {con.close();} catch (Exception e) {}}return beJson;}private static ListJSONObject resultSetToJson(ResultSet rs) throws SQLException {ListJSONObject list new ArrayList();ResultSetMetaData metaData rs.getMetaData();int columnCount metaData.getColumnCount();while (rs.next()) {JSONObject jsonObj new JSONObject();for (int i 1; i columnCount; i) {String columnName metaData.getColumnLabel(i);String value rs.getString(columnName);jsonObj.put(columnName, value);}list.add(jsonObj);}return list;}
}pom.xml dependencygroupIdcom.ververica/groupIdartifactIdflink-connector-mysql-cdc/artifactIdversion2.4.0/version/dependency2.1 Mysql 打开 bin-log 功能 og_bin 的Value如果为ON代表开启如果为OFF代表关闭MySQL8.0默认是开启的
# 查看是否开启binlog
mysql SHOW VARIABLES LIKE %log_bin%;关闭状态 log_bin为ON代表MySQL已经开启binlog日志记录log_bin_basename配置了binlog的文件路径及文件前缀名log_bin_index配置了binlog索引文件的路径 开启状态 # 在centos中mysql的配置文件一般都在/etc/mysql目录下如果不在可以通过 find / -name my.cnf 查找
vi /etc/mysql/my.cnf# 服务ID
server-id1
# binlog 配置 只要配置了log_bin地址 就会开启
log_bin /var/lib/mysql/mysql_bin
# 日志存储天数 默认0 永久保存
# 如果数据库会定期归档建议设置一个存储时间不需要一直存储binlog日志理论上只需要存储归档之后的日志
expire_logs_days 30
# binlog最大值
max_binlog_size 1024M
# 规定binlog的格式binlog有三种格式statement、row、mixad默认使用statement建议使用row格式
binlog_format ROW
# 在提交n次事务后进行binlog的落盘0为不进行强行的刷新操作而是由文件系统控制刷新日志文件如果是在线交易和账有关的数据建议设置成1如果是其他数据可以保持为0即可
sync_binlog 1# 重启MySQL服务使配置生效
systemctl restart mysqld / service mysql restart# 查看日志列表
SHOW MASTER LOGS;可参考MySQL 开启配置binlog以及通过binlog恢复数据
2.2 在 Mysql 中建库建表准备
CREATE DATABASE IF NOT EXISTS cloud_test;
CREATE DATABASE IF NOT EXISTS league_test;CREATE TABLE league_test.oc_settle_profit (id varchar(32),show_profit_id varchar(32),order_no varchar(32),from_user_id varchar(32),from_user_type int(11),user_id varchar(32),user_type int(11),rate int(11),amount int(11),type int(11),add_time datetime,state int(11),expect_profit_time datetime,profit_time datetime,profit_mode int(11),opt_code varchar(32),opt_name varchar(32),acct_circle varchar(32),process_state int(11),parent_id varchar(32),keep_account_from_user_id varchar(32),keep_account_from_bm_user_id varchar(32),keep_account_user_id varchar(32),keep_account_bm_user_id varchar(32),biz_type int(11),remark varchar(32),contribute_user_id varchar(32),relation_brand_owner_id varchar(32),PRIMARY KEY (id) USING BTREE
);CREATE TABLE cloud_test.dws_profit_record_hdj_flink_api (id varchar(32),show_profit_id varchar(32),order_no varchar(32),from_user_id varchar(32),from_user_type int(11),user_id varchar(32),user_type int(11),amount int(11),profit_time datetime,state int(11),acct_circle varchar(32),biz_type int(11),contribute_user_id varchar(32),relation_brand_owner_id varchar(32),remark varchar(32),add_time datetime,PRIMARY KEY (id) USING BTREE);2.3 遇到的坑 用 JDBC 连接 Mysql 的时候报错The MySQL server has a timezone offset (0 seconds ahead of UTC) 原因从错误即可知道是时区的错误。
show variables like %time_zone%;
Variable_name |Value |
----------------------
time_zone |SYSTEM|// 或者下面这条命令
SELECT global.time_zone;解决使用 root 用户登录 mysql再执行 set global time_zone8:00 命令。 注意一开始改成了 SET GLOBAL time_zone Asia/Shanghai但并不好使。
2.4 测试 Idea 启动程序后在 oc_settle_profit 表中插入数据后 dws_profit_record_hdj_flink_api 也可以同步插入相应的数据。
参考 【博学谷学习记录】超强总结用心分享|大数据之flinkCDC 一次打通FlinkCDC同步Mysql数据
三、番外 用 Flink CDC 可以监控 Mysql但无法监控 StarRocks和官方询问过目前 StarRocks 并没有像 Mysql 这样被外部感知 DDL 操作的 bin-log 功能所以暂时还无法用 Flink CDC 监控 StarRocks。