利用ZooKeeper API模拟HDFS节点的监听模式
首先我们需要理解一件事情,虽然大多数人学习ZooKeeper是因为Hadoop和大数据,但实际上ZK只是分布式一致性算法的实现,和大数据以及Hadoop并无任何关系。
ZK本身是一套树桩结构的文件系统,这个系统每个文件节点可以存放一点儿数据。重点是这个文件系统十分敏感,一旦任何数据发生变化,ZK就会检测到并且报告给客户端。
我们利用这个特性来监管分布式系统的服务器,配置文件,命名空间等。
这里我给出一个小例子,希望帮助初学者理解,ZK是如何实现上述功能的
准备工作:
1,通过ZK的客户端窗口,创建一个空的文件,用来作为分布式文件系统的跟目录
create /servergroup ""
2,我们通过ZK API写一个监听程序,如果/servergroup下面节点发生变化,我们就打印出最新的节点列表。
现实中每个节点代表一个服务器,服务器启动时候,就会写入一个节点文件。服务器断开后,节点文件就会消失。我们可以根据这一点判断服务器是否连接。
package com.dynamic.client;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
//客户端监听服务,监控ZK服务器指定节点变化信息
//如果znode(服务器节点)的子节点发生变化,就会更新客户端的serverlist
public class AppClient {
//Application服务器节点路径
private String groupnode = "/servergroup";
private ZooKeeper zk;//客户端实例
private Stat stat= new Stat(); //保存节点状态信息
//保存server列表数据
//volatile多线程的情况下保持数据一致性
private volatile List<String> serverlist;
//创建链接服务器的Client,第三个参数是创建监视器实例
public void connectZK() throws Exception{
String hosts = "server72:2181,server73:2181,server74:2181";//zookeeper服务器列表
final int SESSION_TIMEOUT = 30000;//会话有效时间
//我们获得了一个已经连接ZK服务器的Client,这个Client会话有效时间是30秒,有一个watcher监控该实例
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher(){
@Override
public void process(WatchedEvent event) {
//如果发生子节点发生变化事件,就更新serverlist,并重新绑定watcher
//子节点发生变化 && 时间的路径是 "/servergroup"
if(event.getType()== Watcher.Event.EventType.NodeChildrenChanged && groupnode.equals(event.getPath())){
try {
//调用更新serverlist方法
updateServerlist();
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
//创建对象后更新一次server列表
System.out.println("---客户端已经连接服务器---");
updateServerlist();
}
//更新server列表的方法
public void updateServerlist() throws Exception {
List<String> newserverlist = new ArrayList<String>();
//ture表示给groupnode添加watcher(watcher被触发后就失效了)
//返回所有子节点的名称
List<String> childlist = zk.getChildren(groupnode, true);
for(String childname:childlist){
//获取节点的数据
byte data [] =zk.getData(groupnode+"/"+childname, false,stat);
String childdata = new String(data,"utf-8");//按照utf-8转化为字符串
//添加child的全路径
newserverlist.add(groupnode+"/"+childname+"/"+childdata);
}
serverlist = newserverlist;
System.out.println("-----服务器列表已经更新----"+new Date());
for(String ss:serverlist){
System.out.println(ss);
}
}
public void handle() throws InterruptedException{
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
AppClient ac = new AppClient();
ac.connectZK();
ac.handle();//线程等待
}
}
测试:
我在客户端输入命令 | [zk: localhost:2181(CONNECTED) 17] create /servergroup/server2 “192.168.1.2” Created /servergroup/server2 [zk: localhost:2181(CONNECTED) 18] create /servergroup/server2 “192.168.1.5” Node already exists: /servergroup/server2 [zk: localhost:2181(CONNECTED) 19] create /servergroup/server2 “192.168.1.9” Node already exists: /servergroup/server2 [zk: localhost:2181(CONNECTED) 20] create /servergroup/server9 “192.168.1.9” Created /servergroup/server9 [zk: localhost:2181(CONNECTED) 21] create /servergroup/server5 “192.168.1.5” Created /servergroup/server5 [zk: localhost:2181(CONNECTED) 22] delete /servergroup/server5 |
eclipse端输出以下信息 | —-客户端已经连接服务器—- ——-服务器列表已经更新——Thu Nov 02 18:30:24 CST 2017 /servergroup/server3/“192.168.1.3” /servergroup/server1/“192.168.1.1” ——-服务器列表已经更新——Thu Nov 02 18:35:18 CST 2017 /servergroup/server3/“192.168.1.3” /servergroup/server1/“192.168.1.1” /servergroup/server2/“192.168.1.2” ——-服务器列表已经更新——Thu Nov 02 18:35:42 CST 2017 /servergroup/server3/“192.168.1.3” /servergroup/server1/“192.168.1.1” /servergroup/server9/“192.168.1.9” /servergroup/server2/“192.168.1.2” ——-服务器列表已经更新——Thu Nov 02 18:35:52 CST 2017 /servergroup/server5/“192.168.1.5” /servergroup/server3/“192.168.1.3” /servergroup/server1/“192.168.1.1” /servergroup/server9/“192.168.1.9” /servergroup/server2/“192.168.1.2” ——-服务器列表已经更新——Thu Nov 02 18:36:09 CST 2017 /servergroup/server3/“192.168.1.3” /servergroup/server1/“192.168.1.1” /servergroup/server9/“192.168.1.9” /servergroup/server2/“192.168.1.2” |
现在我在重新写一个程序,在eclipse上模拟创建一个临时的,序列化的节点,并在3秒钟后断开连接。临时节点意味着,断开客户端连接后这个节点就自动消失。
根据这个特性,服务器如果断开我们就可以迅速感知到,并且列出剩余可用的服务器。
package com.dynamic.client;
import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class ServerMissTest {
private String groupnode = "/servergroup";
private ZooKeeper zk;
private Stat stat= new Stat();
void zkConnect() throws IOException{
String hosts = "server72:2181,server73:2181,server74:2181";//zookeeper服务器列表
final int SESSION_TIMEOUT = 30000;//会话有效时间
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher(){
@Override
public void process(WatchedEvent event) {
// TODO Auto-generated method stub
System.out.println("--start watching--");
System.out.println(event.toString());
}
});
}
void createNodeServer() throws KeeperException, InterruptedException{
//創建一個臨時的,序列化的節點,模仿HDFS的namenode
zk.create(groupnode+"/"+"HDFS-Namenode", "Namenode1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
ServerMissTest smt = new ServerMissTest();
smt.zkConnect();
//創建了一個臨時的節點,並在3秒鐘后斷開連接。這個臨時的幾點就會消失。
smt.createNodeServer();
Thread.sleep(3000);
//我們希望監聽端口能夠檢測到節點消失,並且打印出現在活著的節點。
}
}
测试结果:
在40分56秒,我启动了程序,客户端监听到了
在41分27秒,线程3秒接收后了,但整个过程持续了30秒左右,客户端监听到了Namenode1消失,并且打印出了可以使用的服务器列表
——-服务器列表已经更新——Thu Nov 02 18:36:09 CST 2017 /servergroup/server3/“192.168.1.3” /servergroup/server1/“192.168.1.1” /servergroup/server9/“192.168.1.9” /servergroup/server2/“192.168.1.2” ——-服务器列表已经更新——Thu Nov 02 18:40:56 CST 2017 /servergroup/server3/“192.168.1.3” /servergroup/server1/“192.168.1.1” /servergroup/server9/“192.168.1.9” /servergroup/server2/“192.168.1.2” /servergroup/HDFS-Namenode0000000013/Namenode1 ——-服务器列表已经更新——Thu Nov 02 18:41:27 CST 2017 /servergroup/server3/“192.168.1.3” /servergroup/server1/“192.168.1.1” /servergroup/server9/“192.168.1.9” /servergroup/server2/“192.168.1.2” |
以上,今天感觉CSDN不能粘贴图片非常不爽,准备搬家博客园吧。
还没有评论,来说两句吧...