使用Kettle批量同步数据库表

╰+哭是因爲堅強的太久メ 2022-02-21 16:21 869阅读 0赞

转自:https://blog.csdn.net/lstcui/article/details/81664774

目的 (Kettle 版本5.2或以上 可由官网下载,免费开源)
从源库中同步若干张数据表至目标库或目标文件中
(鉴于Kettle工具对数据库连接的插件支持不是太好,最好是在源库与目标库类型相同的情况下使用,如:都是Oracle或都是Mysql等,当然不同的数据库也是可以的)

直接点:下载源代码 下载详细设计文档

数据同步方式 暂支持以下两种(即表2表 表2文件)可做配置
1)从源库表中同步数据至目标库表中 (目标表名可配置)
2)从源库表中同步数据至目标文件中 (目标文件名可配置)

数据源可配置 工具连接数据库统一使用JNDI的连接池来连接数据库;
JNDI统一配置在 data-integration/simple-jndi/jdbc.properties文件中,在向ETL执行源表: etl_kettle_tables 插入数据表信息的同时指定数据源;

执行结果校验
在工具同步数据结束后,会有数据校验步骤,校验的原则为,以下情况视为数据同步失败:
1)日志状态为 N-失败;
2)源库表中影响的行数与插入目标库表时影响的行数不同;
后续可添加其它校验规则;校验位置: tran4.2-check-failure 中

日志记录
工具在同步每张数据表时,都有对应的日志文件生成;

以下为详细配置步骤:

1、流程图
1.1 Kettle数据同步总流程

总流程图

1.2 Kettle数据同步详细流程

详细流程图

2、 数据模型设计

Kettle数据同步设计模型设计下载

简要说明:
ETL执行源表 etl_kettle_tables (Kettle从该表中动态加载需要执行的数据表信息)
ETL临时手动执行源表: etl_kettle_tables_error (手动处理时Kettle从该表中动态加载需要执行的数据表信息)
ETL日志表: etl_kettle_log (记录每次同步结果)
ETL同步结果表:etl_kettle_result (记录每次同步的成功与失败表数量)
具体详细表结构请下载模型设计查看

3、 同步工具配置详细步骤
3.1 启动Kettle工具
3.1.1 创建kettle文件资源库(若已有资源库可跳过)

1)在打开的如下图中点击【+】号按钮, 如下图

新建资源库

2)弹出如下窗口 选择文件资源库类型 Kettle file repository

选择文件资源库

3)在弹出的如下窗口中选择文件资源库的目录位置并设置资源库名称(注:最好起有意义的英文名称,下面的描述Description最好也是使用英文,中文可能会有乱码)

创建文件资源库

4)在上图中点击确定即可完成文件资源库创建

3.1.2 打开文件资源库
1)启动Kettle工具 弹出如下窗口 选择已经创建好的资源库名称,输入登录用户名及密码并点击确认按钮

连接资源库

即可成功登录资源库,弹出如下欢迎界面窗口

3.2 Kettle工具配置
3.2.1 主作业入口 Job-Tools-Main

主作业内容如下图

主作业

现详细说下主作业的各项内容

A)变量配置 tran1.1-set-commen-variables

a) 主要用来设置一些变量供后面使用 内容如下图

变量定义

b) 生成随机数 里面配置为空(也可以不用使用这个插件直接删除掉)

c) 公共变量定义 此处主要来设置变量,是A)步骤的核心设置 如下图

变量设计

需要注意的是 下半部分的字段位置 需要选择出已做好的变量 (其中“改名为”字段 可默认也可以修改,一般在定义的名称前面添加 v_ 用以标识变量)

现粘贴出里面的代码供查看

Date.prototype.Format = function (fmt) { //author: meizz
var o = {
“M+”: this.getMonth() + 1, //月份
“d+”: this.getDate(), //日
“h+”: this.getHours(), //小时
“m+”: this.getMinutes(), //分
“s+”: this.getSeconds(), //秒
“q+”: Math.floor((this.getMonth() + 3) / 3), //季度
“S”: this.getMilliseconds() //毫秒
};
if (/(y+)/.test(fmt)) fmt = fmt.replace(RegExp.$1, (this.getFullYear() + “”).substr(4 - RegExp.$1.length));
for (var k in o)
if (new RegExp(“(“ + k + “)”).test(fmt)) fmt = fmt.replace(RegExp.$1, (RegExp.$1.length == 1) ? (o[k]) : ((“00” + o[k]).substr((“” + o[k]).length)));
return fmt;
}

