zookeeper原生客户端api的使用

川长思鸟来 2022-12-17 01:20 363阅读 0赞

zookeeper原生api的使用

要使用zookeeper原生的api,需引入下面的jar包:

  1. <dependency>
  2. <groupId>org.apache.zookeeper</groupId>
  3. <artifactId>zookeeper</artifactId>
  4. <version>3.6.2</version>
  5. </dependency>

基本操作

创建会话

  1. package com.morris.zookeeper.zookeeper;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.zookeeper.WatchedEvent;
  4. import org.apache.zookeeper.Watcher;
  5. import org.apache.zookeeper.ZooKeeper;
  6. import java.io.IOException;
  7. import java.util.concurrent.CountDownLatch;
  8. @Slf4j
  9. public abstract class ZookeeperFactory {
  10. public static ZooKeeper create() throws IOException, InterruptedException {
  11. final CountDownLatch countDownLatch = new CountDownLatch(1);
  12. final ZooKeeper zooKeeper = new ZooKeeper("10.0.4.105:2181,10.0.4.120:2181,10.0.4.129:2182", 6000, new Watcher() {
  13. public void process(WatchedEvent event) {
  14. if(event.getState().equals(Event.KeeperState.SyncConnected)) {
  15. log.info("connected success");
  16. countDownLatch.countDown();
  17. }
  18. }
  19. });
  20. countDownLatch.await();
  21. return zooKeeper;
  22. }
  23. }

ZooKeeper构造方法参数说明:

  • connectString:多个zookeeper IP地址用逗号分隔,也可以在后面带上节点,这样后续对节点的所有操作都是在这个节点之下,例如connectString为10.0.4.105:2181,10.0.4.120:2181,10.0.4.129:2182/xxxx,然后使用create /app,这样创建出来的节点的绝对路径为/xxxx/app,这样操作一定要确保/xxxx先存在,否则会报错。
  • sessionTimeout:session超时时间,单位为ms。
  • watcher:监听连接的状态,代码中使用了同步计数器CountDownLatch,因为new Zookeeper创建对象立马就会返回了,而客户端连接到服务端是耗时的,这个时候并没有真正的连接成功,如果这个时候拿zk客户端对象去做操作会报错,所以要等待连接建立成功的时候才能使用客户端对象。

