flink集群搭建

灰太狼 2022-10-11 06:18 262阅读 0赞

#

  • 1 flink下载
  • 2.上传安装包
  • 3 解压
  • 4 修改配置文件
  • 4.1 配置slaves
    • 4.2 修改配置文件flink-conf.yaml
    • 5 传送安装包到集群其他服务器
    • 6 启动集群
  • 7 验证
    • 7.1 本地提交作业
    • 7.2 打包提交作业

1 flink下载

flink 官方下载地址:https://flink.apache.org/downloads.html
在这里插入图片描述
本示例下载1.7.1
在这里插入图片描述
在这里插入图片描述

2.上传安装包

在这里插入图片描述

3 解压

tar -zxvf flink-1.7.1-bin-hadoop26-scala_2.11.tgz -C /opt/app/

4 修改配置文件

4.1 配置slaves

  1. cd /opt/app/flink-1.7.1/conf
  2. vim slaves

添加 TaskManager

  1. node1
  2. node2
  3. node3

设置Master节点地址

  1. cd /opt/app/flink-1.7.1/conf
  2. vim flink-conf.yaml

设置master 节点为node0
在这里插入图片描述

5 传送安装包到集群其他服务器

  1. cd /opt/app/
  2. scp -r flink-1.7.1/ node1:`pwd`
  3. scp -r flink-1.7.1/ node2:`pwd`
  4. scp -r flink-1.7.1/ node3:`pwd`

6 启动集群

在node0上

  1. cd /opt/app/flink-1.7.1/bin
  2. ./start-cluster.sh

在这里插入图片描述

7 验证

访问web页面:http://node0:8081
在这里插入图片描述

7.1 本地提交作业

在文章 基于flink实现的worldcount创建的maven工程基础上,创建java类ReadSockte

  1. public class ReadSockte {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
  4. ParameterTool parameterTool=ParameterTool.fromArgs(args);
  5. String node="";
  6. int port=0;
  7. if(parameterTool.has("node")&&parameterTool.has("port")){
  8. node=parameterTool.get("node");
  9. port=Integer.valueOf(parameterTool.get("port"));
  10. }else{
  11. System.out.println("no param get");
  12. System.exit(1);
  13. }
  14. DataStreamSource<String>dss=env.socketTextStream(node,port);
  15. SingleOutputStreamOperator<String> words=dss.flatMap(new FlatMapFunction<String, String>() {
  16. @Override
  17. public void flatMap(String value, Collector<String> out) throws Exception {
  18. String []split=value.split(" ");
  19. for (String word : split) {
  20. out.collect(word);
  21. }
  22. }
  23. });
  24. SingleOutputStreamOperator<Tuple2<String,Integer>>map=words.map(new MapFunction<String, Tuple2<String, Integer>>() {
  25. @Override
  26. public Tuple2<String, Integer> map(String value) throws Exception {
  27. return new Tuple2<>(value,1);
  28. }
  29. });
  30. KeyedStream<Tuple2<String,Integer>,Tuple>keyby=map.keyBy(0);
  31. SingleOutputStreamOperator<Tuple2<String,Integer>>sum=keyby.sum(1);
  32. sum.print(); //流里面 不会触发execute
  33. env.execute();
  34. }
  35. }

添加参数
在这里插入图片描述
在这里插入图片描述
然后在node1 监听 9999端口

  1. nc -lk 9999
  2. hello flink

运行java代码
在这里插入图片描述

再输入
hello bigdata
hello java
看到 java 代码 实时获取字符串 进行计算
在这里插入图片描述

7.2 打包提交作业

maven 打包代码
在这里插入图片描述
在这里插入图片描述

上传打包代码
在这里插入图片描述

node2上 监听 9999端口

  1. nc -lk -9999

提交任务
node1 上进入

  1. cd /opt/app/flink-1.7.1/bin
  2. ./flink run -c com.wh.flink.ReadSockte /root/flink-1.0-SNAPSHOT-jar-with-dependencies.jar --node node2 --port 9999

在这里插入图片描述
在node2上输入 hello world

  1. hello world
  2. hello flink

在这里插入图片描述

进入web 查看任务情况
http://node0:8081/
在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,262人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Flink

    3.Flink 集群搭建    **Flink 可以选择的部署方式有:**   Local、Standalone(资源利用率低)、Yarn、Mesos、Docker...