Flink实操 : DataSource操作 落日映苍穹つ 2022-11-04 12:25 268阅读 0赞 ### . ### * 一 .前言 * 二 .四种读取类型 * * 2.1. 基于本地集合的source(Collection-based-source) * 2.2. 基于文件的source(File-based-source) * 2.2.1. readTextFile * 2.2.2. readCsvFile * 2.3. 基于网络套接字的source(Socket-based-source) * 2.4. 自定义的source(Custom-source) * * 2.4.1.使用MySQL作为数据源 * 2.4.2.使用Kafka作为数据源 * 2.4.3.自定义数据源 # 一 .前言 # 本文主要写Flink读取数据的方式. 只考虑DataStream API. 数据读取的API定义在StreamExecutionEnvironment, 这是Flink流计算的起点. 一个DataStream就是从数据读取API中构造出来的. # 二 .四种读取类型 # Flink在流处理上大致有4大类 : 1. 基于本地集合的source(Collection-based-source) 2. 基于文件的source(File-based-source)- 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回 3. 基于网络套接字的source(Socket-based-source)- 从 socket 读取。元素可以用分隔符切分。 4. 自定义的source(Custom-source) ## 2.1. 基于本地集合的source(Collection-based-source) ## 其实就是把集合中的数据变成DataStream. 注: scala版本不支持 Iterable , Set, Map 集合. import org.apache.flink.streaming.api.scala.{ DataStream, StreamExecutionEnvironment, createTypeInformation} import scala.collection.mutable import scala.collection.mutable.{ ArrayBuffer, ListBuffer} object CollectionSource { def main(args: Array[String]): Unit ={ // 1. 创建流处理环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置并行度,默认和CPU的核数相同 env.setParallelism(1) //0.用element创建DataStream(fromElements) val ds0 : DataStream[Int] = env.fromElements(1,3234,55,65,74523,1) // ds0.print() //1.用Tuple创建DataStream(fromElements) val ds1: DataStream[(Int,String)] = env.fromElements((1,"bo"),(2,"yi")) // ds1.print() //2.用Array创建DataStream val ds2: DataStream[String] = env.fromCollection(Array("bo","yi")) // ds2.print() //3.用ArrayBuffer创建DataStream val ds3 :DataStream[String] = env.fromCollection(ArrayBuffer("bo","yi")) // ds3.print() //4.用List创建DataStream val ds4 : DataStream[String] = env.fromCollection(List("bo","yi")) // ds4.print() //5.用ListBuffer创建DataStream val ds5 : DataStream[String] = env.fromCollection(ListBuffer("BO","YI")) // ds5.print() //6.用Vector创建DataStream val ds6 : DataStream[String] = env.fromCollection(Vector("bo","yi","!!!")) // ds6.print() //7.用Queue创建DataStream val ds7: DataStream[String] = env.fromCollection(mutable.Queue("bo", "yi","flink","!!!")) // ds7.print() //8.用Stack创建DataStream val ds8: DataStream[String] = env.fromCollection(mutable.Stack("bo", "yi","flink","!!!")) // ds8.print() //9.用Stream创建DataStream val ds9: DataStream[String] = env.fromCollection(Stream("bo", "yi","flink","!!!")) // ds9.print() //10.用Seq创建DataStream val ds10: DataStream[String] = env.fromCollection(Seq("bo", "yi","flink","!!!")) // ds10.print() //11.用Set创建DataStream(不支持) //val ds11: DataStream[String] = env.fromCollection(Set("bo", "yi","flink","!!!")) //ds11.print() //12.用Iterable创建DataStream(不支持) //val ds12: DataStream[String] = env.fromCollection(Iterable("bo", "yi","flink","!!!")) //ds12.print() //13.用ArraySeq创建DataStream val ds13: DataStream[String] = env.fromCollection(mutable.ArraySeq("bo", "yi","flink","!!!")) // ds13.print() // //14.用ArrayStack创建DataStream // val ds14: DataStream[String] = env.fromCollection(mutable.ArrayStack("bo", "yi","flink","!!!")) // ds14.print() //15.用Map创建DataStream(不支持) //val ds15: DataStream[(Int, String)] = env.fromCollection(Map(1 -> "spark", 2 ->"flink")) //ds15.print() // //16.用Range创建DataStream val ds16: DataStream[Int] = env.fromCollection(Range(1, 9)) // ds16.print() // //17. Sequence创建DataStream val ds17: DataStream[Long] = env.fromSequence(1, 9) ds17.print() // 执行任务,但是在流环境下,必须手动执行任务 env.execute() } } ## 2.2. 基于文件的source(File-based-source) ## ## 2.2.1. readTextFile ## 从本地或者hdfs中加载数据 import org.apache.flink.streaming.api.scala.{ DataStream, StreamExecutionEnvironment} object FileSource { def main(args : Array[String]) : Unit = { // 1. 获取流处理运行环境 val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // 2. 读取文件 val data : DataStream[String] = env.readTextFile("hdfs://h23:8020/tmp/test/score.csv") // 3. 打印数据 data.print() // 4. 执行程序 env.execute() } } ## 2.2.2. readCsvFile ## import org.apache.flink.api.scala.{ ExecutionEnvironment, createTypeInformation} object JoinOp { // 学科Subject(学科ID、学科名字) case class Subject(id:Int, name:String) // 成绩Score(唯一ID、学生姓名、学科ID、分数) case class Score(id:Int, name:String, subjectId:Int, score:Double) def main(args: Array[String]): Unit = { // 1. 创建流处理环境 val env = ExecutionEnvironment.getExecutionEnvironment // 2.用fromCollection创建DataStream(fromCollection) val socreData = env.readCsvFile[Score]("hdfs://h23:8020/tmp/test/score.csv") val subjectData = env.readCsvFile[Subject]("hdfs://h23:8020/tmp/test/subject.csv") // 3.处理数据 val joinData = socreData.join(subjectData).where(2).equalTo(0) // 4.打印输出 joinData.print() } } ## 2.3. 基于网络套接字的source(Socket-based-source) ## import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.scala.{ DataStream, StreamExecutionEnvironment} object SocketSource { def main (args : Array[String]) : Unit = { //1. 获取流处理运行环境 val env : StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment() // 2. 构建socket流数据源,并指定IP地址和端口号 val data : DataStream[String] = env.socketTextStream("localhost",6666) // 3. 转换,以空格拆分单词 val res = data.flatMap(_.split(" ")) // 4. 打印输出 res.print() // 5. 启动执行 env.execute("WordCount_Stream") } } ## 2.4. 自定义的source(Custom-source) ## Flink 中你可以使用 StreamExecutionEnvironment.addSource(source) 来为你的程序添加数据来源。 Flink 已经提供了若干实现好了的 source functions,当然你也可以通过实现 SourceFunction 来自定义非并行的 source或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的source。 ### 2.4.1.使用MySQL作为数据源 ### package com.boyi.datasource import java.sql.{ Connection, DriverManager, PreparedStatement, ResultSet} import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.{ RichParallelSourceFunction, RichSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala.{ DataStream, StreamExecutionEnvironment} object MysqlCustomSource { def main(args : Array[String]) : Unit = { // 1. env val env = StreamExecutionEnvironment.getExecutionEnvironment // 2 使用自定义Source val mySqlDataStream: DataStream[(Int, String, String, String)] = env.addSource(new MysqlSource) // 3. 打印结果 mySqlDataStream.print() // 4. 执行任务 env.execute() } // 1. 自定义Source,继承自RichSourceFunction class MysqlSource extends RichSourceFunction[(Int,String,String,String)]{ var connection: Connection = null; var ps: PreparedStatement = null; override def open(parameters: Configuration): Unit = { super.open(parameters) // 1. 加载驱动 Class.forName("com.mysql.jdbc.Driver") // 2. 创建连接 connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/tmp?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "root", "root") // 3. 创建PreparedStatement val sql = "select id,username,password,name from user" ps = connection.prepareStatement(sql) } // 2. 实现run方法 override def run(sourceContext: SourceFunction.SourceContext[(Int, String, String, String)]): Unit = { // 4. 执行查询 val resultSet: ResultSet = ps.executeQuery() // 5. 遍历查询结果,收集数据 while(resultSet.next()){ val id = resultSet.getInt("id") val username = resultSet.getString("username") val password = resultSet.getString("password") val name = resultSet.getString("name") // 收集数据 sourceContext.collect((id,username,password,name)) } } override def cancel(): Unit = { if(null != connection){ connection.close() } if (ps != null) { ps.close() } } } } ### 2.4.2.使用Kafka作为数据源 ### package com.boyi.datasource import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.scala.{ DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.kafka.clients.CommonClientConfigs object KafkaCustomSource { def main(args : Array[String]) : Unit = { // 1. 创建流式环境 val env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //2 .指定kafak相关信息 val kafkaCluster = "k01:9092,k02:9092" val kafkaTopic = "test" // 3. 创建Kafka数据流 val props = new Properties() props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,kafkaCluster) val flinkKafkaConsumer = new FlinkKafkaConsumer[String](kafkaTopic,new SimpleStringSchema(),props) //4 .设置数据源 val data : DataStream[String] = env.addSource(flinkKafkaConsumer) // 5. 打印数据 data.print() // 6.执行任务 env.execute() } } ### 2.4.3.自定义数据源 ### package com.boyi.datasource import java.util.UUID import java.util.concurrent.TimeUnit import org.apache.flink.streaming.api.scala.{ DataStream, StreamExecutionEnvironment} import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.functions.source.{ RichSourceFunction, SourceFunction} import scala.util.Random // 自定义数据源, 每1秒钟随机生成一条订单信息( 订单ID 、 用户ID 、 订单金额 、 时间戳 ) // 要求: // 随机生成订单ID(UUID) // 随机生成用户ID(0-2) // 随机生成订单金额(0-100) // 时间戳为当前系统时间 // 开发步骤: // 1. 创建订单样例类 // 2. 获取流处理环境 // 3. 创建自定义数据源 // 循环1000次 // 随机构建订单信息 // 上下文收集数据 // 每隔一秒执行一次循环 // 4. 打印数据 // 5. 执行任务 object OwnCustomSource { // 创建一个订单样例类Order,包含四个字段(订单ID、用户ID、订单金额、时间戳) case class Order(id : String , userId : Int , money : Long , createTime : Long) def main(args : Array[String]) : Unit = { // 1. 获取流处理运行环境r val env = StreamExecutionEnvironment.getExecutionEnvironment // 2. 创建一个自定义数据源 val ownCustomSource : DataStream[Order] = env.addSource(new RichSourceFunction[Order] { override def run(sourceContext: SourceFunction.SourceContext[Order]): Unit = { for (i <- 0 until 1000){ // 随机生成订单ID(UUID) val id = UUID.randomUUID().toString // 随机生成用户ID(0-2) val userId = Random.nextInt(3) // 随机生成订单金额(0-100) val money = Random.nextInt(101) // 时间戳为当前系统时间 val createTime = System.currentTimeMillis() // 收集数据 sourceContext.collect(Order(id,userId,money, createTime)) // 每隔1秒生成一个订单 TimeUnit.SECONDS.sleep(1) } } override def cancel(): Unit = () }) ownCustomSource.print() env.execute() } } 代码: git lab : https://github.com/BoYiZhang/flink-demo
还没有评论,来说两句吧...