文章目录
  1. 1.应用开发
    1. 1.1 主要API
    2. 1.2 Zookeeper 开发组件
      1. 1.2.1 pom.xml
      2. 1.2.2 ZookeeperClient.java
      3. 1.2.3 运行结果
    3. 1.3 zkClient 开发组件
      1. 1.3.1 pom.xml
      2. 1.3.2 ZookeeperClient.java
      3. 1.3.3 运行结果
  2. 2.ZooKeeper 在 dubbo 中的应用
    1. 2.1 ServiceProvider.java
    2. 2.2 ServiceConsumer.java

上篇文章 ZooKeeper 原理与服务器集群部署 完成了 ZooKeeper 服务器集群的部署,本文以官方 API 和 zkClient 两种方式,演示了 ZooKeeper 数据的修改和状态监视。并以代码模拟了 ZooKeeper 在 Dubbo 中的作用。

作者:王克锋
出处:https://kefeng.wang/2017/11/10/zookeeper-development/
版权:自由转载-非商用-非衍生-保持署名,转载请标明作者和出处。

1.应用开发

API文档: https://zookeeper.apache.org/doc/current/api/index.html
Java示例: https://zookeeper.apache.org/doc/current/javaExample.html
程序员指南: https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html
通常,ZooKeeper应用程序分为两个单元,一个维护连接,另一个监视数据。

1.1 主要API

  • create: 创建结点;
  • delete: 删除结点;
  • exists: 判断结点是否存在;
  • get data: 读取结点数据;
  • set data: 写入结点数据;
  • get children: 获取结点的子结点;
  • sync: 数据同步;

1.2 Zookeeper 开发组件

Document: http://zookeeper.apache.org/doc/r3.4.11/
ZooKeeper 3.4.11 API: https://zookeeper.apache.org/doc/r3.4.11/api/index.html

是 zookeeper 自带的组件,繁琐且不可靠:

  • 会话超时异常时,重新连接繁琐;
  • watcher 是一次性的,需要额外编码把一次性订阅改为持久订阅;
  • 节点数据是二进制,对象数据都需要转换为二进制保存。

1.2.1 pom.xml

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.11</version>
</dependency>

1.2.2 ZookeeperClient.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ZookeeperClient {
private static Log logger = LogFactory.getLog(ZookeeperClient.class);

public static void main(String args[]) throws IOException, KeeperException, InterruptedException {
// 连接服务器
final String serverUrl = "centos:2181,centos:2182,centos:2183";
ZooKeeper zk = new ZooKeeper(serverUrl, 30000, new Watcher() {
public void process(WatchedEvent event) { // 监控所有被触发的事件(一次性有效,必须再次注册 watcher)
String path = event.getPath(); // 节点路径
Event.EventType type = event.getType(); // 事件类型(节点的创建与删除、数据改变,子节点改变)
logger.info("*** 观察者事件: path=" + path + ", type=" + type);
}
});

// 创建节点
List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; // 访问权限(开放,所有人可访问)
CreateMode createMode = CreateMode.PERSISTENT; // 节点类型(持久节点,客户端连接断开后该节点不会删除,临时节点会被删除)
zk.create("/parentNode", "parentData".getBytes(), acl, createMode);
zk.create("/parentNode/childNode1", "childData1".getBytes(), acl, createMode); // 父节点必须存在
zk.create("/parentNode/childNode2", "childData2".getBytes(), acl, createMode);

// 查询数据
logger.info("子节点列表: " + zk.getChildren("/parentNode", true)); // 获取子节点列表
logger.info("节点存在性: " + (zk.exists("/parentNode/childNode1", true) != null));
logger.info("节点原数据: " + new String(zk.getData("/parentNode/childNode1", true, null))); // 获取节点数据
zk.setData("/parentNode/childNode1", "childData1-X".getBytes(), -1); // 设置节点数据(上限为 1M, 匹配所有版本)
logger.info("节点新数据: " + new String(zk.getData("/parentNode/childNode1", true, null))); // 获取节点数据

// 删除节点(不指定匹配的版本号时,匹配所有版本)
zk.delete("/parentNode/childNode2", -1);
zk.delete("/parentNode/childNode1", -1);
zk.delete("/parentNode", -1);

// 关闭连接
zk.close();
}
}

