Apache Nifi入门篇03--Nifi 基础案例
再试试其他的案例:
https://mp.weixin.qq.com/s/m2eyFaG_j0S8sgPeNUfdNA
https://mp.weixin.qq.com/s/EJMaFG-f2KadFw7kLHsvlQ
讲真,Nifi用起来体验感觉真不错~~~ 快速找到文档链接…………….
对了,备注一下,Nifi的进程名称,免得下次挂了我们还不知道:
Flink 程序读取Nifi的数据:
依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-nifi_2.11</artifactId>
<version>1.7.2</version>
</dependency>
Flink的程序代码:
package aliyun.product.customer_analysis_system
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.nifi.{NiFiDataPacket, NiFiSource}
import org.apache.nifi.remote.client.{SiteToSiteClient, SiteToSiteClientConfig}
import org.apache.flink.streaming.api.scala._
object Nifi2Flink {
def main(args: Array[String]): Unit = {
val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
.url("http://172.10.4.95:9901/nifi")
.portName("test")
// .portName("flink2")
.requestBatchCount(2)
.buildConfig()
val nifiSource: NiFiSource = new NiFiSource(clientConfig)
val NifiData: DataStream[NiFiDataPacket] = streamExecEnv.addSource(nifiSource)
NifiData.print()
NifiData.map(x=> {
println(x)
x.getContent.toString
}).print()
streamExecEnv.execute()
}
}
输出到Nifi:
val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data from Flink")
.requestBatchCount(5)
.buildConfig()
val nifiSink: NiFiSink[NiFiDataPacket] = new NiFiSink[NiFiDataPacket](clientConfig, new NiFiDataPacketBuilder<T>() {...})
streamExecEnv.addSink(nifiSink)
下一篇文章讲Nifi+Flink
还没有评论,来说两句吧...