网站推广计划书包含哪些内容,计算机网站建设与维护,国内最炫酷的网站,济南网络优化哪家专业大数据Hadoop之——数据同步工具DataX数据采集工具-DataX datax详细介绍及使用
一、概述 DataX 是阿里云DataWorks数据集成的开源版本#xff0c;在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、…大数据Hadoop之——数据同步工具DataX数据采集工具-DataX datax详细介绍及使用
一、概述 DataX 是阿里云DataWorks数据集成的开源版本在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。 Giteegithub.com/alibaba/Dat…
GitHub地址github.com/alibaba/Dat…
文档github.com/alibaba/Dat… DataX 是一个异构数据源离线同步工具致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。 为了解决异构数据源同步问题DataX将复杂的网状的同步链路变成了星型数据链路DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候只需要将此数据源对接到DataX便能跟已有的数据源做到无缝数据同步。DataX在阿里巴巴集团内被广泛使用承担了所有大数据的离线同步业务并已持续稳定运行了6年之久。目前每天完成同步8w多道作业每日传输数据量超过300TB。
二、DataX3.0框架设计 DataX本身作为离线数据同步框架采用Framework plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件纳入到整个同步框架中。 ReaderReader为数据采集模块负责采集数据源的数据将数据发送给Framework。Writer Writer为数据写入模块负责不断向Framework取数据并将数据写入到目的端。FrameworkFramework用于连接reader和writer作为两者的数据传输通道并处理缓冲流控并发数据转换等核心技术问题。
2.1 核心流程架构图 2.2 核心流程
job 是 Datax 一次任务的统称DataX JobContainer 是一个运行 job 的容器是整个同步任务的管理控制中心承担了插件初始化数据清理、 数据检测、任务切分、TaskGroup 管理任务调度监控回收等功能。DataX Job 启动后会根据不同的源端切分策略将 Job 切分成多个小的 Task(子任务)实质上是在切分配置文件以便于并发执行。Task 便是 DataX 同步任务的基础单位每一个 Task 都会负责一部分数据的同步工作。切分后的 task会根据并发要求通过 schedule 方法重新组合成 TaskGroupTaskGroup 线程由一个线程池维护并监控一个 TaskGroup 默 认并发 5 个 Task。每一个 Task 都由 TaskGroup 负责启动Task 启动后会启动读写两 个线程并通过 Record 类作为媒介Reader 不断地读出数据并往传输中 转站 Channel 中存入信息还 Writer 则负责从 Channel 中读出 Record 信 息存入目标数据源。DataX 作业运行时JobContainer 会监控各个 TaskGroup 模块任务 直到所有任务完成并记录日志当有都成功后会返回 0不然会有完整的报错机制异常退出返回非 0。
三、DataX3.0架构 DataX 3.0 开源版本支持单机多线程模式完成同步作业运行本小节按一个DataX作业生命周期的时序图从整体架构设计非常简要说明DataX各个模块相互关系。 3.1 核心模块介绍
DataX完成单个数据同步的作业我们称之为JobDataX接受到一个Job之后将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。DataXJob启动后会根据不同的源端切分策略将Job切分成多个小的Task(子任务)以便于并发执行。Task便是DataX作业的最小单元每一个Task都会负责一部分数据的同步工作。切分多个Task之后DataX Job会调用Scheduler模块根据配置的并发数据量将拆分成的Task重新组合组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task默认单个任务组的并发数量为5。每一个Task都由TaskGroup负责启动Task启动后会固定启动Reader—Channel—Writer的线程来完成任务同步工作。DataX作业运行起来之后 Job监控并等待多个TaskGroup模块任务完成等待所有TaskGroup任务完成后Job成功退出。否则异常退出进程退出值非0
3.2 DataX调度流程
举例来说用户提交了一个DataX作业并且配置了20个并发目的是将一个100张分表的mysql数据同步到odpsOpen Data Processing Service开发数据处理服务里面。 DataX的调度决策思路是
DataXJob根据分库分表切分成了100个Task。根据20个并发DataX计算共需要分配4个TaskGroup。4个TaskGroup平分切分好的100个Task每一个TaskGroup负责以5个并发共计运行25个Task。
3.3 DataX3.0插件体系
经过几年积累DataX目前已经有了比较全面的插件体系主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入。DataX目前支持数据如下
类型数据源Reader(读)Writer(写)文档RDBMS 关系型数据库MySQL√√读 、写Oracle√√读 、写SQLServer√√读 、写PostgreSQL√√读 、写DRDS√√读 、写达梦√√读 、写通用RDBMS(支持所有关系型数据库)√√读 、写阿里云数仓数据存储ODPS√√读 、写ADS√写OSS√√读 、写OCS√√读 、写NoSQL数据存储OTS√√读 、写Hbase0.94√√读 、写Hbase1.1√√读 、写MongoDB√√读 、写Hive√√读 、写无结构化数据存储TxtFile√√读 、写FTP√√读 、写HDFS√√读 、写Elasticsearch√写
DataX Framework提供了简单的接口与插件交互提供简单的插件接入机制只需要任意加上一种插件就能无缝对接其他数据源。详情请看DataX数据源指南
四、环境部署
1下载
$ mkdir -p /opt/bigdata/hadoop/software/datax ; cd /opt/bigdata/hadoop/software/datax
$ wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
$ tar -xf datax.tar.gz -C /opt/bigdata/hadoop/server/2设置环境变量
$ cd /opt/bigdata/hadoop/server/
$ vi /etc/profile
export DATAX_HOME/opt/bigdata/hadoop/server/datax
export PATH$DATAX_HOME/bin:$PATH
$ source /etc/profile五、DataX优化
DataX使用、同步HDFS数据到MySQL案例、DataX优化
5.1 速度控制
DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式可以随意控制你的作业速度让你的作业在数据库可以承受的范围内达到最佳的同步速度。
关键优化参数如下
参数说明job.setting.speed.channel并发数job.setting.speed.record总record限速 (tps每秒处理的条数job.setting.speed.byte总byte限速 (bps每秒处理的字节数)core.transport.channel.speed.record单个channel的record限速默认值为1000010000条/score.transport.channel.speed.byte单个channel的byte限速默认值1024*10241M/s
注意事项
1.若配置了总record限速则必须配置单个channel的record限速
2.若配置了总byte限速则必须配置单个channe的byte限速
3.若配置了总record限速和总byte限速channel并发数参数就会失效。因为配置了总record限速和总byte限速之后实际channel并发数是通过计算得到的
计算公式为:
min(总byte限速/单个channel的byte限速总record限速/单个channel的record限速)
配置示例
{core: {transport: {channel: {speed: {byte: 1048576 //单个channel byte限速1M/s}}}},job: {setting: {speed: {byte : 5242880 //总byte限速5M/s}},...}
}5.2 内存调整
当提升DataX Job内Channel并发数时内存的占用会显著增加因为DataX作为数据交换通道在内存中会缓存较多的数据。例如Channel中会有一个Buffer作为临时的数据交换的缓冲区而在部分Reader和Writer的中也会存在一些Buffer为了防止OOM等错误需调大JVM的堆内存。
建议将内存设置为4G或者8G这个也可以根据实际情况来调整。
调整JVM xms xmx参数的两种方式一种是直接更改datax.py脚本另一种是在启动的时候加上对应的参数如下
python datax/bin/datax.py --jvm-Xms8G -Xmx8G /path/to/your/job.json六、DataX-Web
GitHub地址WeiYe-Jing/datax-web
前端地址https://github.com/WeiYe-Jing/datax-web-ui
作者开源中国博客地址https://segmentfault.com/u/weiye_jing/articles 6.1 DataX-web安装
datax datax_web避坑指南 CentOS7安装DataX和datax-web
6.2 DataX-Web使用
Datax-web 源码阅读记录
6.2.1 前端访问
部署完成后在浏览器中输入 http://ip:port/index.html 就可以访问对应的主界面ip为datax-admin部署所在服务器ip,port为为datax-admin 指定的运行端口9527初始账号admin密码123456。
6.2.2 datax-web API
datax-web部署成功后可以了解datax-web API相关内容网址: http://ip:port/doc.html
6.2.3 路由策略
当执行器集群部署时提供丰富的路由策略包括:
FIRST第一个固定选择第一个机器LAST最后一个固定选择最后一个机器ROUND轮询依次分配任务RANDOM随机随机选择在线的机器CONSISTENT_HASH一致性HASH每个任务按照Hash算法固定选择某一台机器且所有任务均匀散列在不同机器上。LEAST_FREQUENTLY_USED最不经常使用使用频率最低的机器优先被选举LEAST_RECENTLY_USED最近最久未使用最久为使用的机器优先被选举FAILOVER故障转移按照顺序依次进行心跳检测第一个心跳检测成功的机器选定为目标执行器并发起调度BUSYOVER忙碌转移按照顺序依次进行空闲检测第一个空闲检测成功的机器选定为目标执行器并发起调度
阻塞处理策略调度过于密集执行器来不及处理时的处理策略
单机串行调度请求进入单机执行器后调度请求进入FIFO队列并以串行方式运行丢弃后续调度调度请求进入单机执行器后发现执行器存在运行的调度任务本次请求将会被丢弃并标记为失败覆盖之前调度调度请求进入单机执行器后发现执行器存在运行的调度任务将会终止运行中的调度任务并清空队列然后运行本地调度任务
增量增新建议将阻塞策略设置为丢弃后续调度或者单机串行
设置单机串行时应该注意合理设置重试次数(失败重试的次数* 每次执行时间 任务的调度周期)重试的次数如果设置的过多会导致数据重复例如任务30秒执行一次每次执行时间需要20秒设置重试三次如果任务失败了第一个重试的时间段为1577755680-1577756680重试任务没结束新任务又开启那新任务的时间段会是1577755680-1577758680
6.3 DataX-Web打包部署
datax-web在windows环境idea中模块化打包部署操作步骤
6.3.1 pom.xml修改
分别在datax-admin和datax-executor下面的pom.xml文件中添加
plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-jar-plugin/artifactIdconfigurationexcludesexclude**/*.yml/excludeexclude**/*.properties/excludeexclude**/*.sh/excludeexclude**/*.xml/exclude/excludes/configuration
/plugin
plugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactIdexecutionsexecutiongoalsgoalrepackage/goal/goals/execution/executions
/plugin6.3.2 将静态资源移入jar包中
将datax-admin下的所有配置资源拷贝进datax-admin-2.1.2.jar的相关目录中。具体如下
1将idea中的datax-admin下的classes下的配置文件application.yml、bootstrap.properties、logback.xml拷贝进datax-admin-2.1.2.jar下的BOOT-INF\classes下。
2将datax-admin下的target/classes/i8n下的message.properties和message_en.properties拷贝进datax-admin-2.1.2.jar下的BOOT-INF\classes\i18n下
3将将datax-admin下的target/classes/mybatis-mapper下的所有xml文件拷贝进datax-admin-2.1.2.jar下的BOOT-INF\classes\mybatis-mapper下
4将idea中的datax-executor编译生成后的classes下的配置文件application.yml、logback.xml拷贝进datax-executor-2.1.2.jar下的BOOT-INF\classes下。
6.4 参考资料
Datax3.0DataX-Web打造分布式可视化ETL系统linux搭建datax、datax-webETL工具datax任务构建可视化管理Datax-web 集群化部署使用图文教程超详细Datax-web入门配置与启动Datax-web的入门使用数据同步工具—DataX—Web部署使用
七、DataX和Sqoop、Kettle等的比较
【知识】ETL大数据集成工具Sqoop、dataX、Kettle、Canal、StreamSets大比拼六种 主流ETL 工具的比较(DataPipelineKettleTalendInformaticaDatax Oracle Goldengate)
以下对开源的Sqoop、dataX、Kettle、Canal、StreamSetst进行简单梳理比较通过分析建议优先DataX更优。
比较维度\产品DataPipelinekettleOracle GoldengateinformaticatalendDataX设计及架构适用场景主要用于各类数据融合、数据交换场景专为超大数据量、高度复杂的数据链路设计的灵活、可扩展的数据交换平台面向数据仓库建模传统ETL工具主要用于数据备份、容灾面向数据仓库建模传统ETL工具面向数据仓库建模传统ETL工具面向数据仓库建模传统ETL工具使用方式全流程图形化界面应用端采用B/S架构Cloud Native为云而生所有操作在浏览器内就可以完成不需要额外的开发和生产发布C/S客户端模式开发和生产环境需要独立部署任务的编写、调试、修改都在本地需要发布到生产环境线上生产环境没有界面需要通过日志来调试、debug效率低费时费力没有图形化的界面操作皆为命令行方式可配置能力差C/S客户端模式开发和生产环境需要独立部署任务的编写、调试、修改都在本地需要发布到生产环境学习成本较高一般需要受过专业培训的工程师才能使用C/S客户端模式开发和生产环境需要独立部署任务的编写、调试、修改都在本地需要发布到生产环境DataX是以脚本的方式执行任务的需要完全吃透源码才可以调用学习成本高没有图形开发化界面和监控界面运维成本相对高。底层架构分布式集群高可用架构可以水平扩展到多节点支持超大数据量架构容错性高可以自动调节任务在节点之间分配适用于大数据场景主从结构非高可用扩展性差架构容错性低不适用大数据场景可做集群部署规避单点故障依赖于外部环境如Oracle RAC等schema mapping非自动可复制性比较差更新换代不是很强支持分布式部署支持单机部署和集群部署两种方式功能CDC机制基于日志、基于时间戳和自增序列等多种方式可选基于时间戳、触发器等主要是基于日志基于日志、基于时间戳和自增序列等多种方式可选基于触发器、基于时间戳和自增序列等多种方式可选离线批处理对数据库的影响基于日志的采集方式对数据库无侵入性对数据库表结构有要求存在一定侵入性源端数据库需要预留额外的缓存空间基于日志的采集方式对数据库无侵入性有侵入性通过sql select 采集数据对数据源没有侵入性自动断点续传支持不支持支持不支持依赖ETL设计的合理性例如T-1指定续读某个时间点的数据非自动不支持依赖ETL设计的合理性例如T-1指定续读某个时间点的数据非自动不支持监控预警可视化的过程监控提供多样化的图表辅助运维故障问题可实时预警依赖日志定位故障问题往往只能是后处理的方式缺少过程预警无图形化的界面预警monitor可以看到报错信息信息相对笼统定位问题仍需依赖分析日志有问题预警定位问题仍需依赖日志依赖工具日志定位故障问题没有图形化运维界面和预警机制需要自定义开发。数据清洗围绕数据质量做轻量清洗围绕数据仓库的数据需求进行建模计算清洗功能相对复杂需要手动编程轻量清洗支持复杂逻辑的清洗和转化支持复杂逻辑的清洗和转化需要根据自身清晰规则编写清洗脚本进行调用DataX3.0 提供的功能。数据转换自动化的schema mapping手动配置schema mapping需手动配置异构数据间的映射手动配置schema mapping手动配置schema mapping通过编写json脚本进行schema mapping映射特性数据实时性实时非实时实时支持实时但是主流应用都是基于时间戳等方式做批量处理实时同步效率未知实时定时应用难度低高中高中高是否需要开发否是是是是是易用性高低中低低低稳定性高低高中中中其他实施及售后服务原厂实施和售后服务开源软件需自客户自行实施、维护原厂和第三方的实施和售后服务主要为第三方的实施和售后服务分为开源版和企业版企业版可提供相应服务阿里开源代码需要客户自动实施、开发、维护
八、DataX与Kettle对比 浅测评DataX与Kettle Datax和Kettle使用场景的对比
较维度\产品KettleDataX设计及架构适用场景面向数据仓库建模传统ETL工具面向数据仓库建模传统ETL工具支持数据源多数关系型数据库少数关系型数据库和大数据非关系型数据库开发语言JavaPython、Java可视化web界面KettleOnline代码收费Kettle-manager代码免费Data-Web代码免费底层架构主从结构非高可用扩展性差架构容错性低不适用大数据场景支持单机部署和第三方调度的集群部署两种方式功能CDC机制基于时间戳、触发器等离线批处理抽取策略支持增量全量抽取支持全量抽取。不支持增量抽取要通过shell脚本自己实现对数据库的影响对数据库表结构有要求存在一定侵入性通过sql select 采集数据对数据源没有侵入性自动断点续传不支持不支持数据清洗围绕数据仓库的数据需求进行建模计算清洗功能相对复杂需要手动编程需要根据自身清晰规则编写清洗脚本进行调用DataX3.0 提供的功能。数据转换手动配置schema mapping通过编写json脚本进行schema mapping映射特性数据实时性非实时定时应用难度高高是否需要开发是是易用性低低稳定性低中抽取速度小数据量的情况下差别不大大数据量时datax比kettle快。datax对于数据库压力比较小其他实施及售后服务开源软件社区活跃度高阿里开源代码社区活跃度低
九、实战案例 dataX案例 读取mysql通过表名或自定义sql语句数据写入到hdfs中 (txt或orc存储gzip或snappy压缩)_ dataX案例 读取hdfs文件写入到mysql中 dataX案例 从Oracle中读取数据自定义sql语句存到MySQL中_ dataX案例-从mysql读取数据写入到hbase中 dataX案例-从hbase中读取数据写入到文本文件中 使用 DataX 实现数据同步高效的数据同步工具
DataX系列博文
DataX 全系列之一 —— DataX 安装和使用DataX全系列之二 —— DataX 总体架构和原理DataX 全系列之三 —— DataX 源码运行流程分析DataX 全系列之四 —— DataX 核心数据结构DataX 全系列之五 —— DataX-web 介绍和使用
9.1 Datax同步MySQL到Hive
dataX同步mysql至hive
9.1.1 前言
以下是我的一个mysql同步到Hive相关的变量都可以通过传参统一脚本处理。
{job: {setting: {speed: {channel: 3}},content: [{reader: {name: mysqlreader,parameter: {username: ${username},password: ${password},connection: [{jdbcUrl: [${jdbcUrl}],querySql: [select id,create_time,update_time from ${sourceTableName} where update_time${endTime} ]}]}},writer: {name: hdfswriter,parameter: {column: [{name: id,type: string},{name: create_time,type: string},{name: update_time,type: string}],isCompress: ${isCompress},defaultFS: ${hdfsPath},fieldDelimiter: ${fieldDelimiter},fileName: ${fileName},fileType: ${fileType},path: ${path},writeMode: ${writeMode}}}}]}
}9.1.2 参数调用和传参
使用dataX调用这个脚本。
python ${DATAX_HOME}/bin/datax.py -p-DtargetDBName$TARGET_DB_NAME -DtargetTableName$TARGET_TABLE_NAME -DjdbcUrl$MYSQL_URL -Dusername$MYSQL_USERNAME -Dpassword$MYSQL_PASSWD -DsourceTableName$SOURCE_TABLE_NAME -DhdfsPath$HDFS_PATH -DstartTime${START_TIME} -DendTime${END_TIME} -DisCompress$ISCOMPRESS -DwriteMode$WRITEMODE -DfileType$FILETYPE -DfieldDelimiter$FIELDDELIMITER -DfileName$TARGET_TABLE_NAME -Dpath${PATH_HIVE}$TARGET_DB_NAME.db/$TARGET_TABLE_NAME/day$DT_TIME $DATAX_JSON_FILE;在这个命令中你会发现将所有的变量都通过shell命令传递进去了。后续的这些变量传递我在更新。之所以这么多变量其主要是为了方便后续的脚本更新和调度运行。
对于开发人员只需要关心主要逻辑就行了。
有了这个基础脚本我们就可以将HIVE上的一些功能一起合并到shell脚本中
增量同步保留全部数据。全量同步全量同步只保留固定周期的历史全量。刷新元数据。通知更新成功。多个mysql业务库的匹配。生产业务库密码的保护。
9.1.3 封装shell调用脚本
基于上面的考虑。封装dataX的调用脚本。
#!/bin/bash
source /etc/profile
DATAX_HOME/home/data/datax
SHELL_PATH/home/data/dw_datax
SCRIPT_PATH${SHELL_PATH}/job
DATAX_LOG${SHELL_PATH}/logs/datax.log.date %Y-%m-%d
HDFS_PATHhdfs://hdfs-cluster
#START_TIME$(date -d -1 day %Y-%m-%d)
#END_TIME$(date %Y-%m-%d)
#DT_TIME$(date -d -1 day %Y%m%d)
START_TIME
END_TIME
DT_TIME
#失效日期
INVALID_DATE
#失效天数
INVALID_DAYS180
#是否清除失效数据默认清除
IS_CLEAR_INVALID_DATA1#参数
ISCOMPRESSfalse
WRITEMODEnonConflict
FIELDDELIMITER|
FILETYPEorc
PATH_HIVE/user/hive/warehouse/
MYSQL_URL
#数据库用户名
MYSQL_USERNAMEadmin
#数据库密码
MYSQL_PASSWD123456
#默认同步目标库名
TARGET_DB_NAMEods
#同步源库名
SOURCE_DB_NAME
#同步源表名
SOURCE_TABLE_NAME
#业务名称
BUSINESS_NAME
#datax json文件
DATAX_JSON_FILE/temp# 数据库实例信息
declare -A db_instance_conf
# 数据库用户名
declare -A db_instance_user_conf
# 数据库密码
declare -A db_instance_pwd_conf
# 数据库实例与库映射关系
declare -A db_instance_maps# 初始化数据库实例配置
function initInstanceConf()
{# 主业务线 ywx1db_instance_conf[db_main_data]jdbc:mysql://192.168.1.1:3306/db_instance_user_conf[db_main_data]admindb_instance_pwd_conf[db_main_data]123456# 业务线2 ywx2db_instance_conf[db_data]jdbc:mysql://192.168.1.2:3306/db_instance_user_conf[db_data]admindb_instance_pwd_conf[db_data]123456...}# 初始化库和数据库实例映射关系
function initDbAndInstanceMaps()
{#主业务线db_instance_maps[ywx1_db_main]db_main_data#业务线2db_instance_maps[ywx2_db_data]db_data#业务线3db_instance_maps[ywx3_db_insurance]db_ywx3......db_instance_maps[dss_db_dss]db_dss}#时间处理 传入参数 yyyy-mm-dd
function DateProcess()
{
echo 日期时间为$1
if echo $1 | grep -Eq [0-9]{4}-[0-9]{2}-[0-9]{2} date -d $1 %Y%m%d /dev/null 21then :START_TIME$(date -d $1 %Y-%m-%d)END_TIME$(date -d $1 1 day %Y-%m-%d)DT_TIME$(date -d $1 %Y%m%d)INVALID_DATE$(date -d $1 -$INVALID_DAYS day %Y%m%d)echo 时间正确: $START_TIME / $END_TIME / $DT_TIME / $INVALID_DATE;
elseecho 输入的日期格式不正确应为yyyy-mm-dd;exit 1;
fi;}function DataConnect()
{db_business_key$BUSINESS_NAME_$SOURCE_DB_NAMEdb_instance_key${db_instance_maps[$db_business_key]}echo $db_business_key $db_instance_keyif [ ! -n $db_instance_key ]; thenecho 当前数据库连接信息不存在请确认业务和数据库连接是否正确或联系管理员添加exit 1;fidb_instance_value${db_instance_conf[$db_instance_key]}MYSQL_USERNAME${db_instance_user_conf[$db_instance_key]}MYSQL_PASSWD${db_instance_pwd_conf[$db_instance_key]}echo $db_instance_valueif [ ! -n $db_instance_value ]; thenecho 当前数据库连接信息不存在请确认业务和数据库连接是否正确或联系管理员添加exit 1;fiMYSQL_URL$db_instance_value$SOURCE_DB_NAME
}#每天运行 执行dataX
function BaseDataxMysql2Hive()
{#清除重复同步数据分区新增分区hive -e ALTER TABLE $TARGET_DB_NAME.$TARGET_TABLE_NAME DROP IF EXISTS PARTITION(day$DT_TIME);ALTER TABLE $TARGET_DB_NAME.$TARGET_TABLE_NAME ADD IF NOT EXISTS PARTITION (day$DT_TIME);#执行同步echo 开始执行同步if ! python ${DATAX_HOME}/bin/datax.py -p-DtargetDBName$TARGET_DB_NAME -DtargetTableName$TARGET_TABLE_NAME -DjdbcUrl$MYSQL_URL -Dusername$MYSQL_USERNAME -Dpassword$MYSQL_PASSWD -DsourceTableName$SOURCE_TABLE_NAME -DhdfsPath$HDFS_PATH -DstartTime${START_TIME} -DendTime${END_TIME} -DisCompress$ISCOMPRESS -DwriteMode$WRITEMODE -DfileType$FILETYPE -DfieldDelimiter$FIELDDELIMITER -DfileName$TARGET_TABLE_NAME -Dpath${PATH_HIVE}$TARGET_DB_NAME.db/$TARGET_TABLE_NAME/pt_day$DT_TIME $DATAX_JSON_FILE;thenecho command failedexit 1;fiecho 同步结束#删除定义的失效日期数据if(($IS_CLEAR_INVALID_DATA1));thenecho 清除失效$INVALID_DATE天数的历史数据hive -e ALTER TABLE $TARGET_DB_NAME.$TARGET_TABLE_NAME DROP IF EXISTS PARTITION (pt_day${INVALID_DATE});fi#同步分区元数据#hive -e ANALYZE TABLE $TARGET_DB_NAME.$TARGET_TABLE_NAME PARTITION (day${DT_TIME}) COMPUTE STATISTICS;#删除分区数据
}function parseArgs()
{while getopts :d:ab:s:m:f:t:n:u:p: optdocase $opt ind)echo 参数d的值$OPTARGDateProcess $OPTARG;;a)IS_CLEAR_INVALID_DATA0echo 参数a的值$OPTARG;;b)echo 参数b的值$OPTARGBUSINESS_NAME$OPTARG;;m)echo 参数m的值$OPTARGSOURCE_DB_NAME$OPTARG;;s)echo 参数s的值$OPTARGSOURCE_TABLE_NAME$OPTARG;;f)echo 参数f的值$OPTARGDATAX_JSON_FILE$OPTARG;;n)echo 参数n的值$OPTARGTARGET_DB_NAME$OPTARG;;t)echo 参数t的值$OPTARGTARGET_TABLE_NAME$OPTARG;;u)echo 参数u的值$OPTARGMYSQL_USERNAME$OPTARG;;p)echo 参数t的值$OPTARGMYSQL_PASSWD$OPTARG;;?)echo 未知参数exit 1;;:)echo 没有输入任何选项 $OPTARG;;esac done
}function judgeParams()
{if [ ! -n $DT_TIME ] ;thenecho you have not input a etlDate! format {-d yyyy-mm-dd} exit 1;fiif [ ! -n $BUSINESS_NAME ] ;thenecho you have not input a businessName! incloud(xxx,xxxx,x,xx) example {-b xxx}exit 1;fiif [ ! -n $SOURCE_DB_NAME ] ;thenecho you have not input a sourceDB!exit 1;fiif [ ! -n $SOURCE_TABLE_NAME ] ;thenecho you have not input a sourceTable example {-s user_info}!exit 1;fiif [ ! -n $DATAX_JSON_FILE ] ;thenecho you have not input a dataxJson! example {-f ods_ywx1_user_info_di.json}exit 1;fiif [ ! -n $TARGET_TABLE_NAME ] ;thenecho you have not input a targetTable! example {-t ods_ywx1_user_info_di}exit 1;fi
}function startsync()
{#初始化数据库实例initInstanceConf#初始化库和数据库实例映射关系initDbAndInstanceMaps#解析参数parseArgs $#初始化数据链接DataConnect#判断参数judgeParams#同步数据BaseDataxMysql2Hive
}# -d: 处理时间
# -b业务线 (ywx,ywx1,ywx1,...,ywxn)
# -m源数据库
# -a增量数据不清除分区数据默认清除
# -s源数据表
# -n目标数据库
# -t目标数据表
# -fdatax同步json文件
# -p密码
# -u用户名startsync $有了这个shell脚本后续对于同步的一些同步完成功能的通知以及新功能都可以新增。同时又新形成了一个数据同步的规范性和开发的规范性。
9.1.4 调度平台调度脚本
有了上面的脚本我们就可以只需要写好源表和目的表的名称。同时通过 -a 来区别增量还是全量同步进行处理。
#源表 -s
SOURCE_TABLE_NAMEuser_info
#目标表 -t
TARGET_TABLE_NAMEods_main_user_info_df
#datax 文件 -f
DATAX_FILE${BASE_DIR_PATH}/ods_main_user_info_df.json
ETL_DATE${ETL_DATE}
BUSINESS_NAME${BUSINESS_NAME}
SOURCE_DB_NAME${SOURCE_DB_NAME}
#!/bin/bash
source /etc/profile
sh dataxsync.sh -d $ETL_DATE -b $BUSINESS_NAME -m $SOURCE_DB_NAME -s $SOURCE_TABLE_NAME -t $TARGET_TABLE_NAME -f $DATAX_FILE我们可以发现这个脚本中包含了四个变量
${BASE_DIR_PATH}
${ETL_DATE}
${BUSINESS_NAME}
${SOURCE_DB_NAME}这几个变量主要是通过调度平台传入 BASE_DIR_PATHdataX脚本的统一地址之所以弄这个目录主要是为了区分不同业务线。 ETL_DATE每天同步的时间 yyyy-mm-dd同时我们可以再脚本上多增加几个时间通过这个变量转换出来yyyyMMdd, yyyyMMdd-1… BUSINESS_NAME业务线的标识我们也是可以用主题域区分。主要是用来识别数据库 SOURCE_DB_NAME业务库的表名。