1.2.3 运行结果

1
2
3
4
5
6
7
8
15:33:59.215  INFO  [ZookeeperClient.java:24] - *** 观察者事件: path=null, type=None
15:33:59.233 INFO [ZookeeperClient.java:36] - 子节点列表: [childNode1, childNode2]
15:33:59.237 INFO [ZookeeperClient.java:37] - 节点存在性: true
15:33:59.240 INFO [ZookeeperClient.java:38] - 节点原数据: childData1
15:33:59.247 INFO [ZookeeperClient.java:24] - *** 观察者事件: path=/parentNode/childNode1, type=NodeDataChanged
15:33:59.247 INFO [ZookeeperClient.java:40] - 节点新数据: childData1-X
15:33:59.252 INFO [ZookeeperClient.java:24] - *** 观察者事件: path=/parentNode, type=NodeChildrenChanged
15:33:59.254 INFO [ZookeeperClient.java:24] - *** 观察者事件: path=/parentNode/childNode1, type=NodeDeleted

1.3 zkClient 开发组件

是对官方 ZooKeeper API 作了封装,避免了官方 API 的缺点。
https://github.com/sgroschupf/zkclient

1.3.1 pom.xml

1
2
3
4
5
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>

1.3.2 ZookeeperClient.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ZookeeperClient {
private static Log logger = LogFactory.getLog(ZookeeperClient.class);

public static void main(String args[]) {
// 连接服务器
final String serverUrl = "centos:2181,centos:2182,centos:2183";
ZkClient zk = new ZkClient(serverUrl);

// 创建节点、并订阅节点事件
zk.createPersistent("/parentNode", "parentData"); // 持久节点
zk.createEphemeral("/parentNode/childNode1", "childData1"); // 临时节点
zk.createEphemeral("/parentNode/childNode2", "childData2"); // 临时节点
zk.subscribeChildChanges("/parentNode", new IZkChildListener() { // 子节点改变事件
public void handleChildChange(String parentPath, List<String> childList) throws Exception {
logger.info("*** 子节点事件: " + parentPath + ", " + childList);
}
});
zk.subscribeDataChanges("/parentNode/childNode1", new IZkDataListener() { // 数据改变(或删除)事件
public void handleDataChange(String path, Object data) throws Exception {
logger.info("*** 数据改变事件: " + path + ", " + data);
}

public void handleDataDeleted(String path) throws Exception {
logger.info("*** 数据删除事件: " + path);
}
});

// 查询数据
logger.info("子节点列表: " + zk.getChildren("/parentNode"));
logger.info("节点存在性: " + zk.exists("/parentNode/childNode1"));
logger.info("节点原数据: " + zk.readData("/parentNode/childNode1"));
zk.writeData("/parentNode/childNode1", "childData1-X"); // 设置节点数据(上限为 1M, 匹配所有版本)
logger.info("节点新数据: " + zk.readData("/parentNode/childNode1"));

// 删除节点(不指定匹配的版本号时,匹配所有版本)
zk.delete("/parentNode/childNode2");
zk.delete("/parentNode/childNode1");
zk.delete("/parentNode");

// 关闭连接
zk.close();
}
}

1.3.3 运行结果

