通过zookeeper监听器 做到监听数据改变 而去修改更新实时内存数据

素颜马尾好姑娘i 2024-04-17 19:29 94阅读 0赞

1,监听器简单代码,需要测试

  1. package com.coder.flink.core.a_zk;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.CuratorFrameworkFactory;
  4. import org.apache.curator.framework.recipes.cache.ChildData;
  5. import org.apache.curator.framework.recipes.cache.NodeCache;
  6. import org.apache.curator.framework.recipes.cache.NodeCacheListener;
  7. import org.apache.curator.retry.ExponentialBackoffRetry;
  8. import org.apache.curator.utils.EnsurePath;
  9. import java.util.concurrent.CountDownLatch;
  10. public class NodeCacheExample {
  11. private static final String PATH = "/tmp2";
  12. private static CountDownLatch latch = new CountDownLatch(1);
  13. static NodeCache nodeCache;
  14. static CuratorFramework client;
  15. static {
  16. client = CuratorFrameworkFactory.newClient(
  17. "node1:2181", 5000, 5000, new ExponentialBackoffRetry(
  18. 1000, 3));
  19. client.start();
  20. }
  21. public static void initCache() throws Exception {
  22. // client.create().forPath(PATH);
  23. // client.setData().forPath(PATH, "节点的初始值".getBytes());
  24. nodeCache = new NodeCache(client, PATH);
  25. EnsurePath ensurePath = client.newNamespaceAwareEnsurePath(PATH);
  26. ensurePath.ensure(client.getZookeeperClient());
  27. //设置成true,那么nodeCache在第一次启动的时候就会到zookeeper上去获取节点的数据内容,并保存在cache中
  28. nodeCache.start(true);
  29. startCache(nodeCache);
  30. }
  31. private static void startCache(final NodeCache cache) throws Exception {
  32. ChildData data = cache.getCurrentData();
  33. System.out.println("第一次启动获取到的内容:" + new String(data.getData()));
  34. cache.getListenable().addListener(new NodeCacheListener() {
  35. @Override
  36. public void nodeChanged() throws Exception {
  37. System.out.println("监听器获取到的数据: "
  38. + new String(cache.getCurrentData().getData()));
  39. latch.countDown();
  40. }
  41. });
  42. /* Thread.sleep(2000);
  43. if(client.checkExists().forPath(PATH) != null){
  44. System.out.println("node is exist,准备给节点设置新的内容");
  45. client.setData().forPath(PATH, "节点新内容".getBytes());
  46. client.setData().forPath(PATH, "节点新内容2".getBytes());
  47. client.setData().forPath(PATH, "节点新内容3".getBytes());
  48. }*/
  49. Thread.sleep(Long.MAX_VALUE);
  50. }
  51. public static void main(String[] args) throws Exception {
  52. initCache();
  53. latch.await();
  54. }
  55. }
  56. package com.coder.flink.core.a_zk;
  57. import org.apache.curator.framework.CuratorFramework;
  58. import org.apache.curator.framework.CuratorFrameworkFactory;
  59. import org.apache.curator.framework.recipes.cache.ChildData;
  60. import org.apache.curator.framework.recipes.cache.NodeCache;
  61. import org.apache.curator.framework.recipes.cache.NodeCacheListener;
  62. import org.apache.curator.retry.ExponentialBackoffRetry;
  63. import org.apache.curator.utils.EnsurePath;
  64. import java.util.Date;
  65. import java.util.Random;
  66. import java.util.concurrent.CountDownLatch;
  67. public class NodeCacheExample_send {
  68. private static final String PATH = "/tmp2";
  69. private static CountDownLatch latch = new CountDownLatch(1);
  70. static NodeCache nodeCache;
  71. static CuratorFramework client;
  72. static {
  73. client = CuratorFrameworkFactory.newClient(
  74. "node1:2181", 5000, 5000, new ExponentialBackoffRetry(
  75. 1000, 3));
  76. client.start();
  77. }
  78. public static void initCache() throws Exception {
  79. // client.create().forPath(PATH);
  80. // client.setData().forPath(PATH, "节点的初始值".getBytes());
  81. nodeCache = new NodeCache(client, PATH);
  82. EnsurePath ensurePath = client.newNamespaceAwareEnsurePath(PATH);
  83. ensurePath.ensure(client.getZookeeperClient());
  84. //设置成true,那么nodeCache在第一次启动的时候就会到zookeeper上去获取节点的数据内容,并保存在cache中
  85. nodeCache.start(true);
  86. setData(nodeCache);
  87. }
  88. //给节点赋值
  89. private static void setData(final NodeCache cache) throws Exception {
  90. ChildData data = cache.getCurrentData();
  91. System.out.println("第一次启动获取到的内容:" + new String(data.getData()));
  92. Thread.sleep(2000);
  93. if(client.checkExists().forPath(PATH) != null){
  94. for (int i = 0; i < 10 ; i++) {
  95. Random random = new Random();
  96. String json = "{'a':'1','type':'"+random.nextInt(3)+"'}";
  97. System.out.println("========发送的内容======【"+json+"】");
  98. client.setData().forPath(PATH, ("节点新内容:"+json).getBytes());
  99. }
  100. }
  101. }
  102. public static void main(String[] args) throws Exception {
  103. initCache();
  104. latch.await();
  105. }
  106. }

