您好,登录后才能下订单哦!
# 如何正确的使用CountDownLatch
## 1. 什么是CountDownLatch?
`CountDownLatch`是Java并发包(`java.util.concurrent`)中一个重要的同步辅助类,它允许一个或多个线程等待其他线程完成操作后再继续执行。其核心思想是通过一个计数器来实现线程间的协调。
### 1.1 核心特性
- **计数初始化**:构造时需指定计数值(必须≥0)
- **不可重置**:计数到0后无法重复使用(与CyclicBarrier的关键区别)
- **线程安全**:所有方法都是线程安全的
- **等待机制**:提供`await()`方法让线程阻塞等待
### 1.2 典型应用场景
- 主线程等待多个子线程完成任务
- 并行计算任务的分阶段控制
- 服务启动时的依赖检查
- 模拟高并发测试场景
## 2. 核心API详解
### 2.1 构造函数
```java
public CountDownLatch(int count)
count
:初始计数值IllegalArgumentException
// 使当前线程等待直到计数器归零
public void await() throws InterruptedException
// 带超时的等待
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException
// 计数器减1(不会阻塞调用线程)
public void countDown()
CountDownLatch latch = new CountDownLatch(N);
// 工作线程
class Worker implements Runnable {
public void run() {
try {
// 执行任务
} finally {
latch.countDown(); // 必须放在finally块
}
}
}
// 启动多个线程
for (int i = 0; i < N; i++) {
new Thread(new Worker()).start();
}
// 主线程等待
latch.await();
System.out.println("所有工作完成");
countDown()
放在finally
块中InterruptedException
的两种方式:
“`java
// 方式1:恢复中断状态
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}// 方式2:传播异常 void doWork() throws InterruptedException { latch.await(); // … }
### 3.3 性能优化技巧
1. **避免过度等待**:合理设置`await`的超时时间
```java
if (!latch.await(30, TimeUnit.SECONDS)) {
handleTimeout();
}
批量计数:对可分组任务使用批量计数
// 每完成10个任务计数一次
if (tasksCompleted % 10 == 0) {
latch.countDown();
}
结合ExecutorService:
“`java
ExecutorService executor = Executors.newFixedThreadPool(5);
List
for (int i = 0; i < 10; i++) { futures.add(executor.submit(() -> { try { // 任务逻辑 } finally { latch.countDown(); } })); }
latch.await();
## 4. 典型应用场景实现
### 4.1 服务启动检查
```java
public class ServiceHealthChecker {
private final CountDownLatch latch;
private final List<Service> services;
public ServiceHealthChecker(List<Service> services) {
this.services = services;
this.latch = new CountDownLatch(services.size());
}
public boolean checkAll() throws InterruptedException {
for (Service service : services) {
new Thread(() -> {
if (service.checkHealth()) {
latch.countDown();
}
}).start();
}
return latch.await(10, TimeUnit.SECONDS);
}
}
public class ParallelCalculator {
public Result compute(List<Data> data) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(data.size());
ConcurrentMap<Key, Value> partialResults = new ConcurrentHashMap<>();
for (Data segment : data) {
new Thread(() -> {
try {
partialResults.put(processSegment(segment));
} finally {
latch.countDown();
}
}).start();
}
latch.await();
return aggregate(partialResults);
}
}
问题场景:
- 工作线程中又调用了await()
- 计数永远无法归零
解决方案:
// 错误示例
new Thread(() -> {
doPhase1();
latch.countDown();
latch.await(); // 这里会造成死锁
doPhase2();
}).start();
// 正确做法:使用两个不同的Latch
CountDownLatch phase1Latch = new CountDownLatch(N);
CountDownLatch phase2Latch = new CountDownLatch(N);
问题表现:
- 实际调用countDown()
次数少于初始计数
- 导致等待线程永久阻塞
防御性编程:
// 使用带超时的await
if (!latch.await(5, TimeUnit.MINUTES)) {
log.warn("任务执行超时,未完成计数: {}", latch.getCount());
// 执行回滚或补偿逻辑
}
// 或者使用监控线程
new Thread(() -> {
try {
if (!latch.await(5, TimeUnit.MINUTES)) {
system.exit(1); // 强制终止
}
} catch (InterruptedException ignored) {}
}).start();
public class PhasedExecution {
private final CountDownLatch initializationLatch;
private final CountDownLatch processingLatch;
public PhasedExecution(int workers) {
this.initializationLatch = new CountDownLatch(1);
this.processingLatch = new CountDownLatch(workers);
}
public void execute() {
// 阶段1:初始化
new Thread(() -> {
initSystem();
initializationLatch.countDown();
}).start();
// 阶段2:并行处理
for (int i = 0; i < 5; i++) {
new Thread(() -> {
initializationLatch.await();
processData();
processingLatch.countDown();
}).start();
}
// 阶段3:最终处理
processingLatch.await();
finish();
}
}
public CompletableFuture<Void> executeConcurrently(List<Runnable> tasks) {
CountDownLatch latch = new CountDownLatch(tasks.size());
return CompletableFuture.runAsync(() -> {
tasks.forEach(task ->
CompletableFuture.runAsync(() -> {
try {
task.run();
} finally {
latch.countDown();
}
})
);
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
特性 | CountDownLatch | CyclicBarrier | Phaser |
---|---|---|---|
可重用性 | 不可重用 | 可重用 | 可重用 |
计数方向 | 递减 | 递增 | 可控制 |
参与者调整 | 固定 | 固定 | 动态 |
阶段支持 | 单阶段 | 多阶段 | 多阶段 |
异常处理 | 简单 | 复杂 | 中等 |
适用场景 | 启动停止控制 | 多线程同步点 | 复杂分阶段任务 |
// await()实现核心逻辑
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// countDown()实现
public void countDown() {
sync.releaseShared(1);
}
// 同步器实现
private static final class Sync extends AbstractQueuedSynchronizer {
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// CAS循环减少计数
for (;;) {
int c = getState();
if (c == 0) return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
public class StressTester {
public void test(int concurrentUsers, Runnable task)
throws InterruptedException {
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch finishLatch = new CountDownLatch(concurrentUsers);
for (int i = 0; i < concurrentUsers; i++) {
new Thread(() -> {
try {
startLatch.await();
task.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
finishLatch.countDown();
}
}).start();
}
long startTime = System.nanoTime();
startLatch.countDown(); // 所有线程同时开始
finishLatch.await();
long duration = System.nanoTime() - startTime;
System.out.printf("并发数: %d, 耗时: %.2fms%n",
concurrentUsers, duration/1_000_000.0);
}
}
CountDownLatch是Java并发编程中的重要工具,正确使用时需要注意: 1. 明确生命周期:单次使用,不能重置 2. 确保计数匹配:countDown()调用次数必须足够 3. 合理处理中断:避免静默吞没中断异常 4. 考虑超时机制:防止永久阻塞 5. 资源清理:结合try-finally保证计数递减
通过本文介绍的各种模式和实践经验,开发者可以安全高效地在多线程环境中使用CountDownLatch来解决复杂的线程同步问题。 “`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。