zookeeper 事件监听
zookeeper 事件监听
官网:https://github.com/apache/curator/tree/master/curator-examples/src/main/java
**********************
相关类与接口
CuratorCache
public interface CuratorCache extends Closeable, CuratorCacheAccessor {
static CuratorCache build(CuratorFramework client, String path, CuratorCache.Options... options) {
return builder(client, path).withOptions(options).build();
}
static CuratorCacheBuilder builder(CuratorFramework client, String path) {
return new CuratorCacheBuilderImpl(client, path);
}
static CuratorCacheBridgeBuilder bridgeBuilder(CuratorFramework client, String path) {
return new CuratorCacheBridgeBuilderImpl(client, path);
}
void start();
void close();
Listenable<CuratorCacheListener> listenable();
Optional<ChildData> get(String var1);
int size();
Stream<ChildData> stream();
public static enum Options {
SINGLE_NODE_CACHE,
COMPRESSED_DATA,
DO_NOT_CLEAR_ON_CLOSE;
private Options() {
}
}
}
Listeneable:添加、删除监听接口
public interface Listenable<T> {
void addListener(T var1);
void addListener(T var1, Executor var2); //添加监听接口
void removeListener(T var1); //删除监听接口
}
CuratorCacheListener:监听接口
@FunctionalInterface
public interface CuratorCacheListener {
void event(CuratorCacheListener.Type var1, ChildData var2, ChildData var3);
default void initialized() {
}
static CuratorCacheListenerBuilder builder() {
return new CuratorCacheListenerBuilderImpl();
}
public static enum Type {
NODE_CREATED,
NODE_CHANGED,
NODE_DELETED;
private Type() {
}
}
}
CuratorCacheListenerBuilder:创建CuratorCacheListener
public interface CuratorCacheListenerBuilder {
CuratorCacheListenerBuilder forAll(CuratorCacheListener var1); //所有操作件
CuratorCacheListenerBuilder forCreates(Consumer<ChildData> var1); //创建操作
CuratorCacheListenerBuilder forChanges(CuratorCacheListenerBuilder.ChangeListener var1); //修改操作
CuratorCacheListenerBuilder forCreatesAndChanges(CuratorCacheListenerBuilder.ChangeListener var1); //创建修改操作
CuratorCacheListenerBuilder forDeletes(Consumer<ChildData> var1); //删除操作
CuratorCacheListenerBuilder forInitialized(Runnable var1); //初始化节点
CuratorCacheListenerBuilder forPathChildrenCache(String var1, CuratorFramework var2, PathChildrenCacheListener var3); //子节点
CuratorCacheListenerBuilder forTreeCache(CuratorFramework var1, TreeCacheListener var2); //监控整个树节点状态
CuratorCacheListenerBuilder forNodeCache(NodeCacheListener var1); //监控当前节点
CuratorCacheListenerBuilder afterInitialized();
CuratorCacheListener build();
@FunctionalInterface
public interface ChangeListener {
void event(ChildData var1, ChildData var2);
}
}
NodeCacheListener:监听当前节点
public interface NodeCacheListener {
void nodeChanged() throws Exception;
}
说明:创建当前节点、更新数据会触发通知,删除当前节点不会触发通知;添加、修改、删除子节点会触发通知
PathChildrenCacheListener:监听当前节点子节点
public interface PathChildrenCacheListener {
void childEvent(CuratorFramework var1, PathChildrenCacheEvent var2) throws Exception;
}
PathChildrenCacheEvent:子节点创建、数据更新、删除
public class PathChildrenCacheEvent {
private final PathChildrenCacheEvent.Type type;
private final ChildData data;
public PathChildrenCacheEvent(PathChildrenCacheEvent.Type type, ChildData data) {
this.type = type;
this.data = data;
}
public PathChildrenCacheEvent.Type getType() {
return this.type;
}
public ChildData getData() {
return this.data;
}
public List<ChildData> getInitialData() {
return null;
}
public String toString() {
return "PathChildrenCacheEvent{type=" + this.type + ", data=" + this.data + '}';
}
public static enum Type {
CHILD_ADDED,
CHILD_UPDATED,
CHILD_REMOVED,
CONNECTION_SUSPENDED,
CONNECTION_RECONNECTED,
CONNECTION_LOST,
INITIALIZED;
private Type() {
}
}
}
TreeCacheListener:监听整个目录
public interface TreeCacheListener {
void childEvent(CuratorFramework var1, TreeCacheEvent var2) throws Exception;
}
TreeCacheEvent:节点添加、更新、删除触发
public class TreeCacheEvent {
private final TreeCacheEvent.Type type;
private final ChildData data;
private final ChildData oldData;
public TreeCacheEvent(TreeCacheEvent.Type type, ChildData data) {
this(type, data, (ChildData)null);
}
public TreeCacheEvent(TreeCacheEvent.Type type, ChildData data, ChildData oldData) {
this.type = type;
this.data = data;
this.oldData = oldData;
}
public TreeCacheEvent.Type getType() {
return this.type;
}
public ChildData getData() {
return this.data;
}
public ChildData getOldData() {
return this.oldData;
}
public String toString() {
return TreeCacheEvent.class.getSimpleName() + "{type=" + this.type + ", data=" + this.data + '}';
}
public static enum Type {
NODE_ADDED,
NODE_UPDATED,
NODE_REMOVED,
CONNECTION_SUSPENDED,
CONNECTION_RECONNECTED,
CONNECTION_LOST,
INITIALIZED;
private Type() {
}
}
}
**********************
示例
*****************
config 层
WebConfig
@Configuration
public class WebConfig {
@Value("${zookeeper.connect-string}")
private String connectString;
@Bean
public CuratorFramework initCuratorFramework(){
CuratorFramework curatorFramework= CuratorFrameworkFactory.builder()
.connectString(connectString)
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000,3,3000))
.namespace("test")
.build();
curatorFramework.start();
return curatorFramework;
}
}
*****************
controller 层
HelloController
@RestController
public class HelloController {
@Resource
private CuratorFramework curatorFramework;
private final String path="/cache";
private final String childPath="/cache/child";
private final String path2="/cache2";
private final String childPath2="/cache2/child";
@RequestMapping("/hello")
public void hello(){
CuratorCache cache=CuratorCache.build(curatorFramework,path);
CuratorCacheListener curatorCacheListener=CuratorCacheListener.builder()
.forNodeCache(() -> {
if (cache.get(path).isPresent()){
System.out.println("当前数据节点变更,新数据:"+new String(cache.get(path).get().getData()));
}
}).build();
cache.listenable().addListener(curatorCacheListener);
cache.start();
try {
curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath(path,"瓜田李下".getBytes());
curatorFramework.setData().forPath(path,"海贼王".getBytes());
curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(childPath,"海贼王".getBytes());
curatorFramework.setData().forPath(childPath,"瓜田李下".getBytes());
curatorFramework.delete().deletingChildrenIfNeeded().forPath(childPath);
curatorFramework.delete().deletingChildrenIfNeeded().forPath(path);
}catch (Exception e){
System.out.println(e.getMessage());
}
}
@RequestMapping("/hello2")
public void hello2(){
CuratorCache cache=CuratorCache.build(curatorFramework,path2);
CuratorCacheListener curatorCacheListener=CuratorCacheListener.builder()
.forPathChildrenCache(path2, curatorFramework, (curatorFramework, pathChildrenCacheEvent) -> {
switch (pathChildrenCacheEvent.getType()){
case CHILD_ADDED -> {
System.out.println("添加子节点:"+pathChildrenCacheEvent.getData().getPath());
}
case CHILD_UPDATED -> {
System.out.println("更新子节点:"+pathChildrenCacheEvent.getData().getPath());
}
case CHILD_REMOVED -> {
System.out.println("删除子节点:"+pathChildrenCacheEvent.getData().getPath());
}
}
}).build();
cache.listenable().addListener(curatorCacheListener);
cache.start();
try {
curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath(path2,"瓜田李下".getBytes());
curatorFramework.setData().forPath(path2,"海贼王".getBytes());
curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(childPath2,"海贼王".getBytes());
curatorFramework.setData().forPath(childPath2,"瓜田李下".getBytes());
curatorFramework.delete().deletingChildrenIfNeeded().forPath(childPath2);
curatorFramework.delete().deletingChildrenIfNeeded().forPath(path2);
}catch (Exception e){
System.out.println(e.getMessage());
}
}
}
**********************
使用测试
localhost:8080/hello
当前数据节点变更,新数据:瓜田李下
当前数据节点变更,新数据:海贼王
当前数据节点变更,新数据:海贼王
当前数据节点变更,新数据:海贼王
当前数据节点变更,新数据:海贼王
触发通知:创建、修改当前节点,创建、修改、删除子节点
不触发通知:删除当前节点
localhost:8080/hello2
添加子节点:/cache2/child
更新子节点:/cache2/child
删除子节点:/cache2/child
还没有评论,来说两句吧...