修改zookeeper的节点数据

  1. package com.coder.flink.core.a_zk;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.CuratorFrameworkFactory;
  4. import org.apache.curator.framework.recipes.cache.ChildData;
  5. import org.apache.curator.framework.recipes.cache.NodeCache;
  6. import org.apache.curator.framework.recipes.cache.NodeCacheListener;
  7. import org.apache.curator.retry.ExponentialBackoffRetry;
  8. import org.apache.curator.utils.EnsurePath;
  9. import org.apache.zookeeper.*;
  10. import org.apache.zookeeper.data.Stat;
  11. import org.junit.Before;
  12. import org.junit.Test;
  13. import java.io.IOException;
  14. import java.util.List;
  15. import java.util.Random;
  16. public class zkSendDataExample {
  17. private static final String connectString = "node1:2181";
  18. private static final int sessionTimeout = 6000;
  19. static ZooKeeper zkClient = null;
  20. /*
  21. *connectString -- host:port[,host:port][basePath] 指定的服务器列表,多个host:port之间用英文逗号分隔。
  22. *sessionTimeOut -- 会话超时时间。以毫秒为单位。客户端和服务器端之间的连接通过心跳包进行维系,如果心跳包超过这个指定时间则认为会话超时失效。
  23. *watcher -- 监视器。如果为null表示不需要观察者。
  24. */
  25. public static void init() throws IOException, KeeperException, InterruptedException {
  26. zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
  27. @Override
  28. public void process(WatchedEvent event) {
  29. //事件处理逻辑
  30. System.out.println("正在监听");
  31. }
  32. });
  33. byte[] data = zkClient.getData("/tmp2", new Watcher() {
  34. //监听的具体内容
  35. @Override
  36. public void process(WatchedEvent event) {
  37. System.out.println("监听路径为:" + event.getPath());
  38. System.out.println("监听的类型为:" + event.getType());
  39. System.out.println("数据被2货修改了!!!");
  40. }
  41. }, null);
  42. System.out.println(new String(data));
  43. }
  44. public void testCreateNode() throws KeeperException, InterruptedException {
  45. //进行增删改查
  46. /*
  47. * 第一个参数:节点的路径
  48. * 第二个参数:节点存放的数据
  49. * 第三个参数:节点的权限
  50. * 第四个参数:节点的类型(是暂时的还是持久的)
  51. */
  52. String newNodeString = zkClient.create("/eclipse", "111".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
  53. System.out.println("----------\n");
  54. }
  55. //判断该节点是否存在
  56. public void existNode() throws KeeperException, InterruptedException {
  57. /*
  58. * 参数1:节点路径
  59. * 参数2:是否需要监视器,若不需要填写“false”
  60. */
  61. Stat exists = zkClient.exists("/a0000000004", false);
  62. System.out.println(exists == null ? "not exists" : "exists");
  63. System.out.println("----------\n");
  64. }
  65. //获取子节点
  66. public void getChildren() throws KeeperException, InterruptedException {
  67. /*
  68. * 参数1:节点路径
  69. * 参数2:是否需要监视器,若不需要填写“false”
  70. */
  71. List<String> children = zkClient.getChildren("/a0000000004", true);
  72. for (String child : children) {
  73. System.out.println(child);
  74. }
  75. System.out.println("----------\n");
  76. }
  77. //获取节点数据
  78. public void nodeData() throws KeeperException, InterruptedException {
  79. /*
  80. * 参数1:节点路径
  81. * 参数2:是否需要监视器,若不需要填写“false”
  82. * 参数3:指定数据节点的状态信息:一般填写“null”或者“new stat”
  83. */
  84. byte[] data = zkClient.getData("/a0000000004", false, null);
  85. System.out.println(new String(data));//因为存放的是byte()类型数据
  86. System.out.println("----------\n");
  87. }
  88. //删除节点
  89. public void deleteNode() throws InterruptedException, KeeperException {
  90. /*
  91. * 参数2:version,可以传入-1,表明要基于最新版本进行更新操作
  92. */
  93. zkClient.delete("/eclipse", -1);
  94. System.out.println("----------\n");
  95. }
  96. //修改节点数据
  97. public static void setNodeData(String log) throws KeeperException, InterruptedException {
  98. byte[] data = zkClient.getData("/tmp2", false, null);
  99. System.out.println(new String(data));
  100. zkClient.setData("/tmp2", log.getBytes(), -1);
  101. byte[] data1 = zkClient.getData("/tmp2", false, null);
  102. System.out.println(new String(data1));
  103. }
  104. public static void main(String[] args) {
  105. Random random = new Random();
  106. String json = "{'aaa':'1','type':'"+random.nextInt(3)+"'}";
  107. System.out.println("========发送的内容======【"+json+"】");
  108. try {
  109. init();
  110. setNodeData(json);
  111. } catch (KeeperException e) {
  112. e.printStackTrace();
  113. } catch (InterruptedException e) {
  114. e.printStackTrace();
  115. } catch (IOException e) {
  116. e.printStackTrace();
  117. }
  118. }
  119. }

