Flink实操 : Sink操作

我就是我 2022-11-04 15:28 314阅读 0赞

.

  • 一 .前言
  • 二 .类型
    • 2.1. 基于本地集合的sink
    • 2.2. 基于文件的sink
      • 2.2.1.将数据写入本地文件
      • 2.2.2.将数据写入HDFS
    • 2.3. Kafka Sink
    • 2.4. MySQL Sink

一 .前言

二 .类型

2.1. 基于本地集合的sink

目标:

基于下列数据,分别 进行打印输出,error输出,collect()

  1. (19, "zhangsan", 178.8),
  2. (17, "lisi", 168.8),
  3. (18, "wangwu", 184.8),
  4. (21, "zhaoliu", 164.8)

代码:

  1. import org.apache.flink.api.scala.createTypeInformation
  2. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  3. object SinkCollection {
  4. def main(args: Array[String]): Unit = {
  5. // 1. 创建流处理环境
  6. val env = StreamExecutionEnvironment.getExecutionEnvironment
  7. // 2.用fromCollection创建DataStream(fromCollection)
  8. val data = env.fromCollection(List((19, "zhangsan", 178.8), (17, "lisi", 168.8), (18, "wangwu", 184.8), (21, "zhaoliu", 164.8) ))
  9. // 3.处理数据
  10. // 4.打印输出
  11. data.print()
  12. data.printToErr()
  13. // data 的数据为批处理的时候可以使用collect
  14. // print(data.collect())
  15. // 5.执行任务
  16. env.execute()
  17. }
  18. }

2.2. 基于文件的sink

  • flink支持多种存储设备上的文件,包括本地文件,hdfs文件等。
  • flink支持多种文件的存储格式,包括text文件,CSV文件等。
  • writeAsText():TextOuputFormat - 将元素作为字符串写入行。字符串是通过调用每个元素的toString()方法获得的。

2.2.1.将数据写入本地文件

目标:

基于下列数据,写入到文件中

  1. List((1,"flink"),(2,"sink"))

代码:

  1. import org.apache.flink.api.scala.createTypeInformation
  2. import org.apache.flink.core.fs.FileSystem.WriteMode
  3. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  4. object LocalFileSink {
  5. def main(args: Array[String]): Unit = {
  6. // 1. 创建流处理环境
  7. val env = StreamExecutionEnvironment.getExecutionEnvironment
  8. env.setParallelism(1)
  9. // 2.用fromCollection创建DataStream(fromCollection)
  10. val data = env.fromCollection(List((1,"flink"),(2,"sink")))
  11. // 3.处理数据
  12. // 4.打印输出
  13. data.writeAsText("/opt/a/tmp/FileSink.txt",WriteMode.OVERWRITE)
  14. // 5.执行任务
  15. env.execute()
  16. }
  17. }

2.2.2.将数据写入HDFS

写入数据到HDFS中

  1. import org.apache.flink.api.scala.createTypeInformation
  2. import org.apache.flink.core.fs.FileSystem.WriteMode
  3. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  4. object HdfsFileSink {
  5. def main(args: Array[String]): Unit = {
  6. // 1. 创建流处理环境
  7. val env = StreamExecutionEnvironment.getExecutionEnvironment
  8. env.setParallelism(1)
  9. // 2.用fromCollection创建DataStream(fromCollection)
  10. val data = env.fromCollection(List((1,"flink"),(2,"sink")))
  11. // 3.处理数据
  12. // 4.打印输出
  13. data.writeAsText("hdfs://h23:8020/tmp/test/tmp/FileSink.txt",WriteMode.OVERWRITE)
  14. // 5.执行任务
  15. env.execute()
  16. }
  17. }

2.3. Kafka Sink

kafka-console-consumer.sh —from-beginning —topic test2 —zookeeper node01:2181,node02:2181,node03:2181

示例

