linux

如何使用Zookeeper实现分布式锁

小樊
50
2025-08-03 12:55:06
栏目: 大数据

使用 ZooKeeper 实现分布式锁是一种常见的方法,ZooKeeper 提供了强一致性的协调服务,可以用来管理分布式系统中的锁机制。以下是使用 ZooKeeper 实现分布式锁的基本步骤:

1. 创建 ZooKeeper 客户端

首先,你需要创建一个 ZooKeeper 客户端连接到 ZooKeeper 集群。

import org.apache.zookeeper.ZooKeeper;

public class DistributedLock {
    private static final String ZK_ADDRESS = "localhost:2181";
    private static final int SESSION_TIMEOUT = 3000;
    private ZooKeeper zk;

    public DistributedLock() throws IOException {
        zk = new ZooKeeper(ZK_ADDRESS, SESSION_TIMEOUT, event -> {
            // 处理连接事件
        });
    }
}

2. 创建锁节点

在 ZooKeeper 中创建一个临时顺序节点来表示锁。

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

public void createLockNode() throws KeeperException, InterruptedException {
    String lockPath = zk.create("/lock", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    System.out.println("Created lock node: " + lockPath);
}

3. 获取锁

获取锁的过程包括检查当前节点是否是最小的节点,如果不是,则监听前一个节点的删除事件。

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;

public boolean acquireLock() throws KeeperException, InterruptedException {
    String lockPath = zk.create("/lock", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    System.out.println("Created lock node: " + lockPath);

    while (true) {
        List<String> children = zk.getChildren("/lock", false);
        Collections.sort(children);
        String currentNode = lockPath.substring(lockPath.lastIndexOf("/") + 1);

        if (currentNode.equals(children.get(0))) {
            // 当前节点是最小的节点,获取锁成功
            return true;
        } else {
            // 监听前一个节点的删除事件
            int previousNodeIndex = Collections.binarySearch(children, currentNode) - 1;
            String previousNodePath = "/lock/" + children.get(previousNodeIndex);
            Stat stat = zk.exists(previousNodePath, event -> {
                if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
                    synchronized (this) {
                        notifyAll();
                    }
                }
            });

            if (stat != null) {
                synchronized (this) {
                    wait();
                }
            }
        }
    }
}

4. 释放锁

释放锁的过程包括删除当前节点。

public void releaseLock() throws KeeperException, InterruptedException {
    String lockPath = zk.create("/lock", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    zk.delete(lockPath, -1);
    System.out.println("Released lock node: " + lockPath);
}

5. 关闭 ZooKeeper 客户端

在程序结束时,关闭 ZooKeeper 客户端。

public void close() throws InterruptedException {
    zk.close();
}

完整示例

以下是一个完整的示例代码:

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

public class DistributedLock {
    private static final String ZK_ADDRESS = "localhost:2181";
    private static final int SESSION_TIMEOUT = 3000;
    private ZooKeeper zk;

    public DistributedLock() throws IOException {
        zk = new ZooKeeper(ZK_ADDRESS, SESSION_TIMEOUT, event -> {
            // 处理连接事件
        });
    }

    public void createLockNode() throws KeeperException, InterruptedException {
        String lockPath = zk.create("/lock", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("Created lock node: " + lockPath);
    }

    public boolean acquireLock() throws KeeperException, InterruptedException {
        String lockPath = zk.create("/lock", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("Created lock node: " + lockPath);

        while (true) {
            List<String> children = zk.getChildren("/lock", false);
            Collections.sort(children);
            String currentNode = lockPath.substring(lockPath.lastIndexOf("/") + 1);

            if (currentNode.equals(children.get(0))) {
                // 当前节点是最小的节点,获取锁成功
                return true;
            } else {
                // 监听前一个节点的删除事件
                int previousNodeIndex = Collections.binarySearch(children, currentNode) - 1;
                String previousNodePath = "/lock/" + children.get(previousNodeIndex);
                Stat stat = zk.exists(previousNodePath, event -> {
                    if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
                        synchronized (this) {
                            notifyAll();
                        }
                    }
                });

                if (stat != null) {
                    synchronized (this) {
                        wait();
                    }
                }
            }
        }
    }

    public void releaseLock() throws KeeperException, InterruptedException {
        String lockPath = zk.create("/lock", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        zk.delete(lockPath, -1);
        System.out.println("Released lock node: " + lockPath);
    }

    public void close() throws InterruptedException {
        zk.close();
    }

    public static void main(String[] args) {
        try {
            DistributedLock lock = new DistributedLock();
            lock.createLockNode();
            if (lock.acquireLock()) {
                System.out.println("Lock acquired!");
                // 执行业务逻辑
                Thread.sleep(5000);
                lock.releaseLock();
                System.out.println("Lock released!");
            }
            lock.close();
        } catch (IOException | KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

这个示例展示了如何使用 ZooKeeper 实现一个简单的分布式锁。在实际应用中,你可能需要处理更多的异常情况和边界条件。

0
看了该问题的人还看了