ZooKeeper 客户端开发

灰太狼 2022-05-19 09:43 363阅读 0赞

上篇文章 ZooKeeper 原理与服务器集群部署 完成了 ZooKeeper 服务器集群的部署,本文以官方 API 和 zkClient 两种方式,演示了 ZooKeeper 数据的修改和状态监视。并以代码模拟了 ZooKeeper 在 Dubbo 中的作用。

作者:王克锋
出处:https://kefeng.wang/2017/11/10/zookeeper-development/
版权:自由转载-非商用-非衍生-保持署名,转载请标明作者和出处。

1.应用开发

API文档: https://zookeeper.apache.org/doc/current/api/index.html
Java示例: https://zookeeper.apache.org/doc/current/javaExample.html
程序员指南: https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html
通常,ZooKeeper应用程序分为两个单元,一个维护连接,另一个监视数据。

1.1 主要API

  • create: 创建结点;
  • delete: 删除结点;
  • exists: 判断结点是否存在;
  • get data: 读取结点数据;
  • set data: 写入结点数据;
  • get children: 获取结点的子结点;
  • sync: 数据同步;

1.2 Zookeeper 开发组件

Document: http://zookeeper.apache.org/doc/r3.4.11/
ZooKeeper 3.4.11 API: https://zookeeper.apache.org/doc/r3.4.11/api/index.html

是 zookeeper 自带的组件,繁琐且不可靠:

  • 会话超时异常时,重新连接繁琐;
  • watcher 是一次性的,需要额外编码把一次性订阅改为持久订阅;
  • 节点数据是二进制,对象数据都需要转换为二进制保存。

1.2.1 pom.xml

  1. <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
  2. <dependency>
  3. <groupId>org.apache.zookeeper</groupId>
  4. <artifactId>zookeeper</artifactId>
  5. <version>3.4.11</version>
  6. </dependency>

1.2.2 ZookeeperClient.java

  1. public class ZookeeperClient {
  2. private static Log logger = LogFactory.getLog(ZookeeperClient.class);
  3. public static void main(String args[]) throws IOException, KeeperException, InterruptedException {
  4. // 连接服务器
  5. final String serverUrl = "centos:2181,centos:2182,centos:2183";
  6. ZooKeeper zk = new ZooKeeper(serverUrl, 30000, new Watcher() {
  7. public void process(WatchedEvent event) { // 监控所有被触发的事件(一次性有效,必须再次注册 watcher)
  8. String path = event.getPath(); // 节点路径
  9. Event.EventType type = event.getType(); // 事件类型(节点的创建与删除、数据改变,子节点改变)
  10. logger.info("*** 观察者事件: path=" + path + ", type=" + type);
  11. }
  12. });
  13. // 创建节点
  14. List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; // 访问权限(开放,所有人可访问)
  15. CreateMode createMode = CreateMode.PERSISTENT; // 节点类型(持久节点,客户端连接断开后该节点不会删除,临时节点会被删除)
  16. zk.create("/parentNode", "parentData".getBytes(), acl, createMode);
  17. zk.create("/parentNode/childNode1", "childData1".getBytes(), acl, createMode); // 父节点必须存在
  18. zk.create("/parentNode/childNode2", "childData2".getBytes(), acl, createMode);
  19. // 查询数据
  20. logger.info("子节点列表: " + zk.getChildren("/parentNode", true)); // 获取子节点列表
  21. logger.info("节点存在性: " + (zk.exists("/parentNode/childNode1", true) != null));
  22. logger.info("节点原数据: " + new String(zk.getData("/parentNode/childNode1", true, null))); // 获取节点数据
  23. zk.setData("/parentNode/childNode1", "childData1-X".getBytes(), -1); // 设置节点数据(上限为 1M, 匹配所有版本)
  24. logger.info("节点新数据: " + new String(zk.getData("/parentNode/childNode1", true, null))); // 获取节点数据
  25. // 删除节点(不指定匹配的版本号时,匹配所有版本)
  26. zk.delete("/parentNode/childNode2", -1);
  27. zk.delete("/parentNode/childNode1", -1);
  28. zk.delete("/parentNode", -1);
  29. // 关闭连接
  30. zk.close();
  31. }
  32. }

