zookeeper的安装与JavaAPI的使用
这一篇主要是介绍 zookeeper 开发环境的搭建以及客户端Java API的使用
1. zookeeper开发环境搭建
(1)首先要去官网下载安装包, 下面给出传送门:
http://mirror.bit.edu.cn/apache/zookeeper/stable/
下载之后解压,这里我是解压在 /opt/目录下:
$tar -zxvf zookeeper-3.4.9.tar,gz
(2)解压之后就是要配置环境变量:
我是子啊Mac 下,所以是 ~/.bash_profile 文件中配置, 如果是Linux下的话,应该是在~/.bashrc 下配置
export ZOOKEEPER_HOME=/opt/zookeeper-3.4.9
export PATH=$PATH:$ZOOKEEPER_HOME/bin
(3)配置了环境变量之后,就可以设置 zookeeper的配置文件了,默认是 zookeeper-3.4.9/conf/zoo.cfg 配置文件在zookeeper服务端在启动时被加载, 但是conf目录下并没有这个文件,所以我们直接把conf目录下的 zoo_sample.cfg 复制成zoo.cfg 就OK,配置内容直接用默认的。
(4) 启动服务端:运行一下命令:
$zkServer.sh start
这时服务端就启动了,通过以下命令可以查看服务端是否启动成功:
¥echo ruok | nc localhost 2181
具体执行过程可以看下图:
2. Java API的使用
对于zookeeper 的客户端连接, zookeeper 提供了一套 Java API来进行连接调用。
具体的详解这里就不给出了,直接给出具体的代码:
客户端连接回调的观察者ConnectionWatcher.java
package com.zookeeper.lyt;
import org.apache.zookeeper.*;
import java.util.concurrent.CountDownLatch;
/** * Created by louyuting on 2017/3/18. * 在 Zookeeper 中新建组 : /zoo */
public class ConnectionWatcher implements Watcher{
private static final int SESSION_TIMEOUT = 5000;
protected ZooKeeper zk;
private CountDownLatch connectedSignal = new CountDownLatch(1);
public void connect(String hosts) throws Exception{
//传入: 主机地址; 会话超时参数; Watcher对象,用于接收来自zookeeper的回调,获得各种事件的通知.
//内部会创建一个线程连接到服务器;
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
//等待连接成功的响应.
connectedSignal.await();
}
// override
// 当客户端与服务端建立成功之后, watcher 的 process()方法会被调用. 参数是一个用于表示该连接的事件.
public void process(WatchedEvent watchedEvent) {
//返回响应, 表示连接已经成功
if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
connectedSignal.countDown();
}
}
/** * 创建一个新的 Zookeeper 的 znode, * @param groupName * @throws Exception */
public void create(String groupName) throws Exception{
String path = "/" + groupName;
String createdPath = zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("created " + createdPath);
}
public void close() throws Exception{
zk.close();
}
}
创建组 CreateGroup.java
package com.zookeeper.lyt;
import org.apache.zookeeper.*;
import java.util.concurrent.CountDownLatch;
/** * Created by louyuting on 2017/3/18. * 在 Zookeeper 中新建组 : /zoo */
public class CreateGroup implements Watcher{
private static final int SESSION_TIMEOUT = 5000;
private ZooKeeper zk;
private CountDownLatch connectedSignal = new CountDownLatch(1);
public void connect(String hosts) throws Exception{
//传入: 主机地址; 会话超时参数; Watcher对象,用于接收来自zookeeper的回调,获得各种事件的通知.
//内部会创建一个线程连接到服务器;
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
//等待连接成功的响应.
connectedSignal.await();
}
// override
// 当客户端与服务端建立成功之后, watcher 的 process()方法会被调用. 参数是一个用于表示该连接的事件.
public void process(WatchedEvent watchedEvent) {
//返回响应, 表示连接已经成功
if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
connectedSignal.countDown();
}
}
/** * 创建一个新的 Zookeeper 的 znode, * @param groupName * @throws Exception */
public void create(String groupName) throws Exception{
String path = "/" + groupName;
String createdPath = zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("created " + createdPath);
}
public void close() throws Exception{
zk.close();
}
// main
public static void main(String[] args) throws Exception {
CreateGroup createGroup = new CreateGroup();
createGroup.connect("ip:port");
createGroup.create("zoo");
createGroup.close();
}
}
加入组 JoinGroup.java
package com.zookeeper.lyt;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import java.util.concurrent.TimeUnit;
/** * Created by louyuting on 2017/3/18. * 加入组 */
public class JoinGroup extends ConnectionWatcher {
public void join(String groupName, String menberName) throws Exception{
String path = "/" + groupName + "/" + menberName;
String createdPath = zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("created " + createdPath);
}
public static void main(String[] args) throws Exception {
JoinGroup joinGroup = new JoinGroup();
joinGroup.connect("ip:port");
joinGroup.join("zoo", "cow");
TimeUnit.MICROSECONDS.sleep(Long.MAX_VALUE);
}
}
列出组:ListGroup.java
package com.zookeeper.lyt;
import org.apache.zookeeper.KeeperException;
import java.util.List;
/** * Created by louyuting on 2017/3/18. * * 列出组成员 */
public class ListGroup extends ConnectionWatcher {
public void list(String groupName) throws Exception{
String path = "/" + groupName;
try {
List<String> children= zk.getChildren(path, false);
if(children.isEmpty()){
System.out.println( path + " is empty");
System.exit(1);
}else {
for (String child : children){
System.out.println(child);
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
ListGroup listGroup = new ListGroup();
listGroup.connect("ip:port");
listGroup.list("zoo");
listGroup.close();
}
}
删除组 DeleteGroup.java
package com.zookeeper.lyt;
import org.apache.zookeeper.KeeperException;
import java.util.List;
/** * Created by louyuting on 2017/3/18. * * 列出组成员 */
public class DeleteGroup extends ConnectionWatcher {
public void delete(String groupName) throws Exception{
String path = "/" + groupName;
try {
List<String> children= zk.getChildren(path, false);
for (String child : children){
zk.delete(path + "/" + child, -1);
}
zk.delete(path, -1);
} catch (KeeperException.NoNodeException e) {
e.printStackTrace();
System.out.println(groupName + " is not existed");
System.exit(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
DeleteGroup deleteGroup = new DeleteGroup();
deleteGroup.connect("ip:port");
deleteGroup.delete("zoo");
deleteGroup.close();
}
}
还没有评论,来说两句吧...