创建节点

  1. String app1 = zooKeeper.create("/app1", "app1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);// 创建持久节点
  2. String app2 = zooKeeper.create("/app2", "app2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); // 创建持久顺序节点
  3. String app3 = zooKeeper.create("/app3", "app3".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // 创建临时节点
  4. String app4 = zooKeeper.create("/app4", "app4".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // 创建临时顺序节点

节点创建成功后会返回创建节点的真实路径,如果是非顺序节点,那么这个路径就是传入的路径,如果是顺序节点,那么这个路径就是传入的路径+序号。

创建一个已经存在的节点会报错:NodeExistsException,由此可见不能重复创建节点。

  1. zooKeeper.create("/app1", "xxoo".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

运行结果如下:

  1. org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /app1
  2. at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
  3. at org.apache.zookeeper.KeeperException.create(KeeperException.java:54)
  4. ... ...

创建一个父节点不存在的节点会报错:NoNodeException,由此可见不会自动递归创建节点。

  1. zooKeeper.create("/a/b", "bbb".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

运行结果如下:

  1. org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /a/b
  2. at org.apache.zookeeper.KeeperException.create(KeeperException.java:118)
  3. at org.apache.zookeeper.KeeperException.create(KeeperException.java:54)
  4. at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:1734)
  5. ... ...

带回调的异步创建节点的方式:

  1. zooKeeper.create("/app5", "app5".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, (rc, path, ctx, name) -> {
  2. log.info("create /app5:{}", name);
  3. }, null);

获取节点数据

  1. byte[] data = zooKeeper.getData("/app1", null, null);
  2. log.info("/app1 data:{}", new String(data));

带回调的异步获取节点数据的方式:

  1. zooKeeper.getData("/app1", null, (rc, path, ctx, data, stat) -> {
  2. log.info("/app1 data:{}", new String(data));
  3. }, null);

更新节点数据

  1. zooKeeper.setData("/app1", "morris".getBytes(), -1);

判断节点是否存在

  1. Stat stat = zooKeeper.exists("/app1", false);
  2. log.info("/app1 stat: {}", stat); // 8589934605,8589934622,1603164678863,1603165733893,1,0,0,0,6,0,8589934605
  3. Stat s = zooKeeper.exists("/ooxx", false);
  4. log.info("/ooxx stat: {}", s); // null 节点不存在返回null

删除节点

  1. zooKeeper.delete("/app1", -1);

删除一个存在子节点的节点会报错:NotEmptyException,由此可见不会自动递归删除节点。

获取子节点数据

  1. zooKeeper.create("/a", "a".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  2. zooKeeper.create("/a/b", "ab".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  3. zooKeeper.create("/a/c", "ac".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  4. zooKeeper.create("/a/d", "ad".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  5. zooKeeper.create("/a/b/x", "abx".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  6. List<String> children = zooKeeper.getChildren("/a", false);
  7. log.info("/a children:{}", children);

运行结果如下:

  1. 2020-10-20 14:41:46,054 INFO [main] (ZookeeperDemo.java:164) - /a children:[b, c, d]

getChildren只会返回所有子节点的集合,不包含孙子节点。

实现递归创建节点

  1. private void create(String path, String data) throws KeeperException, InterruptedException {
  2. String[] split = path.split("/");
  3. StringBuilder p = new StringBuilder();
  4. for (int i = 1; i < split.length - 1; i++) {
  5. p.append("/").append(split[i]);
  6. if(null == zookeeper.exists(p.toString(), null)) {
  7. zookeeper.create(p.toString(), null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  8. }
  9. }
  10. zookeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  11. }

实现递归删除节点

  1. private void delete(String path) throws KeeperException, InterruptedException {
  2. List<String> children = zookeeper.getChildren(path, false);
  3. for (String child : children) {
  4. delete(path + "/" + child);
  5. }
  6. zookeeper.delete(path, -1);
  7. }

Watcher监听机制

在创建zookeeper会话时会传入一个Watcher,里面有两个特别关键的类:KeeperState(连接状态)和EventType(事件类型)。

KeeperState

KeeperState表示的是客户端与服务端连接的状态。


































连接状态 描述
Disconnected 客户端与服务器断开连接
SyncConnected 客户端与服务器建立连接
AuthFailed 客户端进行连接认证失败
ConnectedReadOnly 客户端连接到的zookeeper服务是只读的
SaslAuthenticated 用于通知客户端它们是SASL认证的
Expired 客户端心跳检测没有收到服务端的响应时即认定断开连接,session失效

EventType

EventType表示的是节点发生变化时所触发的事件类型。






























事件类型 描述
NodeCreated 被监听的节点被创建
NodeChildrenChanged 被监听的节点的直接子节点被创建、被删除、子节点数据发生变更
NodeDataChanged 被监听的节点的数据发生变更
NodeDeleted 被监听的节点被删除
None 客户端的连接状态(KeeperState)发生变更

zookeeper中的watcher

zookeeper中给节点添加watcher的方式有两种:

  • 使用默认的watcher:

    List getChildren(String path, boolean watch)
    byte[] getData(String path, boolean watch, Stat stat)
    Stat exists(String path, boolean watch)

上面三个方法中都有一个boolean类型的watch参数,当watch==true时使用的就是默认的watcher,而默认的watcher就是创建连接的构造方法中的watcher,也可以通过register(Watcher watcher)注册默认的watcher。

  • 给节点指定的watcher:

    List getChildren(String path, Watcher watcher)
    byte[] getData(String path, Watcher watcher, Stat stat)
    Stat exists(String path, Watcher watcher)

上面三个方法中都有个Watcher类型的watch参数,可以通过传递这个参数给节点指定watcher。

watcher测试

创建会话并注入一个默认的Watcher:

  1. private ZooKeeper zookeeper;
  2. private CountDownLatch countDownLatch = new CountDownLatch(1);
  3. @Before
  4. public void getZkClient() throws InterruptedException, IOException {
  5. if (null == zookeeper) {
  6. zookeeper = new ZooKeeper("10.0.4.105:2181,10.0.4.120:2181,10.0.4.129:2182/xxxx", 6000, new MyWatcher("Default Watcher"));
  7. countDownLatch.await();
  8. }
  9. }
  10. class MyWatcher implements Watcher {
  11. private String name;
  12. public MyWatcher(String name) {
  13. this.name = name;
  14. }
  15. public void process(WatchedEvent watchedEvent) {
  16. Event.KeeperState state = watchedEvent.getState(); //获取事件的状态
  17. Event.EventType type = watchedEvent.getType(); //获取事件的类型
  18. if (Event.KeeperState.SyncConnected.equals(state)) {
  19. switch (type) {
  20. case None:
  21. log.info("zookeeper connected success");
  22. countDownLatch.countDown();
  23. break;
  24. case NodeCreated:
  25. log.info("[{}] create node: {}", name, watchedEvent.getPath());
  26. break;
  27. case NodeDeleted:
  28. log.info("[{}] delete node: {}", name, watchedEvent.getPath());
  29. break;
  30. case NodeDataChanged:
  31. log.info("[{}] node data change: {}", name, watchedEvent.getPath());
  32. break;
  33. case NodeChildrenChanged:
  34. log.info("[{}] node children change: {}", name, watchedEvent.getPath());
  35. break;
  36. }
  37. }
  38. }
  39. }

watcher

exists使用默认的watcher:

  1. zookeeper.exists("/p", true);
  2. zookeeper.create("/p", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  3. zookeeper.exists("/p", true);
  4. zookeeper.create("/p/c", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  5. zookeeper.exists("/p", true);
  6. zookeeper.setData("/p", "pp".getBytes(), -1);
  7. zookeeper.exists("/p", true);
  8. zookeeper.delete("/p/c", -1);
  9. zookeeper.exists("/p", true);
  10. zookeeper.delete("/p", -1);

运行结果如下:

  1. 2020-10-21 09:46:30,653 INFO [main-EventThread] (ZookeeperWatcherDemo.java:42) - zookeeper connected success
  2. 2020-10-21 09:46:30,670 INFO [main-EventThread] (ZookeeperWatcherDemo.java:46) - [Default Watcher] create node: /p
  3. 2020-10-21 09:46:30,681 INFO [main-EventThread] (ZookeeperWatcherDemo.java:52) - [Default Watcher}] node data change: /p
  4. 2020-10-21 09:46:30,688 INFO [main-EventThread] (ZookeeperWatcherDemo.java:49) - [Default Watcher] delete node: /p

从运行结果可以发现exists注册的watcher能监听节点的事件为:NodeCreated、NodeDataChanged、NodeDeleted。

getData

getData使用默认的watcher:

  1. zookeeper.create("/p", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  2. zookeeper.getData("/p", true, null);
  3. zookeeper.create("/p/c", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  4. zookeeper.getData("/p", true, null);
  5. zookeeper.setData("/p", "pp".getBytes(), -1);
  6. zookeeper.getData("/p", true, null);
  7. zookeeper.delete("/p/c", -1);
  8. zookeeper.getData("/p", true, null);
  9. zookeeper.delete("/p", -1);

运行结果如下:

  1. 2020-10-21 09:58:51,948 INFO [main-EventThread] (ZookeeperWatcherDemo.java:52) - [Default Watcher] node data change: /p
  2. 2020-10-21 09:58:51,956 INFO [main-EventThread] (ZookeeperWatcherDemo.java:49) - [Default Watcher] delete node: /p

从运行结果可以发现getData注册watcher能监听节点的事件为:NodeDeleted、NodeDataChanged,不能监听NodeCreated事件(getData一个不存在的节点会抛出异常NoNodeException)。

getChildren

getChildren使用默认的watcher:

  1. zookeeper.create("/p", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  2. zookeeper.getChildren("/p", true);
  3. zookeeper.create("/p/c", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  4. zookeeper.getChildren("/p", true);
  5. zookeeper.setData("/p", "pp".getBytes(), -1);
  6. zookeeper.getChildren("/p", true);
  7. zookeeper.delete("/p/c", -1);
  8. zookeeper.getChildren("/p", true);
  9. zookeeper.delete("/p", -1);

运行结果如下:

  1. 2020-10-21 10:06:03,887 INFO [main-EventThread] (ZookeeperWatcherDemo.java:55) - [Default Watcher] node children change: /p
  2. 2020-10-21 10:06:03,899 INFO [main-EventThread] (ZookeeperWatcherDemo.java:55) - [Default Watcher] node children change: /p
  3. 2020-10-21 10:06:03,905 INFO [main-EventThread] (ZookeeperWatcherDemo.java:49) - [Default Watcher] delete node: /p

从运行结果可以发现getChildren注册watcher能监听节点的事件为:NodeDeleted、NodeChildrenChanged。

用同一个方法注册多个watcher

  1. zookeeper.exists("/p", true); // 注册默认的wachter
  2. zookeeper.exists("/p", new MyWatcher("Customer Watcher")); // 再注册一个watcher
  3. zookeeper.create("/p", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  4. zookeeper.delete("/p", -1);

运行结果如下:

  1. 2020-10-21 10:21:17,377 INFO [main-EventThread] (ZookeeperWatcherDemo.java:46) - [Default Watcher] create node: /p
  2. 2020-10-21 10:21:17,377 INFO [main-EventThread] (ZookeeperWatcherDemo.java:46) - [Customer Watcher] create node: /p

结论:当一个节点注册了多个watcher,那么多个watcher的方法都会被回调。

用不同的方法注册同一个watcher多次

  1. zookeeper.create("/p", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  2. zookeeper.getData("/p", true, null); // 注册默认的wachter
  3. zookeeper.exists("/p", true); // 注册两次默认的wachter
  4. zookeeper.setData("/p", "ppp".getBytes(), -1);
  5. zookeeper.delete("/p", -1);

运行结果如下:

  1. 2020-10-21 10:22:41,025 INFO [main-EventThread] (ZookeeperWatcherDemo.java:52) - [Default Watcher] node data change: /p

结论:当一个节点用不同的方法都注册了同一个watcher时,watcher的方法只会回调一次。

总结

不同注册watcher的方法与可监听事件的关系:


































注册方式 NodeCreated NodeChildrenChanged NodeDataChanged NodeDeleted
getChildren Y Y
exists Y Y Y
getData Y Y

通过观察运行结果,总结如下:

  • 注册一次watcher只会收到一次通知,想一直监听就得收到通知后再次注册。
  • 同一个watcher实例被例如exists,getData等方法多次注册,zookeeper客户端也只会收到一次通知。
  • 当一个节点注册多个不同的watcher实例时,会通知多次,即每个被注册的watcher都会收到通知。
  • exists可以监听一个不存在的节点,但是getData和getChildren不能监控一个不存在的节点,否则会报NoNodeException。

zookeeper原生客户端的缺点

  • 不能递归的创建节点和删除节点。
  • 对节点的数据操作基于字节数组(二进制安全),经常需要数组和字符串之间的转换。
  • 想一直监听某个节点就要一直注册,监听一些不存在的节点可能会抛出异常。

发表评论

表情:
评论列表 (有 0 条评论,363人围观)

还没有评论,来说两句吧...

相关阅读