应用场景

予早 2025-10-09 21:14:44
Categories: Tags:

Zookeeper 的核心价值在于提供高可用、强一致性的分布式协调服务,适用于需要统一配置、状态同步、服务发现、分布式锁、主节点选举等场景,是构建可靠分布式系统的关键基础设施。

基于上述流程,ZooKeeper 可以作为配置中心、注册中心

1. 分布式锁

Zookeeper 可通过创建临时顺序节点实现分布式锁,确保多个进程或服务对共享资源的互斥访问,避免并发冲突。典型应用包括:


2. 服务注册与发现

在微服务架构中,服务实例启动时向 Zookeeper 注册自己的地址,客户端通过监听 Zookeeper 获取可用服务列表,实现动态服务发现负载均衡。例如:


3. 配置中心

Zookeeper 可作为集中式配置管理中心,支持配置的动态更新与热加载。应用监听配置节点变化,实时获取最新配置,避免重启服务。适用于:


4. 主节点选举(Leader Election)

Zookeeper 提供强一致性保障,常用于主从架构中的 Leader 选举。典型应用包括:


5. 分布式协调与同步

Zookeeper 可用于多节点之间的状态同步与协调,如:

ZooKeeper 应用

ZooKeeper 可以用于发布/订阅、负载均衡、命令服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能

#5.1. 命名服务

在分布式系统中,通常需要一个全局唯一的名字,如生成全局唯一的订单号等,ZooKeeper 可以通过顺序节点的特性来生成全局唯一 ID,从而可以对分布式系统提供命名服务。

#5.2. 配置管理

利用 ZooKeeper 的观察机制,可以将其作为一个高可用的配置存储器,允许分布式应用的参与者检索和更新配置文件。

#5.3. 分布式锁

可以通过 ZooKeeper 的临时节点和 Watcher 机制来实现分布式排它锁。

举例来说,有一个分布式系统,有三个节点 A、B、C,试图通过 ZooKeeper 获取分布式锁。

(1)访问 /lock (这个目录路径由程序自己决定),创建 带序列号的临时节点(EPHEMERAL)

(2)每个节点尝试获取锁时,拿到 /locks节点下的所有子节点(id_0000,id_0001,id_0002),判断自己创建的节点是不是序列号最小的

(3)释放锁,即删除自己创建的节点。

图中,NodeA 删除自己创建的节点 id_0000,NodeB 监听到变化,发现自己的节点已经是最小节点,即可获取到锁。

#5.4. 集群管理

ZooKeeper 还能解决大多数分布式系统中的问题:

#5.5. 选举 Leader 节点

分布式系统一个重要的模式就是主从模式 (Master/Salves),ZooKeeper 可以用于该模式下的 Matser 选举。可以让所有服务节点去竞争性地创建同一个 ZNode,由于 ZooKeeper 不能有路径相同的 ZNode,必然只有一个服务节点能够创建成功,这样该服务节点就可以成为 Master 节点。

16.创建ZooKeeper客户端

在 Eclipse 环境搭建

<dependencies>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-core</artifactId>
        <version>2.8.2</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.10</version>
    </dependency>
</dependencies>
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

创建 ZooKeeper 客户端

ZooKeeperTest源码

public class ZooKeeperTest {
    private static String connectString = "127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184";
    private static int sessionTimeout = 2000;
    private ZooKeeper zkClient = null;

    // @Test
    @Before
    public void init() throws Exception {

        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            // 收到事件通知后的回调函数(用户的业务逻辑)
            public void process(WatchedEvent event) {
                System.out.println(event.getType() + "--" + event.getPath());

                // 再次启动监听
                try {
                    List<String> children = zkClient.getChildren("/", true);
                    for (String child : children) {
                        System.out.println(child);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

17.创建一个节点

// 创建子节点
@Test
public void create() throws Exception {
    // 参数 1:要创建的节点的路径; 参数 2:节点数据 ; 参数 3:节点权限 ;参数 4:节点的类型
    String nodeCreated = zkClient.create("/root", "root".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

    System.out.println(nodeCreated);
}

18.获取子节点并监听节点变化

// 获取子节点
@Test
public void getChildren() throws Exception {
    List<String> children = zkClient.getChildren("/", true);
    for (String child : children) {
        System.out.println(child);
    }

    // 延时阻塞
    Thread.sleep(Long.MAX_VALUE);
}

19.判断节点是否存在

// 判断 znode 是否存在
@Test
public void exist() throws Exception {
    Stat stat = zkClient.exists("/eclipse", false);
    System.out.println(stat == null ? "not exist" : "exist");
}

20.服务器节点动态上下线案例分析

需求

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。

需求分析

![](../../../../../../工作/BigData/notebook backup/zookeeper/image/01.png)

21.服务器节点动态上下线案例注册代码

先在集群上创建/servers节点

[zk: 127.0.0.1:2182(CONNECTED) 6] create /servers "servers"
Created /servers

服务器端向 Zookeeper 注册代码

DistributeServer源码

public class DistributeServer {
    private static String connectString = "127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184";
    private static int sessionTimeout = 2000;
    private ZooKeeper zkClient = null;

    private String parentNode = "/servers";

    // 创建到 zk 的客户端连接
    public void getConnect() throws IOException {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
            }
        });
    }

    // 注册服务器
    public void registServer(String hostname) throws Exception {
        String create = zkClient.create(parentNode + "/server", hostname.getBytes(), Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(hostname + " is online " + create);
    }

    // 业务功能
    public void business(String hostname) throws Exception {
        System.out.println(hostname + " is working ...");
        Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws Exception {
        // 1 获取 zk 连接
        DistributeServer server = new DistributeServer();
        server.getConnect();
        // 2 利用 zk 连接注册服务器信息
        server.registServer(args[0]);
        // 3 启动业务功能
        server.business(args[0]);
    }

}

22.服务器节点动态上下线案例全部代码实现

DistributeClient源码

public class DistributeClient {
    private static String connectString = "127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184";
    private static int sessionTimeout = 2000;
    private ZooKeeper zkClient = null;

    private String parentNode = "/servers";

    // 创建到 zkClient 的客户端连接
    public void getConnect() throws IOException {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // 再次启动监听
                try {
                    getServerList();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    // 获取服务器列表信息
    public void getServerList() throws Exception {
        // 1 获取服务器子节点信息,并且对父节点进行监听
        List<String> children = zkClient.getChildren(parentNode, true);
        // 2 存储服务器信息列表
        ArrayList<String> servers = new ArrayList<>();
        // 3 遍历所有节点,获取节点中的主机名称信息
        for (String child : children) {
            byte[] data = zkClient.getData(parentNode + "/" + child, false, null);
            servers.add(new String(data));
        }
        // 4 打印服务器列表信息
        System.out.println(servers);
    }

    // 业务功能
    public void business() throws Exception {
        System.out.println("client is working ...");
        Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws Exception {
        // 1 获取 zk 连接
        DistributeClient client = new DistributeClient();
        client.getConnect();
        // 2 获取 servers 的子节点信息,从中获取服务器信息列表
        client.getServerList();
        // 3 业务进程启动
        client.business();
    }

}

11