当前位置: 首页 > news >正文

织梦增加网站英文名称网站开发完整的解决方案

织梦增加网站英文名称,网站开发完整的解决方案,军事头条免费下载安装,优化大师介绍 ETL是数据抽取#xff08;Extract#xff09;、转换#xff08;Transform#xff09;、装载#xff08;Load#xff09;的过程。Kettle是一款国外开源的ETL工具#xff0c;有两种脚本文件transformation和job#xff0c;transformation完成针对数据的基础转换Extract、转换Transform、装载Load的过程。Kettle是一款国外开源的ETL工具有两种脚本文件transformation和jobtransformation完成针对数据的基础转换job则完成整个工作流的控制。Job一个作业由不同逻辑功能的entry组件构成数据从一个entry组件传递到另一个entry组件并在entry组件中进行相应的处理。Transformation完成针对数据的基础转换即一个数据转换过程。Entry:实体即job型组件。用来完成特定功能应用是job的组成单元、执行单元。Step:步骤是Transformation的功能单元用来完成整个转换过程的一个特定步骤。Hop:工作流或转换过程的流向指示从一个组件指向另一个组件在kettle源工程中有三种hop无条件流向、判断为真时流向、判断为假时流向。 体系结构 kettle平台是整个系统的基础包括元数据管理引擎、数据集成引擎、UI和插件管理模块。 元数据管理引擎 元数据管理引擎管理ktr、kjb或者元数据库插件通过该引擎获取基本信息主要包括TransMeta、JobMeta和StepMeta三个类。 TransMeta类定义了一个转换对应一个.ktr文件提供了保存和加载该文件的方法JobMeta类同样对应于一个工作(对应一个.kjb文件)提供保存和加载方法StepMeta类保存的是Step的一些公共信息的类每个类的具体的元数据将保存在显示了StepMetaInterface的类里面。 数据集成引擎 数据集成引擎包括Step引擎、Job引擎和数据库访问引擎三大部分主要负责调用插件并返回相应信息。 UI UI显示Spoon这个核心组件的界面通过xul实现菜单栏、工具栏的定制化显示插件界面接口元素其中的TransGraph类和JobGraph类是用于显示转换和Job的类。 TransGraph类 选中转换标签后红框内的编辑区对象对应org.pentaho.di.ui.spoon.trans包中的TransGraph类。 JobGraph类 选中Job标签后红框内的编辑区对象对应org.pentaho.di.ui.spoon.job包中的JobGraph类。 插件管理模块 Kettle是众多“可供插入的地方”扩展点和“可以插入的东西”扩展共同组成的集合体。在我们的生活中电源接线板就是一种“扩展点”很多“扩展”也就是电线插头可以插在它上面。 插件管理引擎主要负责插件的注册在Kettle中不管是以后的扩展还是系统集成的功能本质上来讲都是插件管理方式和运行机制是一致的。系统集成的功能点也均实现了对应的扩展接口只是在插接的说明上略有不同。 Kettle的扩展点包括step插件、job entry插件、Database插件、Partioner插件、debugging插件。 功能模块 Kettle的主要包括四大功能模块 Chef——工作(job)设计工具 (GUI方式)Kitchen——工作(job)执行器(命令行方式)Spoon——转换(transform)设计工具 (GUI方式)Span——转换(trasform)执行器(命令行方式)。 Chef—工作(job)设计器 这是一个GUI工具操作方式主要通过拖拽。 何谓工作多个作业项按特定的工作流串联起来形成一项工作。正如我的工作是软件开发。我的作业项是设计、编码、测试先设计如果成功则编码否则继续设计编码完成则开始设计周而复始作业完成。 Chef中的作业项 转换指定更细的转换任务通过Spoon生成通过Field来输入参数SQLsql语句执行FTP下载ftp文件邮件发送邮件检查表是否存在检查文件是否存在执行shell脚本如dos命令。批处理(注意windows批处理不能有输出到控制台)。Job包作为嵌套作业使用。JavaScript执行如果有自已的Script引擎可以很方便的替换成自定义Script来扩充其功能SFTP安全的Ftp协议传输HTTP方式的上传/下载。 工作流 工作流是作业项的连接方式分为三种无条件成功失败。 为了方便工作流使用KETTLE提供了几个辅助结点单元(也可将其作为简单的作业项) Start单元任务必须由此开始。设计作业时以此为起点。OK单元可以编制做为中间任务单元且进行脚本编制用来控制流程。ERROR单元用途同上。DUMMY单元什么都不做主要是用来支持多分支的情况。 存储方式 支持XML存储或存储到指定数据库中。 一些默认的配置(如数据库存储位置……)在系统的用户目录下单独建立了一个.Kettle目录用来保存用户的这些设置。LogView 可查看执行日志。 Kitchen—作业执行器 是一个作业执行引擎用来执行作业。这是一个命令行执行工具参数如下 rep : Repository name 任务包所在存储名user : Repository username 执行人pass : Repository password 执行人密码job : The name of the job to launch 任务包名称dir : The directory (don’t forget the leading / or /)file : The filename (Job XML) to launchlevel : The logging level (Basic, Detailed, Debug, Rowlevel, Error, Nothing) 指定日志级别log : The logging file to write to 指定日志文件listdir : List the directories in the repository 列出指定存储中的目录结构。listjobs : List the jobs in the specified directory 列出指定目录下的所有任务listrep : List the defined repositories 列出所有的存储norep : Don’t log into the repository 不写日志 Spoon—转换过程设计器 GUI工作用来设计数据转换过程创建的转换可以由Pan来执行也可以被Chef所包含作为作业中的一个作业项。 Input-Steps输入步骤 Text file input文本文件输入 可以支持多文件合并有不少参数基本一看参数名就能明白其意图。 Table input数据表输入 实际上是视图方式输入因为输入的是sql语句。当然需要指定数据源(数据源的定制方式在后面讲一下)Get system info取系统信息 就是取一些固定的系统环境值如本月最后一天的时间本机的IP地址之类。Generate Rows生成多行。 这个需要匹配使用主要用于生成多行的数据输入比如配合Add sequence可以生成一个指定序号的数据列。XBase InputExcel InputXML Input Output-Steps 输出步聚 Text file output文本文件输出。这个用来作测试蛮好呵呵。很方便的看到转换的输出。Table output输出到目的表。Insert/Update目的表和输入数据行进行比较然后有选择的执行增加更新操作。Update同上只是不支持增加操作。XML OutputXML输出。 Look-up查找操作 Data BaseStreamProcedureDatabase join Transform 转换 Select values 对输入的行记录数据的字段进行更改 (更改数据类型更改字段名或删除) 数据类型变更时数据的转换有固定规则可简单定制参数。可用来进行数据表的改装。Filter rows 对输入的行记录进行指定复杂条件的过滤。用途可扩充sql语句现有的过滤功能。但现有提供逻辑功能超出标准sql的不多。 Sort rows 对指定的列以升序或降序排序当排序的行数超过5000时需要临时表。Add sequence 为数据流增加一个序列这个配合其它Step(Generate rows, rows join)可以生成序列表如日期维度表(年、月、日)。Dummy 不做任何处理主要用来作为分支节点。Join Rows 对所有输入流做笛卡儿乘积。Aggregate 聚合分组处理Group by 分组用途可扩充sql语句现有的分组聚合函数。但我想可能会有其它方式的sql语句能实现。Java Script value 使用mozilla的rhino作为脚本语言并提供了很多函数用户可以在脚本中使用这些函数。Row Normaliser 该步骤可以从透视表中还原数据到事实表通过指定维度字段及其分类值度量字段最终还原出事实表数据。Unique rows 去掉输入流中的重复行在使用该节点前要先排序否则只能删除连续的重复行。Calculator 提供了一组函数对列值进行运算用该方式比用户自定义JAVA SCRIPT脚本速度更快。Merge Rows 用于比较两组输入数据一般用于更新后的数据重新导入到数据仓库中。Add constants 增加常量值。Row denormaliser 同Normaliser过程相反。Row flattener 表扁平化处理指定需处理的字段和扃平化后的新字段将其它字段做为组合Key进行扃平化处理。 除了上述基本节点类型外还定义了扩展节点类型SPLIT FIELDS按指定分隔符拆分字段EXECUTE SQL SCRIPT执行SQL语句CUBE INPUTCUBE输入CUBE OUTPUTCUBE输出。 Pan—转换的执行工具 命令行执行方式可以执行由Spoon生成的转换任务。同样不支持调度。参数与Kitchen类似。其它 Connection 可以配置多个数据源在Job或是Trans中使用这意味着可以实现跨数据库的任务。支持大多数市面上流行的数据库。 概念模型 Kettle的执行分为两个层次Job和Transformation。两个层次的最主要区别在于数据传递和运行方式。 Transformation转换 Transformation转换是由一系列被称之为step步骤的逻辑工作的网络。转换本质上是数据流。 下图是一个转换的例子这个转换从文本文件中读取数据过滤然后排序最后将数据加载到数据库。本质上转换是一组图形化的数据转换配置的逻辑结构。 转换的两个相关的主要组成部分是step步骤和hops节点连接。 转换文件的扩展名是.ktr。 转换机制 每个转换步骤都是ETL数据流里面的一个任务。 转换步骤包括输入、处理和输出。 输入步骤从外部数据源获取数据例如文件或者数据库 处理步骤处理数据流字段计算流处理等例如整合或者过滤。 输出步骤将数据写回到存储系统里面例如文件或者数据库。 Steps步骤 Steps步骤是转换的建筑模块比如一个文本文件输入或者一个表输出就是一个步骤。每个步骤用于完成某种特定的功能通过配置一系列的步骤就可以完成你所需要完成的任务。 Step类图简介 每个类都有其特定的目的及扮演的角色。以TableInput为例下图说明了这4个类的继承体系。 StepInterface继承体系 实现StepInterface接口的类在转换运行时将是数据实际处理的位置。每个执行线程都表示一个实现StepInterface的实例。 BaseStep实现了StepInterface是各step具体实现类的基类。完成了公用的处理函数如putRow()但是对于更具体的processRow()在StepBase的子类中。StepBase的主要成员有 public ArrayList inputRowSetsoutputRowSets StepBase的子类每次从inputRowSets中取出一行数据向outputRowSets中写入一行数据。 StepDataInterface继承体系 实现StepDataInterface接口的类为数据类当插件执行时对于每个执行执行的线程都是唯一的。保存于step相关的数据信息比如行的元数据信息。 StepMetaInterface继承体系 实现了StepMetaInterface接口的类为元数据类。它的职责是保存和序列化特定步骤的实例配置例如保存步骤的名称、字段名称等如何生成加载xml或者读写数据库。 StepDialogInterface继承体系 实现了StepDialogInterface接口的类为对话框类该类实现了该步骤与用户交互的界面它显示一对话框通过对话框用户可以根据自己的要求进行步骤的设定。该对话框类与元数据类关系非常密切对话框里面的配置数据均会保存在元数据类里面。 步骤间交互通信类 步骤之间通信机制 RowSet的实现类负责步骤之间的相互通信rowset对象即是前一个step的成员也是后一个step的成员访问是线程安全的。 RowSet实现类内存快照 RowSet类中包含源step目标step和由源向目标发送的一个rowMeta和一组data。其中data数据是以行为单位的队列queArray。一个RowSet作为此源step的outputrowsets的一部分。同时作为目标step的inputRowsets一部分。源Step每次向队列中写一行数据目标step每次从队列中读取一行数据。 RowSet实现类 行元数据 行元数据UML类图 所有的data均为object对象。步骤与步骤之间以行为单位进行处理自然需要知道每行的结构即行元数据。行元数据至少需要包括类型、名称当然还可能包括字段长度、精度等常见内容。 行元数据不仅在执行的时候需要而且在转换设置的时候同样需要。每个步骤的行元数据都会保存在.ktr文件或者数据库里面所以可以根据步骤名称从TransMeta对象中获取行元数据。 行元数据的UML类图结构如下所示主要有单元格元数据组成行元数据。在现有的版本中支持的数据类型有String、Date、BigNumber、Boolean、SerializableType、Binary、Integer、Numberic。 Hops节点连接 Hops节点连接是数据的通道用于连接两个步骤使得元数据从一个步骤传递到另一个步骤。节点连接决定了贯穿在步骤之间的数据流步骤之间的顺序不是转换执行的顺序。当执行一个转换时每个步骤都以自己的线程启动并不断的接受和推送数据。 注意所有的步骤是同步开启和运行的所以步骤的初始化的顺序是不可知的。因为我们不能在第一个步骤中设置一个变量然后在接下来的步骤中使用它。 在一个转换中一个步骤可以有多个连接数据流可以从一个步骤流到多个步骤。在Spoon中hops就想是箭它不仅允许数据从一个步骤流向另一个步骤也决定了数据流的方向和所经步骤。如果一个步骤的数据输出到了多个步骤那么数据既可以是复制的也可以是分发的。Trans配置及开启 Trans执行时序图 在真正运行trans之前还需要对运行模式进行一个设置。设置结果会传给TransGraph.start(executionConfiguration)。配置界面如下所示 执行转换模式设置 实例化Trans的基本流程如下Trans类是最后真正执行转换的类。实例化之前需要配置启动项保持.ktr文件同步然后实例化Trans类。最后开启后台程序这样不会影响UI的操作真正的转换在后台执行。 实例化Trans流程图 Trans执行 trans类的执行由execute()负责主要包含两个步骤转换执行前的准备工作和所有线程的开启。Trans每一个步骤都会对应一个独立的线程线程之间通过RowSet进行通信交互。 执行准备(prepareExecution) 主要完成对通信类的初始化对步骤的包装初始化。最后启动各个步骤初始化线程即调用各个步骤的init()方法。准备结束之后步骤之间的通信机制完成了各个步骤的初始化工作也完成了。具体的流程如下所示 准备执行流程图 转换处理执行 Trans转换执行引擎类通过startThreads启动步骤线程。为所有步骤添加监听器在开启监听进程对所有线程进行监听。具体的步骤如下图所示 启动所有步骤线程 Jobs工作 Jobs工作是基于工作流模型的协调数据源、执行过程和相关依赖性的ETL活动。 Jobs工作将功能性和实体过程聚合在了一起由工作节点连接、工作实体和工作设置组成工作文件的扩展名是.kjb。 下图是一个工作的例子。 一个工作中展示的任务有从FTP获取文件、核查一个必须存在的数据库表是否存在、执行一个转换、发送邮件通知一个转换中的错误等。最终工作的结果可能是数据仓库的更新等。机制 一个job项代表ETL控制流中的一项逻辑任务。Job项将会顺序执行每个job项会产生一个结果能作为别的分支上job项的条件。类图简介 Job entry类图结构 Job配置及开启 Job开启时序图 Job的开启与Trans相类似配置执行的参数检查.kjb文件是否发生变化实例化一个Job对象开启该线程。Job执行步骤 kettle数据流图 程序正常工作需要3种类型的文件 配置文件 程序首先是从系统中读取相关的配置文件用来初始化程序。然后加载组件库到程序中组件的加载需要用到相关的XML文件和图标。工作文件 Kjb文件是job的工作文件ktr文件是transformation的工作文件要打开或者运行一个job或者transformation需要存在相应的kjb或ktr文件。数据文件 需要处理的数据。顶层数据流图 第一层数据流图 第二层数据流图 作业执行 转换执行 插件扩展机制 Kettle并不能做到热插拔每次添加或者删除插件的时候都需要重启。安装或删除插件只需要在plugins文件夹下添加或删除对应的文件即可。 Kettle的扩展点包括step插件、job entry插件、Database插件、Partioner插件、debugging插件这里我们重点介绍step、job entry、database插件。暴露的扩展点如下表所示 插件的建立 Kettle中的插件包含两部分一是系统本身就已经实现的功能点在源码目录src中说明如kettle-steps.xml二是系统之外开发的插件在plugins目录对应插件目录下的plugins.xml说明plugins/steps/S3CsvInput/plugins.xml。 系统集成插件定义 插件说明信息中包括描述信息、类名包括package反射用、父级目录Spoon左侧栏目录、提示信息和图片信息。Kettle使用国家化方式编程所以软件中的所有文字描述均由messages_**.properties提供。 系统集成插件说明xml结构如下 扩展插件定义 所有新开发的扩展插件均放在同一的目录下进行管理插件管理模块会自动去该目录下进行搜索查找。插件目录结构如下所示 扩展插件与系统集成插件的说明内容相似扩展插件增加ID属性和依赖属性同时他的目录结构、描述信息和提示信息均能进行国际化配置。 扩展插件说明xml结构如下 插件的注册 Spoon在启动的时候会对所有插件进行注册并保存在PluginRegistry类里面。平台通过查找PluginRegistry注册表获取插件信息。Kettle安装插件需要进行重启卸载插件也只需简单的删除plugins目录结构下对应的文件即可。 插件注册时序图 plugin注册相关的UML类图 插件查找 PluginRegistry提供了插件查找功能准确的来说是插件信息的查找功能。以steps在左侧功能栏里面的显示为例进行插件查找的说明。提供了getPlugins获取指定插件类型列表、getPlugin获取指定成名插件、getCateories获取目录结构、getClass获取指定插件类等方法。 Spoon中step列表 左侧显示由Spoon.refreshCoreObjects函数实现如果选择时trans相关的内容将显示所有的step插件。流程图如下所示 spoon界面step插件显示流程 插件调用 Kettle中调用插件时平台通过元素管理引擎获取对应的插件信息通过反射生成插件对象调用对应的函数。Kettle以外观模式的方式调用插件我们以双击某个插件图表弹出对应配置界面为例进行说明。 Spoon界面交互相关的处理器都封装到SpoonDelegates中根据不同的事件类型调用对应的事件处理函数。UML类图如下所示。 事件代理类 SpoonStepsDelegate提供了与UI交互相关的处理事件如复制、删除、粘贴、编辑等。双击某个step时会调用编辑功能编辑功能是对插件StepDialogInterface的封装。时序图如下 双击是TransGraph对象注册的时间双击是根据页面上的坐标信息获取双击的stepmeta对象来自于*.ktr。然后将这个对象传给事件代理类处理根据stepmeta对象获取对应的插件类名通过反射生成StepDialogInterface的实例并调用open()方法。插件间通信 Kettle插件之间天生就具有通信共享数据的特点kettle中最主要通信方式是通过插件之间共同关联一个数据类对象的方式进行通信使用单例模式实现插件间信息共享。 核心包模块、类和接口 kettle包含的核心包模块、类和接口分为辅助层(底层)、逻辑层和界面层。其对应的源码包结构是coresrc ui的顺序如下列表所示。 界面层 核心面板模块 Spoon类 Spoon类是图形界面的入口类它处于整个程序运行的核心地位连接界面和逻辑功能从界面获取配置。 Spoon类作用主要是主界面的Handle实现界面的显示、刷新、菜单加载、菜单及界面按钮功能的实现、工作流信息的收集等。即与界面相关的、与delegates相关的、与props相关的。 Spoon的一些重要的成员函数如下表 Spoon的一些成员变量 Spoon的一些重要函数 菜单加载 Spoon中加载菜单是在界面初始化的时候完成的Spoon.init函数完成界面的初始化然后调用addMenu()添加菜单。 菜单加载原理Kettle通过读取配置文件的方式加载主菜单。当Kettle建立Shell对象时需要为Shell对象增加菜单。这个任务有XulHelper完成。详细过程见6.9 XulHelper.createMenuBar(XUL_FILE_MENUBAR, shell,new XulMessages()); XulHelper解析配置文件根据配置文件生成相应的menubarmenumenuitem或menuseparator。每一项都可能有自己的id快捷键和label并且id和快捷键在handler中注册但是没有说明每一个菜单项调用哪个方法。 加载完之后需要对每一个id和快捷键加上事件的触发的函数。该过程通过读取ui/menubar.properties文件获得一个id对应的事件函数体名称Handler在把id和对应的函数体关联起来这样每次事件触发都有hanhler统一根据id调用相应的函数。树面板显示 Kettle主界面中的树面板是通过addTree()函数添加的。 Spoon类中的mainComposite变量就代表树面板view、design 、expandAll、collapseAll、selectionFilter和coreObjectsTree分别表示主对象树、核心对象、编辑框、展开按钮、收起按钮和树列表。如下图 标签页管理 标签页实际上就是一个TabItem类标签页的管理涉及到两个类对象 Spoon中的delegates.tabs和Spoon中的delegates.jobs和delegates.trans它们都管理着所有的正在编辑的页面但是各有各的职责。 tabs的职责的是维护标签页列表List tabMap (TabMapEntry中有变量TabItem tabItem)包括列表元素的增删查找等以及实现标签选择、不选择和关闭等事件。 delegates.jobs和delegates.trans类似主要的职责是记录job标签页的信息delegates.jobs根据标签页维护这一个jobMeta列表 MapString, JobMeta jobMap; 原理以增加一个job为例 当触发一个增加job的事件时Spoon会将首先事件委托给SpoonJobDelegate处理。 SpoonJobDelegate会把增加标签页面的工作交给SpoonTabsDelegate处理。 同理当关闭一个标签页时Spoon会首先事件委托给SpoonJobDelegate处理。 SpoonJobDelegate会把去除标签页面的工作交给SpoonTabsDelegate处理。拖拽事件 这里拖拽事件是指将树面板中的组件拖拽到标签页中。 原理SpoonTreeDelegate监听树的事件信息当有拖拽离开事件发生时SpoonTreeDelegate会把算法在核心对象树的文本展示保存到全局变量当在编辑面板有拖拽放下事件发生时会根据得到的全局变量在算法列表里索引相应的算法实例化一个相应的算法类和算法界面类。 SpoonTreeDelegate的dragSetData内有拖拽离开事件发生后的代码。 data treeObject.getItemText(); event.data new DragAndDropContainer(type, data); 最后将事件和携带的组件信息传递到JobGraph类中的drop函数中这个函数是处理拖拽放下事件的。该函数的其中三个语句如下 DragAndDropContainer container (DragAndDropContainer) event.data; String entry container.getData(); container.getType() … JobEntryCopy jge spoon.newJobEntry(jobMeta, entry, false); 可见首先根据传入的事件数据event.data构建DragAndDropContainer对象然后从中获取组件类型描述信息entry、组件类型container.getType。最后根据类型描述构建JobEntryCopy对象。 这样便实现了拖拽添加组件。 JobGraph类 是标签tabItem页的控制者Controller代表工作区中的一个标签页。负责标签页上的各种触发事件的传递或处理比如说将组件拖拽到工作区事件、编辑entry事件、删除entry或者hop事件、移动组件位置等等还有获取当前工作区中的工作流界面信息还有响应标签页的菜单选择事件菜单如下图 JobGraph类重要成员如下 //相应的jobMeta对象每一个标签页就是一个工作流也就是一个jobMeta protected JobMeta jobMeta // Job类对象它是一个线程对象负责当前jobMeta的执行 private Job job private Repository rep; private Job parentJob; private JobTracker jobTracker;用于跟踪日志记录 private Date startDate, endDate, currentDate, logDate, depDate; private boolean active, stopped;状态位 private List sourceRows; 返回结果的数据内容 private Result result;每次执行完一个jobentry返回结果 一些重要的函数 public JobGraph(Composite par, final Spoon spoon, final JobMeta jobMeta) 这是它的构造函数。该函数不仅构造了JobGraph的对象而且在这里同时设置了很多的监听器比如监听鼠标的按下、移动、放开事件、选择事件等。双击事件 canvas.addMouseListener(new MouseAdapter() 双击事件主要是指双击工作区的entry进入entry的编辑对话框。当然也可能是双击工作区的其他地方。拖拽事件 ddTarget.addDropListener(new DropTargetListener() 拖拽事件的一个主要功能是选择需要添加的entry加入到job中来即创建新的entry并加入到jobMeta中。当然也可能是拖动已有的entry改变其位置。显示标签页上的新建菜单的下拉菜单 public void newFileDropDown() 标签页上的打开菜单 public void openFile()标签页的保存菜单 public void saveFile()运行工作流 public void runJob() {Start()函数 函数主要是开启job的线程执行当前工作流。主要代码如下 job new Job(log, jobMeta.getName(), jobMeta.getFilename(), null); …… job.start(); 组件适配模块 该模块的主要作用是把swt开发包内的界面组件做适当的修改可以更加灵活地以配置文件的方式动态加载菜单工具栏更加灵活地处理事件信息。 其包结构如下 使用的原理每一个XulMenuBar或XulToolBar都有自己的一个EventHandler成员所有的MenuItem和XulToolBarButton都有唯一的id这些内容可以通过配置文件读取。所有的MenuItem和XulToolBarButton会把自己的id和处理函数在EventHandler中注册EventHandler维护一个列表表示不同事件的id和对应的处理函数快捷键处理对象。如下图 当触发相应的Item或Button时它们同一把id作为参数调用handler的处理函数handler根据索引得到相应的函数名对象实例通过反射调用对象实例的函数方法。完成触发动作。测试结果展示模块 结果的展示就是将执行结果展示到界面上。要展示的内容包括三个方面的信息历史信息、日记信息和执行过程信息。执行完任务后调用JobGraph.addAllTabs()添加三个标签来分别显示这三个方面的内容。 JobHistoryDelegate类表示历史信息。 JobLogDelegate类表示日记信息。 JobGridDelegate类表示执行过程信息。 当job运行时会设置 jobGridDelegate.jobTracker job.getJobTracker(); 当每次job运行结束之后会将结果保存到jobTracker对象内。JobGridDelegate会循环读取这个运行结果内容并且把运行结果展示在界面上。ui其他重要的类 SpoonDelegates类 该类是一个代理类代理工作任务的相关操作。它的部分成员变量如下 该类只有一个成员函数用来初始化生成所有成员变量 public SpoonDelegates(Spoon spoon) SpoonJobDelegate类 该类是job型代理类控制所有的job。其成员变量如下 private MapString, JobMeta jobMap 以及从父类SpoonDelegate继承的protected Spoon spoon jobMap是所有job的映射列表存储了所有的jobJobMeta代表一个job下面会有相关介绍。 该类的函数主要是对jobMap进行增删修改复制jobMeta等操作比如 添加一个job public String addJob(JobMeta jobMeta) 复制jobMeta对象 public void copyJobEntries(JobMeta jobMeta, JobEntryCopy jec[]) 根据tab名称标签页名获取job public JobMeta getJob(String tabItemText) 执行所给的jobMeta public void executeJob(JobMeta jobMeta, boolean local, boolean remote, Date replayDate, boolean safe) throws KettleExceptionui中类的关系图 逻辑层 Job模块 主要介绍src.org.pentaho.di.job包下的一些主要的类。这些类主要作用是实现job作业的逻辑功能包括创建job、维护job及执行job等后续有详细介绍。Trans模块 这个模块和job模块类似主要是处理transformation任务后续有详细介绍。Src中其他重要的类和接口 EnvUtil类 EnvUtil类中的所有成员都是静态的最主要的两个函数是 public static Properties readProperties(String fileName) public static void environmentInit() 第一个函数是读取kettle运行时主目录下的kettle.properties文件文件是关于kettle应用程序的一些设置 第二个函数首先调用第一个读取文件后加载文件中的所有设置并加入一些临时的设置。 这个类负责kettle的环境设置。 PropsUI类 该类继承了Props类Props类用来储存各种用户设置信息并且有一个自身静态对象protected static Props props。PropsUI中还可储存最近用户文件。 下面介绍该类的一些重要函数 public static final void init(Display d, String filename) 该函数在Spoon的main中执行最后创建了props静态对象。props是PropsUI的父类Props对象。 PropsUI的其他函数一般是用于加载和获取设置信息。PluginLoader类 该类通过load接口函数对外提供加载插件jar包功能。有成员变量locs表示插件所在的目录地址。 底层辅助层 对文件的操作 以下各类封装了对文件的基本操作几乎所有的模块都利用到以下的类。是最底层的类。 org.pentaho.di.core.vfs. KettleVFS.java 实现了对文件的基本操作基本是静态方法通过使用开源jar包org.apache.commons.vfs可以对不同操作系统的文件系统处理。 org.pentaho.di.core.xml.XMLHandler 实现了对xml的基本操作基本是静态方法得到对应标签的值或者获取变量对应的xml字符串。多语言应用 实现原理在子包X中如果子包x中所有的类需要多国语言支持那么必须需要在子包x中增加一个Messages类和相应的不用语言的文件支持。 如下图 语言设置可以是默认语言也可以用户指定当用户指定时Kettle把设置信息保存到用户目录下这样下次启动时读取信息设置依然生效。 Kettle中多语言是通过Messages类来实现的。在需要实现多语言的类包下编写相应的Messages类。而在Messages通过getString(“需要国际化的字符串”)方法调用基类BaseMessages获取给定别名的字符串值BaseMessages交给GlobalMessages处理GlobalMessages会根据语言设置查找相应的文件比如messages_zh_CN.properties文件得到相应的语言信息。 例如GlobalMessages的方法。如下 public String getString(String packageName, String key) {Object[] parameters new Object[] {};return calculateString(packageName, key, parameters);} protected String calculateString(String packageName, String key, Object[] parameters){String stringnull;try { string findString(packageName, langChoice.getDefaultLocale(), key, parameters); } catch(MissingResourceException e) {};if (string!null) return string; //首先从本包内搜寻相应的语言文件try { string findString(SYSTEM_BUNDLE_PACKAGE, langChoice.getDefaultLocale(), key, parameters); } catch(MissingResourceException e) {};if (string!null) return string; //如果没有就从i18n包内搜寻相应的语言 //搜索不到那么就尝试用默认的第二语言 。。。。。。return string;}上述类的关系如下图 异常处理 在org.pentaho.di.core.exception内 日志模块 核心代码在src-core内org.pentaho.di.core.logging。 核心类是LogWriter.java。在应用中依赖于语言支持包和文件操作类其输出方式问文件和控制台端口两种方式默认是控制台也可以同时输出到文件中只要调用不同的构造函数即可。 例如在LogWriter()中有pentahoLogger.addAppender(consoleAppender);表示日志输出到控制台。 在LogWriter(String filename, boolean exact, int level) { this();调用LogWriter()构造函数 this.level level; fileAppender createFileAppender(filename, exact); addAppender(fileAppender);除了控制台的输出也增加了文件输出。 … 在日志记录中用以下函数记录不同的日志级别。 元数据表示模块 元数据表示模块是Kettle的Transformation部分的元数据表示类用于Transformation的任务中由于修改后的源码没有采用Kettle的元数据而是采用了PDM的元数据这里对Kettle的元数据只做大概介绍。 这是Kettle的基本的数据类型。 这是Kettle用于元数据处理的类核心类是RowMeta’和ValueMeta主要作用是读取kettle文件生成元数据信息或保存元数据包含在org.pentaho.di.core.row子包内。core中的其他重要的类 工具类ResolverUtil解析类多种用途例如读取配置文件加载他们的信息搜寻有标记的类判断一个类和另一个类的关系不是对象的关系。Props类该类主要用来存储所有的用户与程序间的交互信息例如界面颜色、布局等。算法组件描述类Plugin是所有算法组件的基本的元数据信息子类是JobPlugin和StepPlugin类。算法结果类Result保存Job运算结果详细如下 private boolean result;执行是否出现异常 private int exitStatus; 执行结果状态 private List rows;一个jobEntry完成处理后的数据若存在 private MapString, ResultFile resultFiles;常量类Props:保存用户的节本设置信息PropsUI保存用户的界面层的设置信息。Const。保存常量信息。 PDIClassLoader装载类装载插件jar包的类。界面字符显示BaseMessages、GlobalMessages、MessageHandler、LanguageChoice和LAFMessageHandler这几个类负责界面字符串的显示。 目录结构 总体目录结构 Kettle源代码主要由9个source folder组成其中core、engine、ui和test实现了主要功能。core包 core包中是kettle的核心类。 compatibility compatibility是系统用到的数值类型及对应接口。core core包是整个应用程序处理的基础。 core.config应用程序的核心配置类 core.exception包异常处理类。 org.pentaho.di.core.xml包XML相关接口及封装类。 org.pentaho.di.core.logging包日志。 org.pentaho.di.core.plugins包组件加载类。 org.pentaho.di.core.row包行的数据、元信息、操作。 org.pentaho.di.core.gui包界面接口类和一些基础图形类。 org.pentaho.di.core.database下包含各数据库对应的类以及对应的元数据类还包括不同类型数据库必须继承的基类和必须实现的接口。 engine包 包含调度逻辑和具体的执行代码。最重要的两个包为engine.src.org.pentaho.di.job和engine.src.org.pentaho.di.trans。 Job中包含各个功能组件entry的逻辑实现每一个entry具有特定的不同功能实现不同的数据处理和应用。例如文件输入组件fileinput entry实现从外部数据源导入数据文件到程序。 Trans类似job每个特定的步骤step实现整个转换transformation中的特定步骤。 job包 Job的每个执行单元称为entry。 engine.src.org.pentaho.di.job.entry包中存放了每个entry必须继承的基类以及必须实现的接口。 engine.src.org.pentaho.di.job.entries包中存放了不同entry的具体实现。 trans包 Transfromation中的每个执行步骤称为step。 engine.src.org.pentaho.di.trans.step包中存放了每个step必须继承的基类以及必须实现的接口。 engine.src.org.pentaho.di.trans.steps包中存放了不同step的具体实现。 kitchen包 Job的命令行执行器类。 pan包 Transformation的命令行执行器类。 logging包 日志模块。 ui包 这是关于界面部分的最主要的就是其下的org.pentaho.di.ui包。 ui子包结构如上图所示其中最重要的3个子包为org.pentaho.di.ui.joborg.pentaho.di.ui.trans和org.pentaho.di.ui.spoon。 Job包中是各个entry参数配置对话框的实现一般为一个弹出对话框。 Trans包和job包类似为step的参数配置对话框的实现。 Spoon是整个kettle程序的主界面相关类所在的包主要功能是初始化主界面、相关工作参数配置以及实现界面响应事件。 几个比较重要的类是 org.pentaho.di.ui.spoon包中的Spoon类对应整个程序的入口 org.pentaho.di.ui.spoon.job包中的JobGraph类编辑区的Job类型的标签页负责标签页的菜单、组件等的显示和事件响应 org.pentaho.di.ui.spoon.delegates包此包中的类是负责作业或转换的装载、运行的控制。 整个包的功能比较多也比较复杂下面会有更详细的说明。 job包 每个entry的参数设置面板类、进程对话类及需要继承的基类。 trans包 每个step的参数设置面板类、进程对话类及需要继承的基类。 spoon包 整个软件的入口对应ui.src.org.pentaho.di.ui.spoon包中的Spoon类。 test包 测试代码。 主要源码分析 用户界面工作过程 初始化过程 图形界面的应用程序的初始化包括程序环境的初始化、组件加载、用户历史信息初始化以及界面初始化。 环境初始化的内容主要包括加载kettle.properties文件获取和设置一些系统变量和路径值。 组件加载的内容主要包括读取kettle-config.xml指明下面的三个文件的路径、处理文件如kettle-job.xml所用的类、处理规则文件、处理起点、kettle-job.xml所有可用的entry组件均在此注册、kettle-step.xml所有可用的step组件均在此注册、kettle-plugin.xml等文件将文件中注册的组件加载到程序中存放在对应的列表中如entry要存放在jobEntryLoader.pluginList中其中jobEntryLoader是JobEntryLoader类对象。 用户历史信息加载内容主要包括最近打开的一些文件、上次关闭时kettle的状态打开的标签页、编辑区域的内容等、用户对应用程序的偏好设置颜色、字体、布局等。 界面初始化内容主要包括界面的初始化 创建PropsUI对象props存储之前加载的用户历史信息、创建Spoon对象并赋值给staticSpoon表示整个应用程序界面、绘制窗口、添加菜单及各种面板等、注册热键、将内容组件等显示到相应位置。设置监听器 用来监听界面的按键或者鼠标等事件。main函数 初始化在主函数开始时进行spoon类的main如下 public static void main(String[] a) throws KettleException { try { // Do some initialization of environment variables //环境初始化 EnvUtil.environmentInit(); ListString args new ArrayListString(java.util.Arrays.asList(a)); Display display new Display(); //这是程序的加载界面 Splash splash new Splash(display); CommandLineOption[] commandLineOptions getCommandLineArgs(args); initLogging(commandLineOptions); //组件加载 initPlugins(); //用户配置信息加载 PropsUI.init(display, Props.TYPE_PROPERTIES_SPOON); // things to // // remember... staticSpoon new Spoon(display); //主界面初始化 staticSpoon.init(null); SpoonFactory.setSpoonInstance(staticSpoon); staticSpoon.setDestroy(true); //打开界面并显示 GUIFactory.setThreadDialogs(new ThreadGuiResources()); … // listeners //设置监听器 try { staticSpoon.lcsup.onStart(staticSpoon); } catch (LifecycleException e) { …… } staticSpoon.setArguments(args.toArray(new String[args.size()])); //程序初始化完毕进入响应-处理阶段程序进行正常工作 staticSpoon.start(splash, commandLineOptions); } catch (Throwable t) { …… } // Kill all remaining things in this VM! System.exit(0); }环境初始化 调用类org.pentaho.di.ui.spoon包中的Spoon类 调用位置public static void main(String[] a) throws KettleException { … //初始化Kettle运行环境 EnvUtil.environmentInit(); …} 初始化流程首先主函数Spoon调用EnvUtil的静态函数environmentInit()。在environmentInit()中使用下面语句来读取加载运行环境。 //下面的语句是从kettle的主目录中读取配置文件kettle.properties Map kettleProperties EnvUtil.readProperties(Const.KETTLE_PROPERTIES);System.getProperties().putAll(kettleProperties); 接着同时还要设定使用语句System.getProperties().put(key,value)一些临时的环境变量(Also put some default values for obscure environment variables in there)。运行环境初始化结束返回主函数Spoon。 对应engine.src.org.pentaho.di.core包中KettleEnvironment类负责初始化Kettle运行环境主要包括调用cort.src.org.pentaho.di.core.plugins.PluginRegistry类的init()方法加载组件。 public static void init(boolean simpleJndi) throws KettleException { //若未曾初始化 if (initializednull) {//若不存在则创建一个home文件夹 createKettleHome();//加载home中kettle.properties文件的内容 EnvUtil.environmentInit();//Log初始设置容量大小、超时时长 CentralLogStore.init(); …//加载Kettle需要用到的变量值从Kettle-variables.xml文件 KettleVariablesList.init();initialized true; } }组件初始化: 因为job和transformation类似这里只拿job的entry来做说明组件加载过程。 在main函数中调用函数initPlugins()初始化JobEntryLoader最后返回主函数。 整个过程如下 Spoon.main - Spoon.initPlugins()- JobEntryLoader.init() 下面是init()函数体 public static final void init() throws KettleException { JobEntryLoader loader getInstance(); loader.readNatives(); loader.readPlugins(); loader.initialized true; }getInstance()用来获取一个JobEntryLoader对象进入函数体 public static final JobEntryLoader getInstance() { if (jobEntryLoader ! null) return jobEntryLoader; jobEntryLoader new JobEntryLoader(); return jobEntryLoader; } 其中jobEntryLoader是JobEntryLoader的自身静态对象private static JobEntryLoader jobEntryLoader null;可见getInstance()函数创建并返回了JobEntryLoader类的自身静态对象此对象创建后便唯一存在。loader.readNatives()读取并解析kettle-config.xml、kettle-job.xmlb等相关xml文件最终将所有entry都加入到列表jobEntryLoader. pluginList中实现了组件的加载。 public boolean readNatives() { try { // annotated classes first ConfigManager? jobsAnntCfg KettleConfig.getInstance().getManager(jobs-annotation-config); //读取文件kettle-config.xml文件中关于//job-xml-config的部分,即标签config idjobs-xml-config CollectionJobPluginMeta jobs jobsAnntCfg.loadAs(JobPluginMeta.class); ConfigManager? jobsCfg KettleConfig.getInstance().getManager(jobs-xml-config); CollectionJobPluginMeta cjobs jobsCfg.loadAs(JobPluginMeta.class);jobs.addAll(cjobs);//加入到pluginList 类型为JobPlugin.TYPE_NATIVE for (JobPluginMeta job : jobs) if (job.getType() ! JobEntryType.NONE) pluginList.add(new JobPlugin(JobPlugin.TYPE_NATIVE, job.getId(), job.getType(), job.getTooltipDesc(), null, null, job.getImageFileName(), job.getClassName().getName(), job.getCategory())); } catch (KettleConfigException e) {… } return true; } loader.readPlugins()读取并解析kettle-config.xml、和kettle-plugin.xml等相关xml文件最终将一些特别的组件如dummy组件这些组件以jar包的形式存放在plugin/jobentries目录下加载到jobEntryLoader.pluginList中类型为JobPlugin.TYPE_PLUGIN 。用户历史信息初始化用户历史信息包括最近打开的一些文件、最近一次打开的kjb、用户对应用程序的偏好设置颜色、字体、布局等 在main函数中调用语句PropsUI.init(display, Props.TYPE_PROPERTIES_SPOON); 实现配置信息初始化函数代码如下 public static final void init(Display d, int t) { … props new PropsUI(t);// Also init the colors and fonts to use...GUIResource.getInstance(); … }public Spoon(Display d, Repository rep) { … props PropsUI.getInstance(); … } public static final PropsUI getInstance() { if (props!null) return (PropsUI) props; … }PropsUI是Props的子类props是Props的自身静态对象。 在PropsUI.init()函数中首先实例化props这其实是加载了一些用户的历史信息存放在props对象中。然后使用GUIResource.getInstance()来初始化应用程序的颜色、字体、风格等。 初始化spoon时(main函数中语句staticSpoon new Spoon(display);)在Spoon(display)函数中语句props PropsUI.getInstance(); 界面初始化 staticSpoon new Spoon(display); staticSpoon.init(null); SpoonFactory.setSpoonInstance(staticSpoon); staticSpoon.setDestroy(true); GUIFactory.setThreadDialogs(new ThreadGuiResources()); … // listeners try { staticSpoon.lcsup.onStart(staticSpoon); } catch (LifecycleException e) { … } staticSpoon.start(splash, commandLineOptions); …第1-5行是界面的初始化后面的第8行则是设置监听器用来监听界面的按键或者鼠标等事件。而第10行是进入事件监听-处理的循环中直到程序关闭。 staticSpoon new Spoon(display)获取Spoon对象以及程序初始化所需要的配置信息 public Spoon(Display d, Repository rep) { this.log LogWriter.getInstance(); … //获取系统配置信息和包括用户配置信息 props PropsUI.getInstance();shell new Shell(display); shell.setText(APPL_TITLE); staticSpoon this;JndiUtil.initJNDI();SpoonFactory.setSpoonInstance(this); } staticSpoon.init(null)函数用来初始化界面 public void init(TransMeta ti) { //界面布局 FormLayout layout new FormLayout(); layout.marginWidth 0; layout.marginHeight 0; shell.setLayout(layout); … // Load settings in the props设置配置信息 loadSettings(); … …//其他设置主要是热键的设置initFileMenu(); //主界面 sashform new SashForm(shell, SWT.HORIZONTAL); …//主界面的设置 //加入菜单 addMenu(); //加入左边的树面板 addTree(); //加入标签即上次未关闭的job addTabs(); … }主界面标题 找到spoon.java下面的 public static final String APP_NAME BaseMessages.getString(PKG, Spoon.Application.Name);初始化Spoon界面 读取ui*.xul文件进行部署。 对应Spoon类的init()方法 public void init(TransMeta ti) { //对界面布局进行设置 shell.setLayout(layout);//加入ktr、kjb文件读写监听器 addFileListener(new TransFileListener()); addFileListener(new JobFileListener()); … //加载初始化一些变量 … try { //SwtXulLoader类没有提供src xulLoader new SwtXulLoader(); … } catch (…) { … } //加载部分固定组件及对应监听器 … }初始化过程时序图 Job、Transformation界面调用 通过用户界面操作点击运行按钮触发监听器调用Spoon的runFile()方法。Spoon类runFile( ) public void runFile() {executeFile(true, false, false, false, false, null, false);}Spoon类executeFile( ) 在这一层决定执行Job还是Transformation。 public void executeFile(boolean local, boolean remote, boolean cluster, boolean preview, boolean debug,Date replayDate, boolean safe) { //获取当前活跃的Transformation元信息TransMeta transMeta getActiveTransformation();if (transMeta ! null)executeTransformation(transMeta, local, remote, cluster, preview, debug, replayDate, safe); //获取当前活跃的Job元信息JobMeta jobMeta getActiveJob();if (jobMeta ! null)executeJob(jobMeta, local, remote, replayDate, safe); }Spoon类getActiveTransformation( )、getActiveJob( ) public TransMeta getActiveTransformation() { EngineMetaInterface meta getActiveMeta(); if (meta instanceof TransMeta) { return (TransMeta) meta;}return null; } getActiveJob()类的实现同getActiveTransformation()。如上图JobMeta和TransMeta都实现了EngineMetaInterface。 上图可见EngineMetaInterface包含Job、Transformation的整体操作。 Spoon类getActiveMeta( ) public EngineMetaInterface getActiveMeta() { if (tabfolder null) return null; TabItem tabItem tabfolder.getSelected(); if (tabItem null) return null; //通过当前活跃的Tab标签确定返回类型。 TabMapEntry mapEntry delegates.tabs.getTab(tabfolder.getSelected()); EngineMetaInterface meta null; if (mapEntry ! null) { if (mapEntry.getObject() instanceof TransGraph) meta (mapEntry.getObject()).getMeta(); if (mapEntry.getObject() instanceof JobGraph) meta (mapEntry.getObject()).getMeta(); } return meta; }下图红色方框内为活跃的Tab。 Spoon类Splash 初始化显示的界面 Splash splash new Splash(display);选择执行时序图 下图总结了系统如何判定执行Job还是Transformation。 Job调用后续步骤 最终转交给JobGraph负责执行。Transformation调用后续步骤 最终转交给TransGraph负责执行。Spoon类executeTransformation( ) public void executeTransformation(…) {new Thread() {… //利用trans执行代理执行transdelegates.trans.executeTransformation(transMeta, local, remote, cluster, preview, debug, replayDate, safe);…}.start(); }SpoonTransformationDelegate类executeTransformation( ) public void executeTransformation(…){ … //获取当前活跃的trans TransGraph activeTransGraph spoon.getActiveTransGraph(); … //将配置设置入executionConfiguration后调用TransGraph实例执行 activeTransGraph.start(executionConfiguration); … }Job 主要介绍src.org.pentaho.di.job包下的一些主要的类。这些类主要作用是实现job作业的逻辑功能包括创建job、维护job及执行job等。 创建job Job有创建有两种方式一种是从kjb文件中加载到程序另一种是用户手动通过菜单创建。两种过程都差不多只是手动创建的初始时是一个空的没有组件的job。 创建job实质上就是创建一个jobMeta对象。 所有的job是由SpoonDelegates类来控制的 SpoonDelegates中有job和step两种类型的工作流。SpoonJobDelegate 是SpoonDelegatesd的子类表示job类型的工作流控者。SpoonJobDelegate 中MapString, JobMeta jobMap 存放着多个job一个JobMeta就是一个job。打开job打开kjb 打开job就是打开kjb文件首先是由jobMeta调用loadXML加载作业的一些公共信息然后调用具体组件的loadXML函数加载它们自己的组件信息最后jobMeta加载hop信息 有两种情况 1、初始化时打开上次程序关闭前没有关闭的标签页一个标签页代表一个job 2、通过菜单项打开 两种情况下创建jobMeta的过程都是通过调用Spoon.openFile()来实现不同的是初始化时加载需要使用handleStartOptions ()和loadLastUsedFile()获取上次关闭的信息。 handleStartOptions(options)中的相关代码如下 ListLastUsedFile lastUsedFiles props.getOpenTabFiles(); for (LastUsedFile lastUsedFile : lastUsedFiles) { RepositoryMeta repInfo (rep null) ? null : rep.getRepositoryInfo(); loadLastUsedFile(lastUsedFile, repInfo, false); }首先是从props中获取上次关闭前打开的文件已经在用户历史信息初始化阶段完成加载然后逐个将工作流加载到当前应用程序。 private void loadLastUsedFile函数中调用openFile函数来打开job文件调用语句如下 if (lastUsedFile.isJob()) { openFile(lastUsedFile.getFilename(), false);}Spoon中的openFile函数 public void openFile(String fname, boolean importfile) { … loaded listener.open(root, fname, importfile); … }fname是文件kjb路径和名称。 JobFileListrener类中的open函数 public boolean open(Node jobNode, String fname, boolean importfile){Spoon spoon Spoon.getInstance();try{//生成jobMeta此时里面没有entryJobMeta jobMeta new JobMeta(spoon.getLog()); //从kjb文件中加载entry到jobMeta中jobMeta.loadXML(jobNode, spoon.getRepository(), spoon);spoon.setJobMetaVariables(jobMeta);//添加当前job为最近打开文件到菜单中 spoon.getProperties().addLastFile(LastUsedFile.FILE_TYPE_JOB, fname, null, false, null);spoon.addMenuLast(); //如果job的名字改变则刷新标签页上的名称if (!importfile) jobMeta.clearChanged();jobMeta.setFilename(fname); //加入到spoon.delegates.jobs中为当前打开的一个jobspoon.delegates.jobs.addJobGraph(jobMeta);spoon.refreshTree();spoon.refreshHistory();return true;}catch(KettleException e){…}return false; }函数首先是创建一个jobMeta对象然后通过所给的文件构建entry并加入到jobMeta中然后将整个jobMeta加入当前工作流集合spoon.delegates.jobs中。最后刷新主对象树面板。 jobMeta.loadXML(jobNode, spoon.getRepository(), spoon)函数从文件由spoon.getRepository()提供加载所有entry并加入jobMeta。详细如何加载kjb文件请见“打开kjb”一节。 spoon.delegates.jobs.addJobGraph(jobMeta)函数的部分代码如下 String key addJob(jobMeta){… jobMap.put(key, jobMeta);…}; JobGraph jobGraph new JobGraph(spoon.tabfolder.getSwtTabset(), spoon, jobMeta); spoon.delegates.tabs.addTab(new TabMapEntry(tabItem, tabName, jobGraph,TabMapEntry.OBJECT_TYPE_JOB_GRAPH));所以此函数不仅是将jobMeta加入到jobs 列表变量jobMap中中还生成相应的JobGraph标签页。 打开kjb文件并创建job的过程如下图所示 新建job 通过菜单或者鼠标新建job实际上是调用函数Spoon.newJobFile()来实现的。 下面我们来看看这个函数 public void newJobFile() { try { //生成jobMeta对象 JobMeta jobMeta new JobMeta(log); … //将jobMeta加入delegates.jobs中并创建标签页tab delegates.jobs.addJobGraph(jobMeta); … refreshTree(); } } catch (Exception e) { … } }和初始化创建那里的openFile()函数差不多只是openFile()函数还有从xml文件加载组件这部分内容。newJobFile()这里只是新建一个JobMeta类的对象jobMeta然后调用delegates.jobs.addJobGraph(jobMeta)将jobMeta对象加入到delegates.jobs的jobMap列表中然后添加相应的标签页。 整个过程函数的调用关系如下 保存job保存kjb 保存job就是保存kjb文件。 kjb等文件是kettle的工作文件kettle将作业信息存放到kjb文件中。包括输入输出数据信息、当前所有组件配置组件之间的连接关系等说明。 和打开kjb文件过程比较类似保存kjb首先是由jobMeta调用getXML函数写入一些公共信息当要写入具体的组件时再调用特定的组件由它们的getXML函数写入自己特定信息的内容最后由jobMeta写入hop信息。 JobGraph.saveFile()响应鼠标单击保存按钮事件。 public void saveFile() {spoon.saveFile(); } Spoon.saveFile()完成保存kjb文件的功能。 public boolean saveFile() { TransMeta transMeta getActiveTransformation(); if (transMeta ! null) return saveToFile(transMeta); //获取当前活动jobMeta JobMeta jobMeta getActiveJob(); if (jobMeta ! null) //保存到kjb文件 return saveToFile(jobMeta); return false; }Spoon.saveToFile(EngineMetaInterface meta)函数 public boolean saveToFile(EngineMetaInterface meta) { if (meta null) return false; boolean saved false; … if (rep ! null) { saved saveToRepository(meta); } else { if (meta.getFilename() ! null) { //保存 saved save(meta, meta.getFilename(), false); } else {//另存为 saved saveFileAs(meta); } } … return saved; }另存为和保存类似下面拿保存作例子说明 saved save(meta, meta.getFilename(), false)的调用函数如下 public boolean save(EngineMetaInterface meta, String fname, boolean export) { boolean saved false; FileListener listener null; // match by extension first //获取扩展名 int idx fname.lastIndexOf(.); if (idx ! -1) { String extension fname.substring(idx 1); listener fileExtensionMap.get(extension); } if (listener null) { String xt meta.getDefaultExtension(); listener fileExtensionMap.get(xt); } //如果当前job名称被修改则更改标签页上的名称 if (listener ! null) { String sync BasePropertyHandler. getProperty(SYNC_TRANS); if (Boolean.parseBoolean(sync)) { listener.syncMetaName(meta, Const.createName(fname)); delegates.tabs.renameTabs(); }//保存到文件 saved listener.save(meta, fname, export); … } return saved; }上面的函数主要作用是获取文件的扩展名并按照给定的job名称改变标签页的名称。最后保存到文件。 FileListener.save()函数代码如下 public boolean save(EngineMetaInterface meta, String fname,boolean export) {Spoon spoon Spoon.getInstance();EngineMetaInterface lmeta;if (export){lmeta (JobMeta)((JobMeta)meta).realClone(false);}elselmeta meta;//保存到文件return spoon.saveMeta(lmeta, fname);}spoon.saveMeta(lmeta, fname)语句的调用函数如下 public boolean saveMeta(EngineMetaInterface meta, String fname) { //获取文件名 meta.setFilename(fname); if (Const.isEmpty(meta.getName()) || delegates.jobs.isDefaultJobName(meta.getName())) { meta.nameFromFilename(); }boolean saved false; try {//构造XML文件内容 String xml XMLHandler.getXMLHeader() meta.getXML();DataOutputStream dos new DataOutputStream(KettleVFS.getOutputStream(fname, false)); //将内容写入文件 dos.write(xml.getBytes(Const.XML_ENCODING)); dos.close(); saved true; … return saved; }先写入文件的头部XMLHandler.getXMLHeader()然后写入job即标签。 meta.getXML()调用JobMeta.getXML()函数详细代码如下 public String getXML() { Props props null; if (Props.isInitialized()) props Props.getInstance();DatabaseMeta ci getLogConnection(); StringBuffer retval new StringBuffer(500); //添加一些标签 retval.append().append(XML_TAG).append().append(Const.CR); //$NON-NLS-1$retval.append( ).append(XMLHandler.addTagValue(name, getName())); //$NON-NLS-1$ //$NON-NLS-2$ …//添加其他的很多标签//添加所有属于该job的entry信息 retval.append( entries).append(Const.CR); //$NON-NLS-1$一一添加所有属于该job的entry信息 for (int i 0; i nrJobEntries(); i) { JobEntryCopy jge getJobEntry(i); //写入当前entry的信息 retval.append(jge.getXML()); } retval.append( /entries).append(Const.CR); //$NON-NLS-1$ //hop信息 retval.append( hops).append(Const.CR); //$NON-NLS-1$ for (JobHopMeta hi : jobhops) // Look at all the hops { retval.append(hi.getXML()); } retval.append( /hops).append(Const.CR); //$NON-NLS-1$//其他标签 return retval.toString(); }retval.append(jge.getXML())语句是具体类型的组件调用自己的getXML()函数写入自己的特定的信息,整个过程时序图如下 执行job 程序中可能存在多个jobMeta因此首先应该找到当前活动的jobMeta。然后用Job类来运行这个jobMeta按照连接顺序依次从jobMeta中取出entry调用entry自己的execute函数执行将执行结果存储到Result类中。 JobGraph.runJob()响应鼠标单击运行按钮事件,然后调用Spoon.runFile()执行运行当前job。 public void runFile() { executeFile(true, false, false, false, false, null, false); } public void executeFile(boolean local, boolean remote, boolean cluster, boolean preview, boolean debug, Date replayDate, boolean safe) { … //找到当前活动的jobMeta JobMeta jobMeta getActiveJob(); if (jobMeta ! null) //运行该jobMeta executeJob(jobMeta, local, remote, replayDate, safe); } public void executeJob(JobMeta jobMeta, boolean local, boolean remote, Date replayDate, boolean safe) { try { delegates.jobs.executeJob(jobMeta, local, remote, replayDate, safe); } catch (Exception e) {…} }delegates.jobs.executeJob调用函数如下 public void executeJob(JobMeta jobMeta, boolean local, boolean remote, Date replayDate, boolean safe) throws KettleException { … JobExecutionConfiguration executionConfiguration spoon.getJobExecutionConfiguration(); … //运行参数配置对话框 JobExecutionConfigurationDialog dialog new JobExecutionConfigurationDialog(spoon.getShell(), executionConfiguration, jobMeta); if (dialog.open()) { // addJobLog(jobMeta); //获取当前活动标签即当前jobMeta JobGraph jobGraph spoon.getActiveJobGraph(); … // Is this a local execution? //运行当前job if (executionConfiguration.isExecutingLocally()) { jobGraph.startJob(executionConfiguration); } … } }先弹出运行参数配置对话框配置完毕并确定后接着获取当前活动的标签对象也就是获取当前活动的工作流。然后运行这个job。 jobGraph.startJob函数代码如下: public synchronized void startJob(JobExecutionConfiguration executionConfiguration) {if (job null) // Not running, start the transformation...{// Auto save feature... //自动保存如果当前job未保存则必须先保存为kjb文件if (jobMeta.hasChanged()) {if (spoon.props.getAutoSave()) {… job new Job(log, jobMeta.getName(), jobMeta.getFilename(), null);…job.start();…}job 是JobGraph类的成员变量用来控制jobMeta的执行它是一个线程类对象。这个函数首先如果当前jobMeta没有保存则需要保存接着新建一个Job类对象将job与当前jobMeta关联起来最后启动线程job.start()。 上面的过程的时序图如下 接下来我们看看Job.run()函数 public void run() { try { stoppedfalse; finishedfalse;initialized true;…// Run the job, dont fire the job listeners at the end//执行result execute(false); } catch(Throwable je) {… }} Result用来保存执行结果result execute(false)函数代码如下 private Result execute(boolean fireJobListeners) throws KettleException{ …// Where do we start?寻找开始组件JobEntryCopy startpoint;beginProcessing();startpoint jobMeta.findJobEntry(JobMeta.STRING_SPECIAL_START, 0, false);if (startpoint null) { throw new KettleJobException(Messages.getString(Job.Log.CounldNotFindStartingPoint)); }JobEntrySpecial jes (JobEntrySpecial) startpoint.getEntry();Result res null;boolean isFirst true;while ( (jes.isRepeat() || isFirst) !isStopped()){isFirst false; //进入递归执行组件res execute(0, null, startpoint, null, Messages.getString(Job.Reason.Started));}… return res;} 递归方式执行job中的entry自动的从一个entry到另一个entry执行。使用回溯算法。 private Result execute(final int nr, Result prev_result, final JobEntryCopy startpoint, JobEntryCopy previous, String reason) throws KettleException { …// Execute this entry...JobEntryInterface cloneJei (JobEntryInterface)jei.clone();((VariableSpace)cloneJei).copyVariablesFrom(this); //调用cloneJei自己的execute函数做和自己功能相关的处理。 // cloneJei是一个有确定类型的JobEntryfinal Result result cloneJei.execute(prevResult, nr, rep, this);…// Same as before: blocks until its done//递归执行该函数运行下一个entryres execute(nr1, result, nextEntry, startpoint, nextComment);… }该过程的时序图如下 结果展示 结果显示是指运行job后将运行结果展示到应用程序的Execution results面板中如下图 包括三个标签History、Logging和Job metrics分别展示不同的信息。 JobGraph.startJob()函数中运行job运行结束后获取结果信息。 jobGraph.startJob函数代码如下 public synchronized void startJob(JobExecutionConfiguration executionConfiguration) {if (job null) // Not running, start the transformation...{// Auto save feature...if (jobMeta.hasChanged()) {if (spoon.props.getAutoSave()) {…job new Job(log, jobMeta.getName(), jobMeta.getFilename(), null);…//运行job.start();… // Show the execution results views //添加结果显示的三个标签页 addAllTabs(); …}添加显示结果的三个标签页 public void addAllTabs() { CTabItem tabItemSelection null; if (extraViewTabFolder ! null !extraViewTabFolder.isDisposed()) { tabItemSelection extraViewTabFolder.getSelection(); } //三个标签页 jobHistoryDelegate.addJobHistory(); jobLogDelegate.addJobLog(); jobGridDelegate.addJobGrid(); … }整个时序图过程如下 执行示例说明 调试一个Job其拓扑如下 黑色Hop代表无条件执行下一个JobEntry红色代表结果为false执行下一jobEntry绿色代表结果为true执行下一跳。 General-Change log Processing和General-Change log Processin 2是两个trans。 job的执行顺序为jobEntry的深度遍历如标号所示。JobEntry 具体每个组件的执行体对应engine.src.org.pentaho.di.job.entries包内每个entry的具体实现。 execute()方法2中调用jobEntry的execute()完成jobEntry的具体功能。向job添加entry 首先创建job即jobMeta变量它存储着整个job的相关信息和资源。然后将你所要的entry加入JobMeta.jobcopies中用户界面表现为将组件拖拽到编辑区并且连接好各个组件。 entry就是job类型的组件区别于stepentry的添加也有两种不同的方式分别使用在不同的场合从kjb文件中加载原有的entry创建新的entry并加入到job中。从kjb文件中加载entry 从上面的打开job过程可知entry的加载就是读取kjb文件达的相关部分。用JobMeta.loadXML函数来读取kjb文件加载所有entry并加入jobMeta。 调用语句jobMeta.loadXML(jobNode, spoon.getRepository(), spoon)由spoon.getRepository()提供文件路径和名称。 函数的部分代码如下 public void loadXML(Node jobnode, Repository rep, OverwritePrompter prompter) throws KettleXMLException { Props props null; … // get job info: setName( XMLHandler.getTagValue(jobnode, name) ); … /** read the job entries...读取xml文件中的相应节点entries*/ Node entriesnode XMLHandler.getSubNode(jobnode, entries); //$NON-NLS-1$ int tr XMLHandler.countNodes(entriesnode, entry); //逐个读取节点entry,并根据节点信息构建JobEntryCopy对象加 //入到工作流中 for (int i 0; i tr; i) { Node entrynode XMLHandler.getSubNodeByNr(entriesnode, entry, i); //$NON-NLS-1$JobEntryCopy je new JobEntryCopy(entrynode, databases, slaveServers, rep); … // Add the JobEntryCopy... addJobEntry(je); } //add hop连线 Node hopsnode XMLHandler.getSubNode(jobnode, hops); //$NON-NLS-1$ int ho XMLHandler.countNodes(hopsnode, hop); //$NON-NLS-1$ for (int i 0; i ho; i) { Node hopnode XMLHandler.getSubNodeByNr(hopsnode, hop, i); //$NON-NLS-1$ JobHopMeta hi new JobHopMeta(hopnode, this); jobhops.add(hi); } … }函数首先根据所给的根节点逐个读取节点子节点,并根据节点信息构建JobEntryCopy对象然后加入到jobMeta中。最后加入hop信息。 语句JobEntryCopy je new JobEntryCopy(entrynode, databases, slaveServers, rep)调用下面函数 public JobEntryCopy(Node entrynode, ListDatabaseMeta databases, ListSlaveServer slaveServers, Repository rep) throws KettleXMLException { try {//获取当前节点代表的组件类型 String stype XMLHandler.getTagValue(entrynode, type); //根据该类型从pluginList 中获取对应的JobPlugin 对象 JobPlugin jobPlugin JobEntryLoader.getInstance().findJobEntriesWithID(stype); … // Get an empty JobEntry of the appropriate class... //根据jobPlugin获取与之相应的entry类对象此时的entry并没有任何 //信息只是一个特定类型的entry entry JobEntryLoader.getInstance().getJobEntryClass(jobPlugin); if (entry ! null) { //调用该entry自身的loadXML()函数从文件中加载相应的组件信息 entry.loadXML(entrynode, databases, slaveServers, rep); … } }JobEntryCopy的构造函数能够根据节点构建节点中描述的相应组件实例。 JobEntryLoader.getInstance().getJobEntryClass(jobPlugin)函数下面有说明。 加载entry组件的时序图如下 创建新的entry 通过从组件树中拖拽需要的组件到工作区中可以创建新的组件。拖拽响应函数如下 public void drop(DropTargetEvent event)是一个事件响应函数响应鼠标拖拽放开事件。当我们从核心对象树中将组件拖拽到工作区并放下时便会触发这个事件。它的部分代码如下 public void drop(DropTargetEvent event) {… //获取当前坐标Point p getRealPosition(canvas, event.x, event.y); //获取组件类型try {DragAndDropContainer container (DragAndDropContainer) event.data; //entry是一个string对象表示该类组件的描述用来区别不同的组件String entry container.getData();switch (container.getType()) {case DragAndDropContainer.TYPE_BASE_JOB_ENTRY: // Create a new Job Entry on the canvas{//创建新的组件对象并加入到jobMeta中JobEntryCopy jge spoon.newJobEntry(jobMeta, entry, false); …newJobEntry函数参数typeDesc是类型描述信息jobMeta是当前job public JobEntryCopy newJobEntry(JobMeta jobMeta, String typeDesc, boolean openit) { return delegates.jobs.newJobEntry(jobMeta, typeDesc, openit); }delegates.jobs.newJobEntry函数,参数和上面的函数意义相同 public JobEntryCopy newJobEntry(JobMeta jobMeta, String type_desc, boolean openit) { //获取JobEntryLoader的自身静态变量jobEntryLoader JobEntryLoader jobLoader JobEntryLoader.getInstance(); JobPlugin jobPlugin null;try {//从jobEntryLoader.pluginList中获取响应的jobPlugin jobPlugin jobLoader.findJobEntriesWithDescription(type_desc); if (jobPlugin null) {//一些特殊组件 // Check if its not START or DUMMY if (JobMeta.STRING_SPECIAL_START.equals(type_desc) || JobMeta.STRING_SPECIAL_DUMMY.equals(type_desc)) {//特殊的组件 jobPlugin jobLoader.findJobEntriesWithID(JobMeta.STRING_SPECIAL);} } if (jobPlugin ! null) {// // Determine name number for this entry. String basename type_desc; int nr jobMeta.generateJobEntryNameNr(basename); String entry_name basename nr; //$NON-NLS-1$// Generate the appropriate class... //根据jobPlugin 的信息构建响应的JobEntry 对象 JobEntryInterface jei jobLoader.getJobEntryClass(jobPlugin); … //创建JobEntryCopy对象初始化一些相关信息 JobEntryCopy jge new JobEntryCopy(); jge.setEntry(jei); jge.setLocation(50, 50); jge.setNr(0); //加入到jobMeta中 jobMeta.addJobEntry(jge); …大概说说这个函数的整个过程首先是传入参数jobMeta和String type_desc一个是当前工作流对象一个是要添加组件的描述信息。要构建一个entry必须要从jobEntryLoader.pluginList列表初始化部分有介绍中获取和描述信息相符合的JobPlugin对象然后再根据这个对象生成一个实现JobEntryInterface接口的对象到此已经获取了一个特定类型的entry如本例中的JobEntrySuccess最后将其装换成JobEntryCopy对象并加入到jobMeta中。 下面是JobEntryInterface jei jobLoader.getJobEntryClass(jobPlugin) ;语句调用函数getJobEntryClass public JobEntryInterface getJobEntryClass(JobPlugin sp) throws KettleStepLoaderException { … //根据 sp 的类型获取该类型的组件的加载类cl switch (sp.getType()) { case JobPlugin.TYPE_NATIVE:{ cl Class.forName(sp.getClassname()); }break; case JobPlugin.TYPE_PLUGIN: { ClassLoader ucl getClassLoader(sp); cl ucl.loadClass(sp.getClassname()); }break; … } //cl 对应着特定的类型组件因此下面语句可以返回一个特定的entry对象 JobEntryInterface res (JobEntryInterface)cl.newInstance(); … return res; … }总的来说就是根据传入的JobPlugin获取特定类型的entry加载类然后使用加载类获取该类型的entry对象。 该过程时序图如下 b)编辑entry 双击工作区域中的组件弹出组件参数配置对话框实现entry的编辑。 首先捕捉双击事件 canvas.addMouseListener(new MouseAdapter() {public void mouseDoubleClick(MouseEvent e) {clearSettings(); //获取坐标Point real screen2real(e.x, e.y); //尝试根据坐标从当前的jobMeta中获取entryJobEntryCopy jobentry jobMeta.getJobEntryCopy(real.x, real.y, iconsize);if (jobentry ! null) { //如果当前双击的区域存在组件if (e.button 1) { editEntry(jobentry);} …}} JobGraph中的editEntry函数 protected void editEntry(JobEntryCopy je) {spoon.editJobEntry(jobMeta, je);} Spoon中的editJobEntry函数 public void editJobEntry(JobMeta jobMeta, JobEntryCopy je) { delegates.jobs.editJobEntry(jobMeta, je); }delegates.jobs.editJobEntry(jobMeta, je)语句调用的函数如下 public void editJobEntry(JobMeta jobMeta, JobEntryCopy je) { try{ … JobEntryCopy before (JobEntryCopy) je.clone_deep(); JobEntryInterface jei je.getEntry(); … //获取相应的对话框类 JobEntryDialogInterface d getJobEntryDialog(jei, jobMeta); if (d ! null) { if (d.open() ! null) { // First see if the name changed. // If so, we need to verify that the name is not already used in the job. // … spoon.refreshGraph(); spoon.refreshTree(); } } else{… } } catch (Exception e) {… } }根据特定的entry获取对应的对话框并且打开对话框编辑entry。 JobEntryDialogInterface d getJobEntryDialog(jei, jobMeta)语句调用下列函数 public JobEntryDialogInterface getJobEntryDialog(JobEntryInterface jei, JobMeta jobMeta) { //获取对话框类的名称 String dialogClassName jei.getDialogClassName(); try { Class? dialogClass;Class?[] paramClasses new Class[] { spoon.getShell().getClass(), JobEntryInterface.class, Repository.class, JobMeta.class };Object[] paramArgs new Object[] { spoon.getShell(), jei, spoon.getRepository(), jobMeta };Constructor? dialogConstructor; //获取对话框类 dialogClass jei.getPluginID()!null?JobEntryLoader.getInstance().loadClassByID(jei.getPluginID(), dialogClassName):JobEntryLoader.getInstance(). loadClass(jei.getJobEntryType().getDescription(),dialogClassName); dialogConstructor dialogClass.getConstructor(paramClasses); //返回对话框类对象 return (JobEntryDialogInterface) dialogConstructor.newInstance(paramArgs); } catch (Throwable t) {… } return null; }getDialogClassName()用来获取对话框类名它在JobEntryInterface中定义在JobEntryBase中实现代码如下 public String getDialogClassName(){ //当前类名 String className getClass().getCanonicalName(); //加上前缀di.ui.className className.replaceFirst(\\.di\\., .di.ui.); //加上后缀Dialog className Dialog; //最后的className为di.ui.当前类名Dialogreturn className;}在函数中根据当前的entry类名及其所在包返回其对应的Dialog类所在的包及类名。 可见entry编辑时才去查找其对应的参数配置对话框类。 整个过程时序图如下 相关类和接口 JobMeta 维护整个Job的元数据用来定义一个Job及进行与该Job加载、保存、修改等相关的操作。 该类用来定义一个job及进行与该job加载、保存、修改等相关的操作。 它的部分成员变量如下表 该类的部分成员函数如下 //添加entry实际上是添加一个JobEntryCopy对象 public void addJobEntry //添加hop public void addJobHop //查找特定的entry public JobEntryCopy findJobEntry //查找特定的hop public JobHopMeta findJobHop //查找下一个entry从当前entry到hop指出的entry public JobEntryCopy findNextJobEntry //查找前一个entry public JobEntryCopy findPrevJobEntry //找到start组件 public JobEntryCopy findStart() //获取指定的entry public JobEntryCopy getJobEntry //获取指定的hop public JobHopMeta getJobHop //获取用户选择的entry public JobEntryCopy getSelected //写xml文件将该工作流的信息写入到xml文件不包括entry的实际内容 public String getXML() //加载xml文件 public void loadXML() 该类的部分成员函数如下 //添加entry实际上是添加一个JobEntryCopy对象 public void addJobEntry //添加hop public void addJobHop //查找特定的entry public JobEntryCopy findJobEntry //查找特定的hop public JobHopMeta findJobHop //查找下一个entry从当前entry到hop指出的entry public JobEntryCopy findNextJobEntry //查找前一个entry public JobEntryCopy findPrevJobEntry //找到start组件 public JobEntryCopy findStart() //获取指定的entry public JobEntryCopy getJobEntry //获取指定的hop public JobHopMeta getJobHop //获取用户选择的entry public JobEntryCopy getSelected //写xml文件将该工作流的信息写入到xml文件不包括entry的实际内容 public String getXML() //加载xml文件 public void loadXML()JobEntryCopy 该类代表一个entry及额外的关于图像界面的相关信息如位置、选择状态。主要用于维护每一个不同entry或者相同entry的不同副本的信息。 主要成员 private JobEntryInterface entry;当前组件具体entry,执行入口 private int nr;副本数一个编辑区里可以出现多个相同组件 private boolean selected;是否被选择 private Point location;图标位置工作区中的坐标 private boolean draw; private long id;相关函数如下 //写xml文件主要是写entry的共性部分比如坐标 public String getXML //判断是否被选择 public boolean isSelected //设置entry对象 public void setEntry(JobEntryInterface je) //获取坐标 public Point getLocation //设置坐标 public void setLocation(Point pt);JobEntryLoader 这个类作用是加载所有的entryentry的来源有两种一种是内置的一种是插件形式。 JobEntryLoader的一些重要成员变量如下 JobEntryLoader的一些重要成员函数如下 加载所有组件 public static final void init() throws KettleException 包括加载内置组件和加载插件 加载内置组件 public boolean readNatives() 内置组件在xml文件中定义。 加载插件 public boolean readPlugins() 插件在plugin目录下一个plugin为一个jar 根据特定的plugin返回相应的entry public JobEntryInterface getJobEntryClass(String desc) throws 因为工作流中真正执行单元功能的是entrypluginList中储存的并非entry只是一些加载entry所需要的信息比如加载类、entry类型、ID等所以我们必须根据plugin加载相应的entry并返回。 JobEntryInterface JobEntryInterface是Job Entry插件的主要实现接口是所有具体entry必须实现的接口。需要实现的功能主要是获取entry的一些共性信息比如名称、ID、类型、描述信息、是否是特殊entry、参数配置对话框类名、写xml、读xml等。 每个具体engine.src.org.pentaho.di.job.entries包下的 entry类需要实现的接口包含execute方法。 几个比较重要的函数如下 //获取该entry对应的参数配置对话框类 public String getDialogClassName() //读kjb中entry标签 public void loadXML //写kjb中的 entry标签 public String getXML //执行entry public Result execute(Result prev_result, int nr, Repository rep, Job parentJob) throws KettleException;JobEntryDialogInteface 负责构建和打开参数设置对话框。Spoon通过调用open函数打开该对话框spoon是使用swt框架的所以对话框也应使用swt来实现。JobEntryBase 该类是所有entry的基类包含了entry一些共性的变量和操作。因为每一个具体的entry如prime组件都必须继续JobEntryBase类并且实现JobEntryInterface接口所以可以在JobEntryBase类中实现JobEntryInterface接口中的一些函数。这些函数应该不区别具体的entry对每一种不同的entry都是一样的操作。 JobEntryBase中的一部分变量如下 主要的函数和JobEntryInterface差不多。JobEntrySpecial 功能是开启一个job只是简单地对传递来的preResult设置它的的result属性值为trueJob项目据此判断前一结果执行完毕。返回该对象即可。JobEntryTableExit 功能是判断一个table是否存在数据库中。JobEntryTableExit Job项目有属性tablename和DatabaseMeta对数据库的元数据信息描述根据DatabaseMeta得到一个Dabase对象db建立连接db.connect(); 调用db.checkTableExists(tablename)根据此返回值设置preResult的result属性为否为true。返回preResult对象。JobEntryTrans JobEntryJob和JobEntryTrans是嵌套job或trans的Job项目JobEntry。它们是比较复杂的job项目。 作用是执行一个trans。首先实例化一个TransMeta,之后实例化Trans。调用trans.start()当执行完毕后调用函数trans.getResult()并把结果加到preResult中返回该对象即可。组件JobEntryXXX Entry有很多种每种不同的entry代表不同功能。但是各个entry所具有的函数差别不大只是内容有比较大的差别。下面拿prime组件来介绍一个具体的组件的实现类构成。 JobEntryPrime是主键约束组件的实现类其主要功能是检查数据中的主键约束关系是否能够满足。支持多字段属性的主键检查支持异常数据不满足约束的数据保存到其他文件。 类定义public class JobEntryPrime extends JobEntryBase implements Cloneable,JobEntryInterface 继承JobEntryBase类实现JobEntryInterface接口。 //克隆函数 public Object clone() //写kjb文件函数 public String getXML() //读取kjb文件 public void loadXML(Node entrynode, ListDatabaseMeta databases,ListSlaveServer slaveServers, Repository rep) throws KettleXMLException //执行该组件 public Result execute(Result previousResult, int nr, Repository rep,Job parentJob) throws KettleJobExceptionJobEntryType枚举 列举所有内置entry类型及其描述每一个内置entry必须注册一个类型以区别其他entry。Job类 Job的执行类是一个线程类本身实现了Thread是一个单独的线程。Job entry可以是单独的线程也可以是顺序执行大多数情况都是顺序执行下一步以上一步的执行结果为基础。Job类也包括转换加载、相关插件的实例化、初始化、运行、监视Job执行。 public class Job extends Thread implements VariableSpace, NamedParams 其主要功能是负责作业的执行。该类的一些重要成员变量如下 private JobMeta jobMeta//当前工作流 private Job parentJob// private Result result//执行结果保存 private boolean finished//结束标志线程的run函数 public void run()执行entry public Result execute(Boolean) 函数最终一一地从jobMeta中取出entry并调用entry它自己的execute方法。 m)Result 每一个jobEntryInterface的实现类在完成相应功能时返回结果的类型。 Result中也可以有处理数据这些处理数据可以作为下一个Job项目JobEntry的输入。但是容量受内存容量限制。 主要成员变量 private boolean result;执行是否出现异常 private int exitStatus; 执行结果状态 private ListRowMetaAndData rows;一个jobEntry完成处理后的数据若存在 private MapString, ResultFile resultFiles; Transformation 这个模块和Job模块类似主要负责转换执行相关的所有任务包括转换加载、相关插件的实例化、初始化、运行、监视转换执行并把内容放置到TransInfo类中。 执行过程概述 Trans的执行机制是搭建一个结构使得每一个step能够从自己的inputRowsets读处理一行将结果输出到自己的outputRowsets中。 注意一个rowset对象既属于前一个step成员outputRowsets的一部分也属于后一个对象的inputRowsets的一部分。所有的rowset信息都在Trans对象中以List形式维护。 Trans执行过程时序图 由于trans可以有TransGraph实例化也可以由JobEntryTrans实例化。但基本过程是一样的先实例化TransMeta再实例化Trans最终调用trans的start方法。 由TransGraph实例化如下图所示 由JobEntryTrans实例化如下图所示 Trans代码解释 JobEntryTrans类execute( ) 首先获取元数据然后以此作为参数实例化trans TransMeta transMeta getTransMeta(rep); …… Trans trans new Trans(transMeta); …… trans.execute(args); b)Trans类execute( ) 具体执行前需要进行准备工作 public void execute(String[] arguments) throws KettleException{prepareExecution(arguments);startThreads();}Trans的prepareExecution 搭建以下结构 对每一个step根据hop信息进行找到下一个step或多个step。对于每一个this step和nex tstep生成一个RowSet对象作为缓存供this step写同时供next step读取数据。把此RowSet对象加入到Trans的List成员中保存。 List hopstepstransMeta.getTransHopSteps(false); 得到step列表 …… 对每一个step进行如下设置 for (int i0;ihopsteps.size();i) { StepMeta thisStephopsteps.get(i); …… //找到下一个step的列表 ListStepMeta nextSteps transMeta.findNextSteps(thisStep); int nrTargets nextSteps.size(); for (int n0;nnrTargets;n) { StepMeta nextStep nextSteps.get(n); …… 对于每一个hop信息生成RowSet,并设置RowSet把 RowSet rowSetnew RowSet(transMeta.getSizeRowset()); rowSet.setThreadNameFromToCopy(thisStep.getName(),0, nextStep.getName(), 0);rowsets.add(rowSet);} } ……根据TransMeta的step信息生成相应的StepMetaDataCombi即steps信息加到steps列表中。 StepMetaDataCombi combi new StepMetaDataCombi(); combi.stepname stepMeta.getName(); combi.copy c; combi.stepMeta stepMeta; combi.meta stepMeta.getStepMetaInterface(); StepDataInterface data combi.meta.getStepData(); combi.data data; …… StepInterface stepcombi.meta.getStep(stepMeta, data, c, transMeta, this); 在step初始化时会把Trans中的ListRowSet的相应的rowset加入到step的inputRowSets和outputRowSets中。 combi.step step; steps.add(combi);Trans类startThreads( ) 打开了所有的step线程核心代码如下 for (int i0;isteps.size();i){steps.get(i).step.start(); }Step执行 实现StepInterface的不同的step各个功能个不一样但是它们之间也有一定的规律性。下图只列举了两个stepTextInput文本输入和Uniquerow(去重)。 启动 每一个具体的step启动线程时自动调用run函数它们统一调用基类的静态方法 public void run(){BaseStep.runStepThread(this, meta, data); }- 处理基类BaseStep采取了统一的处理方式调用子类processRow以行为单位处理核心代码如下: while (stepInterface.processRow(meta, data) !stepInterface.isStopped()); processRow( )通用过程是调用基类BaseStep 的getRow( )得到数据对一行数据处理处理之后调用基类putRow( )方法数据保存至outputRowSets即next step的inputRowSets元数据与数据关系。 Trans中的ETL过程每个step以行为单位处理其中行的元数据信息RowMeta和数据信息统一保存在RowSet对象中。 在RowSet中RowMeta的成员的调试结果如下。可见rowMeta储存了每列数据的名称和类型。第一列列名flag数据是长度为1的String第二列列名id… RowSet的数据信息在queArray队列中调试结果如下可以看出第一个数据元素是一个Object包含了3列数据内容为N1a… 相关类和接口 JobEntryTrans 实现了JobEntryInterface的execute方法被job执行。由JobEntryTrans实例化Trans并执行。TransGraph 当点击trans面板的run时由TransGraph实例化Trans并执行。 Trans主要成员有 private TransMeta transMeta; private Repository repository; private Job parentJob; private Trans parentTrans; private ListRowSet rowsets; private ListStepMetaDataCombi steps其中最重要的是rowsets、steps。rowsets保存了所有hop对应的行元数据和数据信息。List steps封装了一个step的主要内容。 TransMeta 描述了整个Trans的元数据信息。 主要的属性成员有 private ListStepMeta steps; private ListTransHopMeta hops; private String name; private Result previousResult;上一个jobentry的执行结果。 private ListRowMetaAndData resultRows;这次trans执行后的数据结果。 private ListResultFile resultFiles; resultRows成员将作为result比部分返回多行的元数据和数据如果有的话需要返回数据结果时。把resultRows加入Result结果的rows列表并返回。StepMetaDataCombi 提取了一个step所需的主要信息把插件的主要实现类全部存储在这个类中方便集中调用。 public class StepMetaDataCombi {public StepMeta stepMeta;public String stepname;public int copy;public StepInterface step;public StepMetaInterface meta;public StepDataInterface data; }TransHopMeta 描述hop信息。StepMeta 描述step的公有基本信息stepidstepname对于每一个具体的step由成员变量StepMetaInterface step来描述。StepInterface 主要成员函数 processRow()对一行的数据处理。 putRow()把处理后的数据放入下一个step的inputrowsets中。StepBase 实现了StepInterface是各step具体实现类的基类。完成了公用的处理函数如putRow()但是对于更具体的processRow()在StepBase的子类中。StepBase的主要成员有 public ArrayList inputRowSetsoutputRowSets StepBase的子类每次从inputRowSets中取出一行数据向outputRowSets中写入一行数据。StepDataInterface 与step相关的数据信息。比如行的元数据信息。StepMetaInterface的实现类是与具体step相关的元数据信息与StepMeta配合使用共同描述具体step的元数据信息。RowSet RowSet类中包含源step目标step和由源向目标发送的一个rowMeta和一组data。其中data数据是以行为单位的队列queArray。一个RowSet作为此源step的outputrowsets的一部分。同时作为目标step的inputRowsets一部分。源Step每次向队列中写一行数据目标step每次从队列中读取一行数据。RowMetaAndData public class RowMetaAndData implements Cloneable{ private RowMetaInterface rowMeta;//行的元数据描述了每行的数据名字数据类型。 private Object[] data;//数据 }StepInitThread类 Step初始化线程包装类使用多线程调用所有StepInterface实现类的Init函数。RunThread类 步骤处理线程包装类这个类能够处理异常并将其记录到日志中。同时也能够在异常发生或者执行结束后记录相关内容、关闭相关资源。菜单加载 有三种菜单程序工具栏菜单、标签页上的菜单和右键菜单。 程序菜单栏 这种菜单在界面初始化时就被生成并显示到程序的菜单栏上。 调用Spoon.init(TransMeta ti)实现菜单的初始化。 public void init(TransMeta ti) { … initFileMenu(); ... addMenu(); addTree(); // addCoreObjectsExpandBar(); addTabs(); … }Spoon.addMenu()函数 public void addMenu() { … try { //从ui/menubar.xul文件中生成菜单栏 menuBar XulHelper.createMenuBar(XUL_FILE_MENUBAR, shell, new XulMessages()); ListString ids new ArrayListString(); //菜单项 ids.add(trans-class); … this.menuMap XulHelper.createPopupMenus(XUL_FILE_MENUS, shell, new XulMessages(), ids);// createMenuBarFromXul(); } catch (Throwable t) {… } //监听器 addMenuListeners(); //添加最近打开菜单项 addMenuLast(); }XulHelper.createMenuBar(XUL_FILE_MENUBAR, shell, new XulMessages())函数代码如下 public static MenuBar createMenuBar(String menuFile, Shell shell,Messages xulMessages) throws KettleException {// first get the XML document try { URL xulFile getAndValidate(menuFile); Document doc XMLHandler.loadXMLFile(xulFile); //此处从xul文件中加载菜单项 MenuBar menuBar MenuHelper.createMenuBarFromXul(doc, shell, xulMessages); shell.setMenuBar((org.eclipse.swt.widgets.Menu) menuBar.getNativeObject()); return menuBar; } catch (IOException e){ … } }Xul文件格式如下项目的在根目录ui/menubar.xul 取其中的一个标签来说明: menu valueview label%Spoon.Menu.View menupopup menuitem valueview-zoom-in acceltextPAGE-UP accesskeypageup label%Spoon.Menu.View.ZoomIn/ menuitem valueview-zoom-out acceltextPAGE-DOWN accesskeypagedown label%Spoon.Menu.View.ZoomOut/ menuseparator/ menuitem valueview-zoom-100pct acceltextHOME accesskeyhome label%Spoon.Menu.View.Zoom100/ /menupopup /menu标签表示视图view菜单 是其下拉菜单 menuitem 是一个个菜单项 标签页上的菜单 因为一个JobGraph对象表示一个标签页所以我们可以在JobGraph类中找到与菜单相关的成员变量private XulToolbar toolbar; 它在下面函数中被初始化加载菜单项 private void addToolBar() {try {toolbar XulHelper.createToolbar(XUL_FILE_JOB_TOOLBAR, JobGraph.this, JobGraph.this, new XulMessages()); … } }XUL_FILE_JOB_TOOLBAR指明的文件是程序根目录下的ui.job-toolbar.xul其内容如下 window xmlnshttp://www.mozilla.org/keymaster/gatekeeper/there.is.only.xul alignvertical toolbox toolbar idnav-toolbar modeicons !—- 一些菜单项 / toolbarbutton valuefile-new label%Spoon.Menu.File.New imageui/images/new.png tooltiptext%Spoon.Tooltip.NewFile/ toolbarbutton valuefile-open label%Spoon.Menu.File.Open imageui/images/open.png tooltiptext%Spoon.Tooltip.OpenTranformation / toolbarseparator / toolbarbutton valuefile-save label%Spoon.Menu.File.Save imageui/images/save.png tooltiptext%Spoon.Tooltip.SaveCurrentTranformation / …!—- 其他菜单项 / /toolbar /toolbox /window右键菜单 工作区中的右键菜单由JobGraph.setMenu函数构建它的菜单文件在根目录的ui.menus.xul中 右击工作区中的组件 右击工作区的空白区域 Kettle资源文件及配置文件 kettle.properties 一般存放在应用程序的工作主目录C:\Documents and Settings\Administrator.kettle下里面有几个文件存放用户对于系统的喜好配置以及历史信息这些信息会在启动时被读取。其中kettle.properties较为重要它在程序启动环境初始化过程被读取和.languageChoice文件它指明了当前的语言设定。 格式如下 #Language Choice #Tue Apr 12 16:13:38 CST 2011 #这个是当前默认语言 LocaleDefaultzh_cn #这个是第二语言 LocaleFailoveren_uskettle-config.xml 在src.org.pentaho.di.core.config包中 整个文件分为5个部分5个标签提供程序初始化时加载组件的一些必要信息。5个部分包括对job类型组件的加载配置说明、step类型组件的加载配置说明和plugin类型组件的加载配置说明。 比如加载entry组件需要下面的信息 config idjobs-xml-config config-class org.pentaho.di.core.config.DigesterConfigManager /config-class property nameconfigURL valuekettle-jobs.xml / property namerulesURL valueorg/pentaho/di/core/config/jobs-rules.xml / property namesetNext valuejobs/job / /config标签config-class指明处理加载entry的类 标签property nameconfigURL valuekettle-jobs.xml /指明组件信息所在的文件 标签property namerulesURLvalue org/pentaho/di/core/config/jobs-rules.xml /指明加载entry的规则 标签property namesetNext valuejobs/job /指明处理的起点节点在property nameconfigURL ...标签中指明的xml文件中。kettle-job.xml 在src包下 所有job组件都必须在此注册每一个标签就是一个组件提供加载一个组件的必要信息。 jobs job idWAIT_FOR_FILE type ognl:org.pentaho.di.job.JobEntryTypeWAIT_FOR_FILE /type class-name org.pentaho.di.job.entries.waitforfile.JobEntryWaitForFile /class-name categoryognl:org.pentaho.di.job.JobEntryCategoryFILE_MANAGEMENT.getName()/category tooltip ognl:org.pentaho.di.job.entry.MessagesgetString(JobEntry.WaitForFile.Tooltip) /tooltip image-uriui/images/WFF.png/image-uri /job …其他job /jobs标签job idWAIT_FOR_FILE表示一个特定id的组件 标签type表明了该组件的类型这个类型必须出现在org.pentaho.di.job.JobEntryType枚举中。 标签class-name指明该组件的实现类 标签tooltip指明当鼠标移动到组件图标上时出现的提示信息 标签image-uri指明该组件的图标文件。Message文件 在你使用的类所在的包下子包Message中。 格式为名称 值 不同的语言我们创建不同的message文件。名称格式为Message_语言简称_国家简称.properties例如中文中国Message_zh_CN. Properties。 .kjb文件格式 .kjb是Job元数据文件的后缀。主要有三部分构成job基本信息job的entris组件信息和组件之间的链接hop信息。标签为job。 job标签 job的基本信息包括name描述信息创建和修改日期等。 entries标签 job的信息有两部分 公有信息每个entry都具有的信息。如namedescriptionjobentrytype等由JobEntryBase保存。 私有信息每个具体step继承了JobEntryBase并各自特有的信息如开始entry的xml中其他信息如下图所示。 每个标签内容对应界面层元素如下图 hops标签 Hops信息主要有源和目标entry信息链接的状态是否开启链接的类型条件执行还是无条件执行等。如下图所示 .ktr文件格式 一个UniqueRiws.ktr 文件的Transformation顶层有Transformation信息 , 信息。信息组成。 info标签 info节点包括了一个Trans的信息。如trans名字trans描述信息日志设置信息设置的最大RowSet数量修改时间。 对应的界面层设置是 order标签 order对象是Step之间的链接信息包括一个链接的源Step和目标Step以及链接的使能状态。 step标签 step节点信息描述了一个节点的信息。包括两部分信息 每个step都有的信息如stepnamestep类型信息所在面板位置信息 具体step特有的信息如去重是否对重复的行计数以及计数变量名。 文本输出的格式分隔符数据名和数据类型信息。 以下是去重step对应的内容。 对应的GUI设置为 图标 图标位置 位于ui\images文件夹中。 位于ui.package-res.ui.images文件夹中。 配置文件 ui\下LAF.properties文件内容如下 splash_imageui/images/kettle_splash.png 其中splash_image为程序中用到的property的key值返回值为ui/images/kettle_splash.png。代码说明 设置文件 package org.pentaho.di.laf; public class OverlayPropertyHandler implements PropertyHandler { protected static final String propFile ui/laf.properties; …… } 使用 final Image kettle_image ImageUtil.getImageAsResource(display, BasePropertyHandler.getProperty(splash_image));背景图片 找到ui.package-res.ui.images.kettle_splash.png替换该图片版本信息 找到e.gc.drawText(versionText, 290, 205, true); 改为e.gc.drawText(“Kettle V1.0”, 290, 205, true); 插件配置文件 plugin.xml的功能是告诉kettle插件的元数据类插件的名称及描叙还有需要加载的jar包。
http://www.w-s-a.com/news/965361/

