Flink 简单入门示例 £神魔★判官ぃ 2022-04-23 06:08 500阅读 0赞 ## Flink 简单入门示例 ## #### 1.Flink读取Socket流,实现Word Count示例 #### import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.time.Time object SocketWindowWordCount { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val res: DataStream[String] = env.socketTextStream("localhost", 9998, '\n') val wordCounts = res .flatMap { w => w.split(",") } .map(w => WordWithCount(w, 1)) .keyBy("word") .timeWindow(Time.seconds(5), Time.seconds(1)) .sum("count") wordCounts.print() env.execute("SocketWindowWordCount") } } #### 2.Flink读取Text文件,实现Word Count示例 #### import org.apache.flink.core.fs.FileSystem import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} object TextWindowWordCount { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置并行度 env.setParallelism(2) val values = env.readTextFile("/Users/zhangzhiqiang/Documents/test_project/flinkdemo/data") values.print() val res = values.flatMap(_.split(",")) .filter(_.nonEmpty) .map((_, 1)) .keyBy(0) .sum(1) res.writeAsCsv("/Users/zhangzhiqiang/Documents/test_project/flinkdemo/data2", FileSystem.WriteMode.NO_OVERWRITE) env.execute() } } #### 3.Flink读取csv文件 #### import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.streaming.api.scala._ object ReadCsvFile { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 直接将数据,转成Student(相当于Schema) val values = env.readCsvFile[Student]("/Users/zhangzhiqiang/Documents/test_project/flinkdemo/data/1.csv") values.print() } } Student类 case class Student(name: String, age: Int, sex: String, id: String) #### 4.Flink读取csv文件,并使用Table sql转换 #### import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ import org.apache.flink.api.scala._ object ReadTableCsvFile { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val input = env.readCsvFile[Student]("/Users/zhangzhiqiang/Documents/test_project/flinkdemo/data/1.csv") input.print() tableEnv.registerDataSet("student", input) val result = tableEnv.sqlQuery("select * from student") result.printSchema() result.toDataSet[Student].print() } } 或者做些转换,如: import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ object ReadTableCsvFile2 { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // 获取table env对象 val tableEnv = TableEnvironment.getTableEnvironment(env) // 读取数据 val input = env.readCsvFile[Student]("/Users/zhangzhiqiang/Documents/test_project/flinkdemo/data/1.csv") input.print() // 将DataSet转成Table对象 val table = tableEnv.fromDataSet(input) // 注册 Table tableEnv.registerTable("student", table) // sql 查询语句 val result = tableEnv.sqlQuery("select name,age,sex from student") result.printSchema() // 将数据转化输出 result.toDataSet[People].print() } } People类 case class People(name: String, age: Int, sex: String) #### 5.Flink读取Kafka流,实现Word Count #### import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 object KafkaWordCountStreaming { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "test") val stream = env.addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), properties)) stream.print() val result = stream.flatMap(x => x.split(",")) .map(x => (x, 1)).keyBy(0) .timeWindow(Time.seconds(10)) .sum(1) result.print() env.execute("KafkaWordCountStreaming") } } #### 6.Flink读取Kafka流,转换后输出 #### import java.util.Properties import com.google.gson.Gson import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 object KafkaJsonStreaming { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val p = new Properties() p.setProperty("bootstrap.servers", "localhost:9092") p.setProperty("group.id", "test") val stream = env.addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), p)) stream.print() val result = stream.map { x => val g = new Gson() val people = g.fromJson(x, classOf[People]) people } result.print() env.execute("KafkaJsonStreaming") } } #### 7.Flink读取Kafka流,并写入Kafka #### import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010} import org.apache.flink.streaming.api.scala._ object KafkaToKafkaStreaming { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val p = new Properties() p.setProperty("bootstrap.servers", "localhost:9092") p.setProperty("group.id", "test") val input = env.addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), p)) input.print() val p2 = new Properties() p2.setProperty("bootstrap.servers", "localhost:9092") p2.setProperty("zookeeper.connect", "localhost:2181") p2.setProperty("group.id", "test") input.addSink(new FlinkKafkaProducer010[String]("test", new SimpleStringSchema(), p2)) env.execute("KafkaToKafkaStreaming") } } [具体项目参见github][github] [github]: https://github.com/zhang3550545/flinkdemo
还没有评论,来说两句吧...