当前位置: 首页 > news >正文

佛山网站优化步骤大连做网站 智域

佛山网站优化步骤,大连做网站 智域,宁波seo品牌推广排名,给网站做绝对路径相关文章 【数仓】基本概念、知识普及、核心技术【数仓】数据分层概念以及相关逻辑【数仓】Hadoop软件安装及使用#xff08;集群配置#xff09;【数仓】Hadoop集群配置常用参数说明【数仓】zookeeper软件安装及集群配置【数仓】kafka软件安装及集群配置【数仓】flume软件安…相关文章 【数仓】基本概念、知识普及、核心技术【数仓】数据分层概念以及相关逻辑【数仓】Hadoop软件安装及使用集群配置【数仓】Hadoop集群配置常用参数说明【数仓】zookeeper软件安装及集群配置【数仓】kafka软件安装及集群配置【数仓】flume软件安装及配置【数仓】flume常见配置总结以及示例【数仓】Maxwell软件安装及配置采集mysql数据【数仓】通过Flumekafka采集日志数据存储到Hadoop【数仓】DataX软件安装及配置从mysql同步到hdfs DataX的任务脚本job.json格式基本类似而且我们在实际同步过程中通常都是一个表对应一个job那么如果需要同步的表非常多的话需要编写的job.json文件也非常多。既然是类似文件结构那么我们就有办法通过程序自动生成相关的job.json文件。 居于以上考虑有了下面的SpringBoot项目自动生成job.json的程序 一、job 配置说明 DataX的job配置中的reader、writer和setting是构成数据同步任务的关键组件。 1、reader reader是数据同步任务中的数据源读取配置部分用于指定从哪个数据源读取数据以及如何读取数据。它通常包含以下关键信息 name: 读取插件的名称如mysqlreader、hdfsreader等用于指定从哪种类型的数据源读取数据。parameter: 具体的读取参数配置包括数据源连接信息、读取的表或文件路径、字段信息等。 示例 假设要从MySQL数据库读取数据reader的配置可能如下 reader: {name: mysqlreader,parameter: {username: root,password: password,column: [id, name, age],connection: [{jdbcUrl: jdbc:mysql://localhost:3306/test_db,table: [test_table]}]} }2、writer writer是数据同步任务中的目标数据源写入配置部分用于指定将数据写入哪个目标数据源以及如何写入数据。它通常包含以下关键信息 name: 写入插件的名称如mysqlwriter、hdfswriter等用于指定将数据写入哪种类型的数据源。parameter: 具体的写入参数配置包括目标数据源连接信息、写入的表或文件路径、字段映射等。 示例 假设要将数据写入HDFSwriter的配置可能如下 writer: {name: hdfswriter,parameter: {writeMode: append,fieldDelimiter: ,,compress: gzip,column: [{name: id, type: int}, {name: name, type: string}, {name: age, type: int}],connection: [{hdfsUrl: hdfs://localhost:9000,file: [/user/hive/warehouse/test_table]}]} }3、setting setting是数据同步任务的全局设置部分用于配置影响整个任务行为的参数。它通常包含以下关键信息 speed: 控制数据同步的速度和并发度包括通道数channel和每个通道的数据传输速度如byte。errorLimit: 设置数据同步过程中的错误容忍度包括允许出错的记录数record和错误率percentage。 示例 一个典型的setting配置可能如下 setting: {speed: {channel: 3, // 并发通道数byte: 1048576 // 每个通道的数据传输速度单位是字节1MB},errorLimit: {record: 0, // 允许出错的记录数percentage: 0.02 // 允许出错的记录数占总记录数的百分比} }综上所述reader、writer和setting三个部分共同构成了DataX数据同步任务的配置文件。通过合理配置这些部分用户可以灵活地定义数据源、目标数据源以及数据同步的行为和性能。在实际应用中用户应根据具体的数据源类型、目标数据源类型和数据同步需求来填写和调整这些配置。 二、示例从mysql同步到hdfs 该配置文件定义了从一个 MySQL 数据库读取数据并将这些数据写入到 HDFS 的过程。 {job: {content: [{reader: {name: mysqlreader, parameter: {column: [id,name,msg,create_time,status,last_login_time], connection: [{jdbcUrl: [jdbc:mysql://192.168.56.1:3306/user?characterEncodingUTF-8useUnicodetrueuseSSLfalsetinyInt1isBitfalseallowPublicKeyRetrievaltrueserverTimezoneAsia/Shanghai], table: [t_user]}], password: password, username: test, where: id3}}, writer: {name: hdfswriter, parameter: {column: [{name:id,type:bigint},{name:name,type:string},{name:msg,type:string},{name:create_time,type:date},{name:status,type:string},{name:last_login_time,type:date}], compress: gzip, defaultFS: hdfs://hadoop131:9000, fieldDelimiter: \t, fileName: mysql2hdfs01, fileType: text, path: /mysql2hdfs, writeMode: append}}}], setting: {speed: {channel: 1}}} }参考 mysqlreader参考 hdfswriter 三、通过SpringBoot项目自动生成job文件 本例使用SpringBoot 3.0 结合 JDBC 读取mysql数据库表结构信息生成job.json文件 1、创建SpringBoot项目添加pom依赖以及配置 1增加pom.xml依赖jar包 !-- Spring Boot JDBC Starter -- dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-jdbc/artifactId /dependency !-- MySQL JDBC Driver -- dependencygroupIdcom.mysql/groupIdartifactIdmysql-connector-j/artifactId /dependency dependencygroupIdcn.hutool/groupIdartifactIdhutool-all/artifactIdversion5.8.25/version /dependency2增加application.properties配置项 server.port8080 # mysql 数据库链接 spring.datasource.urljdbc:mysql://127.0.0.1:3306/user?characterEncodingUTF-8useUnicodetrueuseSSLfalsetinyInt1isBitfalseallowPublicKeyRetrievaltrueserverTimezoneAsia/Shanghai spring.datasource.usernametest spring.datasource.passwordpassword spring.datasource.driver-class-namecom.mysql.cj.jdbc.Driver# datax 相关配置在生成文件时使用 datax.hdfs.defaultFShdfs://hadoop131:9000 datax.hdfs.path/origin_data # 需要生成job文件的表多个用逗号隔开 datax.mysql.tablest_user,t_user_test,t_sys_dict # job文件存储位置 datax.savepathd:/temp/2、按照job.json格式创建好各个 vo 1基础结构vo Data public class DataxJobRoot {private Job job; } Data public class Job {private ListContent content;private Setting setting new Setting(); } Data public class Content {private Reader reader;private Writer writer; } Data public class Setting {private Speed speed new Speed();Datapublic static class Speed {private String channel 1;} } Data public class Reader {private String name;private Parameter parameter; } Data public class Writer {private String name;private Parameter parameter;Datapublic static class MysqlParameter {private ListString column;private ListConnection connection;private String password;private String username;private String writeMode replace;}Datapublic static class Connection {private String jdbcUrl;private ListString table;} }public class Parameter { }2mysql2hdfs的vo实现类 EqualsAndHashCode(callSuper true) Data public class MysqlReader extends Reader {public String getName() {return mysqlreader;}EqualsAndHashCode(callSuper true)Datapublic static class MysqlParameter extends Parameter {private ListString column;private ListConnection connection;private String password;private String username;private String where;}Datapublic static class Connection {private ListString jdbcUrl;private ListString table;} }EqualsAndHashCode(callSuper true) Data public class HdfsWriter extends Writer {public String getName() {return hdfswriter;}EqualsAndHashCode(callSuper true)Datapublic static class HdfsParameter extends Parameter {private ListColumn column;private String compress gzip;private String encoding UTF-8;private String defaultFS;private String fieldDelimiter \t;private String fileName;private String fileType text;private String path;private String writeMode append;}Datapublic static class Column {String name;String type;} }3hdfs2mysql的vo实现类 EqualsAndHashCode(callSuper true) Data public class HdfsReader extends Reader {Overridepublic String getName() {return hdfsreader;}public HdfsParameter getParameter() {return new HdfsParameter();}EqualsAndHashCode(callSuper true)Datapublic static class HdfsParameter extends Parameter {private ListString column Collections.singletonList(*);private String compress gzip;private String encoding UTF-8;private String defaultFS;private String fieldDelimiter \t;private String fileName;private String fileType text;private String path;private String nullFormat \\N;} } EqualsAndHashCode(callSuper true) Data public class MysqlWriter extends Writer {public String getName() {return mysqlwriter;}public MysqlParameter getParameter() {return new MysqlParameter();}EqualsAndHashCode(callSuper true)Datapublic static class MysqlParameter extends Parameter {private ListString column;private ListConnection connection;private String password;private String username;private String writeMode replace;}Datapublic static class Connection {private String jdbcUrl;private ListString table;} }3、创建Repository、Service类读取数据库表结构 Repository public class DatabaseInfoRepository {private final JdbcTemplate jdbcTemplate;Autowiredpublic DatabaseInfoRepository(JdbcTemplate jdbcTemplate) {this.jdbcTemplate jdbcTemplate;}// 获取所有表名public ListString getAllTableNames() {String sql SHOW TABLES;return jdbcTemplate.queryForList(sql, String.class);}// 根据表名获取字段信息public ListMapString, Object getTableColumns(String tableName) {String sql SHOW FULL COLUMNS FROM tableName;return jdbcTemplate.queryForList(sql);} }Service public class DatabaseInfoService {private final DatabaseInfoRepository databaseInfoRepository;Autowiredpublic DatabaseInfoService(DatabaseInfoRepository databaseInfoRepository) {this.databaseInfoRepository databaseInfoRepository;}public void printAllTablesAndColumns() {// 获取所有表名ListString tableNames databaseInfoRepository.getAllTableNames();// 遍历表名获取并打印每个表的字段信息for (String tableName : tableNames) {System.out.println(Table: tableName);// 获取当前表的字段信息ListMapString, Object columns databaseInfoRepository.getTableColumns(tableName);// 遍历字段信息并打印for (MapString, Object column : columns) {System.out.println( Column: column.get(Field) (Type: column.get(Type) ) (Comment: column.get(Comment) ));}System.out.println(); // 打印空行作为分隔}}/** 查询指定表的所有字段列表 */public ListString getColumns(String tableName) {ListString list new ArrayList();// 获取当前表的字段信息ListMapString, Object columns databaseInfoRepository.getTableColumns(tableName);// 遍历字段信息并打印for (MapString, Object column : columns) {list.add(column.get(Field).toString());}return list;}/** 查询指定表的所有字段列表封装成HdfsWriter格式 */public ListHdfsWriter.Column getHdfsColumns(String tableName) {ListHdfsWriter.Column list new ArrayList();// 获取当前表的字段信息ListMapString, Object columns databaseInfoRepository.getTableColumns(tableName);// 遍历字段信息并打印for (MapString, Object column : columns) {String name column.get(Field).toString();String typeDb column.get(Type).toString();String type string;if (typeDb.equals(bigint)) {type bigint;} else if (typeDb.startsWith(varchar)) {type string;} else if (typeDb.startsWith(date) || typeDb.endsWith(timestamp)) {type date;}HdfsWriter.Column columnHdfs new HdfsWriter.Column();columnHdfs.setName(name);columnHdfs.setType(type);list.add(columnHdfs);}return list;} }4、创建Service生成job.json文件 Service public class GenHdfs2mysqlJsonService {Value(${spring.datasource.url})private String url;Value(${spring.datasource.password})private String password;Value(${spring.datasource.username})private String username;Value(${datax.mysql.tables})private String tables;Value(${datax.hdfs.defaultFS})private String defaultFS;Value(${datax.hdfs.path})private String path;Value(${datax.savepath})private String savepath;Autowiredprivate DatabaseInfoService databaseInfoService;/*** 生成 hdfs2mysql的job.json* param table*/public void genHdfs2mysqlJson(String table) {DataxJobRoot root new DataxJobRoot();Job job new Job();root.setJob(job);Content content new Content();HdfsReader reader new HdfsReader();MysqlWriter writer new MysqlWriter();content.setReader(reader);content.setWriter(writer);job.setContent(Collections.singletonList(content));HdfsReader.HdfsParameter hdfsParameter reader.getParameter();hdfsParameter.setPath(path);hdfsParameter.setFileName(table _hdfs);hdfsParameter.setDefaultFS(defaultFS);MysqlWriter.MysqlParameter mysqlParameter writer.getParameter();mysqlParameter.setPassword(password);mysqlParameter.setUsername(username);ListString columns databaseInfoService.getColumns(table);mysqlParameter.setColumn(columns);MysqlWriter.Connection connection new MysqlWriter.Connection();connection.setJdbcUrl(url);connection.setTable(Collections.singletonList(table));mysqlParameter.setConnection(Collections.singletonList(connection));String jsonStr JSONUtil.parse(root).toJSONString(2);System.out.println(jsonStr);File file FileUtil.file(savepath, table _h2m.json);FileUtil.appendString(jsonStr, file, utf-8);}/*** 生成 mysql2hdfs 的job.json* param table*/public void genMysql2HdfsJson(String table) {DataxJobRoot root new DataxJobRoot();Job job new Job();root.setJob(job);Content content new Content();HdfsWriter writer new HdfsWriter();MysqlReader reader new MysqlReader();content.setReader(reader);content.setWriter(writer);job.setContent(Collections.singletonList(content));HdfsWriter.HdfsParameter hdfsParameter new HdfsWriter.HdfsParameter();writer.setParameter(hdfsParameter);hdfsParameter.setPath(path);hdfsParameter.setFileName(table _hdfs);hdfsParameter.setDefaultFS(defaultFS);ListHdfsWriter.Column lstColumns databaseInfoService.getHdfsColumns(table);hdfsParameter.setColumn(lstColumns);MysqlReader.MysqlParameter mysqlParameter new MysqlReader.MysqlParameter();reader.setParameter(mysqlParameter);mysqlParameter.setPassword(password);mysqlParameter.setUsername(username);ListString columns databaseInfoService.getColumns(table);mysqlParameter.setColumn(columns);MysqlReader.Connection connection new MysqlReader.Connection();connection.setJdbcUrl(Collections.singletonList(url));connection.setTable(Collections.singletonList(table));mysqlParameter.setConnection(Collections.singletonList(connection));String jsonStr JSONUtil.parse(root).toJSONString(2);System.out.println(jsonStr);File file FileUtil.file(savepath, table _m2h.json);FileUtil.appendString(jsonStr, file, utf-8);}public void genAllTable() {Splitter.on(,).split(tables).forEach(this::genMysql2HdfsJson);}}5、执行测试 调用genAllTable()方法在配置的存储目录中自动生成每个表的job.json文件结构示例如下 {job: {content: [{reader: {name: mysqlreader,parameter: {column: [id,name,msg,create_time,last_login_time,status],connection: [{jdbcUrl: [jdbc:mysql://127.0.0.1:3306/user?characterEncodingUTF-8useUnicodetrueuseSSLfalsetinyInt1isBitfalseallowPublicKeyRetrievaltrueserverTimezoneAsia/Shanghai],table: [t_user]}],password: password,username: test}},writer: {name: hdfswriter,parameter: {column: [{name: id,type: bigint},{name: name,type: string},{name: msg,type: string},{name: create_time,type: date},{name: last_login_time,type: date},{name: status,type: bigint}],compress: gzip,encoding: UTF-8,defaultFS: hdfs://hadoop131:9000,fieldDelimiter: \t,fileName: t_user_hdfs,fileType: text,path: /origin_data,writeMode: append}}}],setting: {speed: {channel: 1}}} }至此通过SpringBoot项目自动生成DataX的job.json文件功能完成 其中细节以及其他的reader\writer转换可以按照例子实现。 参考 【数仓】DataX软件安装及配置从mysql同步到hdfshttps://github.com/alibaba/DataX/blob/master/userGuid.md
http://www.w-s-a.com/news/141912/

