zookeeper原生客户端api的使用
zookeeper原生api的使用
要使用zookeeper原生的api,需引入下面的jar包:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.2</version>
</dependency>
基本操作
创建会话
package com.morris.zookeeper.zookeeper;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
@Slf4j
public abstract class ZookeeperFactory {
public static ZooKeeper create() throws IOException, InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final ZooKeeper zooKeeper = new ZooKeeper("10.0.4.105:2181,10.0.4.120:2181,10.0.4.129:2182", 6000, new Watcher() {
public void process(WatchedEvent event) {
if(event.getState().equals(Event.KeeperState.SyncConnected)) {
log.info("connected success");
countDownLatch.countDown();
}
}
});
countDownLatch.await();
return zooKeeper;
}
}
ZooKeeper构造方法参数说明:
- connectString:多个zookeeper IP地址用逗号分隔,也可以在后面带上节点,这样后续对节点的所有操作都是在这个节点之下,例如connectString为
10.0.4.105:2181,10.0.4.120:2181,10.0.4.129:2182/xxxx
,然后使用create /app
,这样创建出来的节点的绝对路径为/xxxx/app
,这样操作一定要确保/xxxx
先存在,否则会报错。 - sessionTimeout:session超时时间,单位为ms。
- watcher:监听连接的状态,代码中使用了同步计数器CountDownLatch,因为
new Zookeeper
创建对象立马就会返回了,而客户端连接到服务端是耗时的,这个时候并没有真正的连接成功,如果这个时候拿zk客户端对象去做操作会报错,所以要等待连接建立成功的时候才能使用客户端对象。
创建节点
String app1 = zooKeeper.create("/app1", "app1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);// 创建持久节点
String app2 = zooKeeper.create("/app2", "app2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); // 创建持久顺序节点
String app3 = zooKeeper.create("/app3", "app3".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // 创建临时节点
String app4 = zooKeeper.create("/app4", "app4".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // 创建临时顺序节点
节点创建成功后会返回创建节点的真实路径,如果是非顺序节点,那么这个路径就是传入的路径,如果是顺序节点,那么这个路径就是传入的路径+序号。
创建一个已经存在的节点会报错:NodeExistsException,由此可见不能重复创建节点。
zooKeeper.create("/app1", "xxoo".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
运行结果如下:
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /app1
at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:54)
... ...
创建一个父节点不存在的节点会报错:NoNodeException,由此可见不会自动递归创建节点。
zooKeeper.create("/a/b", "bbb".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
运行结果如下:
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /a/b
at org.apache.zookeeper.KeeperException.create(KeeperException.java:118)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:54)
at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:1734)
... ...
带回调的异步创建节点的方式:
zooKeeper.create("/app5", "app5".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, (rc, path, ctx, name) -> {
log.info("create /app5:{}", name);
}, null);
获取节点数据
byte[] data = zooKeeper.getData("/app1", null, null);
log.info("/app1 data:{}", new String(data));
带回调的异步获取节点数据的方式:
zooKeeper.getData("/app1", null, (rc, path, ctx, data, stat) -> {
log.info("/app1 data:{}", new String(data));
}, null);
更新节点数据
zooKeeper.setData("/app1", "morris".getBytes(), -1);
判断节点是否存在
Stat stat = zooKeeper.exists("/app1", false);
log.info("/app1 stat: {}", stat); // 8589934605,8589934622,1603164678863,1603165733893,1,0,0,0,6,0,8589934605
Stat s = zooKeeper.exists("/ooxx", false);
log.info("/ooxx stat: {}", s); // null 节点不存在返回null
删除节点
zooKeeper.delete("/app1", -1);
删除一个存在子节点的节点会报错:NotEmptyException,由此可见不会自动递归删除节点。
获取子节点数据
zooKeeper.create("/a", "a".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zooKeeper.create("/a/b", "ab".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zooKeeper.create("/a/c", "ac".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zooKeeper.create("/a/d", "ad".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zooKeeper.create("/a/b/x", "abx".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
List<String> children = zooKeeper.getChildren("/a", false);
log.info("/a children:{}", children);
运行结果如下:
2020-10-20 14:41:46,054 INFO [main] (ZookeeperDemo.java:164) - /a children:[b, c, d]
getChildren只会返回所有子节点的集合,不包含孙子节点。
实现递归创建节点
private void create(String path, String data) throws KeeperException, InterruptedException {
String[] split = path.split("/");
StringBuilder p = new StringBuilder();
for (int i = 1; i < split.length - 1; i++) {
p.append("/").append(split[i]);
if(null == zookeeper.exists(p.toString(), null)) {
zookeeper.create(p.toString(), null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
zookeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
实现递归删除节点
private void delete(String path) throws KeeperException, InterruptedException {
List<String> children = zookeeper.getChildren(path, false);
for (String child : children) {
delete(path + "/" + child);
}
zookeeper.delete(path, -1);
}
Watcher监听机制
在创建zookeeper会话时会传入一个Watcher,里面有两个特别关键的类:KeeperState(连接状态)和EventType(事件类型)。
KeeperState
KeeperState表示的是客户端与服务端连接的状态。
连接状态 | 描述 |
---|---|
Disconnected | 客户端与服务器断开连接 |
SyncConnected | 客户端与服务器建立连接 |
AuthFailed | 客户端进行连接认证失败 |
ConnectedReadOnly | 客户端连接到的zookeeper服务是只读的 |
SaslAuthenticated | 用于通知客户端它们是SASL认证的 |
Expired | 客户端心跳检测没有收到服务端的响应时即认定断开连接,session失效 |
EventType
EventType表示的是节点发生变化时所触发的事件类型。
事件类型 | 描述 |
---|---|
NodeCreated | 被监听的节点被创建 |
NodeChildrenChanged | 被监听的节点的直接子节点被创建、被删除、子节点数据发生变更 |
NodeDataChanged | 被监听的节点的数据发生变更 |
NodeDeleted | 被监听的节点被删除 |
None | 客户端的连接状态(KeeperState)发生变更 |
zookeeper中的watcher
zookeeper中给节点添加watcher的方式有两种:
使用默认的watcher:
List
getChildren(String path, boolean watch)
byte[] getData(String path, boolean watch, Stat stat)
Stat exists(String path, boolean watch)
上面三个方法中都有一个boolean类型的watch参数,当watch==true时使用的就是默认的watcher,而默认的watcher就是创建连接的构造方法中的watcher,也可以通过register(Watcher watcher)注册默认的watcher。
给节点指定的watcher:
List
getChildren(String path, Watcher watcher)
byte[] getData(String path, Watcher watcher, Stat stat)
Stat exists(String path, Watcher watcher)
上面三个方法中都有个Watcher类型的watch参数,可以通过传递这个参数给节点指定watcher。
watcher测试
创建会话并注入一个默认的Watcher:
private ZooKeeper zookeeper;
private CountDownLatch countDownLatch = new CountDownLatch(1);
@Before
public void getZkClient() throws InterruptedException, IOException {
if (null == zookeeper) {
zookeeper = new ZooKeeper("10.0.4.105:2181,10.0.4.120:2181,10.0.4.129:2182/xxxx", 6000, new MyWatcher("Default Watcher"));
countDownLatch.await();
}
}
class MyWatcher implements Watcher {
private String name;
public MyWatcher(String name) {
this.name = name;
}
public void process(WatchedEvent watchedEvent) {
Event.KeeperState state = watchedEvent.getState(); //获取事件的状态
Event.EventType type = watchedEvent.getType(); //获取事件的类型
if (Event.KeeperState.SyncConnected.equals(state)) {
switch (type) {
case None:
log.info("zookeeper connected success");
countDownLatch.countDown();
break;
case NodeCreated:
log.info("[{}] create node: {}", name, watchedEvent.getPath());
break;
case NodeDeleted:
log.info("[{}] delete node: {}", name, watchedEvent.getPath());
break;
case NodeDataChanged:
log.info("[{}] node data change: {}", name, watchedEvent.getPath());
break;
case NodeChildrenChanged:
log.info("[{}] node children change: {}", name, watchedEvent.getPath());
break;
}
}
}
}
watcher
exists使用默认的watcher:
zookeeper.exists("/p", true);
zookeeper.create("/p", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zookeeper.exists("/p", true);
zookeeper.create("/p/c", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zookeeper.exists("/p", true);
zookeeper.setData("/p", "pp".getBytes(), -1);
zookeeper.exists("/p", true);
zookeeper.delete("/p/c", -1);
zookeeper.exists("/p", true);
zookeeper.delete("/p", -1);
运行结果如下:
2020-10-21 09:46:30,653 INFO [main-EventThread] (ZookeeperWatcherDemo.java:42) - zookeeper connected success
2020-10-21 09:46:30,670 INFO [main-EventThread] (ZookeeperWatcherDemo.java:46) - [Default Watcher] create node: /p
2020-10-21 09:46:30,681 INFO [main-EventThread] (ZookeeperWatcherDemo.java:52) - [Default Watcher}] node data change: /p
2020-10-21 09:46:30,688 INFO [main-EventThread] (ZookeeperWatcherDemo.java:49) - [Default Watcher] delete node: /p
从运行结果可以发现exists注册的watcher能监听节点的事件为:NodeCreated、NodeDataChanged、NodeDeleted。
getData
getData使用默认的watcher:
zookeeper.create("/p", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zookeeper.getData("/p", true, null);
zookeeper.create("/p/c", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zookeeper.getData("/p", true, null);
zookeeper.setData("/p", "pp".getBytes(), -1);
zookeeper.getData("/p", true, null);
zookeeper.delete("/p/c", -1);
zookeeper.getData("/p", true, null);
zookeeper.delete("/p", -1);
运行结果如下:
2020-10-21 09:58:51,948 INFO [main-EventThread] (ZookeeperWatcherDemo.java:52) - [Default Watcher] node data change: /p
2020-10-21 09:58:51,956 INFO [main-EventThread] (ZookeeperWatcherDemo.java:49) - [Default Watcher] delete node: /p
从运行结果可以发现getData注册watcher能监听节点的事件为:NodeDeleted、NodeDataChanged,不能监听NodeCreated事件(getData一个不存在的节点会抛出异常NoNodeException)。
getChildren
getChildren使用默认的watcher:
zookeeper.create("/p", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zookeeper.getChildren("/p", true);
zookeeper.create("/p/c", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zookeeper.getChildren("/p", true);
zookeeper.setData("/p", "pp".getBytes(), -1);
zookeeper.getChildren("/p", true);
zookeeper.delete("/p/c", -1);
zookeeper.getChildren("/p", true);
zookeeper.delete("/p", -1);
运行结果如下:
2020-10-21 10:06:03,887 INFO [main-EventThread] (ZookeeperWatcherDemo.java:55) - [Default Watcher] node children change: /p
2020-10-21 10:06:03,899 INFO [main-EventThread] (ZookeeperWatcherDemo.java:55) - [Default Watcher] node children change: /p
2020-10-21 10:06:03,905 INFO [main-EventThread] (ZookeeperWatcherDemo.java:49) - [Default Watcher] delete node: /p
从运行结果可以发现getChildren注册watcher能监听节点的事件为:NodeDeleted、NodeChildrenChanged。
用同一个方法注册多个watcher
zookeeper.exists("/p", true); // 注册默认的wachter
zookeeper.exists("/p", new MyWatcher("Customer Watcher")); // 再注册一个watcher
zookeeper.create("/p", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zookeeper.delete("/p", -1);
运行结果如下:
2020-10-21 10:21:17,377 INFO [main-EventThread] (ZookeeperWatcherDemo.java:46) - [Default Watcher] create node: /p
2020-10-21 10:21:17,377 INFO [main-EventThread] (ZookeeperWatcherDemo.java:46) - [Customer Watcher] create node: /p
结论:当一个节点注册了多个watcher,那么多个watcher的方法都会被回调。
用不同的方法注册同一个watcher多次
zookeeper.create("/p", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zookeeper.getData("/p", true, null); // 注册默认的wachter
zookeeper.exists("/p", true); // 注册两次默认的wachter
zookeeper.setData("/p", "ppp".getBytes(), -1);
zookeeper.delete("/p", -1);
运行结果如下:
2020-10-21 10:22:41,025 INFO [main-EventThread] (ZookeeperWatcherDemo.java:52) - [Default Watcher] node data change: /p
结论:当一个节点用不同的方法都注册了同一个watcher时,watcher的方法只会回调一次。
总结
不同注册watcher的方法与可监听事件的关系:
注册方式 | NodeCreated | NodeChildrenChanged | NodeDataChanged | NodeDeleted |
---|---|---|---|---|
getChildren | Y | Y | ||
exists | Y | Y | Y | |
getData | Y | Y |
通过观察运行结果,总结如下:
- 注册一次watcher只会收到一次通知,想一直监听就得收到通知后再次注册。
- 同一个watcher实例被例如exists,getData等方法多次注册,zookeeper客户端也只会收到一次通知。
- 当一个节点注册多个不同的watcher实例时,会通知多次,即每个被注册的watcher都会收到通知。
- exists可以监听一个不存在的节点,但是getData和getChildren不能监控一个不存在的节点,否则会报NoNodeException。
zookeeper原生客户端的缺点
- 不能递归的创建节点和删除节点。
- 对节点的数据操作基于字节数组(二进制安全),经常需要数组和字符串之间的转换。
- 想一直监听某个节点就要一直注册,监听一些不存在的节点可能会抛出异常。
还没有评论,来说两句吧...