zookeeper 事件监听

末蓝、 2022-12-10 01:14 273阅读 0赞

zookeeper 事件监听

官网:https://github.com/apache/curator/tree/master/curator-examples/src/main/java

**********************

相关类与接口

CuratorCache

  1. public interface CuratorCache extends Closeable, CuratorCacheAccessor {
  2. static CuratorCache build(CuratorFramework client, String path, CuratorCache.Options... options) {
  3. return builder(client, path).withOptions(options).build();
  4. }
  5. static CuratorCacheBuilder builder(CuratorFramework client, String path) {
  6. return new CuratorCacheBuilderImpl(client, path);
  7. }
  8. static CuratorCacheBridgeBuilder bridgeBuilder(CuratorFramework client, String path) {
  9. return new CuratorCacheBridgeBuilderImpl(client, path);
  10. }
  11. void start();
  12. void close();
  13. Listenable<CuratorCacheListener> listenable();
  14. Optional<ChildData> get(String var1);
  15. int size();
  16. Stream<ChildData> stream();
  17. public static enum Options {
  18. SINGLE_NODE_CACHE,
  19. COMPRESSED_DATA,
  20. DO_NOT_CLEAR_ON_CLOSE;
  21. private Options() {
  22. }
  23. }
  24. }

Listeneable:添加、删除监听接口

  1. public interface Listenable<T> {
  2. void addListener(T var1);
  3. void addListener(T var1, Executor var2); //添加监听接口
  4. void removeListener(T var1); //删除监听接口
  5. }

CuratorCacheListener:监听接口

  1. @FunctionalInterface
  2. public interface CuratorCacheListener {
  3. void event(CuratorCacheListener.Type var1, ChildData var2, ChildData var3);
  4. default void initialized() {
  5. }
  6. static CuratorCacheListenerBuilder builder() {
  7. return new CuratorCacheListenerBuilderImpl();
  8. }
  9. public static enum Type {
  10. NODE_CREATED,
  11. NODE_CHANGED,
  12. NODE_DELETED;
  13. private Type() {
  14. }
  15. }
  16. }

CuratorCacheListenerBuilder:创建CuratorCacheListener

  1. public interface CuratorCacheListenerBuilder {
  2. CuratorCacheListenerBuilder forAll(CuratorCacheListener var1); //所有操作件
  3. CuratorCacheListenerBuilder forCreates(Consumer<ChildData> var1); //创建操作
  4. CuratorCacheListenerBuilder forChanges(CuratorCacheListenerBuilder.ChangeListener var1); //修改操作
  5. CuratorCacheListenerBuilder forCreatesAndChanges(CuratorCacheListenerBuilder.ChangeListener var1); //创建修改操作
  6. CuratorCacheListenerBuilder forDeletes(Consumer<ChildData> var1); //删除操作
  7. CuratorCacheListenerBuilder forInitialized(Runnable var1); //初始化节点
  8. CuratorCacheListenerBuilder forPathChildrenCache(String var1, CuratorFramework var2, PathChildrenCacheListener var3); //子节点
  9. CuratorCacheListenerBuilder forTreeCache(CuratorFramework var1, TreeCacheListener var2); //监控整个树节点状态
  10. CuratorCacheListenerBuilder forNodeCache(NodeCacheListener var1); //监控当前节点
  11. CuratorCacheListenerBuilder afterInitialized();
  12. CuratorCacheListener build();
  13. @FunctionalInterface
  14. public interface ChangeListener {
  15. void event(ChildData var1, ChildData var2);
  16. }
  17. }

NodeCacheListener:监听当前节点

  1. public interface NodeCacheListener {
  2. void nodeChanged() throws Exception;
  3. }

说明:创建当前节点、更新数据会触发通知删除当前节点不会触发通知添加、修改、删除子节点会触发通知

PathChildrenCacheListener:监听当前节点子节点

  1. public interface PathChildrenCacheListener {
  2. void childEvent(CuratorFramework var1, PathChildrenCacheEvent var2) throws Exception;
  3. }

PathChildrenCacheEvent:子节点创建、数据更新、删除

  1. public class PathChildrenCacheEvent {
  2. private final PathChildrenCacheEvent.Type type;
  3. private final ChildData data;
  4. public PathChildrenCacheEvent(PathChildrenCacheEvent.Type type, ChildData data) {
  5. this.type = type;
  6. this.data = data;
  7. }
  8. public PathChildrenCacheEvent.Type getType() {
  9. return this.type;
  10. }
  11. public ChildData getData() {
  12. return this.data;
  13. }
  14. public List<ChildData> getInitialData() {
  15. return null;
  16. }
  17. public String toString() {
  18. return "PathChildrenCacheEvent{type=" + this.type + ", data=" + this.data + '}';
  19. }
  20. public static enum Type {
  21. CHILD_ADDED,
  22. CHILD_UPDATED,
  23. CHILD_REMOVED,
  24. CONNECTION_SUSPENDED,
  25. CONNECTION_RECONNECTED,
  26. CONNECTION_LOST,
  27. INITIALIZED;
  28. private Type() {
  29. }
  30. }
  31. }

TreeCacheListener:监听整个目录

  1. public interface TreeCacheListener {
  2. void childEvent(CuratorFramework var1, TreeCacheEvent var2) throws Exception;
  3. }

