您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# Zookeeper中怎么实现分布式排它锁
## 1. 分布式锁概述
### 1.1 分布式锁的定义
分布式锁是控制分布式系统之间同步访问共享资源的一种机制。在单机系统中,我们可以使用Java的synchronized或ReentrantLock等机制实现线程间的互斥,但在分布式环境下,这些本地锁机制无法跨JVM工作。
### 1.2 分布式锁的应用场景
- 避免重复处理(如定时任务)
- 防止超卖(电商库存扣减)
- 实现幂等性操作
- 系统间资源协调
### 1.3 分布式锁的实现方式
1. 基于数据库(唯一索引)
2. 基于Redis(SETNX命令)
3. 基于Zookeeper(临时顺序节点)
4. 基于Etcd等分布式协调服务
## 2. Zookeeper实现分布式锁的原理
### 2.1 Zookeeper的数据模型
Zookeeper采用类似文件系统的树形结构(ZNode),每个节点可以存储少量数据(默认1MB),并提供以下特性:
- 持久节点(PERSISTENT)
- 临时节点(EPHEMERAL)
- 顺序节点(SEQUENTIAL)
### 2.2 排它锁的核心机制
Zookeeper实现排它锁主要依赖以下特性:
1. **临时节点**:客户端会话结束时自动删除
2. **节点唯一性**:同一路径下节点名称不能重复
3. **Watch机制**:监听节点变化事件
### 2.3 实现流程示意图
```mermaid
graph TD
A[客户端1创建/lock节点] --> B[创建成功获取锁]
C[客户端2创建/lock节点] --> D[创建失败]
D --> E[监听/lock节点删除事件]
B --> F[业务处理]
F --> G[删除/lock节点]
G --> H[触发客户端2的Watch]
H --> I[客户端2重新尝试获取锁]
public boolean tryLock() {
try {
// 尝试创建临时节点
zk.create("/exclusive_lock",
new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
return true;
} catch (KeeperException.NodeExistsException e) {
return false;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void unlock() {
try {
zk.delete("/exclusive_lock", -1);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void lock() throws InterruptedException {
while (!tryLock()) {
synchronized(this) {
// 设置Watch监听节点删除事件
Stat stat = zk.exists("/exclusive_lock", true);
if (stat == null) {
continue; // 节点已不存在,重试
}
wait(); // 等待通知
}
}
}
// Watch回调处理
Watcher watcher = event -> {
if (event.getType() == EventType.NodeDeleted) {
synchronized(this) {
notifyAll(); // 唤醒等待线程
}
}
};
当多个客户端同时监听同一个节点时,节点删除会导致所有客户端同时唤醒,产生惊群效应。
public void lock() {
while (true) {
try {
// 创建临时顺序节点
String myNode = zk.create("/lock/seq-",
new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// 获取所有子节点并排序
List<String> children = zk.getChildren("/lock", false);
Collections.sort(children);
// 判断自己是否是最小节点
String smallest = children.get(0);
if (myNode.equals("/lock/" + smallest)) {
return; // 获取锁成功
}
// 监听前一个节点
String watchNode = children.get(Collections.binarySearch(children,
myNode.substring("/lock/".length())) - 1);
CountDownLatch latch = new CountDownLatch(1);
Stat stat = zk.exists("/lock/" + watchNode,
event -> {
if (event.getType() == EventType.NodeDeleted) {
latch.countDown();
}
});
if (stat != null) {
latch.await(); // 等待前驱节点释放
}
} catch (Exception e) {
// 处理异常
}
}
}
public class ReentrantZkLock {
private ThreadLocal<Integer> lockCount = new ThreadLocal<>();
public boolean tryLock() {
if (lockCount.get() != null) {
lockCount.set(lockCount.get() + 1);
return true;
}
if (/* 正常获取锁逻辑 */) {
lockCount.set(1);
return true;
}
return false;
}
public void unlock() {
if (lockCount.get() == null) return;
int count = lockCount.get() - 1;
if (count == 0) {
// 实际释放锁
lockCount.remove();
} else {
lockCount.set(count);
}
}
}
public boolean tryLock(long timeout, TimeUnit unit) {
long start = System.currentTimeMillis();
long end = start + unit.toMillis(timeout);
while (System.currentTimeMillis() < end) {
if (tryLock()) {
return true;
}
try {
Thread.sleep(100); // 避免CPU空转
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
return false;
}
// 创建Zookeeper客户端时设置会话超时
ZooKeeper zk = new ZooKeeper(connectString,
sessionTimeout,
watcher);
// 添加会话状态监听
watcher = event -> {
if (event.getState() == KeeperState.Expired) {
// 会话过期,需要重新连接
reconnect();
}
};
特性 | Zookeeper | Redis |
---|---|---|
一致性 | 强一致性(ZAB协议) | 最终一致性(主从异步复制) |
性能 | 相对较低(需要磁盘写入) | 较高(内存操作) |
锁释放机制 | 自动(会话结束) | 需要设置超时 |
实现复杂度 | 较复杂 | 较简单 |
适用场景 | 对一致性要求高的场景 | 高性能要求的场景 |
特性 | Zookeeper | 数据库 |
---|---|---|
性能 | 较高 | 较低(磁盘IO) |
可靠性 | 高(集群部署) | 依赖数据库稳定性 |
阻塞能力 | 原生支持 | 需要轮询 |
实现复杂度 | 中等 | 简单 |
public boolean deductStock(String itemId, int num) {
String lockPath = "/locks/stock_" + itemId;
try {
// 获取商品库存锁
if (zkLock.lock(lockPath, 3000)) {
// 查询库存
int stock = stockDao.query(itemId);
if (stock >= num) {
// 扣减库存
return stockDao.update(itemId, stock - num) > 0;
}
return false;
}
} finally {
zkLock.unlock(lockPath);
}
return false;
}
@Scheduled(fixedRate = 60000)
public void scheduledTask() {
if (zkLock.tryLock("/locks/scheduled_task")) {
try {
// 执行任务逻辑
executeTask();
} finally {
zkLock.unlock("/locks/scheduled_task");
}
}
}
问题现象:网络分区导致多个客户端同时持有锁
解决方案:
1. 使用Zookeeper的fencing token机制
2. 增加锁的版本号校验
3. 设置合理的会话超时时间
Zookeeper通过其临时节点和Watch机制,为分布式锁提供了可靠的实现方案。虽然性能上不如基于Redis的实现,但在需要强一致性的场景下仍然是首选方案。
未来发展趋势: 1. 与Service Mesh集成 2. 支持更细粒度的锁(如读写锁) 3. 自动调节的锁超时机制 4. 与云原生生态的深度整合
(此处可添加GitHub仓库链接或完整实现类代码)
”`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。