// 全局变量:当前系统日期 如:20170911
var global_date =new Date(new Date().getTime()).Format(“yyyyMMdd”);

var trade_date = date2str(dateAdd(new Date(),’d’, -1),’yyyyMMdd’);

// 全局变量:当前系统日期时间 如:20170911141421
var global_datetime =new Date(new Date().getTime()).Format(“yyyyMMddhhmmss”);

// 全局变量:当前资源库绝对路径
var base_path = “E:/98KettleRepositoryPro/Kettle_Tools_1.0/“;

// 全局变量:当前资源库绝对路径
var context_path = “kettle_tools/“;

// 全局变量:FTP服务器路径
var ftp_base_path = base_path + “files/“;

// 全局变量:日志文件所在路径
var log_file_path = base_path + “logs/log_day/“;

注 如下图中左上部分的 Java Script 函数是Kettle工具内置的函数 可以直接使用

d) 设置变量 在上一步骤的设置的变量 需要在此设置后,后面的程序才能找到这些变量 如下图

设置变量

其中变量活动类型字段 需要注意 有多种类型如下

活动类型

字如其意 有开发语言经验的一看即懂 在此不过多描述 如果实在不知 统一都选择 Valid in the root job 即可;不过为也程序的严紧性 本人推荐应该正确选择变量的活动类型

结束 ….

B)配置文件加载 tran1.2-load-config

在Kettle工具中有内置支持加载properties配置文件功能,即在指定目录创建一个properties配置文件并把路径指给Kettle即可 如下图配置

配置文件

注意:

属性文件名处本人使用的是变量 在A)步骤中有配置

变量替换 请勾选 表示配置文件中的Key-Value键值对会替换掉与在A)步骤中配置的相同变量名称的值(有点绕 不过不难理解 即后者覆盖前者之意)

结束 ….

C) 加载需要同步的数据表信息 tran2.1-load-tables

加载数据表

本工具的数据表同步信息是配置在一张落地表中,需要连接数据库并加载出来供使用

a) 加载需要同步的表信息

查询数据表信息

查询表数据SQL语句

注意选择数据库连接(数据库连接需要提前创建好 详细创建步骤请参考下面的数据库连接创建)
注意勾选”替换SQL语句里面的变量” (如果未使用变量 则不用勾选)
b) 字段选择 从上步骤中选择出需要使用的查询字段并在此配置

选择查询字段

注意:在此作名称修改 是为了把这些值在后面当成变量使用并方便查看

c) 复制记录到结果 即将查询结果添加到内存中 供后面拿取使用

复制记录到结果

注意 此步骤一定不能少 否则后面无法使用

D) 数据同步 Job3.0-Sub-Tools-Transfer

此步骤是Kettle数据同步工具的核心步骤 内容较多 不过也不难理解

简单来说就是以下同种情况(详细请参考我的详细流程图设计)

表2表 从源表查询数据插入到目标库表时 如果目标库表已经存在 则直接插入数据;如果不存在则在目标库中依照源表先创建表结构再插入数据;
表2文件 无论目标文件是否存在 新生成文件直接覆盖目标文件

核心图

图看着有点乱(故意做成了像一只低头的鸟) 不过理解上面的说明就好办了;

下面作详细说明:

a) 将查询出来的数据表信息设置成变量 tran3.1-set-tableName-variables

设置表信息

生成随机数 生成一个流水号 现使用UUID作为流水号

自定义拼接流水号 在上一步骤获取的UUID基础上添加时间戳拼接成后面使用的流水号 (具体语法不在详解)

流水号变量设置 同样上步骤设计的变量需要配置到环境变量中

从结果获取记录 即从程序内存中拿取上一步骤查询的数据表信息(注意 使用多少就要拿多少)

自定义变量处理 对前面拿取的变量可以作一些特殊的处理(下图是对日志的文件路径作一些处理 使之存储绝对路径)

自定义变量设置 同样上步骤设计的变量需要配置到环境变量中

至此 把加载的数据表信息配置成变量工作已结束

需要特殊注意的是 在配置变量的时候 要在一个独立的转换(Transformation)中配置,否则的话会导致T+1问题即在T+1次时使用的变量值

b) 执行前先插入一条初始日志(默认状态N-失败) tran3.2.1-log-insert

插入初始日志

内容如下 逻辑很简单 即一条SQL语句

插入初始日志

存储过程 pro_etl_log 内容如下 :

