flink消费kafka数据
maven配置
org.apache.flink
flink-scala_2.11
1.7.2
org.apache.flink
flink-streaming-scala_2.11
1.7.2
org.apache.flink
flink-connector-kafka_2.11
1.7.2 代码如下
import java.util.Properties
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchemaobject FlinKafka {
def main(args: Array[String]): Unit = {// 获取执行器的环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
//kafak配置
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "a")
//消费Kafka数据
//Flink’s Kafka consumer is called FlinkKafkaConsumer08 (
// or 09 for Kafka 0.9.0.x versions, etc.
// or just FlinkKafkaConsumer for Kafka >= 1.0.0 versions).
val stream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("t1", new SimpleStringSchema(), properties))
val stream2 = stream.map(_.split("\\W+")).flatMap(_.toSeq).map((_, 1)).keyBy(0).sum(1)
stream2.addSink(tup=>{ //sink动作,出发action
println(tup._1+", count-> ",tup._2)
})
//启动程序
env.execute("test kafka")
}
}输入测试数据: kafka生产者---------
wang@wang-pc:~$ kafka-console-producer.sh —broker-list localhost:9092 —topi1 t1
a a a
b c程序运行结果如下: --------
(a, count-> ,1)
(a, count-> ,2)
(a, count-> ,3)
(c, count-> ,1)
(b, count-> ,1)
还没有评论,来说两句吧...