您好,登录后才能下订单哦!
密码登录
            
            
            
            
        登录注册
            
            
            
        点击 登录注册 即表示同意《亿速云用户服务条款》
        # 如何使用AQS共享锁:Semaphore、CountDownLatch深度解析
## 一、AQS核心机制回顾
### 1.1 AQS架构设计
AbstractQueuedSynchronizer(AQS)作为Java并发包的基石,采用模板方法模式实现同步器框架。其核心组成包括:
- volatile int state:同步状态标志
- CLH变体队列:采用双向链表实现的等待队列
- ConditionObject:条件变量实现类
```java
// AQS简化结构
public abstract class AbstractQueuedSynchronizer {
    private volatile int state;
    private transient volatile Node head;
    private transient volatile Node tail;
    
    protected final boolean compareAndSetState(int expect, int update) {
        // CAS操作实现
    }
    
    // 需要子类实现的模板方法
    protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
    protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
}
| 特性 | 独占模式 | 共享模式 | 
|---|---|---|
| 资源获取 | 排他性占用 | 多个线程可同时获取 | 
| 典型实现 | ReentrantLock | Semaphore/CountDownLatch | 
| 唤醒策略 | 只唤醒一个后继节点 | 传播式唤醒后续所有共享节点 | 
| 适用场景 | 互斥访问场景 | 资源控制/多线程协同 | 
Semaphore通过AQS的state表示可用许可数,其工作流程:
// Semaphore同步器实现
static final class NonfairSync extends Sync {
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 || 
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}
public class ConnectionPool {
    private final Semaphore semaphore;
    private final BlockingQueue<Connection> pool;
    public ConnectionPool(int size) {
        semaphore = new Semaphore(size);
        pool = new ArrayBlockingQueue<>(size);
        // 初始化连接...
    }
    public Connection getConnection() throws InterruptedException {
        semaphore.acquire();
        return pool.take();
    }
    public void release(Connection conn) {
        pool.offer(conn);
        semaphore.release();
    }
}
class RateLimiter {
    private final Semaphore semaphore;
    private final ScheduledExecutorService scheduler;
    public RateLimiter(int permitsPerSecond) {
        semaphore = new Semaphore(permitsPerSecond);
        scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(() -> 
            semaphore.release(permitsPerSecond - semaphore.availablePermits()),
            0, 1, TimeUnit.SECONDS);
    }
    public void acquire() throws InterruptedException {
        semaphore.acquire();
    }
}
acquire(int permits)减少CAS次数tryAcquire(long timeout, TimeUnit unit)避免死锁CountDownLatch.Sync重写AQS方法:
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState();
        if (c == 0) return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}
public class ServiceInitializer {
    private final CountDownLatch latch = new CountDownLatch(3);
    public void init() throws InterruptedException {
        new Thread(() -> {
            initDatabase();
            latch.countDown();
        }).start();
        
        new Thread(() -> {
            initCache();
            latch.countDown();
        }).start();
        
        new Thread(() -> {
            loadConfig();
            latch.countDown();
        }).start();
        
        latch.await();
        System.out.println("All services initialized");
    }
}
public class ParallelCalculator {
    public long calculate(int[] data) throws InterruptedException {
        int threadCount = Runtime.getRuntime().availableProcessors();
        CountDownLatch latch = new CountDownLatch(threadCount);
        AtomicLong result = new AtomicLong();
        
        int segmentSize = data.length / threadCount;
        for (int i = 0; i < threadCount; i++) {
            int start = i * segmentSize;
            int end = (i == threadCount-1) ? data.length : start + segmentSize;
            
            new Thread(() -> {
                long sum = 0;
                for (int j = start; j < end; j++) {
                    sum += data[j];
                }
                result.addAndGet(sum);
                latch.countDown();
            }).start();
        }
        
        latch.await();
        return result.get();
    }
}
| 特性 | CountDownLatch | CyclicBarrier | 
|---|---|---|
| 重置能力 | 不可重置 | 可循环使用 | 
| 触发条件 | 计数减到0 | 线程数达到屏障值 | 
| 等待行为 | 主线程等待工作线程 | 所有线程互相等待 | 
| 典型场景 | 启动协调/结束检测 | 并行计算分阶段同步 | 
@startuml
title 共享锁获取流程
participant ThreadA
participant AQS
participant CLH队列
ThreadA -> AQS: acquireShared()
alt 快速路径成功
    AQS --> ThreadA: 直接返回
else 需要排队
    ThreadA -> AQS: addWaiter(Node.SHARED)
    AQS -> CLH队列: 插入尾节点
    loop 自旋检查
        ThreadA -> AQS: shouldParkAfterFailedAcquire()
        AQS -> ThreadA: 是否需要park
        ThreadA -> AQS: parkAndCheckInterrupt()
    end
    AQS --> ThreadA: 被前驱节点唤醒
end
@enduml
关键代码段:
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                unparkSuccessor(h); // 唤醒后继节点
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head) // 检查头节点是否变化
            break;
    }
}
Semaphore许可数计算:
CountDownLatch防死锁:
// 建议增加超时控制
if (!latch.await(30, TimeUnit.SECONDS)) {
    log.warn("Initialization timeout");
    // 触发补偿机制
}
问题现象:线程在await()处永久阻塞
排查步骤:
1. 检查countDown()调用次数是否匹配
2. 使用jstack查看线程栈:
jstack <pid> | grep -A 10 'java.util.concurrent.CountDownLatch'
countDown()未执行// 基于Redis的分布式CountDownLatch
public class RedisCountDownLatch {
    private final Jedis jedis;
    private final String latchKey;
    
    public void countDown() {
        jedis.decr(latchKey);
    }
    
    public void await() throws InterruptedException {
        while (true) {
            String val = jedis.get(latchKey);
            if ("0".equals(val)) return;
            Thread.sleep(100);
        }
    }
}
public CompletableFuture<Void> parallelTasks(List<Runnable> tasks) {
    CountDownLatch latch = new CountDownLatch(tasks.size());
    return CompletableFuture.runAsync(() -> {
        tasks.forEach(task -> 
            CompletableFuture.runAsync(() -> {
                try {
                    task.run();
                } finally {
                    latch.countDown();
                }
            }));
        latch.await();
    });
}
| 方法签名 | 说明 | 
|---|---|
void acquire() | 
阻塞获取许可 | 
boolean tryAcquire() | 
非阻塞尝试获取 | 
void release() | 
释放许可 | 
int availablePermits() | 
查询当前可用许可数 | 
| 方法签名 | 说明 | 
|---|---|
void await() | 
阻塞直到计数归零 | 
boolean await(long timeout, TimeUnit unit) | 
带超时的等待 | 
void countDown() | 
计数减一 | 
long getCount() | 
获取当前计数值 | 
本文共包含代码示例15个,流程图2幅,对比表格3个,完整演示了AQS共享锁的实现原理和实战用法。实际应用中请根据具体场景调整参数和异常处理逻辑。 “`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。