当前位置: 首页 > news >正文

三明建设网站南通网站建设找哪家好

三明建设网站,南通网站建设找哪家好,长春seo网站排名,广州企业网站设计方案Apache DolphinScheduler 是一个分布式、易扩展的可视化数据工作流任务调度系统#xff0c;广泛应用于数据调度和处理领域。 在大规模数据工程项目中#xff0c;数据质量的管理至关重要#xff0c;而 DolphinScheduler 也提供了数据质量检查的计算能力。本文将对 Apache Do…Apache DolphinScheduler 是一个分布式、易扩展的可视化数据工作流任务调度系统广泛应用于数据调度和处理领域。 在大规模数据工程项目中数据质量的管理至关重要而 DolphinScheduler 也提供了数据质量检查的计算能力。本文将对 Apache DolphinScheduler 的数据质量模块进行源码分析帮助开发者深入理解其背后的实现原理与设计理念。 数据质量规则 Apache Dolphinscheduler 数据质量模块支持多种常用的数据质量规则如下图所示。 数据质量规则主要包括空值校验、自定义SQL、跨表准确性、跨表值比、字段长度校验、唯一性校验、及时性检查、枚举值校验、表行数校验等。 数据质量工作流程 数据质量运行流程分为2个部分 1在Web端进行数据质量检测的流程定义通过DolphinScheduer进行调度提交到Spark计算引擎 2Spark端负责解析数据质量模型的参数通过读取数据、执行转换、输出三个步骤完成数据质量检测任务工作流程如下图所示。 在Web端进行定义 数据质量定义如下图所示这里只定义了一个节点。 以一个空值检测的输入参数为例在界面完成配置后会生产一个JSON文件。 这个JSON文件会以字符串参数形式提交给Spark集群进行调度和计算。 JSON文件如下所示。 {name: $t(null_check),env: {type: batch,config: null},readers: [{type: JDBC,config: {database: ops,password: ***,driver: com.mysql.cj.jdbc.Driver,user: root,output_table: ops_ms_alarm,table: ms_alarm,url: jdbc:mysql://192.168.3.211:3306/ops?allowLoadLocalInfilefalseautoDeserializefalseallowLocalInfilefalseallowUrlInLocalInfilefalse}}],transformers: [{type: sql,config: {index: 1,output_table: total_count,sql: SELECT COUNT(*) AS total FROM ops_ms_alarm}},{type: sql,config: {index: 2,output_table: null_items,sql: SELECT * FROM ops_ms_alarm WHERE (alarm_time is null or alarm_time ) }},{type: sql,config: {index: 3,output_table: null_count,sql: SELECT COUNT(*) AS nulls FROM null_items}}],writers: [{type: JDBC,config: {database: dolphinscheduler3,password: ***,driver: com.mysql.cj.jdbc.Driver,user: root,table: t_ds_dq_execute_result,url: jdbc:mysql://192.168.3.212:3306/dolphinscheduler3?characterEncodingutf-8allowLoadLocalInfilefalseautoDeserializefalseallowLocalInfilefalseallowUrlInLocalInfilefalse,sql: select 0 as rule_type,$t(null_check) as rule_name,0 as process_definition_id,25 as process_instance_id,26 as task_instance_id,null_count.nulls AS statistics_value,total_count.total AS comparison_value,7 AS comparison_type,3 as check_type,0.95 as threshold,3 as operator,1 as failure_strategy,hdfs://xmaster:9000/user/hadoop/data_quality_error_data/0_25_211-ops-ms_alarm-空值检测 as error_output_path,2022-11-16 03:40:32 as create_time,2022-11-16 03:40:32 as update_time from null_count full join total_count}},{type: JDBC,config: {database: dolphinscheduler3,password: ***,driver: com.mysql.cj.jdbc.Driver,user: root,table: t_ds_dq_task_statistics_value,url: jdbc:mysql://192.168.3.212:3306/dolphinscheduler3?characterEncodingutf-8allowLoadLocalInfilefalseautoDeserializefalseallowLocalInfilefalseallowUrlInLocalInfilefalse,sql: select 0 as process_definition_id,26 as task_instance_id,1 as rule_id,ZKTZKDBTRFDKXKQUDNZJVKNX8OIAEVLQ91VT2EXZD3U as unique_code,null_count.nullsAS statistics_name,null_count.nulls AS statistics_value,2022-11-16 03:40:32 as data_time,2022-11-16 03:40:32 as create_time,2022-11-16 03:40:32 as update_time from null_count}},{type: hdfs_file,config: {path: hdfs://xmaster:9000/user/hadoop/data_quality_error_data/0_25_211-ops-ms_alarm-空值检测,input_table: null_items}}] } Spark端源码分析 DataQualityApplication.java 是Spark程序入口 public static void main(String[] args) throws Exception {//... //从命令行获取参数String dataQualityParameter args[0]; // 将json参数转为DataQualityConfiguration对象DataQualityConfiguration dataQualityConfiguration JsonUtils.fromJson(dataQualityParameter,DataQualityConfiguration.class);//... //构建 SparkRuntimeEnvironment的参数Config对象EnvConfig envConfig dataQualityConfiguration.getEnvConfig();Config config new Config(envConfig.getConfig());config.put(type,envConfig.getType());if (Strings.isNullOrEmpty(config.getString(SPARK_APP_NAME))) {config.put(SPARK_APP_NAME,dataQualityConfiguration.getName());}SparkRuntimeEnvironment sparkRuntimeEnvironment new SparkRuntimeEnvironment(config); //委托给 DataQualityContext执行DataQualityContext dataQualityContext new DataQualityContext(sparkRuntimeEnvironment,dataQualityConfiguration);dataQualityContext.execute(); } 数据质量配置类 public class DataQualityConfiguration implements IConfig {JsonProperty(name)private String name; // 名称JsonProperty(env)private EnvConfig envConfig; // 环境配置JsonProperty(readers)private ListReaderConfig readerConfigs; // reader配置JsonProperty(transformers)private ListTransformerConfig transformerConfigs; // transformer配置JsonProperty(writers)private ListWriterConfig writerConfigs; // writer配置 //... } DataQualityContext#execute从dataQualityConfiguration类中获取Readers、Transformers、Writers 委托给SparkBatchExecution执行 public void execute() throws DataQualityException { // 将ListReaderConfig转为ListBatchReaderListBatchReader readers ReaderFactory.getInstance().getReaders(this.sparkRuntimeEnvironment,dataQualityConfiguration.getReaderConfigs()); // 将ListTransformerConfig转为ListBatchTransformerListBatchTransformer transformers TransformerFactory.getInstance().getTransformer(this.sparkRuntimeEnvironment,dataQualityConfiguration.getTransformerConfigs()); // 将ListWriterConfig转为ListBatchWriterListBatchWriter writers WriterFactory.getInstance().getWriters(this.sparkRuntimeEnvironment,dataQualityConfiguration.getWriterConfigs()); // spark 运行环境if (sparkRuntimeEnvironment.isBatch()) { // 批模式sparkRuntimeEnvironment.getBatchExecution().execute(readers,transformers,writers);} else { // 流模式 暂不支持throw new DataQualityException(stream mode is not supported now);} } 目前 Apache DolphinScheduler 暂时不支持实时数据的质量检测。 ReaderFactory类采用了单例和工厂方法的设计模式目前支持JDBC和HIVE的数据源的读取 对应Reader类HiveReader、JDBCReader。 WriterFactory类采用了单例和工厂方法的设计模式目前支持JDBC、HDFS、LOCAL_FILE的数据源的输出对应Writer类JdbcWriter、 HdfsFileWriter和 LocalFileWriter 。 TransformerFactory类采用了单例和工厂方法的设计模式目前仅支持TransformerType.SQL的转换器类型。 结合JSON可以看出一个空值检测的Reader、Tranformer、 Writer情况: 1个Reader 读取源表数据 3个Tranformer total_count 行总数 null_items 空值项行数据 null_count 空值数 计算SQL如下 -- SELECT COUNT(*) AS total FROM ops_ms_alarm -- SELECT * FROM ops_ms_alarm WHERE (alarm_time is null or alarm_time ) -- SELECT COUNT(*) AS nulls FROM null_items 3个Writer第一个是JDBC Writer 将比较值、统计值输出t\_ds\_dq\_execute\_result 数据质量执行结果表。 SELECT//...null_count.nulls AS statistics_value,total_count.total AS comparison_value,//...hdfs://xmaster:9000/user/hadoop/data_quality_error_data/0_25_211-ops-ms_alarm-空值检测 AS error_output_path,//... FROMnull_countFULL JOIN total_count 第二个是JDBC Writer将statistics\_value写入到表 t\_ds\_dq\_task\_statistics\_value SELECT//...//...null_count.nulls AS statistics_name,null_count.nulls AS statistics_value,//... FROMnull_count 第3个是HDFS Writer将空值项写入到HDFS文件目录 {type: hdfs_file,config: {path: hdfs://xmaster:9000/user/hadoop/data_quality_error_data/0_25_211-ops-ms_alarm-空值检测,input_table: null_items} } SparkBatchExecution#execute public class SparkBatchExecution implements ExecutionBatchReader, BatchTransformer, BatchWriter {private final SparkRuntimeEnvironment environment;public SparkBatchExecution(SparkRuntimeEnvironment environment) throws ConfigRuntimeException {this.environment environment;}Overridepublic void execute(ListBatchReader readers, ListBatchTransformer transformers, ListBatchWriter writers) { // 为每一个reader注册输入临时表readers.forEach(reader - registerInputTempView(reader, environment));if (!readers.isEmpty()) { // 取readers列表的第一个reader读取数据集合 reader的实现类有HiveReader、JdbcReaderDatasetRow ds readers.get(0).read(environment);for (BatchTransformer tf:transformers) { // 执行转换ds executeTransformer(environment, tf, ds); // 将转换后结果写到临时表registerTransformTempView(tf, ds);}for (BatchWriter sink: writers) { // 执行将转换结果由writer输出 writer的实现类有JdbcWriter、LocalFileWriter、HdfsFileWriterexecuteWriter(environment, sink, ds);}} // 结束environment.sparkSession().stop();} } SparkBatchExecution#registerInputTempView //注册输入临时表, 临时表表名为OUTPUT_TABLE的名字private void registerInputTempView(BatchReader reader, SparkRuntimeEnvironment environment) {Config conf reader.getConfig();if (Boolean.TRUE.equals(conf.has(OUTPUT_TABLE))) {// ops_ms_alarmString tableName conf.getString(OUTPUT_TABLE); registerTempView(tableName, reader.read(environment));} else {throw new ConfigRuntimeException([ reader.getClass().getName() ] must be registered as dataset, please set \output_table\ config);}} 调用Dataset.createOrReplaceTempView方法 private void registerTempView(String tableName, DatasetRow ds) {if (ds ! null) {ds.createOrReplaceTempView(tableName);} else {throw new ConfigRuntimeException(dataset is null, can not createOrReplaceTempView);} } 执行转换executeTransformer private DatasetRow executeTransformer(SparkRuntimeEnvironment environment, BatchTransformer transformer, DatasetRow dataset) {Config config transformer.getConfig();DatasetRow inputDataset;DatasetRow outputDataset null;if (Boolean.TRUE.equals(config.has(INPUT_TABLE))) { // 从INPUT_TABLE获取表名String[] tableNames config.getString(INPUT_TABLE).split(,);// outputDataset合并了inputDataset数据集合for (String sourceTableName: tableNames) {inputDataset environment.sparkSession().read().table(sourceTableName);if (outputDataset null) {outputDataset inputDataset;} else {outputDataset outputDataset.union(inputDataset);}}} else { // 配置文件无INPUT_TABLEoutputDataset dataset;} // 如果配置文件中配置了TMP_TABLE 将outputDataset 注册到TempViewif (Boolean.TRUE.equals(config.has(TMP_TABLE))) {if (outputDataset null) {outputDataset dataset;}String tableName config.getString(TMP_TABLE);registerTempView(tableName, outputDataset);} // 转换器进行转换return transformer.transform(outputDataset, environment); } SqlTransformer#transform 最终是使用spark-sql进行处理 所以核心还是这个SQL语句SQL需要在web端生成好参考前面的JSON文件。 public class SqlTransformer implements BatchTransformer {private final Config config;public SqlTransformer(Config config) {this.config config;} //...Overridepublic DatasetRow transform(DatasetRow data, SparkRuntimeEnvironment env) {return env.sparkSession().sql(config.getString(SQL));} } 将数据输出到指定的位置executeWriter private void executeWriter(SparkRuntimeEnvironment environment, BatchWriter writer, DatasetRow ds) {Config config writer.getConfig();DatasetRow inputDataSet ds;if (Boolean.TRUE.equals(config.has(INPUT_TABLE))) {String sourceTableName config.getString(INPUT_TABLE);inputDataSet environment.sparkSession().read().table(sourceTableName);}writer.write(inputDataSet, environment); } 总体来讲Apache Dolphinscheduler的数据质量检测实现相对简单明了只要采用Spark SQL进行计算。在本文中我们深入分析了数据质量模块的源码结构和实现逻辑Apache DolphinScheduler 数据质量模块的设计理念强调灵活性和扩展性这使得它可以适应不同企业的多样化需求。 对于开发者而言深入理解其源码不仅有助于更好地使用 DolphinScheduler也为进一步扩展其功能提供了方向和灵感。希望本文能够为您在数据质量控制和开源项目深入探索方面提供帮助。 ​ 本文由 白鲸开源科技 提供发布支持
http://www.w-s-a.com/news/895813/

