通过zookeeper监听器 做到监听数据改变 而去修改更新实时内存数据
1,监听器简单代码,需要测试
package com.coder.flink.core.a_zk;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;
import java.util.concurrent.CountDownLatch;
public class NodeCacheExample {
private static final String PATH = "/tmp2";
private static CountDownLatch latch = new CountDownLatch(1);
static NodeCache nodeCache;
static CuratorFramework client;
static {
client = CuratorFrameworkFactory.newClient(
"node1:2181", 5000, 5000, new ExponentialBackoffRetry(
1000, 3));
client.start();
}
public static void initCache() throws Exception {
// client.create().forPath(PATH);
// client.setData().forPath(PATH, "节点的初始值".getBytes());
nodeCache = new NodeCache(client, PATH);
EnsurePath ensurePath = client.newNamespaceAwareEnsurePath(PATH);
ensurePath.ensure(client.getZookeeperClient());
//设置成true,那么nodeCache在第一次启动的时候就会到zookeeper上去获取节点的数据内容,并保存在cache中
nodeCache.start(true);
startCache(nodeCache);
}
private static void startCache(final NodeCache cache) throws Exception {
ChildData data = cache.getCurrentData();
System.out.println("第一次启动获取到的内容:" + new String(data.getData()));
cache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("监听器获取到的数据: "
+ new String(cache.getCurrentData().getData()));
latch.countDown();
}
});
/* Thread.sleep(2000);
if(client.checkExists().forPath(PATH) != null){
System.out.println("node is exist,准备给节点设置新的内容");
client.setData().forPath(PATH, "节点新内容".getBytes());
client.setData().forPath(PATH, "节点新内容2".getBytes());
client.setData().forPath(PATH, "节点新内容3".getBytes());
}*/
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
initCache();
latch.await();
}
}
package com.coder.flink.core.a_zk;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class NodeCacheExample_send {
private static final String PATH = "/tmp2";
private static CountDownLatch latch = new CountDownLatch(1);
static NodeCache nodeCache;
static CuratorFramework client;
static {
client = CuratorFrameworkFactory.newClient(
"node1:2181", 5000, 5000, new ExponentialBackoffRetry(
1000, 3));
client.start();
}
public static void initCache() throws Exception {
// client.create().forPath(PATH);
// client.setData().forPath(PATH, "节点的初始值".getBytes());
nodeCache = new NodeCache(client, PATH);
EnsurePath ensurePath = client.newNamespaceAwareEnsurePath(PATH);
ensurePath.ensure(client.getZookeeperClient());
//设置成true,那么nodeCache在第一次启动的时候就会到zookeeper上去获取节点的数据内容,并保存在cache中
nodeCache.start(true);
setData(nodeCache);
}
//给节点赋值
private static void setData(final NodeCache cache) throws Exception {
ChildData data = cache.getCurrentData();
System.out.println("第一次启动获取到的内容:" + new String(data.getData()));
Thread.sleep(2000);
if(client.checkExists().forPath(PATH) != null){
for (int i = 0; i < 10 ; i++) {
Random random = new Random();
String json = "{'a':'1','type':'"+random.nextInt(3)+"'}";
System.out.println("========发送的内容======【"+json+"】");
client.setData().forPath(PATH, ("节点新内容:"+json).getBytes());
}
}
}
public static void main(String[] args) throws Exception {
initCache();
latch.await();
}
}
修改zookeeper的节点数据
package com.coder.flink.core.a_zk;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import java.util.Random;
public class zkSendDataExample {
private static final String connectString = "node1:2181";
private static final int sessionTimeout = 6000;
static ZooKeeper zkClient = null;
/*
*connectString -- host:port[,host:port][basePath] 指定的服务器列表,多个host:port之间用英文逗号分隔。
*sessionTimeOut -- 会话超时时间。以毫秒为单位。客户端和服务器端之间的连接通过心跳包进行维系,如果心跳包超过这个指定时间则认为会话超时失效。
*watcher -- 监视器。如果为null表示不需要观察者。
*/
public static void init() throws IOException, KeeperException, InterruptedException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
//事件处理逻辑
System.out.println("正在监听");
}
});
byte[] data = zkClient.getData("/tmp2", new Watcher() {
//监听的具体内容
@Override
public void process(WatchedEvent event) {
System.out.println("监听路径为:" + event.getPath());
System.out.println("监听的类型为:" + event.getType());
System.out.println("数据被2货修改了!!!");
}
}, null);
System.out.println(new String(data));
}
public void testCreateNode() throws KeeperException, InterruptedException {
//进行增删改查
/*
* 第一个参数:节点的路径
* 第二个参数:节点存放的数据
* 第三个参数:节点的权限
* 第四个参数:节点的类型(是暂时的还是持久的)
*/
String newNodeString = zkClient.create("/eclipse", "111".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("----------\n");
}
//判断该节点是否存在
public void existNode() throws KeeperException, InterruptedException {
/*
* 参数1:节点路径
* 参数2:是否需要监视器,若不需要填写“false”
*/
Stat exists = zkClient.exists("/a0000000004", false);
System.out.println(exists == null ? "not exists" : "exists");
System.out.println("----------\n");
}
//获取子节点
public void getChildren() throws KeeperException, InterruptedException {
/*
* 参数1:节点路径
* 参数2:是否需要监视器,若不需要填写“false”
*/
List<String> children = zkClient.getChildren("/a0000000004", true);
for (String child : children) {
System.out.println(child);
}
System.out.println("----------\n");
}
//获取节点数据
public void nodeData() throws KeeperException, InterruptedException {
/*
* 参数1:节点路径
* 参数2:是否需要监视器,若不需要填写“false”
* 参数3:指定数据节点的状态信息:一般填写“null”或者“new stat”
*/
byte[] data = zkClient.getData("/a0000000004", false, null);
System.out.println(new String(data));//因为存放的是byte()类型数据
System.out.println("----------\n");
}
//删除节点
public void deleteNode() throws InterruptedException, KeeperException {
/*
* 参数2:version,可以传入-1,表明要基于最新版本进行更新操作
*/
zkClient.delete("/eclipse", -1);
System.out.println("----------\n");
}
//修改节点数据
public static void setNodeData(String log) throws KeeperException, InterruptedException {
byte[] data = zkClient.getData("/tmp2", false, null);
System.out.println(new String(data));
zkClient.setData("/tmp2", log.getBytes(), -1);
byte[] data1 = zkClient.getData("/tmp2", false, null);
System.out.println(new String(data1));
}
public static void main(String[] args) {
Random random = new Random();
String json = "{'aaa':'1','type':'"+random.nextInt(3)+"'}";
System.out.println("========发送的内容======【"+json+"】");
try {
init();
setNodeData(json);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
最简单的案例:
package com.coder.flink.core.a_zk;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
public class WatchDemo {
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
ZooKeeper zkCli = new ZooKeeper("node1:2181", 3000, new Watcher() {
//监听回调
@Override
public void process(WatchedEvent event) {
}
});
byte[] data = zkCli.getData("/tmp2", new Watcher() {
//监听的具体内容
@Override
public void process(WatchedEvent event) {
System.out.println("监听路径为:" + event.getPath());
System.out.println("监听的类型为:" + event.getType());
System.out.println("数据被2货修改了!!!");
}
}, null);
System.out.println(new String(data));
Thread.sleep(Long.MAX_VALUE);
}
}
2,修改实时内存数据也很简单 等完善了再写
还没有评论,来说两句吧...