将数据落地到Kafka中

  1. cd /usr/hdp/current/kafka-broker/bin
  2. 查看topic列表 :
  3. ./kafka-topics.sh --zookeeper hxx:2181 list
  4. 创建topic :
  5. ./kafka-topics.sh --zookeeper hxx:2181 --create --topic test --partitions 3 --replication-factor 1
  6. 生产topic数据
  7. ./kafka-console-producer.sh --broker-list 192.168.xx.xx:9092 --topic test
  8. 读取topic数据
  9. ./kafka-console-consumer.sh --bootstrap-server 192.168.xx.xx:9092 --topic test --from-beginning
  10. import java.util.Properties
  11. import org.apache.flink.api.common.serialization.SimpleStringSchema
  12. import org.apache.flink.api.scala.createTypeInformation
  13. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  14. import org.apache.flink.streaming.connectors.kafka.{ FlinkKafkaConsumer, FlinkKafkaProducer}
  15. object KafkaSink {
  16. def main(args: Array[String]): Unit = {
  17. // 1. 创建流处理环境
  18. val env = StreamExecutionEnvironment.getExecutionEnvironment
  19. // 2.用fromCollection创建DataStream(fromCollection)
  20. val data = env.fromCollection(List("flink","Spark"))
  21. // 3.构造Kafka Sink
  22. val pro: Properties = new Properties
  23. pro.setProperty("bootstrap.servers", " 192.168.xx.xx:9092")
  24. val kafkaSink = new FlinkKafkaProducer[String]("test",new SimpleStringSchema(),pro)
  25. // 4.打印输出
  26. data.addSink(kafkaSink)
  27. // 5.执行任务
  28. env.execute()
  29. }
  30. }

2.4. MySQL Sink

示例

加载下列本地集合,导入MySql中

  1. List(
  2. (10, "dazhuang", "123456", "大壮"),
  3. (11, "erya", "123456", "二丫"),
  4. (12, "sanpang", "123456", "三胖")
  5. )

代码

  1. import java.sql.{ Connection, DriverManager, PreparedStatement}
  2. import org.apache.flink.api.scala.createTypeInformation
  3. import org.apache.flink.configuration.Configuration
  4. import org.apache.flink.streaming.api.functions.sink.{ RichSinkFunction, SinkFunction}
  5. import org.apache.flink.streaming.api.scala.{ DataStream, StreamExecutionEnvironment}
  6. object MySqlSink {
  7. def main(args: Array[String]): Unit = {
  8. // 1. 创建流处理环境
  9. val env = StreamExecutionEnvironment.getExecutionEnvironment
  10. // 2.用fromCollection创建DataStream(fromCollection)
  11. val data : DataStream[(Int, String, String, String)] = env.fromCollection(List(
  12. (10, "dazhuang", "123456", "大壮"),
  13. (11, "erya", "123456", "二丫"),
  14. (12, "sanpang", "123456", "三胖")
  15. ))
  16. // 3.设置sink
  17. data.addSink(new MySqlCustomSink)
  18. // 4.执行任务
  19. env.execute()
  20. }
  21. }
  22. class MySqlCustomSink extends RichSinkFunction [(Int,String,String,String)] {
  23. private var connection: Connection = null
  24. private var ps: PreparedStatement = null
  25. override def open(parameters: Configuration): Unit = {
  26. //1:加载驱动
  27. Class.forName("com.mysql.jdbc.Driver")
  28. //2:创建连接
  29. connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/tmp?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "root", "root")
  30. //3:获得执行语句
  31. val sql = "insert into user(id , username , password , name) values(?,?,?,?);"
  32. ps = connection.prepareStatement(sql)
  33. }
  34. override def invoke(value: (Int, String, String, String), context: SinkFunction.Context): Unit = {
  35. try {
  36. //4.组装数据,执行插入操作
  37. ps.setInt(1, value._1)
  38. ps.setString(2, value._2)
  39. ps.setString(3, value._3)
  40. ps.setString(4, value._4)
  41. ps.executeUpdate()
  42. } catch {
  43. case e: Exception => println(e.getMessage)
  44. }
  45. }
  46. //关闭连接操作
  47. override def close(): Unit = {
  48. if (connection != null) {
  49. connection.close()
  50. }
  51. if (ps != null) {
  52. ps.close()
  53. }
  54. }
  55. }

github 地址: https://github.com/BoYiZhang/flink-demo

发表评论

表情:
评论列表 (有 0 条评论,314人围观)

还没有评论,来说两句吧...

相关阅读

    相关 FlinkSink

    Flink没有类似于spark中的foreach方法,让用户进行迭代的操作。对外的输出操作要利用Sink完成。最后通过类似如下方式完成整个任务最终输出操作。 stre