flink集群搭建
#
- 1 flink下载
- 2.上传安装包
- 3 解压
- 4 修改配置文件
- 4.1 配置slaves
- 4.2 修改配置文件flink-conf.yaml
- 5 传送安装包到集群其他服务器
- 6 启动集群
- 7 验证
- 7.1 本地提交作业
- 7.2 打包提交作业
1 flink下载
flink 官方下载地址:https://flink.apache.org/downloads.html
本示例下载1.7.1
2.上传安装包
3 解压
tar -zxvf flink-1.7.1-bin-hadoop26-scala_2.11.tgz -C /opt/app/
4 修改配置文件
4.1 配置slaves
cd /opt/app/flink-1.7.1/conf
vim slaves
添加 TaskManager
node1
node2
node3
4.2 修改配置文件flink-conf.yaml
设置Master节点地址
cd /opt/app/flink-1.7.1/conf
vim flink-conf.yaml
设置master 节点为node0
5 传送安装包到集群其他服务器
cd /opt/app/
scp -r flink-1.7.1/ node1:`pwd`
scp -r flink-1.7.1/ node2:`pwd`
scp -r flink-1.7.1/ node3:`pwd`
6 启动集群
在node0上
cd /opt/app/flink-1.7.1/bin
./start-cluster.sh
7 验证
访问web页面:http://node0:8081
7.1 本地提交作业
在文章 基于flink实现的worldcount创建的maven工程基础上,创建java类ReadSockte
public class ReadSockte {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool=ParameterTool.fromArgs(args);
String node="";
int port=0;
if(parameterTool.has("node")&¶meterTool.has("port")){
node=parameterTool.get("node");
port=Integer.valueOf(parameterTool.get("port"));
}else{
System.out.println("no param get");
System.exit(1);
}
DataStreamSource<String>dss=env.socketTextStream(node,port);
SingleOutputStreamOperator<String> words=dss.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String []split=value.split(" ");
for (String word : split) {
out.collect(word);
}
}
});
SingleOutputStreamOperator<Tuple2<String,Integer>>map=words.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return new Tuple2<>(value,1);
}
});
KeyedStream<Tuple2<String,Integer>,Tuple>keyby=map.keyBy(0);
SingleOutputStreamOperator<Tuple2<String,Integer>>sum=keyby.sum(1);
sum.print(); //流里面 不会触发execute
env.execute();
}
}
添加参数
然后在node1 监听 9999端口
nc -lk 9999
hello flink
运行java代码
再输入
hello bigdata
hello java
看到 java 代码 实时获取字符串 进行计算
7.2 打包提交作业
maven 打包代码
上传打包代码
node2上 监听 9999端口
nc -lk -9999
提交任务
node1 上进入
cd /opt/app/flink-1.7.1/bin
./flink run -c com.wh.flink.ReadSockte /root/flink-1.0-SNAPSHOT-jar-with-dependencies.jar --node node2 --port 9999
在node2上输入 hello world
hello world
hello flink
进入web 查看任务情况
http://node0:8081/
还没有评论,来说两句吧...