相关文章:

  • 手机网站建设价格手机网站模版更换技巧
  • 哈尔滨松北区建设局网站美妆网站建设
  • 不需要网站备案的空间网站推广的基本方法是哪四个
  • 如何检查网站死链劳动仲裁院内部网站建设
  • 江西省住房和城乡建设网站合同管理系统
  • 网站建设质量保证福州网络推广
  • 高唐网站建设公司广州南站在哪个区
  • 广西柳州网站制作公司郴州网红打卡景点
  • 做网站要固定ip拍摄公司宣传片制作
  • 专业微网站电话号码做软件难吗
  • 邢台网站制作哪家强上海做网站设计
  • 大连网站建设外贸wordpress添加文章属性
  • 商城网站建设合同范本网上哪里可以免费学编程
  • 服务器公司网站博客wordpress怎么编辑
  • 网站建设网络推广柯西乡塘网站建设
  • 企业做网站需要多少钱企业资质查询系统官网
  • 网站建设需要知识百度统计数据
  • 自已如何做网站建设通网站会员共享密码
  • 做网站学习什么wordpress 文件夹
  • 前端移动网站开发wordpress图文混排
  • 企业网站建站那种好商城类网站怎么优化
  • 手机微网站怎么制作的网上找设计师
  • 网站建设包括哪些方面学校网站 建设
  • 贵阳网站优化公司建筑设计师用什么软件
  • 网站建设的小说静态网页模板免费网站
  • 芜湖建设厅官方网站wordpress自动设置缩略图
  • 推荐网站网页湛江网站建设哪家优惠多
  • 传奇网站免费空间网店装修店面
  • 网站改版 重新收录湖南建筑信息一体化管理平台
  • 可以做直播卖产品的网站陕西省建设银行网站