linux

如何使用Zookeeper进行分布式锁管理

小樊
39
2025-06-23 11:18:33
栏目: 大数据

使用 ZooKeeper 进行分布式锁管理是一种常见的做法,因为 ZooKeeper 提供了高可用性和一致性保证。以下是使用 ZooKeeper 实现分布式锁的基本步骤:

1. 创建 ZooKeeper 客户端

首先,你需要创建一个 ZooKeeper 客户端连接到 ZooKeeper 集群。

import org.apache.zookeeper.ZooKeeper;

public class DistributedLock {
    private ZooKeeper zk;
    private String lockPath;

    public DistributedLock(String zkAddress, int sessionTimeout, String lockPath) throws IOException {
        this.zk = new ZooKeeper(zkAddress, sessionTimeout, event -> {
            // 处理连接事件
        });
        this.lockPath = lockPath;
    }
}

2. 创建临时顺序节点

在 ZooKeeper 中,你可以创建临时顺序节点来实现锁。临时节点会在客户端断开连接时自动删除,顺序节点可以确保锁的公平性。

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;
import java.util.List;

public void acquireLock() throws KeeperException, InterruptedException {
    String lockNode = zk.create(lockPath + "/lock_", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    System.out.println("Created lock node: " + lockNode);

    while (true) {
        List<String> children = zk.getChildren(lockPath, false);
        Collections.sort(children);
        if (lockNode.endsWith(children.get(0))) {
            // 获取到锁
            System.out.println("Lock acquired by: " + lockNode);
            return;
        } else {
            // 监听前一个节点的删除事件
            String previousNode = getPreviousNode(children, lockNode);
            Stat stat = zk.exists(lockPath + "/" + previousNode, event -> {
                if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
                    synchronized (this) {
                        notifyAll();
                    }
                }
            });
            if (stat != null) {
                synchronized (this) {
                    wait();
                }
            }
        }
    }
}

private String getPreviousNode(List<String> children, String currentNode) {
    int index = children.indexOf(currentNode.substring(lockPath.length() + 1));
    return children.get(index - 1);
}

3. 释放锁

释放锁时,只需删除创建的临时节点即可。

public void releaseLock() throws KeeperException, InterruptedException {
    zk.delete(lockPath, -1);
    System.out.println("Lock released");
}

4. 关闭 ZooKeeper 客户端

在应用程序结束时,记得关闭 ZooKeeper 客户端。

public void close() throws InterruptedException {
    zk.close();
}

完整示例

以下是一个完整的示例代码:

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

public class DistributedLock {
    private ZooKeeper zk;
    private String lockPath;

    public DistributedLock(String zkAddress, int sessionTimeout, String lockPath) throws IOException {
        this.zk = new ZooKeeper(zkAddress, sessionTimeout, event -> {
            // 处理连接事件
        });
        this.lockPath = lockPath;
    }

    public void acquireLock() throws KeeperException, InterruptedException {
        String lockNode = zk.create(lockPath + "/lock_", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("Created lock node: " + lockNode);

        while (true) {
            List<String> children = zk.getChildren(lockPath, false);
            Collections.sort(children);
            if (lockNode.endsWith(children.get(0))) {
                // 获取到锁
                System.out.println("Lock acquired by: " + lockNode);
                return;
            } else {
                // 监听前一个节点的删除事件
                String previousNode = getPreviousNode(children, lockNode);
                Stat stat = zk.exists(lockPath + "/" + previousNode, event -> {
                    if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
                        synchronized (this) {
                            notifyAll();
                        }
                    }
                });
                if (stat != null) {
                    synchronized (this) {
                        wait();
                    }
                }
            }
        }
    }

    public void releaseLock() throws KeeperException, InterruptedException {
        zk.delete(lockPath, -1);
        System.out.println("Lock released");
    }

    public void close() throws InterruptedException {
        zk.close();
    }

    private String getPreviousNode(List<String> children, String currentNode) {
        int index = children.indexOf(currentNode.substring(lockPath.length() + 1));
        return children.get(index - 1);
    }

    public static void main(String[] args) {
        try {
            DistributedLock lock = new DistributedLock("localhost:2181", 3000, "/locks");
            lock.acquireLock();
            // 执行业务逻辑
            lock.releaseLock();
            lock.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

注意事项

  1. 异常处理:在实际应用中,需要更完善的异常处理机制。
  2. 性能考虑:ZooKeeper 的写操作相对较慢,因此在高并发场景下需要进行性能测试和优化。
  3. 连接管理:确保 ZooKeeper 客户端的连接是稳定的,可以考虑使用连接池或重试机制。

通过以上步骤,你可以使用 ZooKeeper 实现一个基本的分布式锁管理机制。

0
看了该问题的人还看了