Apache Nifi入门篇03--Nifi 基础案例

£神魔★判官ぃ 2021-12-15 04:13 357阅读 0赞

再试试其他的案例:

https://mp.weixin.qq.com/s/m2eyFaG_j0S8sgPeNUfdNA

https://mp.weixin.qq.com/s/EJMaFG-f2KadFw7kLHsvlQ

讲真,Nifi用起来体验感觉真不错~~~ 快速找到文档链接…………….

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzMxODY2Nzkz_size_16_color_FFFFFF_t_70

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzMxODY2Nzkz_size_16_color_FFFFFF_t_70 1

对了,备注一下,Nifi的进程名称,免得下次挂了我们还不知道:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzMxODY2Nzkz_size_16_color_FFFFFF_t_70 2

Flink 程序读取Nifi的数据:

依赖:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-nifi_2.11</artifactId>
  4. <version>1.7.2</version>
  5. </dependency>

Flink的程序代码:

  1. package aliyun.product.customer_analysis_system
  2. import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
  3. import org.apache.flink.streaming.connectors.nifi.{NiFiDataPacket, NiFiSource}
  4. import org.apache.nifi.remote.client.{SiteToSiteClient, SiteToSiteClientConfig}
  5. import org.apache.flink.streaming.api.scala._
  6. object Nifi2Flink {
  7. def main(args: Array[String]): Unit = {
  8. val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
  9. val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
  10. .url("http://172.10.4.95:9901/nifi")
  11. .portName("test")
  12. // .portName("flink2")
  13. .requestBatchCount(2)
  14. .buildConfig()
  15. val nifiSource: NiFiSource = new NiFiSource(clientConfig)
  16. val NifiData: DataStream[NiFiDataPacket] = streamExecEnv.addSource(nifiSource)
  17. NifiData.print()
  18. NifiData.map(x=> {
  19. println(x)
  20. x.getContent.toString
  21. }).print()
  22. streamExecEnv.execute()
  23. }
  24. }

输出到Nifi:

  1. val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
  2. val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
  3. .url("http://localhost:8080/nifi")
  4. .portName("Data from Flink")
  5. .requestBatchCount(5)
  6. .buildConfig()
  7. val nifiSink: NiFiSink[NiFiDataPacket] = new NiFiSink[NiFiDataPacket](clientConfig, new NiFiDataPacketBuilder<T>() {...})
  8. streamExecEnv.addSink(nifiSink)

下一篇文章讲Nifi+Flink

发表评论

表情:
评论列表 (有 0 条评论,357人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Apache NiFi简介

    > 一个易用、强大、可靠的数据处理与分发系统。基于Web图形界面,通过拖拽、连接、配置完成基于流程的编程,实现数据采集等功能 一、什么是NiFi? > NiFi是美国国

    相关 Apache NiFi用户指南

    介绍 Apache NiFi是基于流程编程概念的数据流系统。它支持强大且可扩展的数据路由,转换和系统中介逻辑的有向图。NiFi具有基于Web的用户界面,用于设计,控制,反馈和

    相关 Apache Nifi入门01

    事先声明,下面讲的都是从其他文章搬过来的~ 我自己可总结不了这么详细的资料   1,首先 先了解Nifi,不了解它是做啥,磨刀不误砍柴工~ ok 什么是Apache Ni