09. Zookeeper JavaAPI-常用操作
Zookeeper 的JavaAPI对常用的节点操作都进行了封装, 和Shell 提供的命令差不多.
1. 创建节点API
1.1 创建默认权限节点
zooKeeper.create("/node-anyone", "test node anyone".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
1.2 创建auth 权限模式节点
// 添加用户认证信息
zooKeeper.addAuthInfo("digest", "admin:admin".getBytes());
zooKeeper.addAuthInfo("digest", "admin:123456".getBytes());
// 创建节点
zooKeeper.create("/node-auth", "test node auth".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
1.3 创建digest 模式权限节点
// 权限位使用或运算符拼接
int aclVal = ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE | ZooDefs.Perms.READ | ZooDefs.Perms.WRITE;
// 生成加密的用户认证信息
String authentication = DigestAuthenticationProvider.generateDigest("admin:admin");
// 创建权限模式
ACL acl = new ACL(aclVal, new Id("digest", authentication));
// 创建节点
zooKeeper.create("/node-digest", "test node auth".getBytes(), Collections.singletonList(acl), CreateMode.PERSISTENT);
1.4 创建短暂有序节点
// 创建节点, 返回节点名称. 因为有序, 所以节点名称和指定名称不同
String nodeName = zooKeeper.create("/node-es", "test node anyone".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("nodeName:" + nodeName);
2. 节点操作API
2.1 获取节点数据
// 用于保存节点信息
Stat stat = new Stat();
// 获取节点数据
byte[] data = zooKeeper.getData("/node-anyone", false, stat);
System.out.println("data:" + new String(data));
System.out.println("stat.version:" + stat.getVersion());
2.2 获取子节点
List<String> children = zooKeeper.getChildren("/", false);
2.3 判断节点是否存在
Stat stat = zooKeeper.exists("/sss", false);
2.4 删除节点
删除节点时, 必须指定节点版本号.
// 获取节点状态
Stat stat = new Stat();
zooKeeper.getData("/test", false, stat);
// 删除节点
zooKeeper.delete("/test", stat.getVersion());
3. 权限操作API
3.1 获取权限
List<ACL> aclList = zooKeeper.getACL("/node-auth", new Stat());
3.2 设置权限
设置权限时, 需要指定节点版本号.
// 创建节点信息对象, 用于保存节点信息
Stat nodeStat = new Stat();
zooKeeper.getData(nodePath, false, nodeStat);
// 更新节点权限
zooKeeper.setACL("/node-auth", ZooDefs.Ids.READ_ACL_UNSAFE, nodeStat.getVersion());
4. 批量操作
zookeeper 提供了multi() 和 transaction()API 来进行批量操作. 而Transaction 对象其实就是将multi 进行了一层封装而已.
//批量操作
List<Op> opList = new ArrayList<>();
opList.add(Op.create("/nodes", "nodes".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
opList.add(Op.create("/nodes/A", "aa".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
opList.add(Op.create("/nodes/B", "bb".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
opList.add(Op.create("/nodes/C", "cc".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
// 如果添加此行, 则全部失败
// opList.add(Op.create("/nodes/D/dd", "dd".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
List<OpResult> multi = zooKeeper.multi(opList);
for (OpResult opResult : multi) {
}
multi.stream()
.map(opResult -> (OpResult.CreateResult) opResult)
.forEach(result -> System.out.println(result.getPath() + "-> " + result.getType()));
5. 完整测试用例
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import org.junit.BeforeClass;
import org.junit.Test;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/** * @Description: 测试同步API * @author: zongf * @date: 2019-02-17 10:20 */
public class TestAPI {
// zookeeper 地址, 多个地址以逗号割开
private static String zkServer = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
// 会话超时时间
private static int sessionTimeout = 3000;
// zookeeper 客户端连接
private static ZooKeeper zooKeeper;
@BeforeClass
public static void init() throws Exception{
zooKeeper = new ZooKeeper(zkServer, sessionTimeout, watchedEvent ->{ });
System.out.println("******************* init start ***********************");
System.out.println("state:" + zooKeeper.getState());
System.out.println("sessionPwd:" + zooKeeper.getSessionPasswd());
System.out.println("sessionId:" + zooKeeper.getSessionId());
System.out.println("timeout:" + zooKeeper.getSessionTimeout());
System.out.println("******************* init end ***********************");
}
// 创建shell默认模式节点, 访问权限为- world:anyone:cdrwa
@Test
public void create_world() throws KeeperException, InterruptedException{
// 创建节点
zooKeeper.create("/node-anyone", "test node anyone".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 创建auth权限模式节点, 访问权限为- auth::cdrwa
@Test
public void create_auth() throws KeeperException, InterruptedException{
// 添加用户认证信息
zooKeeper.addAuthInfo("digest", "admin:admin".getBytes());
zooKeeper.addAuthInfo("digest", "admin:123456".getBytes());
// 创建节点
zooKeeper.create("/node-auth", "test node auth".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
}
// 创建digest模式节点, 访问权限为: cdrw, 只能admin用户访问
@Test
public void cteate_digest() throws KeeperException, InterruptedException, NoSuchAlgorithmException {
// 权限位使用或运算符拼接
int aclVal = ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE | ZooDefs.Perms.READ | ZooDefs.Perms.WRITE;
// 生成加密的用户认证信息
String authentication = DigestAuthenticationProvider.generateDigest("admin:admin");
// 创建权限模式
ACL acl = new ACL(aclVal, new Id("digest", authentication));
// 创建节点
zooKeeper.create("/node-digest", "test node auth".getBytes(), Collections.singletonList(acl), CreateMode.PERSISTENT);
}
// 创建短暂有序节点
@Test
public void create_ephemeral_sequential() throws KeeperException, InterruptedException{
// 创建节点, 返回节点名称. 因为有序, 所以节点名称和指定名称不同
String nodeName = zooKeeper.create("/node-es", "test node anyone".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("nodeName:" + nodeName);
// 休眠线程, 否则会话结束短暂节点就销毁了
Thread.sleep(10000);
}
// 异步创建
@Test
public void create_ansy() throws KeeperException, InterruptedException {
zooKeeper.create("/learn2", "just for learn async".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("rc:" + rc);
System.out.println("path:" + path);
System.out.println("name:" + name);
System.out.println("ctx:" + ctx);
}
}, "回调结果");
Thread.sleep(2000);
}
// 获取节点数据 和 状态
@Test
public void test_getData() throws KeeperException, InterruptedException {
// 用于保存节点信息
Stat stat = new Stat();
// 获取节点数据
byte[] data = zooKeeper.getData("/node-anyone", false, stat);
System.out.println("data:" + new String(data));
System.out.println("stat.version:" + stat.getVersion());
}
@Test
public void test_getChildren() throws KeeperException, InterruptedException {
List<String> children = zooKeeper.getChildren("/", false);
System.out.println(children);
}
// 测试节点是否存在
@Test
public void test_exsists() throws KeeperException, InterruptedException {
Stat stat = zooKeeper.exists("/sss", false);
System.out.println(stat != null? stat : "/sss节点不存在" );
}
// 删除节点: 必须指定版本号, 只能删除空节点. 不支持shell的rmr 操作
@Test
public void test_delete() throws KeeperException, InterruptedException {
// 创建测试节点
zooKeeper.create("/test", "test delete".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// 获取节点状态
Stat stat = new Stat();
zooKeeper.getData("/test", false, stat);
// 删除节点
zooKeeper.delete("/test", stat.getVersion());
}
// 获取权限
@Test
public void test_getACL() throws KeeperException, InterruptedException {
Stat nodeStat = new Stat();
List<ACL> aclList = zooKeeper.getACL("/node-auth", nodeStat);
aclList.forEach(acl -> System.out.println(acl.getId().toString().trim() + "->" + convertPerms(acl.getPerms())));
}
// 更新权限
@Test
public void test_setACL() throws KeeperException, InterruptedException {
String nodePath = "/node-anyone";
// 首先认证用户
zooKeeper.addAuthInfo("digest", "admin:123456".getBytes());
// 输出节点权限
List<ACL> aclList = zooKeeper.getACL(nodePath, new Stat());
aclList.forEach(acl -> System.out.println(acl.getId().toString().trim() + "->" + convertPerms(acl.getPerms())));
// 创建节点信息对象, 用于保存节点信息
Stat nodeStat = new Stat();
zooKeeper.getData(nodePath, false, nodeStat);
// 更新节点权限
zooKeeper.setACL("/node-auth", ZooDefs.Ids.READ_ACL_UNSAFE, nodeStat.getVersion());
System.out.println("修改后:");
// 重新输出节点权限
List<ACL> aclListAfter = zooKeeper.getACL(nodePath, new Stat());
aclListAfter.forEach(acl -> System.out.println(acl.getId().toString().trim() + "->" + convertPerms(acl.getPerms())));
// 输出结果
// 'digest,'admin:0uek/hZ/V9fgiM35b0Z2226acMQ=->rwcda
// 'digest,'admin:x1nq8J5GOJVPY6zgzhtTtA9izLc=->rwcda
// 修改后:
// 'world,' anyone -> r
}
// 将权限位转换为shell 字母
public static String convertPerms(int perms) {
if (perms == ZooDefs.Perms.ALL) {
return "rwcda";
}
StringBuffer sb = new StringBuffer();
if (perms - ZooDefs.Perms.DELETE >= 0) {
sb.append("d");
perms = perms - ZooDefs.Perms.DELETE;
}
if (perms - ZooDefs.Perms.CREATE >= 0) {
sb.append("c");
perms = perms - ZooDefs.Perms.CREATE;
}
if (perms - ZooDefs.Perms.WRITE >= 0) {
sb.append("w");
perms = perms - ZooDefs.Perms.WRITE;
}
if (perms - ZooDefs.Perms.READ >= 0) {
sb.append("r");
perms = perms - ZooDefs.Perms.READ;
}
return sb.toString();
}
// 批量执行: 批量创建节点, 要么全部成功, 要么全部失败
@Test
public void multi() throws KeeperException, InterruptedException {
//批量操作
List<Op> opList = new ArrayList<>();
opList.add(Op.create("/nodes", "nodes".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
opList.add(Op.create("/nodes/A", "aa".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
opList.add(Op.create("/nodes/B", "bb".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
opList.add(Op.create("/nodes/C", "cc".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
// 如果添加此行, 则全部失败
// opList.add(Op.create("/nodes/D/dd", "dd".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
List<OpResult> multi = zooKeeper.multi(opList);
for (OpResult opResult : multi) {
}
multi.stream()
.map(opResult -> (OpResult.CreateResult) opResult)
.forEach(result -> System.out.println(result.getPath() + "-> " + result.getType()));
}
}
还没有评论,来说两句吧...