相关文章:

  • 郑州网站建设公司哪家专业好如何注册一家公司
  • 证券投资网站做哪些内容滨州论坛网站建设
  • 重庆网站建设公司模板广东佛山
  • 中展建设股份有限公司网站做网站备案是什么意思
  • 石家庄网站建设接单wordpress功能小工具
  • 有没有专门做网站的网站镜像上传到域名空间
  • 网站建设中 windows买域名自己做网站
  • 设计英语宁波seo做排名
  • 奉贤网站建设上海站霸深圳几个区
  • c#做网站自已建网站
  • 成都地区网站建设网站设计类型
  • 如何做网站结构优化北京响应式网站
  • 出售源码的网站威海住房建设局网站
  • 网站建设补充报价单网站建设 技术指标
  • 做网站费用分摊入什么科目做网络网站需要三证么
  • 房屋备案查询系统官网杭州排名优化软件
  • 网站地图html网络营销的流程和方法
  • 注册好网站以后怎么做wordpress 获取插件目录下
  • 南京做网站dmooo地方网站需要什么手续
  • 网站开发合同有效期omeka wordpress对比
  • 杭州设计网站的公司广州网站改版领军企业
  • 网站备案系统苏州网站设计网站开发公司
  • 怎么样做微网站著名企业vi设计
  • 三分钟做网站网页设计心得体会100字
  • 网站建设支付宝seo建站是什么
  • 常州做网站的 武进学雷锋_做美德少年网站
  • 怎样建网站赚钱贵州seo和网络推广
  • 创建网站的工具站内seo优化
  • 网站特效 站长查询网网站
  • 百度移动端网站网站建设设计思想