相关文章:

  • 网站301跳转效果商丘网站公司
  • 公司网站建设西安网站的架构与建设
  • 食品科技学校网站模板花溪村镇建设银行网站
  • 图片渐隐 网站头部flash地方志网站建设自查报告
  • 深圳做商城网站视觉品牌网站建设
  • 永康电子商务网站建设弹幕网站怎么做
  • 百川网站企业做网站要注意哪些
  • 球迷类的网站如何做网站建设需要哪些素材
  • 请问有重庆有做网站吗电子政务系统网站建设的基本过程
  • 建设银行管方网站官网最新版cmsv6
  • 网站开发工程师需要会写什么深圳网站(建设信科网络)
  • 台州网站搭建网站建设需求计划
  • 网站app免费下载软件大全大连百度推广哪家好
  • 网站建设的面试要求iis做的网站手机怎么访问
  • 定州市住房保障和城乡建设局网站上海网站建设排行
  • 网站发帖百度收录网站改版后不收录
  • 昆明建设局网站号码网站开发 浏览器兼容性
  • 湖北专业网站建设大全室内设计联盟app下载
  • 网站建设的意义和作用江苏城市建设档案馆网站
  • 华为云速建站贴心的广州网站建设
  • 网页网站开发公司天津seo推广
  • 网站线框图用什么做共享门店新增礼品卡兑换模式
  • 互联网建站是什么seo服务公司上海
  • 象山县城乡建设局网站做网站客户要求分期
  • 什么是网络营销型网站手机网站 图标
  • 全国新农村建设网站外包和劳务派遣哪个好
  • 网站权限控制什么软件做网站描述
  • 建网络商城网站wordpress关于
  • 专业网站建设分类标准重庆网站开发哪家专业
  • 织梦的网站关键词如何自己搭建微信小程序