佛山网站优化步骤,大连做网站 智域,宁波seo品牌推广排名,给网站做绝对路径相关文章
【数仓】基本概念、知识普及、核心技术【数仓】数据分层概念以及相关逻辑【数仓】Hadoop软件安装及使用#xff08;集群配置#xff09;【数仓】Hadoop集群配置常用参数说明【数仓】zookeeper软件安装及集群配置【数仓】kafka软件安装及集群配置【数仓】flume软件安…相关文章
【数仓】基本概念、知识普及、核心技术【数仓】数据分层概念以及相关逻辑【数仓】Hadoop软件安装及使用集群配置【数仓】Hadoop集群配置常用参数说明【数仓】zookeeper软件安装及集群配置【数仓】kafka软件安装及集群配置【数仓】flume软件安装及配置【数仓】flume常见配置总结以及示例【数仓】Maxwell软件安装及配置采集mysql数据【数仓】通过Flumekafka采集日志数据存储到Hadoop【数仓】DataX软件安装及配置从mysql同步到hdfs
DataX的任务脚本job.json格式基本类似而且我们在实际同步过程中通常都是一个表对应一个job那么如果需要同步的表非常多的话需要编写的job.json文件也非常多。既然是类似文件结构那么我们就有办法通过程序自动生成相关的job.json文件。
居于以上考虑有了下面的SpringBoot项目自动生成job.json的程序
一、job 配置说明
DataX的job配置中的reader、writer和setting是构成数据同步任务的关键组件。
1、reader
reader是数据同步任务中的数据源读取配置部分用于指定从哪个数据源读取数据以及如何读取数据。它通常包含以下关键信息
name: 读取插件的名称如mysqlreader、hdfsreader等用于指定从哪种类型的数据源读取数据。parameter: 具体的读取参数配置包括数据源连接信息、读取的表或文件路径、字段信息等。
示例 假设要从MySQL数据库读取数据reader的配置可能如下
reader: {name: mysqlreader,parameter: {username: root,password: password,column: [id, name, age],connection: [{jdbcUrl: jdbc:mysql://localhost:3306/test_db,table: [test_table]}]}
}2、writer
writer是数据同步任务中的目标数据源写入配置部分用于指定将数据写入哪个目标数据源以及如何写入数据。它通常包含以下关键信息
name: 写入插件的名称如mysqlwriter、hdfswriter等用于指定将数据写入哪种类型的数据源。parameter: 具体的写入参数配置包括目标数据源连接信息、写入的表或文件路径、字段映射等。
示例 假设要将数据写入HDFSwriter的配置可能如下
writer: {name: hdfswriter,parameter: {writeMode: append,fieldDelimiter: ,,compress: gzip,column: [{name: id, type: int}, {name: name, type: string}, {name: age, type: int}],connection: [{hdfsUrl: hdfs://localhost:9000,file: [/user/hive/warehouse/test_table]}]}
}3、setting
setting是数据同步任务的全局设置部分用于配置影响整个任务行为的参数。它通常包含以下关键信息
speed: 控制数据同步的速度和并发度包括通道数channel和每个通道的数据传输速度如byte。errorLimit: 设置数据同步过程中的错误容忍度包括允许出错的记录数record和错误率percentage。
示例 一个典型的setting配置可能如下
setting: {speed: {channel: 3, // 并发通道数byte: 1048576 // 每个通道的数据传输速度单位是字节1MB},errorLimit: {record: 0, // 允许出错的记录数percentage: 0.02 // 允许出错的记录数占总记录数的百分比}
}综上所述reader、writer和setting三个部分共同构成了DataX数据同步任务的配置文件。通过合理配置这些部分用户可以灵活地定义数据源、目标数据源以及数据同步的行为和性能。在实际应用中用户应根据具体的数据源类型、目标数据源类型和数据同步需求来填写和调整这些配置。
二、示例从mysql同步到hdfs
该配置文件定义了从一个 MySQL 数据库读取数据并将这些数据写入到 HDFS 的过程。
{job: {content: [{reader: {name: mysqlreader, parameter: {column: [id,name,msg,create_time,status,last_login_time], connection: [{jdbcUrl: [jdbc:mysql://192.168.56.1:3306/user?characterEncodingUTF-8useUnicodetrueuseSSLfalsetinyInt1isBitfalseallowPublicKeyRetrievaltrueserverTimezoneAsia/Shanghai], table: [t_user]}], password: password, username: test, where: id3}}, writer: {name: hdfswriter, parameter: {column: [{name:id,type:bigint},{name:name,type:string},{name:msg,type:string},{name:create_time,type:date},{name:status,type:string},{name:last_login_time,type:date}], compress: gzip, defaultFS: hdfs://hadoop131:9000, fieldDelimiter: \t, fileName: mysql2hdfs01, fileType: text, path: /mysql2hdfs, writeMode: append}}}], setting: {speed: {channel: 1}}}
}参考 mysqlreader参考 hdfswriter
三、通过SpringBoot项目自动生成job文件
本例使用SpringBoot 3.0 结合 JDBC 读取mysql数据库表结构信息生成job.json文件
1、创建SpringBoot项目添加pom依赖以及配置
1增加pom.xml依赖jar包
!-- Spring Boot JDBC Starter --
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-jdbc/artifactId
/dependency
!-- MySQL JDBC Driver --
dependencygroupIdcom.mysql/groupIdartifactIdmysql-connector-j/artifactId
/dependency
dependencygroupIdcn.hutool/groupIdartifactIdhutool-all/artifactIdversion5.8.25/version
/dependency2增加application.properties配置项
server.port8080
# mysql 数据库链接
spring.datasource.urljdbc:mysql://127.0.0.1:3306/user?characterEncodingUTF-8useUnicodetrueuseSSLfalsetinyInt1isBitfalseallowPublicKeyRetrievaltrueserverTimezoneAsia/Shanghai
spring.datasource.usernametest
spring.datasource.passwordpassword
spring.datasource.driver-class-namecom.mysql.cj.jdbc.Driver# datax 相关配置在生成文件时使用
datax.hdfs.defaultFShdfs://hadoop131:9000
datax.hdfs.path/origin_data
# 需要生成job文件的表多个用逗号隔开
datax.mysql.tablest_user,t_user_test,t_sys_dict
# job文件存储位置
datax.savepathd:/temp/2、按照job.json格式创建好各个 vo
1基础结构vo
Data
public class DataxJobRoot {private Job job;
}
Data
public class Job {private ListContent content;private Setting setting new Setting();
}
Data
public class Content {private Reader reader;private Writer writer;
}
Data
public class Setting {private Speed speed new Speed();Datapublic static class Speed {private String channel 1;}
}
Data
public class Reader {private String name;private Parameter parameter;
}
Data
public class Writer {private String name;private Parameter parameter;Datapublic static class MysqlParameter {private ListString column;private ListConnection connection;private String password;private String username;private String writeMode replace;}Datapublic static class Connection {private String jdbcUrl;private ListString table;}
}public class Parameter {
}2mysql2hdfs的vo实现类
EqualsAndHashCode(callSuper true)
Data
public class MysqlReader extends Reader {public String getName() {return mysqlreader;}EqualsAndHashCode(callSuper true)Datapublic static class MysqlParameter extends Parameter {private ListString column;private ListConnection connection;private String password;private String username;private String where;}Datapublic static class Connection {private ListString jdbcUrl;private ListString table;}
}EqualsAndHashCode(callSuper true)
Data
public class HdfsWriter extends Writer {public String getName() {return hdfswriter;}EqualsAndHashCode(callSuper true)Datapublic static class HdfsParameter extends Parameter {private ListColumn column;private String compress gzip;private String encoding UTF-8;private String defaultFS;private String fieldDelimiter \t;private String fileName;private String fileType text;private String path;private String writeMode append;}Datapublic static class Column {String name;String type;}
}3hdfs2mysql的vo实现类
EqualsAndHashCode(callSuper true)
Data
public class HdfsReader extends Reader {Overridepublic String getName() {return hdfsreader;}public HdfsParameter getParameter() {return new HdfsParameter();}EqualsAndHashCode(callSuper true)Datapublic static class HdfsParameter extends Parameter {private ListString column Collections.singletonList(*);private String compress gzip;private String encoding UTF-8;private String defaultFS;private String fieldDelimiter \t;private String fileName;private String fileType text;private String path;private String nullFormat \\N;}
}
EqualsAndHashCode(callSuper true)
Data
public class MysqlWriter extends Writer {public String getName() {return mysqlwriter;}public MysqlParameter getParameter() {return new MysqlParameter();}EqualsAndHashCode(callSuper true)Datapublic static class MysqlParameter extends Parameter {private ListString column;private ListConnection connection;private String password;private String username;private String writeMode replace;}Datapublic static class Connection {private String jdbcUrl;private ListString table;}
}3、创建Repository、Service类读取数据库表结构
Repository
public class DatabaseInfoRepository {private final JdbcTemplate jdbcTemplate;Autowiredpublic DatabaseInfoRepository(JdbcTemplate jdbcTemplate) {this.jdbcTemplate jdbcTemplate;}// 获取所有表名public ListString getAllTableNames() {String sql SHOW TABLES;return jdbcTemplate.queryForList(sql, String.class);}// 根据表名获取字段信息public ListMapString, Object getTableColumns(String tableName) {String sql SHOW FULL COLUMNS FROM tableName;return jdbcTemplate.queryForList(sql);}
}Service
public class DatabaseInfoService {private final DatabaseInfoRepository databaseInfoRepository;Autowiredpublic DatabaseInfoService(DatabaseInfoRepository databaseInfoRepository) {this.databaseInfoRepository databaseInfoRepository;}public void printAllTablesAndColumns() {// 获取所有表名ListString tableNames databaseInfoRepository.getAllTableNames();// 遍历表名获取并打印每个表的字段信息for (String tableName : tableNames) {System.out.println(Table: tableName);// 获取当前表的字段信息ListMapString, Object columns databaseInfoRepository.getTableColumns(tableName);// 遍历字段信息并打印for (MapString, Object column : columns) {System.out.println( Column: column.get(Field) (Type: column.get(Type) ) (Comment: column.get(Comment) ));}System.out.println(); // 打印空行作为分隔}}/** 查询指定表的所有字段列表 */public ListString getColumns(String tableName) {ListString list new ArrayList();// 获取当前表的字段信息ListMapString, Object columns databaseInfoRepository.getTableColumns(tableName);// 遍历字段信息并打印for (MapString, Object column : columns) {list.add(column.get(Field).toString());}return list;}/** 查询指定表的所有字段列表封装成HdfsWriter格式 */public ListHdfsWriter.Column getHdfsColumns(String tableName) {ListHdfsWriter.Column list new ArrayList();// 获取当前表的字段信息ListMapString, Object columns databaseInfoRepository.getTableColumns(tableName);// 遍历字段信息并打印for (MapString, Object column : columns) {String name column.get(Field).toString();String typeDb column.get(Type).toString();String type string;if (typeDb.equals(bigint)) {type bigint;} else if (typeDb.startsWith(varchar)) {type string;} else if (typeDb.startsWith(date) || typeDb.endsWith(timestamp)) {type date;}HdfsWriter.Column columnHdfs new HdfsWriter.Column();columnHdfs.setName(name);columnHdfs.setType(type);list.add(columnHdfs);}return list;}
}4、创建Service生成job.json文件
Service
public class GenHdfs2mysqlJsonService {Value(${spring.datasource.url})private String url;Value(${spring.datasource.password})private String password;Value(${spring.datasource.username})private String username;Value(${datax.mysql.tables})private String tables;Value(${datax.hdfs.defaultFS})private String defaultFS;Value(${datax.hdfs.path})private String path;Value(${datax.savepath})private String savepath;Autowiredprivate DatabaseInfoService databaseInfoService;/*** 生成 hdfs2mysql的job.json* param table*/public void genHdfs2mysqlJson(String table) {DataxJobRoot root new DataxJobRoot();Job job new Job();root.setJob(job);Content content new Content();HdfsReader reader new HdfsReader();MysqlWriter writer new MysqlWriter();content.setReader(reader);content.setWriter(writer);job.setContent(Collections.singletonList(content));HdfsReader.HdfsParameter hdfsParameter reader.getParameter();hdfsParameter.setPath(path);hdfsParameter.setFileName(table _hdfs);hdfsParameter.setDefaultFS(defaultFS);MysqlWriter.MysqlParameter mysqlParameter writer.getParameter();mysqlParameter.setPassword(password);mysqlParameter.setUsername(username);ListString columns databaseInfoService.getColumns(table);mysqlParameter.setColumn(columns);MysqlWriter.Connection connection new MysqlWriter.Connection();connection.setJdbcUrl(url);connection.setTable(Collections.singletonList(table));mysqlParameter.setConnection(Collections.singletonList(connection));String jsonStr JSONUtil.parse(root).toJSONString(2);System.out.println(jsonStr);File file FileUtil.file(savepath, table _h2m.json);FileUtil.appendString(jsonStr, file, utf-8);}/*** 生成 mysql2hdfs 的job.json* param table*/public void genMysql2HdfsJson(String table) {DataxJobRoot root new DataxJobRoot();Job job new Job();root.setJob(job);Content content new Content();HdfsWriter writer new HdfsWriter();MysqlReader reader new MysqlReader();content.setReader(reader);content.setWriter(writer);job.setContent(Collections.singletonList(content));HdfsWriter.HdfsParameter hdfsParameter new HdfsWriter.HdfsParameter();writer.setParameter(hdfsParameter);hdfsParameter.setPath(path);hdfsParameter.setFileName(table _hdfs);hdfsParameter.setDefaultFS(defaultFS);ListHdfsWriter.Column lstColumns databaseInfoService.getHdfsColumns(table);hdfsParameter.setColumn(lstColumns);MysqlReader.MysqlParameter mysqlParameter new MysqlReader.MysqlParameter();reader.setParameter(mysqlParameter);mysqlParameter.setPassword(password);mysqlParameter.setUsername(username);ListString columns databaseInfoService.getColumns(table);mysqlParameter.setColumn(columns);MysqlReader.Connection connection new MysqlReader.Connection();connection.setJdbcUrl(Collections.singletonList(url));connection.setTable(Collections.singletonList(table));mysqlParameter.setConnection(Collections.singletonList(connection));String jsonStr JSONUtil.parse(root).toJSONString(2);System.out.println(jsonStr);File file FileUtil.file(savepath, table _m2h.json);FileUtil.appendString(jsonStr, file, utf-8);}public void genAllTable() {Splitter.on(,).split(tables).forEach(this::genMysql2HdfsJson);}}5、执行测试
调用genAllTable()方法在配置的存储目录中自动生成每个表的job.json文件结构示例如下
{job: {content: [{reader: {name: mysqlreader,parameter: {column: [id,name,msg,create_time,last_login_time,status],connection: [{jdbcUrl: [jdbc:mysql://127.0.0.1:3306/user?characterEncodingUTF-8useUnicodetrueuseSSLfalsetinyInt1isBitfalseallowPublicKeyRetrievaltrueserverTimezoneAsia/Shanghai],table: [t_user]}],password: password,username: test}},writer: {name: hdfswriter,parameter: {column: [{name: id,type: bigint},{name: name,type: string},{name: msg,type: string},{name: create_time,type: date},{name: last_login_time,type: date},{name: status,type: bigint}],compress: gzip,encoding: UTF-8,defaultFS: hdfs://hadoop131:9000,fieldDelimiter: \t,fileName: t_user_hdfs,fileType: text,path: /origin_data,writeMode: append}}}],setting: {speed: {channel: 1}}}
}至此通过SpringBoot项目自动生成DataX的job.json文件功能完成
其中细节以及其他的reader\writer转换可以按照例子实现。
参考
【数仓】DataX软件安装及配置从mysql同步到hdfshttps://github.com/alibaba/DataX/blob/master/userGuid.md