1.2.3 运行结果

  1. 15:33:59.215 INFO [ZookeeperClient.java:24] - *** 观察者事件: path=null, type=None
  2. 15:33:59.233 INFO [ZookeeperClient.java:36] - 子节点列表: [childNode1, childNode2]
  3. 15:33:59.237 INFO [ZookeeperClient.java:37] - 节点存在性: true
  4. 15:33:59.240 INFO [ZookeeperClient.java:38] - 节点原数据: childData1
  5. 15:33:59.247 INFO [ZookeeperClient.java:24] - *** 观察者事件: path=/parentNode/childNode1, type=NodeDataChanged
  6. 15:33:59.247 INFO [ZookeeperClient.java:40] - 节点新数据: childData1-X
  7. 15:33:59.252 INFO [ZookeeperClient.java:24] - *** 观察者事件: path=/parentNode, type=NodeChildrenChanged
  8. 15:33:59.254 INFO [ZookeeperClient.java:24] - *** 观察者事件: path=/parentNode/childNode1, type=NodeDeleted

1.3 zkClient 开发组件

是对官方 ZooKeeper API 作了封装,避免了官方 API 的缺点。
https://github.com/sgroschupf/zkclient

1.3.1 pom.xml

  1. <dependency>
  2. <groupId>com.101tec</groupId>
  3. <artifactId>zkclient</artifactId>
  4. <version>0.10</version>
  5. </dependency>

1.3.2 ZookeeperClient.java

  1. public class ZookeeperClient {
  2. private static Log logger = LogFactory.getLog(ZookeeperClient.class);
  3. public static void main(String args[]) {
  4. // 连接服务器
  5. final String serverUrl = "centos:2181,centos:2182,centos:2183";
  6. ZkClient zk = new ZkClient(serverUrl);
  7. // 创建节点、并订阅节点事件
  8. zk.createPersistent("/parentNode", "parentData"); // 持久节点
  9. zk.createEphemeral("/parentNode/childNode1", "childData1"); // 临时节点
  10. zk.createEphemeral("/parentNode/childNode2", "childData2"); // 临时节点
  11. zk.subscribeChildChanges("/parentNode", new IZkChildListener() { // 子节点改变事件
  12. public void handleChildChange(String parentPath, List<String> childList) throws Exception {
  13. logger.info("*** 子节点事件: " + parentPath + ", " + childList);
  14. }
  15. });
  16. zk.subscribeDataChanges("/parentNode/childNode1", new IZkDataListener() { // 数据改变(或删除)事件
  17. public void handleDataChange(String path, Object data) throws Exception {
  18. logger.info("*** 数据改变事件: " + path + ", " + data);
  19. }
  20. public void handleDataDeleted(String path) throws Exception {
  21. logger.info("*** 数据删除事件: " + path);
  22. }
  23. });
  24. // 查询数据
  25. logger.info("子节点列表: " + zk.getChildren("/parentNode"));
  26. logger.info("节点存在性: " + zk.exists("/parentNode/childNode1"));
  27. logger.info("节点原数据: " + zk.readData("/parentNode/childNode1"));
  28. zk.writeData("/parentNode/childNode1", "childData1-X"); // 设置节点数据(上限为 1M, 匹配所有版本)
  29. logger.info("节点新数据: " + zk.readData("/parentNode/childNode1"));
  30. // 删除节点(不指定匹配的版本号时,匹配所有版本)
  31. zk.delete("/parentNode/childNode2");
  32. zk.delete("/parentNode/childNode1");
  33. zk.delete("/parentNode");
  34. // 关闭连接
  35. zk.close();
  36. }
  37. }

