Zookeeper:实现“通知协调”的 Demo

小鱼儿 2023-09-23 15:57 148阅读 0赞

应用配置集中到节点上,应用启动时主动获取,并在节点上注册一个 watcher,每次配置更新都会通知到应用。数据发布/订阅(Publish/Subscribe)系统,即所谓的配置中心,顾名思义就是发布者将数据发布到 ZooKeeper 的一个或一系列节点上,供订阅者进行数据订阅,进而达到动态获取数据的目的,实现配置信息的集中式管理和数据的动态更新。

本篇内容包括:Demo 概述、代码实现、测试结果


文章目录

    • 一、Demo 概述
        • 1、关于 zookeeper “通知协调”
        • 2、Demo 设计
        • 3、Demo 前提
    • 二、代码实现
        • 1、引用 Maven 依赖
        • 2、ConnectionWatcher 类创建 Zookeeper 连接
        • 3、ActiveKeyValueStore 类读写 Zookeeper 数据
        • 4、ConfigUpdater 类发布数据信息
        • 5、ConfigWatcher 类订阅数据信息
    • 三、测试结果
        • 1、ConfigUpdater 打印内容
        • 2、ConfigWatcher 打印内容

一、Demo 概述

1、关于 zookeeper “通知协调”

应用配置集中到节点上,应用启动时主动获取,并在节点上注册一个 watcher,每次配置更新都会通知到应用。

数据发布/订阅(Publish/Subscribe)系统,即所谓的配置中心,顾名思义就是发布者将数据发布到 ZooKeeper 的一个或一系列节点上,供订阅者进行数据订阅,进而达到动态获取数据的目的,实现配置信息的集中式管理和数据的动态更新。

2、Demo 设计

采用发布/订阅模式将配置信息发布到 Zookeeper 节点上,供订阅者动态获取数据:

Zookeeper订阅发布Demo

  1. 首先需要启动 Zookeeper 服务,规划集群配置信息存放的节点 /config;
  2. 然后通过 ConfigWatcher 类更新 /config 节点注册监视器 watcher,监控集群配置信息变化;
  3. 最后通过 ConfigUpdater 类不断更新 /config 节点配置信息,从而模拟实现集群配置信息订阅发布效果。

3、Demo 前提

参考:Mac通过Docker安装Zookeeper集群


二、代码实现

1、引用 Maven 依赖

  1. <!-- 选择对应的Zookeeper版本 -->
  2. <dependency>
  3. <groupId>org.apache.zookeeper</groupId>
  4. <artifactId>zookeeper</artifactId>
  5. <version>3.7.0</version>
  6. </dependency>

2、ConnectionWatcher 类创建 Zookeeper 连接

  1. import java.io.IOException;
  2. import java.util.concurrent.CountDownLatch;
  3. import org.apache.zookeeper.WatchedEvent;
  4. import org.apache.zookeeper.Watcher;
  5. import org.apache.zookeeper.ZooKeeper;
  6. public class ConnectionWatcher implements Watcher {
  7. private final CountDownLatch connectedSignal = new CountDownLatch(1);
  8. private static final int SESSION_TIMEOUT = 5000;
  9. protected ZooKeeper zk;
  10. public void connect(String hosts) throws IOException, InterruptedException {
  11. zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
  12. connectedSignal.await();
  13. }
  14. @Override
  15. public void process(WatchedEvent event) {
  16. if (event.getState() == Event.KeeperState.SyncConnected) {
  17. connectedSignal.countDown();
  18. }
  19. }
  20. public void close() throws InterruptedException {
  21. zk.close();
  22. }
  23. }

