zookeeper客户端curator简易使用

客官°小女子只卖身不卖艺 2022-05-15 07:49 399阅读 0赞

写在前面:目前Curator有2.x.x和3.x.x两个系列的版本,支持不同版本的Zookeeper。其中Curator 2.x.x兼容Zookeeper的3.4.x和3.5.x。而Curator 3.x.x只兼容Zookeeper 3.5.x,并且提供了一些诸如动态重新配置、watch删除等新特性。本文是基于docker容器创建zookeeper环境,读者请自行选择zookeeper环境。如若读者想在centos安装docker,请参考《centos中简易安装docker》https://blog.csdn.net/belonghuang157405/article/details/80774506

本文参考文章:https://blog.csdn.net/haoyuyang/article/details/53469269

完整代码地址:https://github.com/Blankwhiter/curator

第一步 搭建zookeeper环境

在centos中,拉取zookeeper镜像,以及创建zookeeper容器。

  1. docker pull zookeeper
  2. docker run -d -v /home/soft/zookeeperhost/zookeeperDataDir:/data -v /home/soft/zookeeperhost/zookeeperDataLogDir:/datalog -e ZOO_MY_ID=1 -e ZOO_SERVERS='server.1=192.168.9.129:2888:3888' --net=host --name zookeeperhost --privileged zookeeper

注:192.168.9.129读者请自行改为本机ip,此文采用的单例,而非集群,如需集群 读者请自行创建.192.168.9.129

第二步 curator依赖加入

在pox.xml中加入curator包:

  1. <dependency>
  2. <groupId>org.apache.curator</groupId>
  3. <artifactId>curator-recipes</artifactId>
  4. <version>2.12.0</version>
  5. </dependency>

