zookeeper分布式集群中Curator的PathChildrenCache监测节点数据更新添加删除
zookeeper分布式集群中Curator的PathChildrenCache监测节点数据更新添加删除
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class Main {
private final String TAG = "调试# ";
public static void main(String[] args) {
//初始化log4j,zookeeper否则报错。
//org.apache.log4j.BasicConfigurator.configure();
try {
Main m = new Main();
} catch (Exception e) {
e.printStackTrace();
}
}
private String getAddress() {
String ip = "127.0.0.1";
return ip + ":2181," + ip + ":2182," + ip + ":2183";
}
public Main() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(getAddress())
.sessionTimeoutMs(10 * 1000)
.connectionTimeoutMs(20 * 1000)
.retryPolicy(retryPolicy)
.build();
client.start();
String path = "/test_data/my_path";
//检测是否存在该路径。
Stat stat = client.checkExists().forPath(path);
//如果不存在这个路径,stat为null,创建新的节点路径。
if (stat == null) {
String s = client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(path);
System.out.println(TAG + "已创建:" + s);
} else {
System.out.println(TAG + "已存在:" + path);
}
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, path, true);
pathChildrenCache.start();
PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
System.out.println(TAG + "事件:" + event.getType());
System.out.println(TAG + "数据:" + event.getData().getPath() + "," + new String(event.getData().getData()));
List<ChildData> childData = event.getInitialData();
if (childData != null) {
for (ChildData data : childData) {
System.out.println(TAG + "子路径:" + data.getPath());
System.out.println(TAG + "子数据:" + data.getData().toString());
}
}
}
};
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
//创建一个子节点。
String tempPath = path + "/test_01";
if (client.checkExists().forPath(tempPath) == null) {
System.out.println(TAG + "不存在:" + tempPath);
client.create().creatingParentsIfNeeded().forPath(tempPath, "hello,world!".getBytes("UTF-8"));
TimeUnit.SECONDS.sleep(5);
} else {
System.out.println(TAG + "存在:" + tempPath);
}
update(client, tempPath);
TimeUnit.SECONDS.sleep(5);
delete(client, tempPath);
TimeUnit.SECONDS.sleep(5);
pathChildrenCache.close();
client.close();
synchronized (Main.class) {
Main.class.wait();
}
}
//更新操作。
private void update(CuratorFramework client, String path) {
//更新数据。
try {
client.setData()
//.withVersion(0)
.forPath(path, "zhang phil".getBytes("UTF-8"));
} catch (Exception e) {
e.printStackTrace();
}
}
//删除节点。
private void delete(CuratorFramework client, String path) {
try {
client.delete().deletingChildrenIfNeeded().forPath(path);
} catch (Exception e) {
e.printStackTrace();
}
}
}
输出:
调试# 已创建:/test_data/my_path
调试# 不存在:/test_data/my_path/test_01
调试# 事件:CHILD_ADDED
调试# 数据:/test_data/my_path/test_01,hello,world!
调试# 事件:CHILD_UPDATED
调试# 数据:/test_data/my_path/test_01,zhang phil
调试# 事件:CHILD_REMOVED
调试# 数据:/test_data/my_path/test_01,zhang phil
还没有评论,来说两句吧...