flink消费kafka数据

偏执的太偏执、 2022-02-28 08:36 411阅读 0赞
  • maven配置


    org.apache.flink
    flink-scala_2.11
    1.7.2

    org.apache.flink
    flink-streaming-scala_2.11
    1.7.2

    org.apache.flink
    flink-connector-kafka_2.11

    1.7.2
  • 代码如下

    import java.util.Properties

    import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema

    object FlinKafka {
    def main(args: Array[String]): Unit = {

    1. // 获取执行器的环境
    2. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    3. env.enableCheckpointing(5000) // checkpoint every 5000 msecs
    4. //kafak配置
    5. val properties = new Properties()
    6. properties.setProperty("bootstrap.servers", "localhost:9092")
    7. properties.setProperty("zookeeper.connect", "localhost:2181")
    8. properties.setProperty("group.id", "a")
    9. //消费Kafka数据
    10. //Flink’s Kafka consumer is called FlinkKafkaConsumer08 (
    11. // or 09 for Kafka 0.9.0.x versions, etc.
    12. // or just FlinkKafkaConsumer for Kafka >= 1.0.0 versions).
    13. val stream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("t1", new SimpleStringSchema(), properties))
    14. val stream2 = stream.map(_.split("\\W+")).flatMap(_.toSeq).map((_, 1)).keyBy(0).sum(1)
    15. stream2.addSink(tup=>{ //sink动作,出发action
    16. println(tup._1+", count-> ",tup._2)
    17. })
    18. //启动程序
    19. env.execute("test kafka")

    }
    }

    输入测试数据: kafka生产者---------

    wang@wang-pc:~$ kafka-console-producer.sh —broker-list localhost:9092 —topi1 t1

    a a a
    b c

    程序运行结果如下: --------

    (a, count-> ,1)
    (a, count-> ,2)
    (a, count-> ,3)
    (c, count-> ,1)
    (b, count-> ,1)

发表评论

表情:
评论列表 (有 0 条评论,411人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Flink消费kafka数据

    前言: Flink的DataSoures模块中,定义了DataStream API的数据输入操作,Flink将数据源主要分为内置和第三方数据源,内置数据源包含文件、Soc...