您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# 如何使用ZooKeeper实现Java跨JVM的分布式读写锁
## 一、分布式锁的背景与挑战
在现代分布式系统中,跨进程/跨主机的资源协调是一个核心挑战。传统的单机锁(如`synchronized`或`ReentrantLock`)无法解决以下问题:
1. **跨JVM同步失效**:单机锁只能控制同一JVM内的线程竞争
2. **单点故障风险**:基于数据库或Redis的实现存在服务不可用风险
3. **不可重入问题**:部分实现无法支持同一线程重复获取锁
4. **死锁风险**:锁持有者崩溃可能导致资源永久锁定
Apache ZooKeeper通过其**临时节点**、**顺序节点**和**Watcher机制**,提供了实现分布式锁的理想特性。
## 二、ZooKeeper核心机制解析
### 2.1 关键特性
- **临时节点(Ephemeral Nodes)**:客户端会话结束后自动删除
- **顺序节点(Sequential Nodes)**:节点名自动附加单调递增序号
- **Watcher机制**:节点变化时通知客户端
### 2.2 锁设计原理
/locks ├── read-0000000001 ├── read-0000000003 └── write-0000000002
#### 读写锁规则:
1. **读锁**:当无写锁(无序号更小的写节点)时可获取
2. **写锁**:当无任何锁(自己是序号最小节点)时可获取
## 三、Java实现详解
### 3.1 环境准备
```xml
<!-- Maven依赖 -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.7.0</version>
</dependency>
public class DistributedLock {
private final ZooKeeper zk;
private final String lockPath;
private String currentLockPath;
public DistributedLock(ZooKeeper zk, String lockPath) {
this.zk = zk;
this.lockPath = lockPath;
}
public void lock() throws Exception {
currentLockPath = zk.create(lockPath + "/lock-",
null,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
while (true) {
List<String> children = zk.getChildren(lockPath, false);
Collections.sort(children);
if (currentLockPath.endsWith(children.get(0))) {
return; // 获得锁
}
CountDownLatch latch = new CountDownLatch(1);
Stat stat = zk.exists(lockPath + "/" + children.get(0),
event -> {
if (event.getType() == EventType.NodeDeleted) {
latch.countDown();
}
});
if (stat != null) {
latch.await();
}
}
}
}
public class DistributedReadWriteLock {
private final ZooKeeper zk;
private final String basePath;
public enum LockType { READ, WRITE }
public DistributedReadWriteLock(ZooKeeper zk, String basePath) {
this.zk = zk;
this.basePath = basePath;
ensurePathExists();
}
private void ensurePathExists() {
try {
if (zk.exists(basePath, false) == null) {
zk.create(basePath, null,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void lock(LockType type) throws Exception {
String prefix = type == LockType.READ ? "read-" : "write-";
String lockPath = zk.create(basePath + "/" + prefix,
null,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
while (!tryAcquireLock(lockPath, type)) {
waitForLock(lockPath, type);
}
}
private boolean tryAcquireLock(String myPath, LockType type) {
List<String> children = zk.getChildren(basePath, false);
Collections.sort(children);
String myNode = myPath.substring(myPath.lastIndexOf('/') + 1);
int myIndex = children.indexOf(myNode);
if (type == LockType.READ) {
// 检查前面是否有写锁
for (int i = 0; i < myIndex; i++) {
if (children.get(i).startsWith("write-")) {
return false;
}
}
return true;
} else { // WRITE锁
return myIndex == 0; // 必须是第一个节点
}
}
}
通过只监听前一个节点而非所有节点:
private void waitForLock(String myPath, LockType type) throws Exception {
String previousNode = getPreviousNode(myPath);
if (previousNode != null) {
CountDownLatch latch = new CountDownLatch(1);
zk.exists(basePath + "/" + previousNode,
event -> {
if (event.getType() == EventType.NodeDeleted) {
latch.countDown();
}
});
latch.await();
}
}
public void unlock(String lockPath) throws Exception {
try {
zk.delete(lockPath, -1);
} catch (KeeperException.NoNodeException e) {
// 锁已被自动释放(会话过期)
}
}
private ThreadLocal<Map<String, Integer>> lockCount =
ThreadLocal.withInitial(HashMap::new);
public void lock() throws Exception {
Integer count = lockCount.get().get(currentLockPath);
if (count != null) {
lockCount.get().put(currentLockPath, count + 1);
return;
}
// ...原有获取锁逻辑...
lockCount.get().put(currentLockPath, 1);
}
latch.await(10, TimeUnit.SECONDS);
Leader选举
优化大量锁竞争原生ZooKeeper API实现 vs Curator Recipes:
特性 | 原生实现 | Curator框架 |
---|---|---|
代码复杂度 | 高 | 低(预制方案) |
连接管理 | 手动处理 | 自动重连 |
异常处理 | 完全自行控制 | 内置重试机制 |
功能扩展 | 灵活定制 | 受限但全面 |
Curator示例:
InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(curatorClient, "/lock-path");
lock.readLock().acquire();
sessionTimeout
(建议10-30秒)
new ZooKeeper(connectString, 15000, watcher);
zk.create(path, data,
ZooDefs.Ids.CREATOR_ALL_ACL,
CreateMode.EPHEMERAL);
watchCount
和dataSize
本文实现的ZooKeeper分布式读写锁具备: - 跨JVM的进程间协调能力 - 公平的锁获取顺序(基于ZK顺序节点) - 自动释放机制(通过临时节点) - 读写锁语义分离
完整实现代码建议包含: 1. 锁超时自动释放 2. 锁等待队列可视化 3. 性能监控埋点
通过合理利用ZooKeeper的特性,我们可以构建出比Redis等方案更可靠的分布式锁服务,特别适合对一致性要求高的金融、交易等场景。 “`
(注:实际字数为2580字左右,可根据需要增减具体实现细节部分的篇幅来精确控制字数)
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。