1.3.3 运行结果

  1. 16:17:21.904 INFO [ZookeeperClient.java:42] - 子节点列表: [childNode1, childNode2]
  2. 16:17:21.904 INFO [ZookeeperClient.java:43] - 节点存在性: true
  3. 16:17:21.909 INFO [ZookeeperClient.java:44] - 节点原数据: childData1
  4. 16:17:21.919 INFO [ZookeeperClient.java:46] - 节点新数据: childData1-X
  5. 16:17:21.921 INFO [ZookeeperClient.java:33] - *** 数据改变事件: /parentNode/childNode1, childData1-X
  6. 16:17:21.931 INFO [ZookeeperClient.java:28] - *** 子节点事件: /parentNode, []
  7. 16:17:21.951 INFO [ZookeeperClient.java:37] - *** 数据删除事件: /parentNode/childNode1

2.ZooKeeper 在 dubbo 中的应用

ZooKeeper 作为配置中心,提供服务地址的登记和查询;
consumer 订阅服务(订阅事件,动态感知服务地址变化);
provider 注册服务(创建 zk 节点);

Dubbo 在 ZooKeeper 中的存储结构如下(增加 consumer 分支是为了便于统计服务消费情况):

  • 第1级:根节点(configcenter),持久节点;
  • 第2级: 各个服务名称(serviceName),持久节点;
  • 第3级(扩充): 用于对节点分类(nodeType),区分 provider/consumer,持久节点;
  • 第4级[provider]: 特定 serviceName 的提供者地址列表(provideAddress),非持久节点,provider 下线时该节点会自动删除,并自动通知 consumer;
  • 第4级[consumer]: 特定 serviceName 的消费者地址列表(consumerAddress),非持久节点,consumer 下线时该节点会自动删除。

2.1 ServiceProvider.java

ServiceProvider 向注册中心(ZooKeeper集群)注册服务的关键代码:

  1. public class ServiceProvider {
  2. private static Log logger = LogFactory.getLog(ServiceProvider.class);
  3. private static final String rootPath = "/configcenter";
  4. private static final String servicePath = rootPath + "/serviceName";
  5. private static final String zookeeperList = "centos:2181,centos:2182,centos:2183";
  6. public static void main(String args[]) throws UnknownHostException {
  7. ZkClient zk = new ZkClient(zookeeperList);
  8. // 确保根节点存在
  9. if (!zk.exists(rootPath)) {
  10. zk.createPersistent(rootPath);
  11. }
  12. // 确保服务节点存在
  13. if (!zk.exists(servicePath)) {
  14. zk.createPersistent(servicePath);
  15. }
  16. // 向服务节点注册自身IP
  17. String serviceIp = InetAddress.getLocalHost().getHostAddress().toString();
  18. zk.createEphemeral(servicePath + "/" + serviceIp);
  19. // 关闭连接
  20. zk.close();
  21. }
  22. }

2.2 ServiceConsumer.java

ServiceConsumer 从注册中心(ZooKeeper集群)获取服务提供者地址列表的关键代码:

  1. public class ServiceConsumer {
  2. private static Log logger = LogFactory.getLog(ServiceConsumer.class);
  3. private static final String servicePath = "/configcenter/serviceName";
  4. private static final String zookeeperList = "centos:2181,centos:2182,centos:2183";
  5. private static List<String> providerList = null; // 本地缓存的服务提供者地址列表
  6. public static void main(String args[]) throws UnknownHostException {
  7. ZkClient zk = new ZkClient(zookeeperList);
  8. // 检查服务节点存在性
  9. if (zk.exists(servicePath)) {
  10. providerList = zk.getChildren(servicePath);
  11. } else {
  12. throw new RuntimeException("[" + servicePath + "] not exist.");
  13. }
  14. // 订阅子节点改变事件
  15. zk.subscribeChildChanges(servicePath, new IZkChildListener() {
  16. public void handleChildChange(String parentPath, List<String> childList) throws Exception {
  17. providerList = childList;
  18. }
  19. });
  20. // 关闭连接
  21. zk.close();
  22. }
  23. }

发表评论

表情:
评论列表 (有 0 条评论,363人围观)

还没有评论,来说两句吧...

相关阅读