第三步 测试crud操作

  1. import org.apache.curator.framework.CuratorFramework;
  2. import org.apache.curator.framework.CuratorFrameworkFactory;
  3. import org.apache.curator.framework.recipes.cache.*;
  4. import org.apache.curator.retry.ExponentialBackoffRetry;
  5. import org.apache.zookeeper.data.Stat;
  6. /** * curator测试crud操作 */
  7. public class CuratorClientTest {
  8. /** * Zookeeper info 集群用,隔开,如192.168.9.127:2181,192.168.9.128:2181,192.168.9.129:2181 */
  9. private static final String ZK_ADDRESS = "192.168.9.129:2181";
  10. /** * 创建路径 */
  11. private static final String ZK_PATH_PARENT = "/new1";
  12. private static final String ZK_PATH = "/new1/mytest";
  13. public static void main(String[] args) throws Exception {
  14. CuratorFramework client = getClient();
  15. /* 开始连接 */
  16. client.start();
  17. // initNodeCache(client);
  18. // initPathChildrenCache(client);
  19. initTreeCache(client);
  20. createNode(client);
  21. /* 普通查询 */
  22. byte[] bytes = client.getData().forPath(ZK_PATH);
  23. System.out.println(new String(bytes));
  24. /* 包含状态查询 */
  25. Stat stat = new Stat();
  26. byte[] bytes1 = client.getData().storingStatIn(stat).forPath(ZK_PATH);
  27. System.out.println(new String(bytes1));
  28. // updateNode(client);
  29. // deleteNode(client);
  30. Thread.sleep(15000);
  31. /* 关闭连接 */
  32. client.close();
  33. }
  34. /** * 删除节点 * * @param client * @throws Exception */
  35. public static void deleteNode(CuratorFramework client) throws Exception {
  36. /* 删除节点 */
  37. // client.delete().forPath(ZK_PATH);
  38. /* 删除节点 并且递归删除子节点 */
  39. // client.delete().deletingChildrenIfNeeded().forPath(ZK_PATH);
  40. client.delete().guaranteed().forPath(ZK_PATH);
  41. }
  42. /** * 更新节点内容 */
  43. public static void updateNode(CuratorFramework client) throws Exception {
  44. /* 更新节点信息 如果未传入version参数,那么更新当前最新版本 */
  45. client.setData().forPath(ZK_PATH, "新内容3".getBytes());
  46. /* 指定版本更新 版本不一直异常信息:org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for */
  47. // client.setData().withVersion(aversion).forPath(ZK_PATH);
  48. }
  49. /** * 创建节点信息以及内容 * * @param client * @throws Exception */
  50. public static void createNode(CuratorFramework client) throws Exception {
  51. /* 创建节点以及对应内容。无法递归创建节点 */
  52. client.create().forPath(ZK_PATH + "/noCursion", "noCursion".getBytes());
  53. // client.create().forPath(ZK_PATH_PARENT,"noCursion".getBytes());
  54. /* 递归创建节点以及对应内容 */
  55. client.create().creatingParentsIfNeeded().forPath(ZK_PATH, "testdata".getBytes());
  56. // client.create().forPath(ZK_PATH_PARENT).
  57. }
  58. /** * 获得客户端连接 * * @return */
  59. public static CuratorFramework getClient() {
  60. /* 重试策略 */
  61. ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);
  62. /* CuratorFrameworkFactory工厂创建实例 */
  63. return CuratorFrameworkFactory.newClient(ZK_ADDRESS, exponentialBackoffRetry);
  64. }
  65. /** * 初始化TreeCache的节点监听 * * @param client * @throws Exception */
  66. public static void initTreeCache(CuratorFramework client) throws Exception {
  67. TreeCache treeCache = new TreeCache(client, ZK_PATH_PARENT);
  68. treeCache.start();
  69. treeCache.getListenable().addListener((curatorFramework, treeCacheEvent) -> {
  70. switch (treeCacheEvent.getType()) {
  71. case NODE_ADDED:
  72. System.out.println("NODE_ADDED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())
  73. + ",状态:" + treeCacheEvent.getData().getStat());
  74. break;
  75. case NODE_UPDATED:
  76. System.out.println("NODE_UPDATED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())
  77. + ",状态:" + treeCacheEvent.getData().getStat());
  78. break;
  79. case NODE_REMOVED:
  80. System.out.println("NODE_REMOVED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())
  81. + ",状态:" + treeCacheEvent.getData().getStat());
  82. break;
  83. default:
  84. break;
  85. }
  86. });
  87. }
  88. /** * 初始化节点监听 * * @param client * @throws Exception */
  89. public static void initNodeCache(CuratorFramework client) throws Exception {
  90. /* 监听节点的新增、修改操作。 最后一个参数表示是否进行压缩 */
  91. NodeCache nodeCache = new NodeCache(client, ZK_PATH_PARENT, false);
  92. nodeCache.start(true);
  93. /* 会监听父节点的创建和修改,删除不会监听 */
  94. nodeCache.getListenable().addListener(() -> {
  95. System.out.println("nodeCache listen begin");
  96. System.out.println("data:" + nodeCache.getCurrentData().getData().toString());
  97. System.out.println("nodeCache listen end");
  98. });
  99. }
  100. /** * 初始化子节点监听 * * @param client * @throws Exception */
  101. public static void initPathChildrenCache(CuratorFramework client) throws Exception {
  102. /* 监听子节点的新增、修改、删除操作。 */
  103. PathChildrenCache pathChildrenCache = new PathChildrenCache(client, ZK_PATH_PARENT, true);
  104. /** * 如果不填写这个参数,则无法监听到子节点的数据更新 如果参数为PathChildrenCache.StartMode.BUILD_INITIAL_CACHE,则会预先创建之前指定的/super节点 如果参数为PathChildrenCache.StartMode.POST_INITIALIZED_EVENT,效果与BUILD_INITIAL_CACHE相同,只是不会预先创建/super节点 参数为PathChildrenCache.StartMode.NORMAL时,与不填写参数是同样的效果,不会监听子节点的数据更新操作 */
  105. pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
  106. pathChildrenCache.getListenable().addListener((curatorFramework, event) -> {
  107. switch (event.getType()) {
  108. case CHILD_ADDED:
  109. System.out.println("CHILD_ADDED,类型:" + event.getType() + ",路径:" + event.getData().getPath() + ",数据:" +
  110. new String(event.getData().getData()) + ",状态:" + event.getData().getStat());
  111. break;
  112. case CHILD_UPDATED:
  113. System.out.println("CHILD_UPDATED,类型:" + event.getType() + ",路径:" + event.getData().getPath() + ",数据:" +
  114. new String(event.getData().getData()) + ",状态:" + event.getData().getStat());
  115. break;
  116. case CHILD_REMOVED:
  117. System.out.println("CHILD_REMOVED,类型:" + event.getType() + ",路径:" + event.getData().getPath() + ",数据:" +
  118. new String(event.getData().getData()) + ",状态:" + event.getData().getStat());
  119. break;
  120. default:
  121. break;
  122. }
  123. });
  124. }

发表评论

表情:
评论列表 (有 0 条评论,399人围观)

还没有评论,来说两句吧...

相关阅读