Zookeeper遇到的BUG

Myth丶恋晨 2024-04-06 12:53 148阅读 0赞

Zookeeper遇到的BUG-不要在回调函数中阻塞线程

在zookeeper api实现配置中心时,遇到watch失效问题,watch到父级节点下发生变更时,再次getChildren(),最终因其没有调用原先我们预料到的回调函数,导致问题出现。当然很少有人会使用原生的zookeeper api,使用zookeeper客户端直接封装自然可以解决问题。

最终解决方案watch回调函数再次去getData时开一个线程去做。

整体源码:

  1. public class ConfigServer implements AsyncCallback.StatCallback, Watcher, AsyncCallback.ChildrenCallback,
  2. AsyncCallback.Create2Callback {
  3. private String path;//path是配置数据节点的父级目录,表示当前集群需要从zk集群的哪个目录下取配置信息
  4. private ZooKeeper zk;
  5. private String name;
  6. private byte[] data;
  7. private CountDownLatch publishCountDownLatch;
  8. private CountDownLatch getDateCountDownLatch;
  9. private CountDownLatch count;
  10. private HashMap<String, byte[]> res;
  11. public ConfigServer(String path, ZooKeeper zk) {
  12. this.path = path;
  13. this.zk = zk;
  14. res=new HashMap<>();
  15. }
  16. //zk集群是二进制安全的,实际上内部存储的是字节流数据
  17. public void publish(String name,byte[] data) throws InterruptedException {
  18. publishCountDownLatch=new CountDownLatch(1);
  19. this.name=name;
  20. this.data=data;
  21. String realPath=path+"/"+name;
  22. zk.create(realPath,data, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,this,"kuailexingqiu");
  23. publishCountDownLatch.await();
  24. }
  25. public void delete(String name) throws InterruptedException, KeeperException {
  26. Stat stat = zk.exists(path + "/" + name, false);
  27. if(stat!=null){
  28. zk.delete(path+"/"+name,stat.getVersion()+1);
  29. }
  30. }
  31. public HashMap<String, byte[]> getData() throws InterruptedException {
  32. //把path父级目录下所有内容查出来
  33. System.out.println("--------------------"+Thread.currentThread().getName()+"-----------------------");
  34. getDateCountDownLatch=new CountDownLatch(1);
  35. res=new HashMap<>();
  36. new Thread(()->{
  37. zk.getChildren(path,this,this,"kuailexingqiu");
  38. }).start();
  39. getDateCountDownLatch.await();
  40. System.out.println("------------------进来了-----------------");
  41. //其实在这步要将map中的数据写入配置文件中,在主动触发配置文件自动刷新
  42. if(res!=null){
  43. Set<String> keySet = res.keySet();
  44. for (String s : keySet) {
  45. System.out.println("----------------------------------------");
  46. System.out.println(s+"===="+new String(res.get(s)));
  47. System.out.println("----------------------------------------");
  48. }
  49. return res;
  50. }
  51. return null;
  52. }
  53. public String getPath() {
  54. return path;
  55. }
  56. public void setPath(String path) {
  57. this.path = path;
  58. }
  59. @Override
  60. public void processResult(int rc, String path, Object ctx, Stat stat) {
  61. if(stat==null){
  62. zk.setData(path,data,0,this,"kuailexingqiu");
  63. }else{
  64. publishCountDownLatch.countDown();
  65. }
  66. }
  67. @Override
  68. public void process(WatchedEvent event) {
  69. Event.EventType type = event.getType();
  70. System.out.println("--------------------"+Thread.currentThread().getName()+"-----------------------");
  71. switch (type) {
  72. case None:
  73. break;
  74. case NodeCreated:
  75. break;
  76. case NodeDeleted:
  77. try {
  78. getData();
  79. } catch (InterruptedException e) {
  80. e.printStackTrace();
  81. }
  82. break;
  83. case NodeDataChanged:
  84. break;
  85. case NodeChildrenChanged:
  86. // new Thread(()->{
  87. // try {
  88. try {
  89. getData();
  90. } catch (InterruptedException e) {
  91. e.printStackTrace();
  92. }
  93. // } catch (InterruptedException e) {
  94. // e.printStackTrace();
  95. // }
  96. // }).start();
  97. break;
  98. case DataWatchRemoved:
  99. break;
  100. case ChildWatchRemoved:
  101. break;
  102. case PersistentWatchRemoved:
  103. break;
  104. }
  105. }
  106. @Override
  107. public void processResult(int rc, String path, Object ctx, List<String> children) {
  108. System.out.println("--------------------"+Thread.currentThread().getName()+"-----------------------");
  109. System.out.println("------------------哈哈哈哈------------------------- ");
  110. if(children.isEmpty()){
  111. getDateCountDownLatch.countDown();
  112. return;
  113. }
  114. // count=new CountDownLatch(children.size());
  115. for (String child : children) {
  116. try {
  117. byte[] bytes = zk.getData(path + "/" + child, false, null);
  118. res.put(child,bytes);
  119. } catch (KeeperException e) {
  120. e.printStackTrace();
  121. } catch (InterruptedException e) {
  122. e.printStackTrace();
  123. }
  124. }
  125. //上面的getData是异步的很可能发出children.len个请求后,就直接执行后面代码,countDown导致程序返回结果
  126. // try {
  127. // count.await();
  128. // } catch (InterruptedException e) {
  129. // e.printStackTrace();
  130. // }
  131. getDateCountDownLatch.countDown();
  132. }
  133. @Override
  134. public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
  135. if(stat==null){
  136. throw new RuntimeException("创建新的配置项失败!");
  137. }else{
  138. publishCountDownLatch.countDown();
  139. }
  140. }
  141. }
  142. @Override
  143. public void process(WatchedEvent event) {
  144. Event.EventType type = event.getType();
  145. System.out.println(Thread.currentThread().getName());
  146. switch (type) {
  147. case None:
  148. break;
  149. case NodeCreated:
  150. break;
  151. case NodeDeleted:
  152. try {
  153. getData();
  154. } catch (InterruptedException e) {
  155. e.printStackTrace();
  156. }
  157. break;
  158. case NodeDataChanged:
  159. break;
  160. case NodeChildrenChanged:
  161. new Thread(()->{
  162. try {
  163. getData();
  164. } catch (InterruptedException e) {
  165. e.printStackTrace();
  166. }
  167. }).start();
  168. break;
  169. case DataWatchRemoved:
  170. break;
  171. case ChildWatchRemoved:
  172. break;
  173. case PersistentWatchRemoved:
  174. break;
  175. }
  176. }