TreeCacheEvent:节点添加、更新、删除触发

  1. public class TreeCacheEvent {
  2. private final TreeCacheEvent.Type type;
  3. private final ChildData data;
  4. private final ChildData oldData;
  5. public TreeCacheEvent(TreeCacheEvent.Type type, ChildData data) {
  6. this(type, data, (ChildData)null);
  7. }
  8. public TreeCacheEvent(TreeCacheEvent.Type type, ChildData data, ChildData oldData) {
  9. this.type = type;
  10. this.data = data;
  11. this.oldData = oldData;
  12. }
  13. public TreeCacheEvent.Type getType() {
  14. return this.type;
  15. }
  16. public ChildData getData() {
  17. return this.data;
  18. }
  19. public ChildData getOldData() {
  20. return this.oldData;
  21. }
  22. public String toString() {
  23. return TreeCacheEvent.class.getSimpleName() + "{type=" + this.type + ", data=" + this.data + '}';
  24. }
  25. public static enum Type {
  26. NODE_ADDED,
  27. NODE_UPDATED,
  28. NODE_REMOVED,
  29. CONNECTION_SUSPENDED,
  30. CONNECTION_RECONNECTED,
  31. CONNECTION_LOST,
  32. INITIALIZED;
  33. private Type() {
  34. }
  35. }
  36. }

**********************

示例

*****************

config 层

WebConfig

  1. @Configuration
  2. public class WebConfig {
  3. @Value("${zookeeper.connect-string}")
  4. private String connectString;
  5. @Bean
  6. public CuratorFramework initCuratorFramework(){
  7. CuratorFramework curatorFramework= CuratorFrameworkFactory.builder()
  8. .connectString(connectString)
  9. .sessionTimeoutMs(5000)
  10. .retryPolicy(new ExponentialBackoffRetry(1000,3,3000))
  11. .namespace("test")
  12. .build();
  13. curatorFramework.start();
  14. return curatorFramework;
  15. }
  16. }

*****************

controller 层

HelloController

  1. @RestController
  2. public class HelloController {
  3. @Resource
  4. private CuratorFramework curatorFramework;
  5. private final String path="/cache";
  6. private final String childPath="/cache/child";
  7. private final String path2="/cache2";
  8. private final String childPath2="/cache2/child";
  9. @RequestMapping("/hello")
  10. public void hello(){
  11. CuratorCache cache=CuratorCache.build(curatorFramework,path);
  12. CuratorCacheListener curatorCacheListener=CuratorCacheListener.builder()
  13. .forNodeCache(() -> {
  14. if (cache.get(path).isPresent()){
  15. System.out.println("当前数据节点变更,新数据:"+new String(cache.get(path).get().getData()));
  16. }
  17. }).build();
  18. cache.listenable().addListener(curatorCacheListener);
  19. cache.start();
  20. try {
  21. curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath(path,"瓜田李下".getBytes());
  22. curatorFramework.setData().forPath(path,"海贼王".getBytes());
  23. curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(childPath,"海贼王".getBytes());
  24. curatorFramework.setData().forPath(childPath,"瓜田李下".getBytes());
  25. curatorFramework.delete().deletingChildrenIfNeeded().forPath(childPath);
  26. curatorFramework.delete().deletingChildrenIfNeeded().forPath(path);
  27. }catch (Exception e){
  28. System.out.println(e.getMessage());
  29. }
  30. }
  31. @RequestMapping("/hello2")
  32. public void hello2(){
  33. CuratorCache cache=CuratorCache.build(curatorFramework,path2);
  34. CuratorCacheListener curatorCacheListener=CuratorCacheListener.builder()
  35. .forPathChildrenCache(path2, curatorFramework, (curatorFramework, pathChildrenCacheEvent) -> {
  36. switch (pathChildrenCacheEvent.getType()){
  37. case CHILD_ADDED -> {
  38. System.out.println("添加子节点:"+pathChildrenCacheEvent.getData().getPath());
  39. }
  40. case CHILD_UPDATED -> {
  41. System.out.println("更新子节点:"+pathChildrenCacheEvent.getData().getPath());
  42. }
  43. case CHILD_REMOVED -> {
  44. System.out.println("删除子节点:"+pathChildrenCacheEvent.getData().getPath());
  45. }
  46. }
  47. }).build();
  48. cache.listenable().addListener(curatorCacheListener);
  49. cache.start();
  50. try {
  51. curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath(path2,"瓜田李下".getBytes());
  52. curatorFramework.setData().forPath(path2,"海贼王".getBytes());
  53. curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(childPath2,"海贼王".getBytes());
  54. curatorFramework.setData().forPath(childPath2,"瓜田李下".getBytes());
  55. curatorFramework.delete().deletingChildrenIfNeeded().forPath(childPath2);
  56. curatorFramework.delete().deletingChildrenIfNeeded().forPath(path2);
  57. }catch (Exception e){
  58. System.out.println(e.getMessage());
  59. }
  60. }
  61. }

**********************

使用测试

localhost:8080/hello

  1. 当前数据节点变更,新数据:瓜田李下
  2. 当前数据节点变更,新数据:海贼王
  3. 当前数据节点变更,新数据:海贼王
  4. 当前数据节点变更,新数据:海贼王
  5. 当前数据节点变更,新数据:海贼王

触发通知:创建、修改当前节点,创建、修改、删除子节点

不触发通知:删除当前节点

localhost:8080/hello2

  1. 添加子节点:/cache2/child
  2. 更新子节点:/cache2/child
  3. 删除子节点:/cache2/child

发表评论

表情:
评论列表 (有 0 条评论,273人围观)

还没有评论,来说两句吧...

相关阅读