怎么使用RedisTemplat实现简单的分布式锁

发布时间:2021-11-19 16:25:49 作者:iii
来源:亿速云 阅读:123
# 如何使用RedisTemplate实现简单的分布式锁

## 一、分布式锁概述

### 1.1 什么是分布式锁
分布式锁是在分布式系统环境下,控制多个进程/线程对共享资源进行有序访问的同步机制。与单机锁不同,分布式锁需要解决网络延迟、节点故障等分布式环境特有的问题。

### 1.2 分布式锁的应用场景
- 秒杀系统中的库存扣减
- 分布式任务调度
- 防止重复订单提交
- 全局配置更新

### 1.3 分布式锁的基本要求
1. **互斥性**:同一时刻只有一个客户端能持有锁
2. **防死锁**:即使客户端崩溃,锁也能自动释放
3. **高可用**:锁服务需要具备高可用性
4. **可重入**(可选):同一客户端可多次获取同一把锁

## 二、Redis实现分布式锁的原理

### 2.1 基于SETNX的实现
Redis的`SETNX`(SET if Not eXists)命令是早期常用的实现方式:
```java
SETNX lock_key unique_value

当key不存在时设置成功返回1,否则返回0。

2.2 Redis 2.6.12+的改进方案

Redis 2.6.12后扩展了SET命令参数:

SET lock_key unique_value NX PX 30000

2.3 Redlock算法

Redis官方推荐的分布式锁算法,需要多个独立的Redis实例协同工作: 1. 获取当前时间 2. 依次向N个Redis实例获取锁 3. 计算获取锁总耗时 4. 当在多数节点上获取成功且耗时小于锁有效期时,认为获取成功

三、Spring RedisTemplate基础

3.1 RedisTemplate简介

RedisTemplate是Spring Data Redis提供的核心类,封装了各种Redis操作,支持自动序列化/反序列化。

3.2 基本配置示例

@Configuration
public class RedisConfig {
    
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        return template;
    }
}

3.3 常用操作方法

方法 说明
opsForValue() 操作String类型
opsForHash() 操作Hash类型
execute(SessionCallback) 执行多个命令

四、基于RedisTemplate的分布式锁实现

4.1 基础实现方案

public class RedisLock {
    
    private final RedisTemplate<String, String> redisTemplate;
    private final String lockKey;
    private String lockValue;
    private final long expireTime;
    
    public RedisLock(RedisTemplate<String, String> redisTemplate, 
                   String lockKey, long expireTime) {
        this.redisTemplate = redisTemplate;
        this.lockKey = lockKey;
        this.expireTime = expireTime;
    }
    
    public boolean tryLock() {
        lockValue = UUID.randomUUID().toString();
        Boolean success = redisTemplate.opsForValue()
            .setIfAbsent(lockKey, lockValue, expireTime, TimeUnit.MILLISECONDS);
        return Boolean.TRUE.equals(success);
    }
    
    public void unlock() {
        // 确保只删除自己加的锁
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                       "return redis.call('del', KEYS[1]) " +
                       "else return 0 end";
        redisTemplate.execute(
            new DefaultRedisScript<>(script, Long.class),
            Collections.singletonList(lockKey),
            lockValue);
    }
}

4.2 关键点解析

  1. 唯一标识:使用UUID作为value,确保只能释放自己的锁
  2. 原子操作:加锁和设置过期时间必须原子性完成
  3. Lua脚本:解锁时使用Lua保证原子性

4.3 重试机制实现

public boolean tryLock(long waitTime) throws InterruptedException {
    long end = System.currentTimeMillis() + waitTime;
    while (System.currentTimeMillis() < end) {
        if (tryLock()) {
            return true;
        }
        Thread.sleep(100); // 避免CPU空转
    }
    return false;
}

五、生产环境优化方案

5.1 锁续期机制

private final ScheduledExecutorService scheduler = 
    Executors.newScheduledThreadPool(1);

public boolean tryLockWithRenewal() {
    if (tryLock()) {
        // 启动定时任务续期
        scheduler.scheduleAtFixedRate(() -> {
            redisTemplate.expire(lockKey, expireTime, TimeUnit.MILLISECONDS);
        }, expireTime / 3, expireTime / 3, TimeUnit.MILLISECONDS);
        return true;
    }
    return false;
}

5.2 可重入锁实现

public class ReentrantRedisLock {
    
    private final ThreadLocal<Map<String, Integer>> lockCount = 
        ThreadLocal.withInitial(HashMap::new);
    
    public boolean tryLock() {
        Map<String, Integer> counts = lockCount.get();
        Integer count = counts.get(lockKey);
        if (count != null) {
            counts.put(lockKey, count + 1);
            return true;
        }
        if (innerTryLock()) {
            counts.put(lockKey, 1);
            return true;
        }
        return false;
    }
    
    // ...其他方法实现
}

5.3 锁等待队列优化

使用Redis的List结构实现简单等待队列:

public void waitInQueue() {
    String queueNode = UUID.randomUUID().toString();
    redisTemplate.opsForList().rightPush(waitQueueKey, queueNode);
    
    // 轮询检查自己是否到达队列头部
    while (true) {
        String head = redisTemplate.opsForList().index(waitQueueKey, 0);
        if (queueNode.equals(head)) {
            return;
        }
        Thread.sleep(100);
    }
}

六、常见问题与解决方案

6.1 锁过期时间设置

6.2 客户端长时间阻塞

6.3 Redis主从切换问题

七、完整工具类实现

/**
 * 基于RedisTemplate的分布式锁工具类
 */
public class RedisDistributedLock implements AutoCloseable {
    
    private static final Logger logger = LoggerFactory.getLogger(RedisDistributedLock.class);
    private static final String UNLOCK_SCRIPT =
        "if redis.call('get', KEYS[1]) == ARGV[1] then " +
        "return redis.call('del', KEYS[1]) " +
        "else return 0 end";
    
    private final RedisTemplate<String, String> redisTemplate;
    private final String lockKey;
    private final String lockValue;
    private final long expireTime;
    private volatile boolean locked = false;
    private ScheduledFuture<?> renewalTask;
    
    public RedisDistributedLock(RedisTemplate<String, String> redisTemplate,
                              String lockKey, long expireTime) {
        this.redisTemplate = redisTemplate;
        this.lockKey = lockKey;
        this.lockValue = UUID.randomUUID().toString();
        this.expireTime = expireTime;
    }
    
    public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
        long end = System.nanoTime() + unit.toNanos(waitTime);
        while (System.nanoTime() < end) {
            if (tryLock()) {
                return true;
            }
            Thread.sleep(100);
        }
        return false;
    }
    
    public boolean tryLock() {
        Boolean success = redisTemplate.opsForValue()
            .setIfAbsent(lockKey, lockValue, expireTime, TimeUnit.MILLISECONDS);
        locked = Boolean.TRUE.equals(success);
        if (locked) {
            scheduleRenewal();
        }
        return locked;
    }
    
    private void scheduleRenewal() {
        renewalTask = Executors.newSingleThreadScheduledExecutor()
            .scheduleAtFixedRate(() -> {
                try {
                    redisTemplate.expire(lockKey, expireTime, TimeUnit.MILLISECONDS);
                } catch (Exception e) {
                    logger.error("锁续期失败", e);
                }
            }, expireTime / 3, expireTime / 3, TimeUnit.MILLISECONDS);
    }
    
    public void unlock() {
        if (!locked) {
            return;
        }
        try {
            redisTemplate.execute(new DefaultRedisScript<>(UNLOCK_SCRIPT, Long.class),
                Collections.singletonList(lockKey), lockValue);
            if (renewalTask != null) {
                renewalTask.cancel(true);
            }
        } finally {
            locked = false;
        }
    }
    
    @Override
    public void close() {
        unlock();
    }
}

八、测试用例

8.1 单元测试

@SpringBootTest
class RedisDistributedLockTest {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @Test
    void testLockAndUnlock() throws Exception {
        RedisDistributedLock lock = new RedisDistributedLock(
            redisTemplate, "test_lock", 30000);
        
        assertTrue(lock.tryLock());
        try {
            // 模拟业务操作
            Thread.sleep(1000);
        } finally {
            lock.unlock();
        }
        
        // 验证锁已释放
        String value = redisTemplate.opsForValue().get("test_lock");
        assertNull(value);
    }
    
    @Test
    void testConcurrentLock() throws Exception {
        int threadCount = 5;
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        CountDownLatch latch = new CountDownLatch(threadCount);
        AtomicInteger successCount = new AtomicInteger();
        
        for (int i = 0; i < threadCount; i++) {
            executor.execute(() -> {
                try (RedisDistributedLock lock = new RedisDistributedLock(
                    redisTemplate, "concurrent_lock", 1000)) {
                    if (lock.tryLock(2, TimeUnit.SECONDS)) {
                        successCount.incrementAndGet();
                        Thread.sleep(300); // 模拟业务操作
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    latch.countDown();
                }
            });
        }
        
        latch.await();
        assertEquals(1, successCount.get());
    }
}

九、与其他方案的对比

9.1 与Zookeeper对比

特性 Redis Zookeeper
性能
一致性 最终一致 强一致
实现复杂度 简单 中等
适用场景 高性能要求 强一致性要求

9.2 与数据库锁对比

Redis锁相比数据库锁: - 性能高数个数量级 - 不增加数据库负担 - 实现更简单

十、总结与最佳实践

10.1 实施建议

  1. 根据业务特点选择合适的过期时间
  2. 必须实现锁的自动释放
  3. 生产环境建议使用Redlock算法
  4. 添加完善的监控和告警

10.2 注意事项

10.3 未来演进方向

  1. 与Spring AOP集成实现注解式锁
  2. 结合Redisson等成熟框架
  3. 实现多级锁(本地锁+分布式锁)

通过本文的实现方案,可以在Spring项目中快速构建基于Redis的分布式锁功能,满足大多数分布式场景下的同步需求。实际生产环境中,建议根据具体业务特点进行适当调整和优化。 “`

推荐阅读:
  1. 单实例redis分布式锁的简单实现
  2. 分布式锁简单入门以及三种实现方式介绍

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

redis

上一篇:如何理解虚拟化及云计算的相关知识

下一篇:Python怎么实现位图分割的效果

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》