问题原因:zookeeper在响应式编程下,会定义回调方法,而客户端是通过main-Event线程来执行回调方法的,且默认情况下只使用main-Thread来进行回调。这也就意味着如果使用CountDownLatch将main-Event阻塞住,且这个阻塞必须要main-Event线程执行完回调方法后使计时器count==0时才会取消,这就会导致main-Event死锁。

而且如果有多个回调方法同时需要执行,那么会放入队列中,逐个执行。但也只会是单线程执行。

所以以后利用zk进行异步回调时,记住一定不要用countDownLatch阻塞住main-Event线程。

使用zk实现配置中心最终代码

  1. public class ConfigServer implements AsyncCallback.StatCallback, Watcher, AsyncCallback.ChildrenCallback,
  2. AsyncCallback.Create2Callback {
  3. private String path;//path是配置数据节点的父级目录,表示当前集群需要从zk集群的哪个目录下取配置信息
  4. private ZooKeeper zk;
  5. private String name;
  6. private byte[] data;
  7. private CountDownLatch publishCountDownLatch;
  8. private CountDownLatch getDateCountDownLatch;
  9. private CountDownLatch count;
  10. private HashMap<String, byte[]> res;
  11. public ConfigServer(String path, ZooKeeper zk) {
  12. this.path = path;
  13. this.zk = zk;
  14. res=new HashMap<>();
  15. }
  16. //zk集群是二进制安全的,实际上内部存储的是字节流数据
  17. public void publish(String name,byte[] data) throws InterruptedException {
  18. publishCountDownLatch=new CountDownLatch(1);
  19. this.name=name;
  20. this.data=data;
  21. String realPath=path+"/"+name;
  22. zk.create(realPath,data, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,this,"kuailexingqiu");
  23. publishCountDownLatch.await();
  24. }
  25. public void delete(String name) throws InterruptedException, KeeperException {
  26. Stat stat = zk.exists(path + "/" + name, false);
  27. if(stat!=null){
  28. zk.delete(path+"/"+name,stat.getVersion()+1);
  29. }
  30. }
  31. public HashMap<String, byte[]> getData() throws InterruptedException {
  32. //把path父级目录下所有内容查出来
  33. System.out.println("--------------------"+Thread.currentThread().getName()+"-----------------------");
  34. res=new HashMap<>();
  35. getDateCountDownLatch=new CountDownLatch(1);
  36. zk.getChildren(path,this,this,"kuailexingqiu");
  37. getDateCountDownLatch.await();
  38. System.out.println("------------------进来了-----------------");
  39. //其实在这步要将map中的数据写入配置文件中,在主动触发配置文件自动刷新
  40. if(res!=null){
  41. Set<String> keySet = res.keySet();
  42. for (String s : keySet) {
  43. System.out.println("----------------------------------------");
  44. System.out.println(s+"===="+new String(res.get(s)));
  45. System.out.println("----------------------------------------");
  46. }
  47. return res;
  48. }
  49. return null;
  50. }
  51. public String getPath() {
  52. return path;
  53. }
  54. public void setPath(String path) {
  55. this.path = path;
  56. }
  57. @Override
  58. public void processResult(int rc, String path, Object ctx, Stat stat) {
  59. if(stat==null){
  60. zk.setData(path,data,0,this,"kuailexingqiu");
  61. }else{
  62. publishCountDownLatch.countDown();
  63. }
  64. }
  65. @Override
  66. public void process(WatchedEvent event) {
  67. Event.EventType type = event.getType();
  68. System.out.println("--------------------"+Thread.currentThread().getName()+"-----------------------");
  69. switch (type) {
  70. case None:
  71. break;
  72. case NodeCreated:
  73. break;
  74. case NodeDeleted:
  75. try {
  76. getData();
  77. } catch (InterruptedException e) {
  78. e.printStackTrace();
  79. }
  80. break;
  81. case NodeDataChanged:
  82. break;
  83. case NodeChildrenChanged:
  84. new Thread(()->{
  85. // try {
  86. try {
  87. getData();
  88. } catch (InterruptedException e) {
  89. e.printStackTrace();
  90. }
  91. // } catch (InterruptedException e) {
  92. // e.printStackTrace();
  93. // }
  94. }).start();
  95. break;
  96. case DataWatchRemoved:
  97. break;
  98. case ChildWatchRemoved:
  99. break;
  100. case PersistentWatchRemoved:
  101. break;
  102. }
  103. }
  104. @Override
  105. public void processResult(int rc, String path, Object ctx, List<String> children) {
  106. System.out.println("--------------------"+Thread.currentThread().getName()+"-----------------------");
  107. System.out.println("------------------哈哈哈哈------------------------- ");
  108. if(children.isEmpty()){
  109. getDateCountDownLatch.countDown();
  110. return;
  111. }
  112. // count=new CountDownLatch(children.size());
  113. for (String child : children) {
  114. try {
  115. byte[] bytes = zk.getData(path + "/" + child, false, null);
  116. res.put(child,bytes);
  117. } catch (KeeperException e) {
  118. e.printStackTrace();
  119. } catch (InterruptedException e) {
  120. e.printStackTrace();
  121. }
  122. }
  123. //上面的getData是异步的很可能发出children.len个请求后,就直接执行后面代码,countDown导致程序返回结果
  124. // try {
  125. // count.await();
  126. // } catch (InterruptedException e) {
  127. // e.printStackTrace();
  128. // }
  129. getDateCountDownLatch.countDown();
  130. }
  131. @Override
  132. public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
  133. if(stat==null){
  134. throw new RuntimeException("创建新的配置项失败!");
  135. }else{
  136. publishCountDownLatch.countDown();
  137. }
  138. }
  139. }

发表评论

表情:
评论列表 (有 0 条评论,148人围观)

还没有评论,来说两句吧...

相关阅读

    相关 程序员遇到bug怎么面对?

            这几天项目在做性能测试,做渗透测试的时候,给我提了两个安全方面的bug,之前写的时候没有考虑这个问题,bug提出来以后找了个有经验的人问了问怎么做,然后就开始动

    相关 遇到bug怎么办?

    感觉作为一名程序员,如果用二分之一的时间写代码,那另外二分之一的时间就是在改bug。 作为一名前端,我记录一下自己遇到bug时候的解决情况。让自己日后不要犯下重复的错误。