1
2
3
4
5
6
7
16:17:21.904  INFO  [ZookeeperClient.java:42] - 子节点列表: [childNode1, childNode2]
16:17:21.904 INFO [ZookeeperClient.java:43] - 节点存在性: true
16:17:21.909 INFO [ZookeeperClient.java:44] - 节点原数据: childData1
16:17:21.919 INFO [ZookeeperClient.java:46] - 节点新数据: childData1-X
16:17:21.921 INFO [ZookeeperClient.java:33] - *** 数据改变事件: /parentNode/childNode1, childData1-X
16:17:21.931 INFO [ZookeeperClient.java:28] - *** 子节点事件: /parentNode, []
16:17:21.951 INFO [ZookeeperClient.java:37] - *** 数据删除事件: /parentNode/childNode1

2.ZooKeeper 在 dubbo 中的应用

ZooKeeper 作为配置中心,提供服务地址的登记和查询;
consumer 订阅服务(订阅事件,动态感知服务地址变化);
provider 注册服务(创建 zk 节点);

Dubbo 在 ZooKeeper 中的存储结构如下(增加 consumer 分支是为了便于统计服务消费情况):

  • 第1级:根节点(configcenter),持久节点;
  • 第2级: 各个服务名称(serviceName),持久节点;
  • 第3级(扩充): 用于对节点分类(nodeType),区分 provider/consumer,持久节点;
  • 第4级[provider]: 特定 serviceName 的提供者地址列表(provideAddress),非持久节点,provider 下线时该节点会自动删除,并自动通知 consumer;
  • 第4级[consumer]: 特定 serviceName 的消费者地址列表(consumerAddress),非持久节点,consumer 下线时该节点会自动删除。

2.1 ServiceProvider.java

ServiceProvider 向注册中心(ZooKeeper集群)注册服务的关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ServiceProvider {
private static Log logger = LogFactory.getLog(ServiceProvider.class);
private static final String rootPath = "/configcenter";
private static final String servicePath = rootPath + "/serviceName";
private static final String zookeeperList = "centos:2181,centos:2182,centos:2183";

public static void main(String args[]) throws UnknownHostException {
ZkClient zk = new ZkClient(zookeeperList);

// 确保根节点存在
if (!zk.exists(rootPath)) {
zk.createPersistent(rootPath);
}

// 确保服务节点存在
if (!zk.exists(servicePath)) {
zk.createPersistent(servicePath);
}

// 向服务节点注册自身IP
String serviceIp = InetAddress.getLocalHost().getHostAddress().toString();
zk.createEphemeral(servicePath + "/" + serviceIp);

// 关闭连接
zk.close();
}
}

2.2 ServiceConsumer.java

ServiceConsumer 从注册中心(ZooKeeper集群)获取服务提供者地址列表的关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ServiceConsumer {
private static Log logger = LogFactory.getLog(ServiceConsumer.class);
private static final String servicePath = "/configcenter/serviceName";
private static final String zookeeperList = "centos:2181,centos:2182,centos:2183";
private static List<String> providerList = null; // 本地缓存的服务提供者地址列表

public static void main(String args[]) throws UnknownHostException {
ZkClient zk = new ZkClient(zookeeperList);

// 检查服务节点存在性
if (zk.exists(servicePath)) {
providerList = zk.getChildren(servicePath);
} else {
throw new RuntimeException("[" + servicePath + "] not exist.");
}

// 订阅子节点改变事件
zk.subscribeChildChanges(servicePath, new IZkChildListener() {
public void handleChildChange(String parentPath, List<String> childList) throws Exception {
providerList = childList;
}
});

// 关闭连接
zk.close();
}
}

文章目录
  1. 1.应用开发
    1. 1.1 主要API
    2. 1.2 Zookeeper 开发组件
      1. 1.2.1 pom.xml
      2. 1.2.2 ZookeeperClient.java
      3. 1.2.3 运行结果
    3. 1.3 zkClient 开发组件
      1. 1.3.1 pom.xml
      2. 1.3.2 ZookeeperClient.java
      3. 1.3.3 运行结果
  2. 2.ZooKeeper 在 dubbo 中的应用
    1. 2.1 ServiceProvider.java
    2. 2.2 ServiceConsumer.java