3、ActiveKeyValueStore 类读写 Zookeeper 数据

  1. import java.nio.charset.Charset;
  2. import java.nio.charset.StandardCharsets;
  3. import java.util.List;
  4. import org.apache.zookeeper.CreateMode;
  5. import org.apache.zookeeper.KeeperException;
  6. import org.apache.zookeeper.Watcher;
  7. import org.apache.zookeeper.ZooDefs;
  8. import org.apache.zookeeper.data.Stat;
  9. public class ActiveKeyValueStore extends ConnectionWatcher {
  10. private static final Charset CHARSET = StandardCharsets.UTF_8;
  11. /**
  12. * 读取节点数据
  13. *
  14. * @param path 节点地址
  15. * @param value 数据值
  16. * @throws InterruptedException 中断异常
  17. * @throws KeeperException ZooKeeper异常
  18. */
  19. public void write(String path, String value) throws InterruptedException, KeeperException {
  20. Stat stat = zk.exists(path, false);
  21. if (stat == null) {
  22. if (value == null) {
  23. zk.create(path, null,
  24. ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  25. } else {
  26. zk.create(path, value.getBytes(CHARSET),
  27. ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  28. }
  29. } else {
  30. if (value == null) {
  31. zk.setData(path, null, -1);
  32. } else {
  33. zk.setData(path, value.getBytes(CHARSET), -1);
  34. }
  35. }
  36. }
  37. /**
  38. * 读取节点数据
  39. *
  40. * @param path 节点地址
  41. * @param watcher watcher
  42. * @return 数据值
  43. * @throws InterruptedException 中断异常
  44. * @throws KeeperException ZooKeeper异常
  45. */
  46. public String read(String path, Watcher watcher) throws InterruptedException, KeeperException {
  47. /* stat */
  48. byte[] data = zk.getData(path, watcher, null);
  49. return new String(data, CHARSET);
  50. }
  51. }

4、ConfigUpdater 类发布数据信息

  1. import java.io.IOException;
  2. import java.util.Random;
  3. import java.util.concurrent.TimeUnit;
  4. import org.apache.zookeeper.KeeperException;
  5. public class ConfigUpdater {
  6. public static final String PATH = "/configuration";
  7. private final ActiveKeyValueStore store;
  8. private final Random random = new Random();
  9. public ConfigUpdater(String hosts) throws IOException, InterruptedException {
  10. //定义一个类
  11. store = new ActiveKeyValueStore();
  12. //连接Zookeeper
  13. store.connect(hosts);
  14. }
  15. public void run() throws InterruptedException, KeeperException {
  16. // noinspection InfiniteLoopStatement
  17. while (true) {
  18. String value = random.nextInt(100) + "";
  19. //向 ZNode 写数据(也可以将xml文件写进去)
  20. store.write(PATH, value);
  21. System.out.printf("Set %s to %s\n", PATH, value);
  22. TimeUnit.SECONDS.sleep(random.nextInt(10));
  23. }
  24. }
  25. public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
  26. String hosts = "localhost:2181";
  27. ConfigUpdater updater = new ConfigUpdater(hosts);
  28. updater.run();
  29. }
  30. }

5、ConfigWatcher 类订阅数据信息

  1. import java.io.IOException;
  2. import org.apache.zookeeper.KeeperException;
  3. import org.apache.zookeeper.WatchedEvent;
  4. import org.apache.zookeeper.Watcher;
  5. public class ConfigWatcher implements Watcher {
  6. private final ActiveKeyValueStore store;
  7. public ConfigWatcher(String hosts) throws InterruptedException, IOException {
  8. store = new ActiveKeyValueStore();
  9. //连接Zookeeper
  10. store.connect(hosts);
  11. }
  12. public void displayConfig() throws InterruptedException, KeeperException {
  13. String value = store.read(ConfigUpdater.PATH, this);
  14. System.out.printf("Read %s as %s\n", ConfigUpdater.PATH, value);
  15. }
  16. @Override
  17. public void process(WatchedEvent event) {
  18. System.out.printf("Process incoming event: %s\n", event.toString());
  19. if (event.getType() == Event.EventType.NodeDataChanged) {
  20. try {
  21. displayConfig();
  22. } catch (InterruptedException e) {
  23. System.err.println("Interrupted. Exiting");
  24. Thread.currentThread().interrupt();
  25. } catch (KeeperException e) {
  26. System.err.printf("KeeperException: %s. Exiting.\n", e);
  27. }
  28. }
  29. }
  30. public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
  31. String hosts = "localhost:2181";
  32. //创建 watcher
  33. ConfigWatcher watcher = new ConfigWatcher(hosts);
  34. //调用 display 方法
  35. watcher.displayConfig();
  36. //然后一直处于监控状态
  37. Thread.sleep(Long.MAX_VALUE);
  38. }
  39. }

三、测试结果

1、ConfigUpdater 打印内容

  1. Set /configuration to 76
  2. Set /configuration to 55
  3. Set /configuration to 13
  4. ...

2、ConfigWatcher 打印内容

  1. Read /configuration as 76
  2. Read /configuration as 55
  3. Read /configuration as 13
  4. ...

通过 ConfigUpdater 发布的信息以及 ConfigWatcher 监控得到的信息可以看出,已经成功模拟实现集群配置信息的订阅发布

发表评论

表情:
评论列表 (有 0 条评论,148人围观)

还没有评论,来说两句吧...

相关阅读

    相关 分布式协调服务Zookeeper

    分布式系统介绍 分布式系统的定义 《分布式系统原理和范型》一书中定义:分布式系统是若干独立计算机的集合,这些计算机对于用户来说就像是单个相关系统。 从进程角度看,两