您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# 如何使用AQS共享锁:Semaphore、CountDownLatch深度解析
## 一、AQS核心机制回顾
### 1.1 AQS架构设计
AbstractQueuedSynchronizer(AQS)作为Java并发包的基石,采用模板方法模式实现同步器框架。其核心组成包括:
- volatile int state:同步状态标志
- CLH变体队列:采用双向链表实现的等待队列
- ConditionObject:条件变量实现类
```java
// AQS简化结构
public abstract class AbstractQueuedSynchronizer {
private volatile int state;
private transient volatile Node head;
private transient volatile Node tail;
protected final boolean compareAndSetState(int expect, int update) {
// CAS操作实现
}
// 需要子类实现的模板方法
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
}
特性 | 独占模式 | 共享模式 |
---|---|---|
资源获取 | 排他性占用 | 多个线程可同时获取 |
典型实现 | ReentrantLock | Semaphore/CountDownLatch |
唤醒策略 | 只唤醒一个后继节点 | 传播式唤醒后续所有共享节点 |
适用场景 | 互斥访问场景 | 资源控制/多线程协同 |
Semaphore通过AQS的state表示可用许可数,其工作流程:
// Semaphore同步器实现
static final class NonfairSync extends Sync {
protected int tryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
public class ConnectionPool {
private final Semaphore semaphore;
private final BlockingQueue<Connection> pool;
public ConnectionPool(int size) {
semaphore = new Semaphore(size);
pool = new ArrayBlockingQueue<>(size);
// 初始化连接...
}
public Connection getConnection() throws InterruptedException {
semaphore.acquire();
return pool.take();
}
public void release(Connection conn) {
pool.offer(conn);
semaphore.release();
}
}
class RateLimiter {
private final Semaphore semaphore;
private final ScheduledExecutorService scheduler;
public RateLimiter(int permitsPerSecond) {
semaphore = new Semaphore(permitsPerSecond);
scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() ->
semaphore.release(permitsPerSecond - semaphore.availablePermits()),
0, 1, TimeUnit.SECONDS);
}
public void acquire() throws InterruptedException {
semaphore.acquire();
}
}
acquire(int permits)
减少CAS次数tryAcquire(long timeout, TimeUnit unit)
避免死锁CountDownLatch.Sync重写AQS方法:
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0) return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
public class ServiceInitializer {
private final CountDownLatch latch = new CountDownLatch(3);
public void init() throws InterruptedException {
new Thread(() -> {
initDatabase();
latch.countDown();
}).start();
new Thread(() -> {
initCache();
latch.countDown();
}).start();
new Thread(() -> {
loadConfig();
latch.countDown();
}).start();
latch.await();
System.out.println("All services initialized");
}
}
public class ParallelCalculator {
public long calculate(int[] data) throws InterruptedException {
int threadCount = Runtime.getRuntime().availableProcessors();
CountDownLatch latch = new CountDownLatch(threadCount);
AtomicLong result = new AtomicLong();
int segmentSize = data.length / threadCount;
for (int i = 0; i < threadCount; i++) {
int start = i * segmentSize;
int end = (i == threadCount-1) ? data.length : start + segmentSize;
new Thread(() -> {
long sum = 0;
for (int j = start; j < end; j++) {
sum += data[j];
}
result.addAndGet(sum);
latch.countDown();
}).start();
}
latch.await();
return result.get();
}
}
特性 | CountDownLatch | CyclicBarrier |
---|---|---|
重置能力 | 不可重置 | 可循环使用 |
触发条件 | 计数减到0 | 线程数达到屏障值 |
等待行为 | 主线程等待工作线程 | 所有线程互相等待 |
典型场景 | 启动协调/结束检测 | 并行计算分阶段同步 |
@startuml
title 共享锁获取流程
participant ThreadA
participant AQS
participant CLH队列
ThreadA -> AQS: acquireShared()
alt 快速路径成功
AQS --> ThreadA: 直接返回
else 需要排队
ThreadA -> AQS: addWaiter(Node.SHARED)
AQS -> CLH队列: 插入尾节点
loop 自旋检查
ThreadA -> AQS: shouldParkAfterFailedAcquire()
AQS -> ThreadA: 是否需要park
ThreadA -> AQS: parkAndCheckInterrupt()
end
AQS --> ThreadA: 被前驱节点唤醒
end
@enduml
关键代码段:
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h); // 唤醒后继节点
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head) // 检查头节点是否变化
break;
}
}
Semaphore许可数计算:
CountDownLatch防死锁:
// 建议增加超时控制
if (!latch.await(30, TimeUnit.SECONDS)) {
log.warn("Initialization timeout");
// 触发补偿机制
}
问题现象:线程在await()
处永久阻塞
排查步骤:
1. 检查countDown()
调用次数是否匹配
2. 使用jstack查看线程栈:
jstack <pid> | grep -A 10 'java.util.concurrent.CountDownLatch'
countDown()
未执行// 基于Redis的分布式CountDownLatch
public class RedisCountDownLatch {
private final Jedis jedis;
private final String latchKey;
public void countDown() {
jedis.decr(latchKey);
}
public void await() throws InterruptedException {
while (true) {
String val = jedis.get(latchKey);
if ("0".equals(val)) return;
Thread.sleep(100);
}
}
}
public CompletableFuture<Void> parallelTasks(List<Runnable> tasks) {
CountDownLatch latch = new CountDownLatch(tasks.size());
return CompletableFuture.runAsync(() -> {
tasks.forEach(task ->
CompletableFuture.runAsync(() -> {
try {
task.run();
} finally {
latch.countDown();
}
}));
latch.await();
});
}
方法签名 | 说明 |
---|---|
void acquire() |
阻塞获取许可 |
boolean tryAcquire() |
非阻塞尝试获取 |
void release() |
释放许可 |
int availablePermits() |
查询当前可用许可数 |
方法签名 | 说明 |
---|---|
void await() |
阻塞直到计数归零 |
boolean await(long timeout, TimeUnit unit) |
带超时的等待 |
void countDown() |
计数减一 |
long getCount() |
获取当前计数值 |
本文共包含代码示例15个,流程图2幅,对比表格3个,完整演示了AQS共享锁的实现原理和实战用法。实际应用中请根据具体场景调整参数和异常处理逻辑。 “`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。