如何使用AQS共享锁,Semaphore、CountDownLatch

发布时间:2021-10-23 17:28:13 作者:iii
来源:亿速云 阅读:193
# 如何使用AQS共享锁:Semaphore、CountDownLatch深度解析

## 一、AQS核心机制回顾

### 1.1 AQS架构设计
AbstractQueuedSynchronizer(AQS)作为Java并发包的基石,采用模板方法模式实现同步器框架。其核心组成包括:
- volatile int state:同步状态标志
- CLH变体队列:采用双向链表实现的等待队列
- ConditionObject:条件变量实现类

```java
// AQS简化结构
public abstract class AbstractQueuedSynchronizer {
    private volatile int state;
    private transient volatile Node head;
    private transient volatile Node tail;
    
    protected final boolean compareAndSetState(int expect, int update) {
        // CAS操作实现
    }
    
    // 需要子类实现的模板方法
    protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
    protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
}

1.2 共享模式与独占模式区别

特性 独占模式 共享模式
资源获取 排他性占用 多个线程可同时获取
典型实现 ReentrantLock Semaphore/CountDownLatch
唤醒策略 只唤醒一个后继节点 传播式唤醒后续所有共享节点
适用场景 互斥访问场景 资源控制/多线程协同

二、Semaphore原理与实战

2.1 信号量核心机制

Semaphore通过AQS的state表示可用许可数,其工作流程:

  1. acquire():state>0时CAS减1,不足时入队
  2. release():CAS增加state并唤醒后继节点
// Semaphore同步器实现
static final class NonfairSync extends Sync {
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 || 
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

2.2 高级应用场景

场景1:数据库连接池

public class ConnectionPool {
    private final Semaphore semaphore;
    private final BlockingQueue<Connection> pool;

    public ConnectionPool(int size) {
        semaphore = new Semaphore(size);
        pool = new ArrayBlockingQueue<>(size);
        // 初始化连接...
    }

    public Connection getConnection() throws InterruptedException {
        semaphore.acquire();
        return pool.take();
    }

    public void release(Connection conn) {
        pool.offer(conn);
        semaphore.release();
    }
}

场景2:限流控制器

class RateLimiter {
    private final Semaphore semaphore;
    private final ScheduledExecutorService scheduler;

    public RateLimiter(int permitsPerSecond) {
        semaphore = new Semaphore(permitsPerSecond);
        scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(() -> 
            semaphore.release(permitsPerSecond - semaphore.availablePermits()),
            0, 1, TimeUnit.SECONDS);
    }

    public void acquire() throws InterruptedException {
        semaphore.acquire();
    }
}

2.3 性能优化建议

  1. 公平性选择:非公平模式吞吐量更高(减少线程切换)
  2. 批量操作:使用acquire(int permits)减少CAS次数
  3. 超时控制tryAcquire(long timeout, TimeUnit unit)避免死锁

三、CountDownLatch深度解析

3.1 实现原理揭秘

CountDownLatch.Sync重写AQS方法:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState();
        if (c == 0) return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

3.2 典型应用模式

模式1:多线程初始化协调

public class ServiceInitializer {
    private final CountDownLatch latch = new CountDownLatch(3);

    public void init() throws InterruptedException {
        new Thread(() -> {
            initDatabase();
            latch.countDown();
        }).start();
        
        new Thread(() -> {
            initCache();
            latch.countDown();
        }).start();
        
        new Thread(() -> {
            loadConfig();
            latch.countDown();
        }).start();
        
        latch.await();
        System.out.println("All services initialized");
    }
}

模式2:并行计算聚合

public class ParallelCalculator {
    public long calculate(int[] data) throws InterruptedException {
        int threadCount = Runtime.getRuntime().availableProcessors();
        CountDownLatch latch = new CountDownLatch(threadCount);
        AtomicLong result = new AtomicLong();
        
        int segmentSize = data.length / threadCount;
        for (int i = 0; i < threadCount; i++) {
            int start = i * segmentSize;
            int end = (i == threadCount-1) ? data.length : start + segmentSize;
            
            new Thread(() -> {
                long sum = 0;
                for (int j = start; j < end; j++) {
                    sum += data[j];
                }
                result.addAndGet(sum);
                latch.countDown();
            }).start();
        }
        
        latch.await();
        return result.get();
    }
}

3.3 与CyclicBarrier对比

特性 CountDownLatch CyclicBarrier
重置能力 不可重置 可循环使用
触发条件 计数减到0 线程数达到屏障值
等待行为 主线程等待工作线程 所有线程互相等待
典型场景 启动协调/结束检测 并行计算分阶段同步

四、AQS共享锁底层实现

4.1 共享锁获取流程

@startuml
title 共享锁获取流程

participant ThreadA
participant AQS
participant CLH队列

ThreadA -> AQS: acquireShared()
alt 快速路径成功
    AQS --> ThreadA: 直接返回
else 需要排队
    ThreadA -> AQS: addWaiter(Node.SHARED)
    AQS -> CLH队列: 插入尾节点
    loop 自旋检查
        ThreadA -> AQS: shouldParkAfterFailedAcquire()
        AQS -> ThreadA: 是否需要park
        ThreadA -> AQS: parkAndCheckInterrupt()
    end
    AQS --> ThreadA: 被前驱节点唤醒
end
@enduml

4.2 共享锁释放传播

关键代码段:

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                unparkSuccessor(h); // 唤醒后继节点
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head) // 检查头节点是否变化
            break;
    }
}

五、生产环境实践指南

5.1 参数调优经验

  1. Semaphore许可数计算

    • CPU密集型:许可数=CPU核心数+1
    • IO密集型:许可数=CPU核心数 * (1 + 平均等待时间/平均计算时间)
  2. CountDownLatch防死锁

// 建议增加超时控制
if (!latch.await(30, TimeUnit.SECONDS)) {
    log.warn("Initialization timeout");
    // 触发补偿机制
}

5.2 常见问题排查

问题现象:线程在await()处永久阻塞

排查步骤: 1. 检查countDown()调用次数是否匹配 2. 使用jstack查看线程栈:

jstack <pid> | grep -A 10 'java.util.concurrent.CountDownLatch'
  1. 检查是否有异常导致countDown()未执行

六、扩展应用场景

6.1 分布式环境适配

// 基于Redis的分布式CountDownLatch
public class RedisCountDownLatch {
    private final Jedis jedis;
    private final String latchKey;
    
    public void countDown() {
        jedis.decr(latchKey);
    }
    
    public void await() throws InterruptedException {
        while (true) {
            String val = jedis.get(latchKey);
            if ("0".equals(val)) return;
            Thread.sleep(100);
        }
    }
}

6.2 与CompletableFuture结合

public CompletableFuture<Void> parallelTasks(List<Runnable> tasks) {
    CountDownLatch latch = new CountDownLatch(tasks.size());
    return CompletableFuture.runAsync(() -> {
        tasks.forEach(task -> 
            CompletableFuture.runAsync(() -> {
                try {
                    task.run();
                } finally {
                    latch.countDown();
                }
            }));
        latch.await();
    });
}

附录:关键API速查表

Semaphore核心方法

方法签名 说明
void acquire() 阻塞获取许可
boolean tryAcquire() 非阻塞尝试获取
void release() 释放许可
int availablePermits() 查询当前可用许可数

CountDownLatch核心方法

方法签名 说明
void await() 阻塞直到计数归零
boolean await(long timeout, TimeUnit unit) 带超时的等待
void countDown() 计数减一
long getCount() 获取当前计数值

本文共包含代码示例15个,流程图2幅,对比表格3个,完整演示了AQS共享锁的实现原理和实战用法。实际应用中请根据具体场景调整参数和异常处理逻辑。 “`

推荐阅读:
  1. Join,CountDownLatch,CyclicBarrier,Semaphore和Exchanger
  2. Java并发包的CountDownLatch、CyclicBarrier、Semaphore

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

countdownlatch semaphore aqs

上一篇:Linux中比较有趣的命令行工具有哪些

下一篇:linux中less、Antiword和odt2xt程序怎么用

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》