利用ZooKeeper API模拟HDFS节点的监听模式

- 日理万妓 2022-06-06 07:38 213阅读 0赞

首先我们需要理解一件事情,虽然大多数人学习ZooKeeper是因为Hadoop和大数据,但实际上ZK只是分布式一致性算法的实现,和大数据以及Hadoop并无任何关系。

ZK本身是一套树桩结构的文件系统,这个系统每个文件节点可以存放一点儿数据。重点是这个文件系统十分敏感,一旦任何数据发生变化,ZK就会检测到并且报告给客户端。

我们利用这个特性来监管分布式系统的服务器,配置文件,命名空间等。

这里我给出一个小例子,希望帮助初学者理解,ZK是如何实现上述功能的

准备工作:

1,通过ZK的客户端窗口,创建一个空的文件,用来作为分布式文件系统的跟目录

  1. create /servergroup ""

2,我们通过ZK API写一个监听程序,如果/servergroup下面节点发生变化,我们就打印出最新的节点列表。

现实中每个节点代表一个服务器,服务器启动时候,就会写入一个节点文件。服务器断开后,节点文件就会消失。我们可以根据这一点判断服务器是否连接。

  1. package com.dynamic.client;
  2. import java.util.ArrayList;
  3. import java.util.Date;
  4. import java.util.List;
  5. import org.apache.zookeeper.WatchedEvent;
  6. import org.apache.zookeeper.Watcher;
  7. import org.apache.zookeeper.ZooKeeper;
  8. import org.apache.zookeeper.data.Stat;
  9. //客户端监听服务,监控ZK服务器指定节点变化信息
  10. //如果znode(服务器节点)的子节点发生变化,就会更新客户端的serverlist
  11. public class AppClient {
  12. //Application服务器节点路径
  13. private String groupnode = "/servergroup";
  14. private ZooKeeper zk;//客户端实例
  15. private Stat stat= new Stat(); //保存节点状态信息
  16. //保存server列表数据
  17. //volatile多线程的情况下保持数据一致性
  18. private volatile List<String> serverlist;
  19. //创建链接服务器的Client,第三个参数是创建监视器实例
  20. public void connectZK() throws Exception{
  21. String hosts = "server72:2181,server73:2181,server74:2181";//zookeeper服务器列表
  22. final int SESSION_TIMEOUT = 30000;//会话有效时间
  23. //我们获得了一个已经连接ZK服务器的Client,这个Client会话有效时间是30秒,有一个watcher监控该实例
  24. zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher(){
  25. @Override
  26. public void process(WatchedEvent event) {
  27. //如果发生子节点发生变化事件,就更新serverlist,并重新绑定watcher
  28. //子节点发生变化 && 时间的路径是 "/servergroup"
  29. if(event.getType()== Watcher.Event.EventType.NodeChildrenChanged && groupnode.equals(event.getPath())){
  30. try {
  31. //调用更新serverlist方法
  32. updateServerlist();
  33. } catch (Exception e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. }
  38. });
  39. //创建对象后更新一次server列表
  40. System.out.println("---客户端已经连接服务器---");
  41. updateServerlist();
  42. }
  43. //更新server列表的方法
  44. public void updateServerlist() throws Exception {
  45. List<String> newserverlist = new ArrayList<String>();
  46. //ture表示给groupnode添加watcher(watcher被触发后就失效了)
  47. //返回所有子节点的名称
  48. List<String> childlist = zk.getChildren(groupnode, true);
  49. for(String childname:childlist){
  50. //获取节点的数据
  51. byte data [] =zk.getData(groupnode+"/"+childname, false,stat);
  52. String childdata = new String(data,"utf-8");//按照utf-8转化为字符串
  53. //添加child的全路径
  54. newserverlist.add(groupnode+"/"+childname+"/"+childdata);
  55. }
  56. serverlist = newserverlist;
  57. System.out.println("-----服务器列表已经更新----"+new Date());
  58. for(String ss:serverlist){
  59. System.out.println(ss);
  60. }
  61. }
  62. public void handle() throws InterruptedException{
  63. Thread.sleep(Long.MAX_VALUE);
  64. }
  65. public static void main(String[] args) throws Exception {
  66. AppClient ac = new AppClient();
  67. ac.connectZK();
  68. ac.handle();//线程等待
  69. }
  70. }

测试:












我在客户端输入命令 [zk: localhost:2181(CONNECTED) 17] create /servergroup/server2 “192.168.1.2”
Created /servergroup/server2
[zk: localhost:2181(CONNECTED) 18] create /servergroup/server2 “192.168.1.5”
Node already exists: /servergroup/server2
[zk: localhost:2181(CONNECTED) 19] create /servergroup/server2 “192.168.1.9”
Node already exists: /servergroup/server2
[zk: localhost:2181(CONNECTED) 20] create /servergroup/server9 “192.168.1.9”
Created /servergroup/server9
[zk: localhost:2181(CONNECTED) 21] create /servergroup/server5 “192.168.1.5”
Created /servergroup/server5
[zk: localhost:2181(CONNECTED) 22] delete /servergroup/server5

eclipse端输出以下信息 —-客户端已经连接服务器—-
——-服务器列表已经更新——Thu Nov 02 18:30:24 CST 2017
/servergroup/server3/“192.168.1.3”
/servergroup/server1/“192.168.1.1”
——-服务器列表已经更新——Thu Nov 02 18:35:18 CST 2017
/servergroup/server3/“192.168.1.3”
/servergroup/server1/“192.168.1.1”
/servergroup/server2/“192.168.1.2”
——-服务器列表已经更新——Thu Nov 02 18:35:42 CST 2017
/servergroup/server3/“192.168.1.3”
/servergroup/server1/“192.168.1.1”
/servergroup/server9/“192.168.1.9”
/servergroup/server2/“192.168.1.2”
——-服务器列表已经更新——Thu Nov 02 18:35:52 CST 2017
/servergroup/server5/“192.168.1.5”
/servergroup/server3/“192.168.1.3”
/servergroup/server1/“192.168.1.1”
/servergroup/server9/“192.168.1.9”
/servergroup/server2/“192.168.1.2”
——-服务器列表已经更新——Thu Nov 02 18:36:09 CST 2017
/servergroup/server3/“192.168.1.3”
/servergroup/server1/“192.168.1.1”
/servergroup/server9/“192.168.1.9”
/servergroup/server2/“192.168.1.2”

现在我在重新写一个程序,在eclipse上模拟创建一个临时的,序列化的节点,并在3秒钟后断开连接。临时节点意味着,断开客户端连接后这个节点就自动消失。

根据这个特性,服务器如果断开我们就可以迅速感知到,并且列出剩余可用的服务器。

  1. package com.dynamic.client;
  2. import java.io.IOException;
  3. import org.apache.zookeeper.CreateMode;
  4. import org.apache.zookeeper.KeeperException;
  5. import org.apache.zookeeper.WatchedEvent;
  6. import org.apache.zookeeper.Watcher;
  7. import org.apache.zookeeper.ZooDefs.Ids;
  8. import org.apache.zookeeper.ZooKeeper;
  9. import org.apache.zookeeper.data.Stat;
  10. public class ServerMissTest {
  11. private String groupnode = "/servergroup";
  12. private ZooKeeper zk;
  13. private Stat stat= new Stat();
  14. void zkConnect() throws IOException{
  15. String hosts = "server72:2181,server73:2181,server74:2181";//zookeeper服务器列表
  16. final int SESSION_TIMEOUT = 30000;//会话有效时间
  17. zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher(){
  18. @Override
  19. public void process(WatchedEvent event) {
  20. // TODO Auto-generated method stub
  21. System.out.println("--start watching--");
  22. System.out.println(event.toString());
  23. }
  24. });
  25. }
  26. void createNodeServer() throws KeeperException, InterruptedException{
  27. //創建一個臨時的,序列化的節點,模仿HDFS的namenode
  28. zk.create(groupnode+"/"+"HDFS-Namenode", "Namenode1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  29. }
  30. public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
  31. ServerMissTest smt = new ServerMissTest();
  32. smt.zkConnect();
  33. //創建了一個臨時的節點,並在3秒鐘后斷開連接。這個臨時的幾點就會消失。
  34. smt.createNodeServer();
  35. Thread.sleep(3000);
  36. //我們希望監聽端口能夠檢測到節點消失,並且打印出現在活著的節點。
  37. }
  38. }

测试结果:

在40分56秒,我启动了程序,客户端监听到了

在41分27秒,线程3秒接收后了,但整个过程持续了30秒左右,客户端监听到了Namenode1消失,并且打印出了可以使用的服务器列表







——-服务器列表已经更新——Thu Nov 02 18:36:09 CST 2017
/servergroup/server3/“192.168.1.3”
/servergroup/server1/“192.168.1.1”
/servergroup/server9/“192.168.1.9”
/servergroup/server2/“192.168.1.2”
——-服务器列表已经更新——Thu Nov 02 18:40:56 CST 2017
/servergroup/server3/“192.168.1.3”
/servergroup/server1/“192.168.1.1”
/servergroup/server9/“192.168.1.9”
/servergroup/server2/“192.168.1.2”
/servergroup/HDFS-Namenode0000000013/Namenode1
——-服务器列表已经更新——Thu Nov 02 18:41:27 CST 2017
/servergroup/server3/“192.168.1.3”
/servergroup/server1/“192.168.1.1”
/servergroup/server9/“192.168.1.9”
/servergroup/server2/“192.168.1.2”

以上,今天感觉CSDN不能粘贴图片非常不爽,准备搬家博客园吧。

发表评论

表情:
评论列表 (有 0 条评论,213人围观)

还没有评论,来说两句吧...

相关阅读

    相关 HDFSAPI

    此文档用来提醒自己编写API的步骤  1.获取与服务器集群上HDFS的连接 a.获取环境变量Configuration conf=new Configur