09. Zookeeper JavaAPI-常用操作

灰太狼 2022-03-18 02:28 320阅读 0赞

Zookeeper 的JavaAPI对常用的节点操作都进行了封装, 和Shell 提供的命令差不多.

1. 创建节点API

1.1 创建默认权限节点

  1. zooKeeper.create("/node-anyone", "test node anyone".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

1.2 创建auth 权限模式节点

  1. // 添加用户认证信息
  2. zooKeeper.addAuthInfo("digest", "admin:admin".getBytes());
  3. zooKeeper.addAuthInfo("digest", "admin:123456".getBytes());
  4. // 创建节点
  5. zooKeeper.create("/node-auth", "test node auth".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);

1.3 创建digest 模式权限节点

  1. // 权限位使用或运算符拼接
  2. int aclVal = ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE | ZooDefs.Perms.READ | ZooDefs.Perms.WRITE;
  3. // 生成加密的用户认证信息
  4. String authentication = DigestAuthenticationProvider.generateDigest("admin:admin");
  5. // 创建权限模式
  6. ACL acl = new ACL(aclVal, new Id("digest", authentication));
  7. // 创建节点
  8. zooKeeper.create("/node-digest", "test node auth".getBytes(), Collections.singletonList(acl), CreateMode.PERSISTENT);

1.4 创建短暂有序节点

  1. // 创建节点, 返回节点名称. 因为有序, 所以节点名称和指定名称不同
  2. String nodeName = zooKeeper.create("/node-es", "test node anyone".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  3. System.out.println("nodeName:" + nodeName);

2. 节点操作API

2.1 获取节点数据

  1. // 用于保存节点信息
  2. Stat stat = new Stat();
  3. // 获取节点数据
  4. byte[] data = zooKeeper.getData("/node-anyone", false, stat);
  5. System.out.println("data:" + new String(data));
  6. System.out.println("stat.version:" + stat.getVersion());

2.2 获取子节点

  1. List<String> children = zooKeeper.getChildren("/", false);

2.3 判断节点是否存在

  1. Stat stat = zooKeeper.exists("/sss", false);

2.4 删除节点

删除节点时, 必须指定节点版本号.

  1. // 获取节点状态
  2. Stat stat = new Stat();
  3. zooKeeper.getData("/test", false, stat);
  4. // 删除节点
  5. zooKeeper.delete("/test", stat.getVersion());

3. 权限操作API

3.1 获取权限

  1. List<ACL> aclList = zooKeeper.getACL("/node-auth", new Stat());

3.2 设置权限

设置权限时, 需要指定节点版本号.

  1. // 创建节点信息对象, 用于保存节点信息
  2. Stat nodeStat = new Stat();
  3. zooKeeper.getData(nodePath, false, nodeStat);
  4. // 更新节点权限
  5. zooKeeper.setACL("/node-auth", ZooDefs.Ids.READ_ACL_UNSAFE, nodeStat.getVersion());

4. 批量操作

zookeeper 提供了multi() 和 transaction()API 来进行批量操作. 而Transaction 对象其实就是将multi 进行了一层封装而已.

  1. //批量操作
  2. List<Op> opList = new ArrayList<>();
  3. opList.add(Op.create("/nodes", "nodes".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
  4. opList.add(Op.create("/nodes/A", "aa".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
  5. opList.add(Op.create("/nodes/B", "bb".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
  6. opList.add(Op.create("/nodes/C", "cc".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
  7. // 如果添加此行, 则全部失败
  8. // opList.add(Op.create("/nodes/D/dd", "dd".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
  9. List<OpResult> multi = zooKeeper.multi(opList);
  10. for (OpResult opResult : multi) {
  11. }
  12. multi.stream()
  13. .map(opResult -> (OpResult.CreateResult) opResult)
  14. .forEach(result -> System.out.println(result.getPath() + "-> " + result.getType()));

5. 完整测试用例

  1. import org.apache.zookeeper.*;
  2. import org.apache.zookeeper.data.ACL;
  3. import org.apache.zookeeper.data.Id;
  4. import org.apache.zookeeper.data.Stat;
  5. import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
  6. import org.junit.BeforeClass;
  7. import org.junit.Test;
  8. import java.security.NoSuchAlgorithmException;
  9. import java.util.ArrayList;
  10. import java.util.Collections;
  11. import java.util.List;
  12. /** * @Description: 测试同步API * @author: zongf * @date: 2019-02-17 10:20 */
  13. public class TestAPI {
  14. // zookeeper 地址, 多个地址以逗号割开
  15. private static String zkServer = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
  16. // 会话超时时间
  17. private static int sessionTimeout = 3000;
  18. // zookeeper 客户端连接
  19. private static ZooKeeper zooKeeper;
  20. @BeforeClass
  21. public static void init() throws Exception{
  22. zooKeeper = new ZooKeeper(zkServer, sessionTimeout, watchedEvent ->{ });
  23. System.out.println("******************* init start ***********************");
  24. System.out.println("state:" + zooKeeper.getState());
  25. System.out.println("sessionPwd:" + zooKeeper.getSessionPasswd());
  26. System.out.println("sessionId:" + zooKeeper.getSessionId());
  27. System.out.println("timeout:" + zooKeeper.getSessionTimeout());
  28. System.out.println("******************* init end ***********************");
  29. }
  30. // 创建shell默认模式节点, 访问权限为- world:anyone:cdrwa
  31. @Test
  32. public void create_world() throws KeeperException, InterruptedException{
  33. // 创建节点
  34. zooKeeper.create("/node-anyone", "test node anyone".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  35. }
  36. // 创建auth权限模式节点, 访问权限为- auth::cdrwa
  37. @Test
  38. public void create_auth() throws KeeperException, InterruptedException{
  39. // 添加用户认证信息
  40. zooKeeper.addAuthInfo("digest", "admin:admin".getBytes());
  41. zooKeeper.addAuthInfo("digest", "admin:123456".getBytes());
  42. // 创建节点
  43. zooKeeper.create("/node-auth", "test node auth".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
  44. }
  45. // 创建digest模式节点, 访问权限为: cdrw, 只能admin用户访问
  46. @Test
  47. public void cteate_digest() throws KeeperException, InterruptedException, NoSuchAlgorithmException {
  48. // 权限位使用或运算符拼接
  49. int aclVal = ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE | ZooDefs.Perms.READ | ZooDefs.Perms.WRITE;
  50. // 生成加密的用户认证信息
  51. String authentication = DigestAuthenticationProvider.generateDigest("admin:admin");
  52. // 创建权限模式
  53. ACL acl = new ACL(aclVal, new Id("digest", authentication));
  54. // 创建节点
  55. zooKeeper.create("/node-digest", "test node auth".getBytes(), Collections.singletonList(acl), CreateMode.PERSISTENT);
  56. }
  57. // 创建短暂有序节点
  58. @Test
  59. public void create_ephemeral_sequential() throws KeeperException, InterruptedException{
  60. // 创建节点, 返回节点名称. 因为有序, 所以节点名称和指定名称不同
  61. String nodeName = zooKeeper.create("/node-es", "test node anyone".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  62. System.out.println("nodeName:" + nodeName);
  63. // 休眠线程, 否则会话结束短暂节点就销毁了
  64. Thread.sleep(10000);
  65. }
  66. // 异步创建
  67. @Test
  68. public void create_ansy() throws KeeperException, InterruptedException {
  69. zooKeeper.create("/learn2", "just for learn async".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, new AsyncCallback.StringCallback() {
  70. @Override
  71. public void processResult(int rc, String path, Object ctx, String name) {
  72. System.out.println("rc:" + rc);
  73. System.out.println("path:" + path);
  74. System.out.println("name:" + name);
  75. System.out.println("ctx:" + ctx);
  76. }
  77. }, "回调结果");
  78. Thread.sleep(2000);
  79. }
  80. // 获取节点数据 和 状态
  81. @Test
  82. public void test_getData() throws KeeperException, InterruptedException {
  83. // 用于保存节点信息
  84. Stat stat = new Stat();
  85. // 获取节点数据
  86. byte[] data = zooKeeper.getData("/node-anyone", false, stat);
  87. System.out.println("data:" + new String(data));
  88. System.out.println("stat.version:" + stat.getVersion());
  89. }
  90. @Test
  91. public void test_getChildren() throws KeeperException, InterruptedException {
  92. List<String> children = zooKeeper.getChildren("/", false);
  93. System.out.println(children);
  94. }
  95. // 测试节点是否存在
  96. @Test
  97. public void test_exsists() throws KeeperException, InterruptedException {
  98. Stat stat = zooKeeper.exists("/sss", false);
  99. System.out.println(stat != null? stat : "/sss节点不存在" );
  100. }
  101. // 删除节点: 必须指定版本号, 只能删除空节点. 不支持shell的rmr 操作
  102. @Test
  103. public void test_delete() throws KeeperException, InterruptedException {
  104. // 创建测试节点
  105. zooKeeper.create("/test", "test delete".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  106. // 获取节点状态
  107. Stat stat = new Stat();
  108. zooKeeper.getData("/test", false, stat);
  109. // 删除节点
  110. zooKeeper.delete("/test", stat.getVersion());
  111. }
  112. // 获取权限
  113. @Test
  114. public void test_getACL() throws KeeperException, InterruptedException {
  115. Stat nodeStat = new Stat();
  116. List<ACL> aclList = zooKeeper.getACL("/node-auth", nodeStat);
  117. aclList.forEach(acl -> System.out.println(acl.getId().toString().trim() + "->" + convertPerms(acl.getPerms())));
  118. }
  119. // 更新权限
  120. @Test
  121. public void test_setACL() throws KeeperException, InterruptedException {
  122. String nodePath = "/node-anyone";
  123. // 首先认证用户
  124. zooKeeper.addAuthInfo("digest", "admin:123456".getBytes());
  125. // 输出节点权限
  126. List<ACL> aclList = zooKeeper.getACL(nodePath, new Stat());
  127. aclList.forEach(acl -> System.out.println(acl.getId().toString().trim() + "->" + convertPerms(acl.getPerms())));
  128. // 创建节点信息对象, 用于保存节点信息
  129. Stat nodeStat = new Stat();
  130. zooKeeper.getData(nodePath, false, nodeStat);
  131. // 更新节点权限
  132. zooKeeper.setACL("/node-auth", ZooDefs.Ids.READ_ACL_UNSAFE, nodeStat.getVersion());
  133. System.out.println("修改后:");
  134. // 重新输出节点权限
  135. List<ACL> aclListAfter = zooKeeper.getACL(nodePath, new Stat());
  136. aclListAfter.forEach(acl -> System.out.println(acl.getId().toString().trim() + "->" + convertPerms(acl.getPerms())));
  137. // 输出结果
  138. // 'digest,'admin:0uek/hZ/V9fgiM35b0Z2226acMQ=->rwcda
  139. // 'digest,'admin:x1nq8J5GOJVPY6zgzhtTtA9izLc=->rwcda
  140. // 修改后:
  141. // 'world,' anyone -> r
  142. }
  143. // 将权限位转换为shell 字母
  144. public static String convertPerms(int perms) {
  145. if (perms == ZooDefs.Perms.ALL) {
  146. return "rwcda";
  147. }
  148. StringBuffer sb = new StringBuffer();
  149. if (perms - ZooDefs.Perms.DELETE >= 0) {
  150. sb.append("d");
  151. perms = perms - ZooDefs.Perms.DELETE;
  152. }
  153. if (perms - ZooDefs.Perms.CREATE >= 0) {
  154. sb.append("c");
  155. perms = perms - ZooDefs.Perms.CREATE;
  156. }
  157. if (perms - ZooDefs.Perms.WRITE >= 0) {
  158. sb.append("w");
  159. perms = perms - ZooDefs.Perms.WRITE;
  160. }
  161. if (perms - ZooDefs.Perms.READ >= 0) {
  162. sb.append("r");
  163. perms = perms - ZooDefs.Perms.READ;
  164. }
  165. return sb.toString();
  166. }
  167. // 批量执行: 批量创建节点, 要么全部成功, 要么全部失败
  168. @Test
  169. public void multi() throws KeeperException, InterruptedException {
  170. //批量操作
  171. List<Op> opList = new ArrayList<>();
  172. opList.add(Op.create("/nodes", "nodes".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
  173. opList.add(Op.create("/nodes/A", "aa".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
  174. opList.add(Op.create("/nodes/B", "bb".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
  175. opList.add(Op.create("/nodes/C", "cc".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
  176. // 如果添加此行, 则全部失败
  177. // opList.add(Op.create("/nodes/D/dd", "dd".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
  178. List<OpResult> multi = zooKeeper.multi(opList);
  179. for (OpResult opResult : multi) {
  180. }
  181. multi.stream()
  182. .map(opResult -> (OpResult.CreateResult) opResult)
  183. .forEach(result -> System.out.println(result.getPath() + "-> " + result.getType()));
  184. }
  185. }

发表评论

表情:
评论列表 (有 0 条评论,320人围观)

还没有评论,来说两句吧...

相关阅读