Streamsets自定义组件开发

谁践踏了优雅 2021-09-28 15:42 838阅读 0赞

需求痛点

在实际项目的使用过程中,有些情况下现有的组件不能完全满足具体的业务需求,比如JDBC插入数据不是真正的batch提交的、较低版本的没有提供FieldMapper和FTP/SFTP写入客户端等。这就需要我们自己编写需要的组件实现想要的功能。

开发步骤

按照官方文档提供的指南实现起来还是很容易的,下面就以JdbcQueryExecutor为例,详细介绍一下自定义开发的过程:

首先生成项目

  1. mvn archetype:generate -DarchetypeGroupId=com.streamsets \
  2. -DarchetypeArtifactId=streamsets-datacollector-stage-lib-tutorial \
  3. -DarchetypeVersion=3.9.0-SNAPSHOT -DinteractiveMode=true
  4. #执行上面的代码之后,会提示需要输入项目的包路径名和项目名称,我这里输入如下,你可以根据需要修改成自己的
  5. com.embrace
  6. jdbcbatch

编译项目为IDEA项目

  1. cd jdbcbatch/
  2. mvn idea:idea

然后使用IDEA打开项目,编写组件的代码,我这里是对JdbcQueryExecutor做一些修改,所以在Streamsets源代码中找到对应的组件类,拷贝过来,做一些修改就可以了。

组件代码导入

组件的核心代码如下图所示
在这里插入图片描述
这些代码拷贝过来之后,会有报错的地方,因为依赖的其他类,这里就需要在pom文件中引入需要的包:

  1. <properties>
  2. <hikaricp.version>3.2.0</hikaricp.version>
  3. </properties>
  4. <dependency>
  5. <groupId>com.streamsets</groupId>
  6. <artifactId>streamsets-datacollector-api</artifactId>
  7. <version>3.9.0-SNAPSHOT</version>
  8. <scope>provided</scope>
  9. </dependency>
  10. <dependency>
  11. <groupId>com.streamsets</groupId>
  12. <artifactId>streamsets-datacollector-stagesupport</artifactId>
  13. <version>3.9.0-SNAPSHOT</version>
  14. </dependency>
  15. <dependency>
  16. <groupId>com.streamsets</groupId>
  17. <artifactId>streamsets-datacollector-jdbc-protolib</artifactId>
  18. <version>3.9.0-SNAPSHOT</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>com.zaxxer</groupId>
  22. <artifactId>HikariCP</artifactId>
  23. <version>${hikaricp.version}</version>
  24. </dependency>

组件修改

首先修改组件名称,在JdbcQueryDExecutor类中

  1. @StageDef(
  2. version = 2,
  3. label = "Batch JDBC Query", # JDBC Query改为 Batch JDBC Query
  4. description = "Executes queries against JDBC compliant database",
  5. upgrader = JdbcQueryExecutorUpgrader.class,
  6. icon = "rdbms-executor.png",
  7. onlineHelpRefUrl ="index.html?contextID=task_ym2_3cv_sx"
  8. )

然后修改核心代码,在JdbcQueryExecutor类中的write方法

  1. @Override
  2. public void write(Batch batch) throws StageException {
  3. ELVars variables = getContext().createELVars();
  4. ELEval eval = getContext().createELEval("query");
  5. try (Connection connection = config.getConnection()) {
  6. connection.setAutoCommit(false);//close auto commit
  7. Iterator<Record> it = batch.getRecords();
  8. Statement stmt = connection.createStatement();//one Statement for one batch
  9. try {
  10. while (it.hasNext()) {
  11. Record record = it.next();
  12. RecordEL.setRecordInContext(variables, record);
  13. String query = eval.eval(variables, config.query, String.class);
  14. LOG.debug("Executing query: {}", query);
  15. stmt.addBatch(query); //change stmt.execute(query) to stmt.addBatch(query)
  16. }
  17. int[] updateCounts= stmt.executeBatch(); //execute batch
  18. LOG.info("execute batch count:"+ updateCounts.length);
  19. } catch (SQLException ex) {
  20. LOG.error("Execute batch query failed", ex);
  21. } finally {
  22. try {
  23. if (stmt != null)
  24. stmt.close();
  25. } catch (SQLException se2) {
  26. }
  27. }
  28. if (config.batchCommit) {
  29. connection.commit();
  30. }
  31. } catch (SQLException ex) {
  32. LOG.error("Can't get connection", ex);
  33. throw new StageException(QueryExecErrors.QUERY_EXECUTOR_002, ex.getMessage());
  34. }
  35. }

编译打包

  1. mvn package -DskipTests

部署自定义组件

把target目录下生成的jdbcbatch-1.0-SNAPSHOT.tar.gz压缩包解压到Streamsets安装目录下的user_libs下

  1. cd ~/IdeaProjects/datacollector/dist/target/streamsets-datacollector-3.9.0-SNAPSHOT/streamsets-datacollector-3.9.0-SNAPSHOT/user-libs/
  2. tar xvfz ~/IdeaProjects/samplestage/target/jdbcbatch-1.0-SNAPSHOT.tar.gz jdbcbatch/lib/

修改安全策略文件

修改文件etc/sdc-security.policy,增加如下内容

  1. grant codebase "file://${sdc.dist.dir}/user-libs/batchjdbc/-" {
  2. permission java.security.AllPermission;
  3. };

测试自定义组件

增加新组建后需要重启Streamsets,然后在页面就可以看到
在这里插入图片描述
测试后速度比原来的组件有所提升,需要的话可以在我的github上查看代码,欢迎提出意见,地址https://github.com/WanZhang1/jdbcbatch

总结

既然有些组件不能满足实际的需求,他们也提供的自定义的方法,就尝试去做修改,发现也不难,编程的乐趣也在于不断解决问题。

发表评论

表情:
评论列表 (有 0 条评论,838人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Uni-app 定义组件开发

    \场景\ 今天开发的时候遇到一个问题,小程序扫普通二维码进来通过参数判断显示不同的页面。但是又不能有跳转的效果。 思考了半天都没有找到合适的办法,后面前端的同事突然和我