CREATE OR REPLACE PROCEDURE pro_etl_log — 日志信息插入日志表
(
v_flow_id IN VARCHAR2, — 主键:流水号
v_etl_date in VARCHAR2, — 执行日期
v_etl_time_stamp in VARCHAR2, — 执行时间戳
v_src_table_name in VARCHAR2, — 源表名称
v_src_instan_user_name in VARCHAR2, — 源库实例用户名称
v_src_system_name in VARCHAR2, — 源表所在系统名称
v_src_table_count in INTEGER, — 源表数据量(插入行数)
v_tgt_table_name in VARCHAR2, — 目标表名称
v_tgt_instan_user_name in VARCHAR2, — 目标库实例用户名称
v_tgt_table_count in INTEGER, — 目标表数据量(插入行数)
v_exe_collect_type IN VARCHAR2, — 数据同步方式: TT-表到表 TF-表到文件
v_etl_status in VARCHAR2, — 执行状态 Y-成功 N-失败
v_etl_error_file_path in VARCHAR2 — 错误信息描述
) IS

BEGIN

— 存在即更新 不存在即插入
MERGE INTO etl_kettle_log ekl
USING ( SELECT v_flow_id AS flow_id,
v_etl_date AS etl_Date,
v_etl_time_stamp AS etl_time_stamp,
v_src_table_name AS src_table_name,
v_src_instan_user_name AS src_instan_user_name,
v_src_system_name AS src_system_name,
v_src_table_count AS src_table_count,
v_tgt_table_name AS tgt_table_name,
v_tgt_instan_user_name AS tgt_instan_user_name,
v_tgt_table_count AS tgt_table_count,
v_exe_collect_type AS exe_collect_type,
v_etl_status AS etl_status,
v_etl_error_file_path AS etl_error_file_path
FROM dual
) yq
ON (ekl.flow_id = yq.flow_id)
WHEN MATCHED THEN
UPDATE SET ekl.etl_end_time = SYSDATE,
ekl.src_table_count = yq.src_table_count,
ekl.tgt_table_count = CASE WHEN yq.exe_collect_type = ‘TF’ AND yq.tgt_table_count > 0
THEN yq.tgt_table_count - 1
ELSE yq.tgt_table_count END,
ekl.etl_status = yq.etl_status,
ekl.etl_error_file_path = yq.etl_error_file_path
WHEN NOT MATCHED THEN
INSERT (flow_id, etl_date, etl_time_stamp, src_table_name, src_instan_user_name, src_system_name, src_table_count, tgt_table_name, tgt_instan_user_name, tgt_table_count,
exe_collect_type,etl_status,etl_error_file_path, etl_begin_time, insert_time )
VALUES (yq.flow_id,
yq.etl_date,
yq.etl_time_stamp,
yq.src_table_name,
yq.src_instan_user_name,
yq.src_system_name,
yq.src_table_count,
yq.tgt_table_name,
yq.tgt_instan_user_name,
yq.tgt_table_count,
yq.exe_collect_type,
yq.etl_status,
yq.etl_error_file_path,
SYSDATE,
SYSDATE ) ;
COMMIT ;

END pro_etl_log;
c) 条件判断(表2表 表2文件)条件分支 用来判定数据同步方式

如果是表2表的同步方式

d) 检查表是否存在 数据插入前先判定目标库中目标表是否存在

e) 创建数据表结构 tran3.4.1-target-create-tables

若目标库表不存在 则需要先创建目标表结构

获取表结构 (获取表结构SQL是个变量 在初始查询数据表信息时赋值)

创建表结构 (需要手动写JAVA代码程序来创建数据表结构 )

创建数据表结构JAVA代码如下:

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {

  1. Object\[\] r = getRow(); //
  2. org.pentaho.di.core.database.DatabaseMeta dbmeta = getTransMeta().findDatabase("DB\_TARGET");
  3. if(dbmeta != null) \{
  4. org.pentaho.di.core.database.Database db = new org.pentaho.di.core.database.Database(dbmeta);
  5. try \{
  6. db.connect();
  7. String username = getVariable("v\_tgt\_instan\_user\_name"); // 注意是 目标库实例用户名
  8. String tablename = getVariable("v\_tgt\_table\_name"); // 注意是 目标库表名称
  9. logBasic("开始创建表:" + tablename);
  10. // 获取数据库名称 如: Oracle、Mysql、 Teradata 等 如果需要 可以作判断
  11. String dbProName = db.getConnection().getMetaData().getDatabaseProductName();
  12. logBasic("数据库类型:" + dbProName);
  13. if(tablename != null && tablename.trim().length() > 0) \{
  14. String sql = db.getDDL(username + "." + tablename, data.inputRowMeta);// 获取创建数据表结构的SQL脚本
  15. logBasic("创建表:" + sql);
  16. db.execStatement(sql.replace(";", ""));
  17. \}
  18. logBasic("创建表结束。");
  19. \} catch(Exception e) \{
  20. logError("创建表出现异常",e);
  21. \} finally \{
  22. db.disconnect();
  23. \}
  24. \}
  25. return false;

}
说明:其中的DB_TARGET是目标数据库连接 注意不要使用错误

