使用 ZooKeeper 实现分布式锁是一种常见的方法,ZooKeeper 提供了强一致性的协调服务,可以用来管理分布式系统中的锁机制。以下是使用 ZooKeeper 实现分布式锁的基本步骤:
首先,你需要创建一个 ZooKeeper 客户端连接到 ZooKeeper 集群。
import org.apache.zookeeper.ZooKeeper;
public class DistributedLock {
private static final String ZK_ADDRESS = "localhost:2181";
private static final int SESSION_TIMEOUT = 3000;
private ZooKeeper zk;
public DistributedLock() throws IOException {
zk = new ZooKeeper(ZK_ADDRESS, SESSION_TIMEOUT, event -> {
// 处理连接事件
});
}
}
在 ZooKeeper 中创建一个临时顺序节点来表示锁。
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
public void createLockNode() throws KeeperException, InterruptedException {
String lockPath = zk.create("/lock", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("Created lock node: " + lockPath);
}
获取锁的过程包括检查当前节点是否是最小的节点,如果不是,则监听前一个节点的删除事件。
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
public boolean acquireLock() throws KeeperException, InterruptedException {
String lockPath = zk.create("/lock", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("Created lock node: " + lockPath);
while (true) {
List<String> children = zk.getChildren("/lock", false);
Collections.sort(children);
String currentNode = lockPath.substring(lockPath.lastIndexOf("/") + 1);
if (currentNode.equals(children.get(0))) {
// 当前节点是最小的节点,获取锁成功
return true;
} else {
// 监听前一个节点的删除事件
int previousNodeIndex = Collections.binarySearch(children, currentNode) - 1;
String previousNodePath = "/lock/" + children.get(previousNodeIndex);
Stat stat = zk.exists(previousNodePath, event -> {
if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
synchronized (this) {
notifyAll();
}
}
});
if (stat != null) {
synchronized (this) {
wait();
}
}
}
}
}
释放锁的过程包括删除当前节点。
public void releaseLock() throws KeeperException, InterruptedException {
String lockPath = zk.create("/lock", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
zk.delete(lockPath, -1);
System.out.println("Released lock node: " + lockPath);
}
在程序结束时,关闭 ZooKeeper 客户端。
public void close() throws InterruptedException {
zk.close();
}
以下是一个完整的示例代码:
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
public class DistributedLock {
private static final String ZK_ADDRESS = "localhost:2181";
private static final int SESSION_TIMEOUT = 3000;
private ZooKeeper zk;
public DistributedLock() throws IOException {
zk = new ZooKeeper(ZK_ADDRESS, SESSION_TIMEOUT, event -> {
// 处理连接事件
});
}
public void createLockNode() throws KeeperException, InterruptedException {
String lockPath = zk.create("/lock", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("Created lock node: " + lockPath);
}
public boolean acquireLock() throws KeeperException, InterruptedException {
String lockPath = zk.create("/lock", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("Created lock node: " + lockPath);
while (true) {
List<String> children = zk.getChildren("/lock", false);
Collections.sort(children);
String currentNode = lockPath.substring(lockPath.lastIndexOf("/") + 1);
if (currentNode.equals(children.get(0))) {
// 当前节点是最小的节点,获取锁成功
return true;
} else {
// 监听前一个节点的删除事件
int previousNodeIndex = Collections.binarySearch(children, currentNode) - 1;
String previousNodePath = "/lock/" + children.get(previousNodeIndex);
Stat stat = zk.exists(previousNodePath, event -> {
if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
synchronized (this) {
notifyAll();
}
}
});
if (stat != null) {
synchronized (this) {
wait();
}
}
}
}
}
public void releaseLock() throws KeeperException, InterruptedException {
String lockPath = zk.create("/lock", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
zk.delete(lockPath, -1);
System.out.println("Released lock node: " + lockPath);
}
public void close() throws InterruptedException {
zk.close();
}
public static void main(String[] args) {
try {
DistributedLock lock = new DistributedLock();
lock.createLockNode();
if (lock.acquireLock()) {
System.out.println("Lock acquired!");
// 执行业务逻辑
Thread.sleep(5000);
lock.releaseLock();
System.out.println("Lock released!");
}
lock.close();
} catch (IOException | KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
}
这个示例展示了如何使用 ZooKeeper 实现一个简单的分布式锁。在实际应用中,你可能需要处理更多的异常情况和边界条件。