Flink实操 : Sink操作
.
- 一 .前言
- 二 .类型
- 2.1. 基于本地集合的sink
- 2.2. 基于文件的sink
- 2.2.1.将数据写入本地文件
- 2.2.2.将数据写入HDFS
- 2.3. Kafka Sink
- 2.4. MySQL Sink
一 .前言
二 .类型
2.1. 基于本地集合的sink
目标:
基于下列数据,分别 进行打印输出,error输出,collect()
(19, "zhangsan", 178.8),
(17, "lisi", 168.8),
(18, "wangwu", 184.8),
(21, "zhaoliu", 164.8)
代码:
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object SinkCollection {
def main(args: Array[String]): Unit = {
// 1. 创建流处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 2.用fromCollection创建DataStream(fromCollection)
val data = env.fromCollection(List((19, "zhangsan", 178.8), (17, "lisi", 168.8), (18, "wangwu", 184.8), (21, "zhaoliu", 164.8) ))
// 3.处理数据
// 4.打印输出
data.print()
data.printToErr()
// data 的数据为批处理的时候可以使用collect
// print(data.collect())
// 5.执行任务
env.execute()
}
}
2.2. 基于文件的sink
- flink支持多种存储设备上的文件,包括本地文件,hdfs文件等。
- flink支持多种文件的存储格式,包括text文件,CSV文件等。
- writeAsText():TextOuputFormat - 将元素作为字符串写入行。字符串是通过调用每个元素的toString()方法获得的。
2.2.1.将数据写入本地文件
目标:
基于下列数据,写入到文件中
List((1,"flink"),(2,"sink"))
代码:
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object LocalFileSink {
def main(args: Array[String]): Unit = {
// 1. 创建流处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 2.用fromCollection创建DataStream(fromCollection)
val data = env.fromCollection(List((1,"flink"),(2,"sink")))
// 3.处理数据
// 4.打印输出
data.writeAsText("/opt/a/tmp/FileSink.txt",WriteMode.OVERWRITE)
// 5.执行任务
env.execute()
}
}
2.2.2.将数据写入HDFS
写入数据到HDFS中
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object HdfsFileSink {
def main(args: Array[String]): Unit = {
// 1. 创建流处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 2.用fromCollection创建DataStream(fromCollection)
val data = env.fromCollection(List((1,"flink"),(2,"sink")))
// 3.处理数据
// 4.打印输出
data.writeAsText("hdfs://h23:8020/tmp/test/tmp/FileSink.txt",WriteMode.OVERWRITE)
// 5.执行任务
env.execute()
}
}
2.3. Kafka Sink
kafka-console-consumer.sh —from-beginning —topic test2 —zookeeper node01:2181,node02:2181,node03:2181
示例
将数据落地到Kafka中
cd /usr/hdp/current/kafka-broker/bin
查看topic列表 :
./kafka-topics.sh --zookeeper hxx:2181 list
创建topic :
./kafka-topics.sh --zookeeper hxx:2181 --create --topic test --partitions 3 --replication-factor 1
生产topic数据
./kafka-console-producer.sh --broker-list 192.168.xx.xx:9092 --topic test
读取topic数据
./kafka-console-consumer.sh --bootstrap-server 192.168.xx.xx:9092 --topic test --from-beginning
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{ FlinkKafkaConsumer, FlinkKafkaProducer}
object KafkaSink {
def main(args: Array[String]): Unit = {
// 1. 创建流处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 2.用fromCollection创建DataStream(fromCollection)
val data = env.fromCollection(List("flink","Spark"))
// 3.构造Kafka Sink
val pro: Properties = new Properties
pro.setProperty("bootstrap.servers", " 192.168.xx.xx:9092")
val kafkaSink = new FlinkKafkaProducer[String]("test",new SimpleStringSchema(),pro)
// 4.打印输出
data.addSink(kafkaSink)
// 5.执行任务
env.execute()
}
}
2.4. MySQL Sink
示例
加载下列本地集合,导入MySql中
List(
(10, "dazhuang", "123456", "大壮"),
(11, "erya", "123456", "二丫"),
(12, "sanpang", "123456", "三胖")
)
代码
import java.sql.{ Connection, DriverManager, PreparedStatement}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{ RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{ DataStream, StreamExecutionEnvironment}
object MySqlSink {
def main(args: Array[String]): Unit = {
// 1. 创建流处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 2.用fromCollection创建DataStream(fromCollection)
val data : DataStream[(Int, String, String, String)] = env.fromCollection(List(
(10, "dazhuang", "123456", "大壮"),
(11, "erya", "123456", "二丫"),
(12, "sanpang", "123456", "三胖")
))
// 3.设置sink
data.addSink(new MySqlCustomSink)
// 4.执行任务
env.execute()
}
}
class MySqlCustomSink extends RichSinkFunction [(Int,String,String,String)] {
private var connection: Connection = null
private var ps: PreparedStatement = null
override def open(parameters: Configuration): Unit = {
//1:加载驱动
Class.forName("com.mysql.jdbc.Driver")
//2:创建连接
connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/tmp?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "root", "root")
//3:获得执行语句
val sql = "insert into user(id , username , password , name) values(?,?,?,?);"
ps = connection.prepareStatement(sql)
}
override def invoke(value: (Int, String, String, String), context: SinkFunction.Context): Unit = {
try {
//4.组装数据,执行插入操作
ps.setInt(1, value._1)
ps.setString(2, value._2)
ps.setString(3, value._3)
ps.setString(4, value._4)
ps.executeUpdate()
} catch {
case e: Exception => println(e.getMessage)
}
}
//关闭连接操作
override def close(): Unit = {
if (connection != null) {
connection.close()
}
if (ps != null) {
ps.close()
}
}
}
github 地址: https://github.com/BoYiZhang/flink-demo
还没有评论,来说两句吧...