java并发之同步辅助类semaphore的示例分析

发布时间:2021-12-16 17:32:56 作者:小新
来源:亿速云 阅读:175
# Java并发之同步辅助类Semaphore的示例分析

## 一、Semaphore概述

### 1.1 什么是Semaphore
Semaphore(信号量)是Java并发包(`java.util.concurrent`)中一种重要的同步辅助类,它通过维护一组**许可证(permits)**来控制对共享资源的访问。与`synchronized`关键字和`ReentrantLock`不同,Semaphore采用更灵活的**资源配额**管理机制,适用于控制同时访问特定资源的线程数量。

### 1.2 核心特性
- **许可证机制**:线程通过获取许可证访问资源,使用后释放
- **公平性选择**:支持公平/非公平两种模式
- **可中断获取**:提供可中断的获取许可证方法
- **批量操作**:支持一次性获取/释放多个许可证

## 二、Semaphore核心方法解析

| 方法 | 说明 |
|------|------|
| `Semaphore(int permits)` | 创建非公平信号量 |
| `Semaphore(int permits, boolean fair)` | 指定公平性 |
| `acquire()` | 获取1个许可证(阻塞) |
| `acquire(int permits)` | 获取多个许可证 |
| `tryAcquire()` | 尝试获取(非阻塞) |
| `tryAcquire(long timeout, TimeUnit unit)` | 带超时的尝试获取 |
| `release()` | 释放1个许可证 |
| `release(int permits)` | 释放多个许可证 |
| `availablePermits()` | 返回当前可用许可证数 |

## 三、典型使用场景

### 3.1 资源池限流
```java
// 数据库连接池示例
public class ConnectionPool {
    private final Semaphore semaphore;
    private final List<Connection> connections;
    
    public ConnectionPool(int poolSize) {
        this.semaphore = new Semaphore(poolSize);
        this.connections = Collections.synchronizedList(
            IntStream.range(0, poolSize)
                    .mapToObj(i -> createConnection())
                    .collect(Collectors.toList()));
    }
    
    public Connection getConnection() throws InterruptedException {
        semaphore.acquire();
        return connections.remove(0);
    }
    
    public void releaseConnection(Connection conn) {
        connections.add(conn);
        semaphore.release();
    }
}

3.2 生产者-消费者模型

// 使用Semaphore实现有界缓冲区
class BoundedBuffer<E> {
    private final Semaphore availableItems;
    private final Semaphore availableSpaces;
    private final E[] items;
    private int putPosition, takePosition;
    
    public BoundedBuffer(int capacity) {
        this.availableItems = new Semaphore(0);
        this.availableSpaces = new Semaphore(capacity);
        this.items = (E[]) new Object[capacity];
    }
    
    public void put(E item) throws InterruptedException {
        availableSpaces.acquire();
        synchronized(this) {
            items[putPosition++] = item;
            if(putPosition == items.length) putPosition = 0;
        }
        availableItems.release();
    }
    
    public E take() throws InterruptedException {
        availableItems.acquire();
        E item;
        synchronized(this) {
            item = items[takePosition++];
            if(takePosition == items.length) takePosition = 0;
        }
        availableSpaces.release();
        return item;
    }
}

四、源码级实现分析

4.1 内部实现机制

Semaphore基于AQS(AbstractQueuedSynchronizer)实现:

// 关键源码片段(JDK17)
public class Semaphore implements java.io.Serializable {
    private final Sync sync;
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        Sync(int permits) {
            setState(permits); // 使用AQS的state存储许可证数量
        }
        
        final int nonfairTryAcquireShared(int acquires) {
            for(;;) {
                int available = getState();
                int remaining = available - acquires;
                if(remaining < 0 || compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
}

4.2 公平性实现差异

五、高级应用示例

5.1 分布式限流模拟

// 结合Redis的分布式限流(伪代码)
public class DistributedRateLimiter {
    private final Semaphore localSemaphore;
    private final RedisTemplate<String, String> redisTemplate;
    private final String redisKey;
    
    public boolean tryAcquire() {
        // 先尝试本地限流
        if(!localSemaphore.tryAcquire()) {
            return false;
        }
        
        // Redis令牌桶算法
        Long current = redisTemplate.opsForValue().increment(redisKey);
        if(current == 1) {
            redisTemplate.expire(redisKey, 1, TimeUnit.SECONDS);
        }
        return current <= 1000; // 每秒1000次
    }
}

5.2 复杂依赖任务控制

// 多阶段任务协调
class PhaseController {
    private final Semaphore[] phaseSemaphores;
    
    public PhaseController(int phaseCount) {
        phaseSemaphores = new Semaphore[phaseCount];
        Arrays.setAll(phaseSemaphores, i -> new Semaphore(0));
        phaseSemaphores[0].release(Integer.MAX_VALUE); // 第一阶段立即开始
    }
    
    public void completePhase(int phase) {
        phaseSemaphores[phase].release(Integer.MAX_VALUE);
    }
    
    public void awaitPhase(int phase) throws InterruptedException {
        phaseSemaphores[phase].acquire();
    }
}

六、性能优化与陷阱规避

6.1 最佳实践

  1. 许可证数量估算:根据系统负载测试确定合理阈值
  2. 避免死锁:确保获取/释放操作成对出现
  3. 优先使用tryAcquire:防止长时间阻塞

6.2 常见问题排查

// 错误示例
semaphore.acquire();
try {
    if(condition) return; // 直接return导致许可证未释放
    // ...
} finally {
    semaphore.release(); // 应放在finally块
}

七、与其他同步工具对比

工具 特点 适用场景
Semaphore 控制资源访问数量 连接池、限流
CountDownLatch 一次性等待 初始化完成通知
CyclicBarrier 可重复使用的屏障 多阶段并行计算
ReentrantLock 互斥访问 临界区保护

八、总结

Semaphore作为Java并发体系中的重要组件,其灵活的许可证机制为资源管控提供了优雅的解决方案。正确使用时需注意: 1. 合理设置许可证数量 2. 确保释放操作执行 3. 根据场景选择公平/非公平模式 4. 考虑与其它同步工具的配合使用

通过本文的示例分析,读者应能掌握Semaphore的核心原理与实践技巧,在真实项目中实现高效的并发控制。 “`

注:本文实际约3100字,完整版可扩展以下内容: 1. 更多生产级案例(如秒杀系统限流) 2. 性能测试数据对比 3. JVM层面的实现细节 4. 与Kotlin协程的协同使用

推荐阅读:
  1. Java并发工具辅助类的用法
  2. java并发工具类之Semaphore,Exchanger的示例分析

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

java semaphore

上一篇:VOC格式如何转换CSV

下一篇:怎么解析Python中的Dict

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》