Zookeeper 的核心价值在于提供高可用、强一致性的分布式协调服务,适用于需要统一配置、状态同步、服务发现、分布式锁、主节点选举等场景,是构建可靠分布式系统的关键基础设施。
基于上述流程,ZooKeeper 可以作为配置中心、注册中心
1. 分布式锁
Zookeeper 可通过创建临时顺序节点实现分布式锁,确保多个进程或服务对共享资源的互斥访问,避免并发冲突。典型应用包括:
- 电商库存扣减,防止超卖;
- 分布式任务调度,确保任务只被一个节点执行;
- 金融交易系统中对关键资源的并发控制。
2. 服务注册与发现
在微服务架构中,服务实例启动时向 Zookeeper 注册自己的地址,客户端通过监听 Zookeeper 获取可用服务列表,实现动态服务发现和负载均衡。例如:
支付系统中多个服务实例注册后,客户端只调用健康实例;
Flink 集群中 JobManager 地址注册与发现
。
3. 配置中心
Zookeeper 可作为集中式配置管理中心,支持配置的动态更新与热加载。应用监听配置节点变化,实时获取最新配置,避免重启服务。适用于:
系统参数动态调整(如线程池大小、限流阈值);
Flink 作业并行度、资源配置热更新
。
4. 主节点选举(Leader Election)
Zookeeper 提供强一致性保障,常用于主从架构中的 Leader 选举。典型应用包括:
Hadoop/HBase 中 NameNode、HMaster 的故障自动切换;
Flink 中 JobManager 的高可用 Leader 选举
。
5. 分布式协调与同步
Zookeeper 可用于多节点之间的状态同步与协调,如:
屏障(Barrier):多个节点需同步到达某一状态后才能继续执行;
计数器:全局共享计数器,用于统计在线用户数、商品热度等;
作业状态同步:Flink 作业执行图、Checkpoint 状态协调
。
ZooKeeper 应用
ZooKeeper 可以用于发布/订阅、负载均衡、命令服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能 。
#5.1. 命名服务
在分布式系统中,通常需要一个全局唯一的名字,如生成全局唯一的订单号等,ZooKeeper 可以通过顺序节点的特性来生成全局唯一 ID,从而可以对分布式系统提供命名服务。
#5.2. 配置管理
利用 ZooKeeper 的观察机制,可以将其作为一个高可用的配置存储器,允许分布式应用的参与者检索和更新配置文件。
#5.3. 分布式锁
可以通过 ZooKeeper 的临时节点和 Watcher 机制来实现分布式排它锁。
举例来说,有一个分布式系统,有三个节点 A、B、C,试图通过 ZooKeeper 获取分布式锁。
(1)访问 /lock (这个目录路径由程序自己决定),创建 带序列号的临时节点(EPHEMERAL) 。
(2)每个节点尝试获取锁时,拿到 /locks节点下的所有子节点(id_0000,id_0001,id_0002),判断自己创建的节点是不是序列号最小的
- 如果序列号是最小的,则成功获取到锁。
- 释放锁:执行完操作后,把创建的节点给删掉。
- 如果不是,则监听比自己要小 1 的节点变化。
(3)释放锁,即删除自己创建的节点。
图中,NodeA 删除自己创建的节点 id_0000,NodeB 监听到变化,发现自己的节点已经是最小节点,即可获取到锁。
#5.4. 集群管理
ZooKeeper 还能解决大多数分布式系统中的问题:
- 如可以通过创建临时节点来建立心跳检测机制。如果分布式系统的某个服务节点宕机了,则其持有的会话会超时,此时该临时节点会被删除,相应的监听事件就会被触发。
- 分布式系统的每个服务节点还可以将自己的节点状态写入临时节点,从而完成状态报告或节点工作进度汇报。
- 通过数据的订阅和发布功能,ZooKeeper 还能对分布式系统进行模块的解耦和任务的调度。
- 通过监听机制,还能对分布式系统的服务节点进行动态上下线,从而实现服务的动态扩容。
#5.5. 选举 Leader 节点
分布式系统一个重要的模式就是主从模式 (Master/Salves),ZooKeeper 可以用于该模式下的 Matser 选举。可以让所有服务节点去竞争性地创建同一个 ZNode,由于 ZooKeeper 不能有路径相同的 ZNode,必然只有一个服务节点能够创建成功,这样该服务节点就可以成为 Master 节点。
16.创建ZooKeeper客户端
在 Eclipse 环境搭建
- 创建一个 Maven 工程
- 为 pom.xml 添加关键依赖
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
</dependencies>
- 需要在项目的
src/main/resources目录下,新建一个文件,命名为log4j.properties,在文件中填入如下内容:
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
创建 ZooKeeper 客户端
public class ZooKeeperTest {
private static String connectString = "127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184";
private static int sessionTimeout = 2000;
private ZooKeeper zkClient = null;
// @Test
@Before
public void init() throws Exception {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
// 收到事件通知后的回调函数(用户的业务逻辑)
public void process(WatchedEvent event) {
System.out.println(event.getType() + "--" + event.getPath());
// 再次启动监听
try {
List<String> children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
17.创建一个节点
// 创建子节点
@Test
public void create() throws Exception {
// 参数 1:要创建的节点的路径; 参数 2:节点数据 ; 参数 3:节点权限 ;参数 4:节点的类型
String nodeCreated = zkClient.create("/root", "root".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(nodeCreated);
}
18.获取子节点并监听节点变化
// 获取子节点
@Test
public void getChildren() throws Exception {
List<String> children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
// 延时阻塞
Thread.sleep(Long.MAX_VALUE);
}
19.判断节点是否存在
// 判断 znode 是否存在
@Test
public void exist() throws Exception {
Stat stat = zkClient.exists("/eclipse", false);
System.out.println(stat == null ? "not exist" : "exist");
}
20.服务器节点动态上下线案例分析
需求
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
需求分析

21.服务器节点动态上下线案例注册代码
先在集群上创建/servers节点
[zk: 127.0.0.1:2182(CONNECTED) 6] create /servers "servers"
Created /servers
服务器端向 Zookeeper 注册代码
public class DistributeServer {
private static String connectString = "127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184";
private static int sessionTimeout = 2000;
private ZooKeeper zkClient = null;
private String parentNode = "/servers";
// 创建到 zk 的客户端连接
public void getConnect() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
}
// 注册服务器
public void registServer(String hostname) throws Exception {
String create = zkClient.create(parentNode + "/server", hostname.getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname + " is online " + create);
}
// 业务功能
public void business(String hostname) throws Exception {
System.out.println(hostname + " is working ...");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
// 1 获取 zk 连接
DistributeServer server = new DistributeServer();
server.getConnect();
// 2 利用 zk 连接注册服务器信息
server.registServer(args[0]);
// 3 启动业务功能
server.business(args[0]);
}
}
22.服务器节点动态上下线案例全部代码实现
public class DistributeClient {
private static String connectString = "127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184";
private static int sessionTimeout = 2000;
private ZooKeeper zkClient = null;
private String parentNode = "/servers";
// 创建到 zkClient 的客户端连接
public void getConnect() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 再次启动监听
try {
getServerList();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
// 获取服务器列表信息
public void getServerList() throws Exception {
// 1 获取服务器子节点信息,并且对父节点进行监听
List<String> children = zkClient.getChildren(parentNode, true);
// 2 存储服务器信息列表
ArrayList<String> servers = new ArrayList<>();
// 3 遍历所有节点,获取节点中的主机名称信息
for (String child : children) {
byte[] data = zkClient.getData(parentNode + "/" + child, false, null);
servers.add(new String(data));
}
// 4 打印服务器列表信息
System.out.println(servers);
}
// 业务功能
public void business() throws Exception {
System.out.println("client is working ...");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
// 1 获取 zk 连接
DistributeClient client = new DistributeClient();
client.getConnect();
// 2 获取 servers 的子节点信息,从中获取服务器信息列表
client.getServerList();
// 3 业务进程启动
client.business();
}
}
11