最简单的案例:

  1. package com.coder.flink.core.a_zk;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.CuratorFrameworkFactory;
  4. import org.apache.curator.framework.recipes.cache.ChildData;
  5. import org.apache.curator.framework.recipes.cache.NodeCache;
  6. import org.apache.curator.framework.recipes.cache.NodeCacheListener;
  7. import org.apache.curator.retry.ExponentialBackoffRetry;
  8. import org.apache.curator.utils.EnsurePath;
  9. import org.apache.zookeeper.*;
  10. import org.apache.zookeeper.data.Stat;
  11. import org.junit.Before;
  12. import org.junit.Test;
  13. import java.io.IOException;
  14. public class WatchDemo {
  15. public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
  16. ZooKeeper zkCli = new ZooKeeper("node1:2181", 3000, new Watcher() {
  17. //监听回调
  18. @Override
  19. public void process(WatchedEvent event) {
  20. }
  21. });
  22. byte[] data = zkCli.getData("/tmp2", new Watcher() {
  23. //监听的具体内容
  24. @Override
  25. public void process(WatchedEvent event) {
  26. System.out.println("监听路径为:" + event.getPath());
  27. System.out.println("监听的类型为:" + event.getType());
  28. System.out.println("数据被2货修改了!!!");
  29. }
  30. }, null);
  31. System.out.println(new String(data));
  32. Thread.sleep(Long.MAX_VALUE);
  33. }
  34. }

2,修改实时内存数据也很简单 等完善了再写

发表评论

表情:
评论列表 (有 0 条评论,94人围观)

还没有评论,来说两句吧...

相关阅读