Zookeeper遇到的BUG
Zookeeper遇到的BUG-不要在回调函数中阻塞线程
在zookeeper api实现配置中心时,遇到watch失效问题,watch到父级节点下发生变更时,再次getChildren(),最终因其没有调用原先我们预料到的回调函数,导致问题出现。当然很少有人会使用原生的zookeeper api,使用zookeeper客户端直接封装自然可以解决问题。
最终解决方案watch回调函数再次去getData时开一个线程去做。
整体源码:
public class ConfigServer implements AsyncCallback.StatCallback, Watcher, AsyncCallback.ChildrenCallback,
AsyncCallback.Create2Callback {
private String path;//path是配置数据节点的父级目录,表示当前集群需要从zk集群的哪个目录下取配置信息
private ZooKeeper zk;
private String name;
private byte[] data;
private CountDownLatch publishCountDownLatch;
private CountDownLatch getDateCountDownLatch;
private CountDownLatch count;
private HashMap<String, byte[]> res;
public ConfigServer(String path, ZooKeeper zk) {
this.path = path;
this.zk = zk;
res=new HashMap<>();
}
//zk集群是二进制安全的,实际上内部存储的是字节流数据
public void publish(String name,byte[] data) throws InterruptedException {
publishCountDownLatch=new CountDownLatch(1);
this.name=name;
this.data=data;
String realPath=path+"/"+name;
zk.create(realPath,data, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,this,"kuailexingqiu");
publishCountDownLatch.await();
}
public void delete(String name) throws InterruptedException, KeeperException {
Stat stat = zk.exists(path + "/" + name, false);
if(stat!=null){
zk.delete(path+"/"+name,stat.getVersion()+1);
}
}
public HashMap<String, byte[]> getData() throws InterruptedException {
//把path父级目录下所有内容查出来
System.out.println("--------------------"+Thread.currentThread().getName()+"-----------------------");
getDateCountDownLatch=new CountDownLatch(1);
res=new HashMap<>();
new Thread(()->{
zk.getChildren(path,this,this,"kuailexingqiu");
}).start();
getDateCountDownLatch.await();
System.out.println("------------------进来了-----------------");
//其实在这步要将map中的数据写入配置文件中,在主动触发配置文件自动刷新
if(res!=null){
Set<String> keySet = res.keySet();
for (String s : keySet) {
System.out.println("----------------------------------------");
System.out.println(s+"===="+new String(res.get(s)));
System.out.println("----------------------------------------");
}
return res;
}
return null;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if(stat==null){
zk.setData(path,data,0,this,"kuailexingqiu");
}else{
publishCountDownLatch.countDown();
}
}
@Override
public void process(WatchedEvent event) {
Event.EventType type = event.getType();
System.out.println("--------------------"+Thread.currentThread().getName()+"-----------------------");
switch (type) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
try {
getData();
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
// new Thread(()->{
// try {
try {
getData();
} catch (InterruptedException e) {
e.printStackTrace();
}
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }).start();
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
case PersistentWatchRemoved:
break;
}
}
@Override
public void processResult(int rc, String path, Object ctx, List<String> children) {
System.out.println("--------------------"+Thread.currentThread().getName()+"-----------------------");
System.out.println("------------------哈哈哈哈------------------------- ");
if(children.isEmpty()){
getDateCountDownLatch.countDown();
return;
}
// count=new CountDownLatch(children.size());
for (String child : children) {
try {
byte[] bytes = zk.getData(path + "/" + child, false, null);
res.put(child,bytes);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//上面的getData是异步的很可能发出children.len个请求后,就直接执行后面代码,countDown导致程序返回结果
// try {
// count.await();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
getDateCountDownLatch.countDown();
}
@Override
public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
if(stat==null){
throw new RuntimeException("创建新的配置项失败!");
}else{
publishCountDownLatch.countDown();
}
}
}
@Override
public void process(WatchedEvent event) {
Event.EventType type = event.getType();
System.out.println(Thread.currentThread().getName());
switch (type) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
try {
getData();
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
new Thread(()->{
try {
getData();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
case PersistentWatchRemoved:
break;
}
}
问题原因:zookeeper在响应式编程下,会定义回调方法,而客户端是通过main-Event线程来执行回调方法的,且默认情况下只使用main-Thread来进行回调。这也就意味着如果使用CountDownLatch将main-Event阻塞住,且这个阻塞必须要main-Event线程执行完回调方法后使计时器count==0时才会取消,这就会导致main-Event死锁。
而且如果有多个回调方法同时需要执行,那么会放入队列中,逐个执行。但也只会是单线程执行。
所以以后利用zk进行异步回调时,记住一定不要用countDownLatch阻塞住main-Event线程。
使用zk实现配置中心最终代码
public class ConfigServer implements AsyncCallback.StatCallback, Watcher, AsyncCallback.ChildrenCallback,
AsyncCallback.Create2Callback {
private String path;//path是配置数据节点的父级目录,表示当前集群需要从zk集群的哪个目录下取配置信息
private ZooKeeper zk;
private String name;
private byte[] data;
private CountDownLatch publishCountDownLatch;
private CountDownLatch getDateCountDownLatch;
private CountDownLatch count;
private HashMap<String, byte[]> res;
public ConfigServer(String path, ZooKeeper zk) {
this.path = path;
this.zk = zk;
res=new HashMap<>();
}
//zk集群是二进制安全的,实际上内部存储的是字节流数据
public void publish(String name,byte[] data) throws InterruptedException {
publishCountDownLatch=new CountDownLatch(1);
this.name=name;
this.data=data;
String realPath=path+"/"+name;
zk.create(realPath,data, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,this,"kuailexingqiu");
publishCountDownLatch.await();
}
public void delete(String name) throws InterruptedException, KeeperException {
Stat stat = zk.exists(path + "/" + name, false);
if(stat!=null){
zk.delete(path+"/"+name,stat.getVersion()+1);
}
}
public HashMap<String, byte[]> getData() throws InterruptedException {
//把path父级目录下所有内容查出来
System.out.println("--------------------"+Thread.currentThread().getName()+"-----------------------");
res=new HashMap<>();
getDateCountDownLatch=new CountDownLatch(1);
zk.getChildren(path,this,this,"kuailexingqiu");
getDateCountDownLatch.await();
System.out.println("------------------进来了-----------------");
//其实在这步要将map中的数据写入配置文件中,在主动触发配置文件自动刷新
if(res!=null){
Set<String> keySet = res.keySet();
for (String s : keySet) {
System.out.println("----------------------------------------");
System.out.println(s+"===="+new String(res.get(s)));
System.out.println("----------------------------------------");
}
return res;
}
return null;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if(stat==null){
zk.setData(path,data,0,this,"kuailexingqiu");
}else{
publishCountDownLatch.countDown();
}
}
@Override
public void process(WatchedEvent event) {
Event.EventType type = event.getType();
System.out.println("--------------------"+Thread.currentThread().getName()+"-----------------------");
switch (type) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
try {
getData();
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
new Thread(()->{
// try {
try {
getData();
} catch (InterruptedException e) {
e.printStackTrace();
}
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
}).start();
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
case PersistentWatchRemoved:
break;
}
}
@Override
public void processResult(int rc, String path, Object ctx, List<String> children) {
System.out.println("--------------------"+Thread.currentThread().getName()+"-----------------------");
System.out.println("------------------哈哈哈哈------------------------- ");
if(children.isEmpty()){
getDateCountDownLatch.countDown();
return;
}
// count=new CountDownLatch(children.size());
for (String child : children) {
try {
byte[] bytes = zk.getData(path + "/" + child, false, null);
res.put(child,bytes);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//上面的getData是异步的很可能发出children.len个请求后,就直接执行后面代码,countDown导致程序返回结果
// try {
// count.await();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
getDateCountDownLatch.countDown();
}
@Override
public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
if(stat==null){
throw new RuntimeException("创建新的配置项失败!");
}else{
publishCountDownLatch.countDown();
}
}
}
还没有评论,来说两句吧...