大数据之Flink(2) | Flink快速上手(wordcount) 亦凉 2022-12-15 14:13 145阅读 0赞 ### 快速上手 ### * 一.搭建maven工程 Flink-Learning * * pom文件 * 二.批处理wordcount * 三.流处理wordcount # 一.搭建maven工程 Flink-Learning # ## pom文件 ## <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.10.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.10.0</version> </dependency> </dependencies> <build> <plugins> <!-- 该插件用于将Scala代码编译成class文件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.4.6</version> <executions> <execution> <!-- 声明绑定到maven的compile阶段 --> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> ## 二.批处理wordcount ## import org.apache.flink.api.scala._ object WordCount { def main(args: Array[String]): Unit = { //创建一个批处理的执行环境 val env = ExecutionEnvironment.getExecutionEnvironment //从文件中读取数据 val inputDataSet = env.readTextFile("D:\\idea\\Flink-Learning\\word.txt") //基于Dataset做转换,首先按照空格分词打散,然后按照word作为key做group by val resultDataSet = inputDataSet .flatMap(_.split(" ")) //分词得到所有word构成的数据集 .map((_, 1)) //转成一个二元组(word,count) .groupBy(0) //以二元组中第一个元素作为key分组 .sum(1) //聚合二元组中的第二个元素的值 //打印 resultDataSet.print() } } 结果 ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQ2NTQ4ODU1_size_16_color_FFFFFF_t_70_pic_center] ## 三.流处理wordcount ## import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala._ object StreamWordCount { def main(args: Array[String]): Unit = { //创建流处理执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //从程序运行参数中读取hostname和port val params = ParameterTool.fromArgs(args) val hostname = params.get("host") val port = params.getInt("port") //接收socket文本流 val inputDataStream = env.socketTextStream(hostname, port) //定义转换操作:wordcount val resultDataStream = inputDataStream .flatMap(_.split(" ")) .filter(_.nonEmpty) .map((_, 1)) //转换成(word,count)二元组 .keyBy(0) //按照第一个元素分组 .sum(1) //按照第二个元素求和 //输出 resultDataStream.print().setParallelism(1) env.execute("stream word count job") } } 打开hadoop12虚拟机,输入你想要计算的内容 [root@hadoop12 ~]# nc -lk 9999 一般生产环境下,需要部署提交(打包),不能写死! 所以加了3行! val params = ParameterTool.fromArgs(args) val hostname = params.get("host") val port = params.getInt("port") 然后运行时,在配置参数 ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQ2NTQ4ODU1_size_16_color_FFFFFF_t_70_pic_center 1] [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQ2NTQ4ODU1_size_16_color_FFFFFF_t_70_pic_center]: /images/20221123/2c0e22942f0b4efd9c029297c3aed212.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQ2NTQ4ODU1_size_16_color_FFFFFF_t_70_pic_center 1]: /images/20221123/2532ea183298434dba601b2a22b155a9.png
还没有评论,来说两句吧...