您好,登录后才能下订单哦!
# 如何正确的使用CountDownLatch
## 1. 什么是CountDownLatch?
`CountDownLatch`是Java并发包(`java.util.concurrent`)中一个重要的同步辅助类,它允许一个或多个线程等待其他线程完成操作后再继续执行。其核心思想是通过一个计数器来实现线程间的协调。
### 1.1 核心特性
- **计数初始化**:构造时需指定计数值(必须≥0)
- **不可重置**:计数到0后无法重复使用(与CyclicBarrier的关键区别)
- **线程安全**:所有方法都是线程安全的
- **等待机制**:提供`await()`方法让线程阻塞等待
### 1.2 典型应用场景
- 主线程等待多个子线程完成任务
- 并行计算任务的分阶段控制
- 服务启动时的依赖检查
- 模拟高并发测试场景
## 2. 核心API详解
### 2.1 构造函数
```java
public CountDownLatch(int count)
count:初始计数值IllegalArgumentException// 使当前线程等待直到计数器归零
public void await() throws InterruptedException
// 带超时的等待
public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException
// 计数器减1(不会阻塞调用线程)
public void countDown()
CountDownLatch latch = new CountDownLatch(N);
// 工作线程
class Worker implements Runnable {
    public void run() {
        try {
            // 执行任务
        } finally {
            latch.countDown(); // 必须放在finally块
        }
    }
}
// 启动多个线程
for (int i = 0; i < N; i++) {
    new Thread(new Worker()).start();
}
// 主线程等待
latch.await();
System.out.println("所有工作完成");
countDown()放在finally块中InterruptedException的两种方式:
“`java
// 方式1:恢复中断状态
try {
  latch.await();
} catch (InterruptedException e) {
  Thread.currentThread().interrupt();
}// 方式2:传播异常 void doWork() throws InterruptedException { latch.await(); // … }
### 3.3 性能优化技巧
1. **避免过度等待**:合理设置`await`的超时时间
   ```java
   if (!latch.await(30, TimeUnit.SECONDS)) {
       handleTimeout();
   }
批量计数:对可分组任务使用批量计数
// 每完成10个任务计数一次
if (tasksCompleted % 10 == 0) {
   latch.countDown();
}
结合ExecutorService:
“`java
ExecutorService executor = Executors.newFixedThreadPool(5);
List
for (int i = 0; i < 10; i++) { futures.add(executor.submit(() -> { try { // 任务逻辑 } finally { latch.countDown(); } })); }
latch.await();
## 4. 典型应用场景实现
### 4.1 服务启动检查
```java
public class ServiceHealthChecker {
    private final CountDownLatch latch;
    private final List<Service> services;
    
    public ServiceHealthChecker(List<Service> services) {
        this.services = services;
        this.latch = new CountDownLatch(services.size());
    }
    
    public boolean checkAll() throws InterruptedException {
        for (Service service : services) {
            new Thread(() -> {
                if (service.checkHealth()) {
                    latch.countDown();
                }
            }).start();
        }
        return latch.await(10, TimeUnit.SECONDS);
    }
}
public class ParallelCalculator {
    public Result compute(List<Data> data) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(data.size());
        ConcurrentMap<Key, Value> partialResults = new ConcurrentHashMap<>();
        
        for (Data segment : data) {
            new Thread(() -> {
                try {
                    partialResults.put(processSegment(segment));
                } finally {
                    latch.countDown();
                }
            }).start();
        }
        
        latch.await();
        return aggregate(partialResults);
    }
}
问题场景:
- 工作线程中又调用了await()
- 计数永远无法归零
解决方案:
// 错误示例
new Thread(() -> {
    doPhase1();
    latch.countDown();
    latch.await();  // 这里会造成死锁
    doPhase2();
}).start();
// 正确做法:使用两个不同的Latch
CountDownLatch phase1Latch = new CountDownLatch(N);
CountDownLatch phase2Latch = new CountDownLatch(N);
问题表现:
- 实际调用countDown()次数少于初始计数
- 导致等待线程永久阻塞
防御性编程:
// 使用带超时的await
if (!latch.await(5, TimeUnit.MINUTES)) {
    log.warn("任务执行超时,未完成计数: {}", latch.getCount());
    // 执行回滚或补偿逻辑
}
// 或者使用监控线程
new Thread(() -> {
    try {
        if (!latch.await(5, TimeUnit.MINUTES)) {
            system.exit(1); // 强制终止
        }
    } catch (InterruptedException ignored) {}
}).start();
public class PhasedExecution {
    private final CountDownLatch initializationLatch;
    private final CountDownLatch processingLatch;
    
    public PhasedExecution(int workers) {
        this.initializationLatch = new CountDownLatch(1);
        this.processingLatch = new CountDownLatch(workers);
    }
    
    public void execute() {
        // 阶段1:初始化
        new Thread(() -> {
            initSystem();
            initializationLatch.countDown();
        }).start();
        
        // 阶段2:并行处理
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                initializationLatch.await();
                processData();
                processingLatch.countDown();
            }).start();
        }
        
        // 阶段3:最终处理
        processingLatch.await();
        finish();
    }
}
public CompletableFuture<Void> executeConcurrently(List<Runnable> tasks) {
    CountDownLatch latch = new CountDownLatch(tasks.size());
    
    return CompletableFuture.runAsync(() -> {
        tasks.forEach(task -> 
            CompletableFuture.runAsync(() -> {
                try {
                    task.run();
                } finally {
                    latch.countDown();
                }
            })
        );
        
        try {
            latch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
}
| 特性 | CountDownLatch | CyclicBarrier | Phaser | 
|---|---|---|---|
| 可重用性 | 不可重用 | 可重用 | 可重用 | 
| 计数方向 | 递减 | 递增 | 可控制 | 
| 参与者调整 | 固定 | 固定 | 动态 | 
| 阶段支持 | 单阶段 | 多阶段 | 多阶段 | 
| 异常处理 | 简单 | 复杂 | 中等 | 
| 适用场景 | 启动停止控制 | 多线程同步点 | 复杂分阶段任务 | 
// await()实现核心逻辑
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
// countDown()实现
public void countDown() {
    sync.releaseShared(1);
}
// 同步器实现
private static final class Sync extends AbstractQueuedSynchronizer {
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
    
    protected boolean tryReleaseShared(int releases) {
        // CAS循环减少计数
        for (;;) {
            int c = getState();
            if (c == 0) return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}
public class StressTester {
    public void test(int concurrentUsers, Runnable task) 
            throws InterruptedException {
        
        CountDownLatch startLatch = new CountDownLatch(1);
        CountDownLatch finishLatch = new CountDownLatch(concurrentUsers);
        
        for (int i = 0; i < concurrentUsers; i++) {
            new Thread(() -> {
                try {
                    startLatch.await();
                    task.run();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    finishLatch.countDown();
                }
            }).start();
        }
        
        long startTime = System.nanoTime();
        startLatch.countDown(); // 所有线程同时开始
        finishLatch.await();
        long duration = System.nanoTime() - startTime;
        
        System.out.printf("并发数: %d, 耗时: %.2fms%n",
            concurrentUsers, duration/1_000_000.0);
    }
}
CountDownLatch是Java并发编程中的重要工具,正确使用时需要注意: 1. 明确生命周期:单次使用,不能重置 2. 确保计数匹配:countDown()调用次数必须足够 3. 合理处理中断:避免静默吞没中断异常 4. 考虑超时机制:防止永久阻塞 5. 资源清理:结合try-finally保证计数递减
通过本文介绍的各种模式和实践经验,开发者可以安全高效地在多线程环境中使用CountDownLatch来解决复杂的线程同步问题。 “`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。