zookeeper分布式集群中Curator的PathChildrenCache监测节点数据更新添加删除

野性酷女 2024-04-18 03:53 132阅读 0赞

zookeeper分布式集群中Curator的PathChildrenCache监测节点数据更新添加删除

  1. import org.apache.curator.RetryPolicy;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.CuratorFrameworkFactory;
  4. import org.apache.curator.framework.recipes.cache.ChildData;
  5. import org.apache.curator.framework.recipes.cache.PathChildrenCache;
  6. import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
  7. import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
  8. import org.apache.curator.retry.ExponentialBackoffRetry;
  9. import org.apache.zookeeper.CreateMode;
  10. import org.apache.zookeeper.data.Stat;
  11. import java.util.List;
  12. import java.util.concurrent.TimeUnit;
  13. public class Main {
  14. private final String TAG = "调试# ";
  15. public static void main(String[] args) {
  16. //初始化log4j,zookeeper否则报错。
  17. //org.apache.log4j.BasicConfigurator.configure();
  18. try {
  19. Main m = new Main();
  20. } catch (Exception e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. private String getAddress() {
  25. String ip = "127.0.0.1";
  26. return ip + ":2181," + ip + ":2182," + ip + ":2183";
  27. }
  28. public Main() throws Exception {
  29. RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
  30. CuratorFramework client = CuratorFrameworkFactory.builder()
  31. .connectString(getAddress())
  32. .sessionTimeoutMs(10 * 1000)
  33. .connectionTimeoutMs(20 * 1000)
  34. .retryPolicy(retryPolicy)
  35. .build();
  36. client.start();
  37. String path = "/test_data/my_path";
  38. //检测是否存在该路径。
  39. Stat stat = client.checkExists().forPath(path);
  40. //如果不存在这个路径,stat为null,创建新的节点路径。
  41. if (stat == null) {
  42. String s = client.create()
  43. .creatingParentsIfNeeded()
  44. .withMode(CreateMode.PERSISTENT)
  45. .forPath(path);
  46. System.out.println(TAG + "已创建:" + s);
  47. } else {
  48. System.out.println(TAG + "已存在:" + path);
  49. }
  50. PathChildrenCache pathChildrenCache = new PathChildrenCache(client, path, true);
  51. pathChildrenCache.start();
  52. PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
  53. @Override
  54. public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
  55. System.out.println(TAG + "事件:" + event.getType());
  56. System.out.println(TAG + "数据:" + event.getData().getPath() + "," + new String(event.getData().getData()));
  57. List<ChildData> childData = event.getInitialData();
  58. if (childData != null) {
  59. for (ChildData data : childData) {
  60. System.out.println(TAG + "子路径:" + data.getPath());
  61. System.out.println(TAG + "子数据:" + data.getData().toString());
  62. }
  63. }
  64. }
  65. };
  66. pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
  67. //创建一个子节点。
  68. String tempPath = path + "/test_01";
  69. if (client.checkExists().forPath(tempPath) == null) {
  70. System.out.println(TAG + "不存在:" + tempPath);
  71. client.create().creatingParentsIfNeeded().forPath(tempPath, "hello,world!".getBytes("UTF-8"));
  72. TimeUnit.SECONDS.sleep(5);
  73. } else {
  74. System.out.println(TAG + "存在:" + tempPath);
  75. }
  76. update(client, tempPath);
  77. TimeUnit.SECONDS.sleep(5);
  78. delete(client, tempPath);
  79. TimeUnit.SECONDS.sleep(5);
  80. pathChildrenCache.close();
  81. client.close();
  82. synchronized (Main.class) {
  83. Main.class.wait();
  84. }
  85. }
  86. //更新操作。
  87. private void update(CuratorFramework client, String path) {
  88. //更新数据。
  89. try {
  90. client.setData()
  91. //.withVersion(0)
  92. .forPath(path, "zhang phil".getBytes("UTF-8"));
  93. } catch (Exception e) {
  94. e.printStackTrace();
  95. }
  96. }
  97. //删除节点。
  98. private void delete(CuratorFramework client, String path) {
  99. try {
  100. client.delete().deletingChildrenIfNeeded().forPath(path);
  101. } catch (Exception e) {
  102. e.printStackTrace();
  103. }
  104. }
  105. }

输出:

  1. 调试# 已创建:/test_data/my_path
  2. 调试# 不存在:/test_data/my_path/test_01
  3. 调试# 事件:CHILD_ADDED
  4. 调试# 数据:/test_data/my_path/test_01,hello,world!
  5. 调试# 事件:CHILD_UPDATED
  6. 调试# 数据:/test_data/my_path/test_01,zhang phil
  7. 调试# 事件:CHILD_REMOVED
  8. 调试# 数据:/test_data/my_path/test_01,zhang phil

发表评论

表情:
评论列表 (有 0 条评论,132人围观)

还没有评论,来说两句吧...

相关阅读