f) 同步数据到目标库表中 tran3.4.2-target-insert-table

统计输入影响行数 设置输入影响行数变量

统计输出影响行数 设置输出影响行数变量

表输入 表输出

g) 添加数据表结构属性备注信息 tran3.4.1.1-target-comment-tables

在上一创建目标表结构的过程中仅仅创建了数据表结构 并没有表属性注释信息 需要从该步骤中添加注释信息(如果哪位大神可以共享下在创建数据表结构的同时添加下表属性注释的JAVA代码 感激不尽)

查询表注释

更新表注释

h) 条件判断(是否有后续执行) 本处是个扩展功能 即在把源库表数据同步到目标库表中后还可以有一些其他的操作 如更新/删除等

i) 执行后续操作功能(当有后续执行操作时会被触发) tran3.5.1-last-operate

执行SQL脚本

如果是表2文件的方式

j) 表2文件方式同步 tran3.3.1-generate-db-files

统计输入影响行数 设置统计输入影响行数变量

统计输出影响行数 设置统计输出影响行数变量

配置目标数据文件落地路径 javascript代码

表输入 文本文件输出

k) 目标数据文件上传FTP服务器 SFTP 上传

l)更新成功日志 tran3.2.3-log-update-success

更新日志状态-成功

更新日志状态-失败

结束 ….

E)数据同步结束后的校验 Job4.0-Sub-Tools-Check

a) 查询统计 tran-4.1-check-statistics

统计本次同步的数据表数量

b) 统计同步错误的数据表信息 tran4.2-check-failure

将同步错误的数据表信息插入到错误记录表中(现在仅是记录错误 后续可以配置成邮件发送运维负责人 方便及时知晓并处理)

插入SQL语句如下 :
DELETE FROM etl_kettle_tables_error ;

INSERT INTO etl_kettle_tables_error
(src_table_name, src_instan_user_name,src_instan_datasource, src_system_name, src_system_code, tgt_table_name, tgt_instan_user_name,
tgt_instan_datasource,exe_collect_type, exe_collect_way, exe_frequency, exe_sql_select, exe_sql_update, exe_status)
SELECT ekt.src_table_name,
ekt.src_instan_user_name,
ekt.src_instan_datasource,
ekt.src_system_name,
ekt.src_system_code,
ekt.tgt_table_name,
ekt.tgt_instan_user_name,
ekt.tgt_instan_datasource,
ekt.exe_collect_type,
ekt.exe_collect_way,
ekt.exe_frequency,
ekt.exe_sql_select,
ekt.exe_sql_update,
ekt.exe_status
FROM etl_kettle_tables ekt
WHERE ekt.exe_status = ‘Y’
AND ekt.src_table_name IN (
SELECT ekl.src_table_name
FROM etl_kettle_log ekl
WHERE ekl.etl_date = ${v_global_date}
AND (ekl.etl_status = ‘N’ OR (ekl.src_table_count <> ekl.tgt_table_count ))
AND ekl.etl_time_stamp = ${v_global_datetime}
) ;

COMMIT;
数据库连接创建

1)在主对象树中找到 作业 -> DB连接

2)在DB连接上右键 选择创建 Create 在弹出窗口中配置如下图

3)使用连接池配置如下

4)创建结果如下图

6)选中该 DB_SOURCE 右键 -> 共享 (目的 方便后续其它模块可以直接使用)

特别说明:以上详细步骤基本上说明了整个的配置逻辑,但最新的逻辑请下载完成源代码请不要吝惜自己的积分哦,值得一看;

下载详细设计文档也是需要积分的哦!

至此整个配置结束 谢谢…..
-——————————
作者:遇见小豆
来源:CSDN
原文:https://blog.csdn.net/lstcui/article/details/81664774
版权声明:本文为博主原创文章,转载请附上博文链接!

发表评论

表情:
评论列表 (有 0 条评论,869人围观)

还没有评论,来说两句吧...

相关阅读