死磕 java同步系列之redis分布式锁进化史

发布时间:2020-06-27 11:47:36 作者:彤哥读源码
来源:网络 阅读:238

问题

(1)redis如何实现分布式锁?

(2)redis分布式锁有哪些优点?

(3)redis分布式锁有哪些缺点?

(4)redis实现分布式锁有没有现成的轮子可以使用?

简介

Redis(全称:Remote Dictionary Server 远程字典服务)是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。

本章我们将介绍如何基于redis实现分布式锁,并把其实现的进化史从头到尾讲明白,以便大家在面试的时候能讲清楚redis分布式锁的来(忽)龙(悠)去(考)脉(官)。

实现锁的条件

基于前面关于锁(分布式锁)的学习,我们知道实现锁的条件有三个:

(1)状态(共享)变量,它是有状态的,这个状态的值标识了是否已经被加锁,在ReentrantLock中是通过控制state的值实现的,在ZookeeperLock中是通过控制子节点来实现的;

(2)队列,它是用来存放排队的线程,在ReentrantLock中是通过AQS的队列实现的,在ZookeeperLock中是通过子节点的有序性实现的;

(3)唤醒,上一个线程释放锁之后唤醒下一个等待的线程,在ReentrantLock中结合AQS的队列释放时自动唤醒下一个线程,在ZookeeperLock中是通过其监听机制来实现的;

那么上面三个条件是不是必要的呢?

其实不然,实现锁的必要条件只有第一个,对共享变量的控制,如果共享变量的值为null就给他设置个值(java中可以使用CAS操作进程内共享变量),如果共享变量有值则不断重复检查其是否有值(重试),待锁内逻辑执行完毕再把共享变量的值设置回null。

说白了,只要有个地方存这个共享变量就行了,而且要保证整个系统(多个进程)内只有这一份即可。

这也是redis实现分布式锁的关键,本文由公从号“彤哥读源码”原创。

redis分布式锁进化史

进化史一——set

既然上面说了实现分布式锁只需要对共享变量控制到位即可,那么redis我们怎么控制这个共享变量呢?

首先,我们知道redis的基础命令有get/set/del,通过这三个命令可以实现分布式锁吗?当然可以。

死磕 java同步系列之redis分布式锁进化史

在获取锁之前先get lock_user_1看这个锁存不存在,如果不存在则再set lock_user_1 value,如果存在则等待一段时间后再重试,最后使用完成了再删除这个锁del lock_user_1即可。

死磕 java同步系列之redis分布式锁进化史

但是,这种方案有个问题,如果一开始这个锁是不存在的,两个线程去同时get,这个时候返回的都是null(nil),然后这两个线程都去set,这时候就出问题了,两个线程都可以set成功,相当于两个线程都获取到同一个锁了。

所以,这种方案不可行!

进化史二——setnx

上面的方案不可行的主要原因是多个线程同时set都是可以成功的,所以后来有了setnx这个命令,它是set if not exist的缩写,也就是如果不存在就set。

死磕 java同步系列之redis分布式锁进化史

可以看到,当重复对同一个key进行setnx的时候,只有第一次是可以成功的。

因此,方案二就是先使用setnx lock_user_1 value命令,如果返回1则表示加锁成功,如果返回0则表示其它线程先执行成功了,那就等待一段时间后重试,最后一样使用del lock_user_1释放锁。

死磕 java同步系列之redis分布式锁进化史

但是,这种方案也有个问题,如果获取锁的这个客户端断线了怎么办?这个锁不是一直都不会释放吗?是的,是这样的。

所以,这种方案也不可行!

进化史三——setnx + setex

上面的方案不可行的主要原因是获取锁之后客户端断线了无法释放锁的问题,那么,我在setnx之后立马再执行setex可以吗?

答案是可以的,2.6.12之前的版本使用redis实现分布式锁大家都是这么玩的。

死磕 java同步系列之redis分布式锁进化史

因此,方案三就是先使用setnx lock_user_1 value命令拿到锁,再立即使用setex lock_user_1 30 value设置过期时间,最后使用del lock_user_1释放锁。

在setnx获取到锁之后再执行setex设置过期时间,这样就很大概率地解决了获取锁之后客户端断线不会释放锁的问题。

但是,这种方案依然有问题,如果setnx之后setex之前这个客户端就断线了呢?嗯~,似乎无解,不过这种概率实在是非常小,所以2.6.12之前的版本大家也都这么用,几乎没出现过什么问题。

所以,这种方案基本可用,只是不太好!

进化史四——set nx ex

上面的方案不太好的主要原因是setnx/setex是两条独立的命令,无法解决前者成功之后客户端断线的问题,那么,把两条命令合在一起不就行了吗?

是的,redis官方也意识到这个问题了,所以2.6.12版本给set命令加了一些参数:

SET key value [EX seconds] [PX milliseconds] [NX|XX]

EX,过期时间,单位秒

PX,过期时间,单位毫秒

NX,not exist,如果不存在才设置成功

XX,exist exist?如果存在才设置成功

通过这个命令我们就再也不怕客户端无故断线了,本文由公从号“彤哥读源码”原创。

死磕 java同步系列之redis分布式锁进化史

因此,方案四就是先使用set lock_user_1 value nx ex 30获取锁,获取锁之后使用,使用完成了最后del lock_user_1释放锁。

然而,这种方案就没有问题吗?

当然有问题,其实这里的释放锁只要简单地执行del lock_user_1即可,并不会检查这个锁是不是当前客户端获取到的。

所以,这种方案还不是很完美。

进化史五——random value + lua script

上面的方案不完美的主要原因是释放锁这里控制的还不是很到位,那么有没有其它方法可以控制释放锁的线程和加锁的线程一定是同一个客户端呢?

redis官方给出的方案是这样的:

 // 加锁
 SET resource_name my_random_value NX PX 30000

 // 释放锁
 if redis.call("get",KEYS[1]) == ARGV[1] then
     return redis.call("del",KEYS[1])
 else
     return 0
 end

加锁的时候,设置随机值,保证这个随机值只有当前客户端自己知道。

释放锁的时候,执行一段lua脚本,把这段lua脚本当成一个完整的命令,先检查这个锁对应的值是不是上面设置的随机值,如果是再执行del释放锁,否则直接返回释放锁失败。

我们知道,redis是单线程的,所以这段lua脚本中的get和del不会存在并发问题,但是不能在java中先get再del,这样会当成两个命令,会有并发问题,lua脚本相当于是一个命令一起传输给redis的。

这种方案算是比较完美了,但是还有一点小缺陷,就是这个过期时间设置成多少合适呢?

设置的过小,有可能上一个线程还没执行完锁内逻辑,锁就自动释放了,导致另一个线程可以获取锁了,就出现并发问题了;

设置的过大,就要考虑客户端断线了,这个锁要等待很长一段时间。

所以,这里又衍生出一个新的问题,过期时间我设置小一点,但是快到期了它能自动续期就好了。

进化史六——redisson(redis2.8+)

上面方案的缺陷是过期时间不好把握,虽然也可以自己启一个监听线程来处理续期,但是代码实在不太好写,好在现成的轮子redisson已经帮我们把这个逻辑都实现好了,我们拿过来直接用就可以了。

而且,redisson充分考虑了redis演化过程中留下的各种问题,单机模式、哨兵模式、集群模式,它统统都处理好了,不管是从单机进化到集群还是从哨兵进化到集群,都只需要简单地修改下配置就可以了,不用改动任何代码,可以说是非(业)常(界)方(良)便(心)。

redisson实现的分布式锁内部使用的是Redlock算法,这是官方推荐的一种算法。

另外,redisson还提供了很多分布式对象(分布式的原子类)、分布式集合(分布式的Map/List/Set/Queue等)、分布式同步器(分布式的CountDownLatch/Semaphore等)、分布式锁(分布式的公平锁/非公平锁/读写锁等),有兴趣的可以去看看,下面贴出链接:

死磕 java同步系列之redis分布式锁进化史

Redlock介绍:https://redis.io/topics/distlock

redisson介绍:https://github.com/redisson/redisson/wiki

代码实现

因为前面五种方案都已经过时,所以彤哥这里偷个懒,就不去一一实现的,我们直接看最后一种redisson的实现方式。

pom.xml文件

添加spring redis及redisson的依赖,我这里使用的是springboot 2.1.6版本,springboot 1.x版本的自己注意下,查看上面的github可以找到方法。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-data-21</artifactId>
    <version>3.11.0</version>
</dependency>
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.11.0</version>
</dependency>

application.yml文件

配置redis的连接信息,彤哥这里给出了三种方式。

spring:
  redis:
    # 单机模式
    #host: 192.168.1.102
    #port: 6379
    # password: <your passowrd>
    timeout: 6000ms  # 连接超时时长(毫秒)
    # 哨兵模式 ,本文由公从号“彤哥读源码”原创
#    sentinel:
#      master: <your master>
#      nodes: 192.168.1.101:6379,192.168.1.102:6379,192.168.1.103:6379
    # 集群模式(三主三从伪集群)
    cluster:
      nodes:
        - 192.168.1.102:30001
        - 192.168.1.102:30002
        - 192.168.1.102:30003
        - 192.168.1.102:30004
        - 192.168.1.102:30005
        - 192.168.1.102:30006

Locker接口

定义Locker接口。

public interface Locker {
    void lock(String key, Runnable command);
}

RedisLocker实现类

直接使用RedissonClient获取锁,注意这里不需要再单独配置RedissonClient这个bean,redisson框架会根据配置自动生成RedissonClient的实例,我们后面说它是怎么实现的。

@Component
public class RedisLocker implements Locker {

    @Autowired
    private RedissonClient redissonClient;

    @Override
    public void lock(String key, Runnable command) {
        RLock lock = redissonClient.getLock(key);
        try {
            // 本文由公从号“彤哥读源码”原创
            lock.lock();
            command.run();
        } finally {
            lock.unlock();
        }
    }
}

测试类

启动1000个线程,每个线程内部打印一句话,然后睡眠1秒。


@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class RedisLockerTest {

    @Autowired
    private Locker locker;

    @Test
    public void testRedisLocker() throws IOException {
        for (int i = 0; i < 1000; i++) {
            new Thread(()->{
                locker.lock("lock", ()-> {
                    // 可重入锁测试
                    locker.lock("lock", ()-> {
                        System.out.println(String.format("time: %d, threadName: %s", System.currentTimeMillis(), Thread.currentThread().getName()));
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    });
                });
            }, "Thread-"+i).start();
        }

        System.in.read();
    }
}

运行结果:

可以看到稳定在1000ms左右打印一句话,说明这个锁是可用的,而且是可重入的。

time: 1570100167046, threadName: Thread-756
time: 1570100168067, threadName: Thread-670
time: 1570100169080, threadName: Thread-949
time: 1570100170093, threadName: Thread-721
time: 1570100171106, threadName: Thread-937
time: 1570100172124, threadName: Thread-796
time: 1570100173134, threadName: Thread-944
time: 1570100174142, threadName: Thread-974
time: 1570100175167, threadName: Thread-462
time: 1570100176180, threadName: Thread-407
time: 1570100177194, threadName: Thread-983
time: 1570100178206, threadName: Thread-982
...

RedissonAutoConfiguration

刚才说RedissonClient不需要配置,其实它是在RedissonAutoConfiguration中自动配置的,我们简单看下它的源码,主要看redisson()这个方法:


@Configuration
@ConditionalOnClass({Redisson.class, RedisOperations.class})
@AutoConfigureBefore(RedisAutoConfiguration.class)
@EnableConfigurationProperties({RedissonProperties.class, RedisProperties.class})
public class RedissonAutoConfiguration {

    @Autowired
    private RedissonProperties redissonProperties;

    @Autowired
    private RedisProperties redisProperties;

    @Autowired
    private ApplicationContext ctx;

    @Bean
    @ConditionalOnMissingBean(name = "redisTemplate")
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<Object, Object> template = new RedisTemplate<Object, Object>();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }

    @Bean
    @ConditionalOnMissingBean(StringRedisTemplate.class)
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
        StringRedisTemplate template = new StringRedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }

    @Bean
    @ConditionalOnMissingBean(RedisConnectionFactory.class)
    public RedissonConnectionFactory redissonConnectionFactory(RedissonClient redisson) {
        return new RedissonConnectionFactory(redisson);
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnMissingBean(RedissonClient.class)
    public RedissonClient redisson() throws IOException {
        Config config = null;
        Method clusterMethod = ReflectionUtils.findMethod(RedisProperties.class, "getCluster");
        Method timeoutMethod = ReflectionUtils.findMethod(RedisProperties.class, "getTimeout");
        Object timeoutValue = ReflectionUtils.invokeMethod(timeoutMethod, redisProperties);
        int timeout;
        if(null == timeoutValue){
            // 超时未设置则为0
            timeout = 0;
        }else if (!(timeoutValue instanceof Integer)) {
            // 转毫秒
            Method millisMethod = ReflectionUtils.findMethod(timeoutValue.getClass(), "toMillis");
            timeout = ((Long) ReflectionUtils.invokeMethod(millisMethod, timeoutValue)).intValue();
        } else {
            timeout = (Integer)timeoutValue;
        }

        // 看下是否给redisson单独写了一个配置文件
        if (redissonProperties.getConfig() != null) {
            try {
                InputStream is = getConfigStream();
                config = Config.fromJSON(is);
            } catch (IOException e) {
                // trying next format
                try {
                    InputStream is = getConfigStream();
                    config = Config.fromYAML(is);
                } catch (IOException e1) {
                    throw new IllegalArgumentException("Can't parse config", e1);
                }
            }
        } else if (redisProperties.getSentinel() != null) {
            // 如果是哨兵模式
            Method nodesMethod = ReflectionUtils.findMethod(Sentinel.class, "getNodes");
            Object nodesValue = ReflectionUtils.invokeMethod(nodesMethod, redisProperties.getSentinel());

            String[] nodes;
            // 看sentinel.nodes这个节点是列表配置还是逗号隔开的配置
            if (nodesValue instanceof String) {
                nodes = convert(Arrays.asList(((String)nodesValue).split(",")));
            } else {
                nodes = convert((List<String>)nodesValue);
            }

            // 生成哨兵模式的配置
            config = new Config();
            config.useSentinelServers()
                .setMasterName(redisProperties.getSentinel().getMaster())
                .addSentinelAddress(nodes)
                .setDatabase(redisProperties.getDatabase())
                .setConnectTimeout(timeout)
                .setPassword(redisProperties.getPassword());
        } else if (clusterMethod != null && ReflectionUtils.invokeMethod(clusterMethod, redisProperties) != null) {
            // 如果是集群模式
            Object clusterObject = ReflectionUtils.invokeMethod(clusterMethod, redisProperties);
            Method nodesMethod = ReflectionUtils.findMethod(clusterObject.getClass(), "getNodes");
            // 集群模式的cluster.nodes是列表配置
            List<String> nodesObject = (List) ReflectionUtils.invokeMethod(nodesMethod, clusterObject);

            String[] nodes = convert(nodesObject);

            // 生成集群模式的配置
            config = new Config();
            config.useClusterServers()
                .addNodeAddress(nodes)
                .setConnectTimeout(timeout)
                .setPassword(redisProperties.getPassword());
        } else {
            // 单机模式的配置
            config = new Config();
            String prefix = "redis://";
            Method method = ReflectionUtils.findMethod(RedisProperties.class, "isSsl");
            // 判断是否走ssl
            if (method != null && (Boolean)ReflectionUtils.invokeMethod(method, redisProperties)) {
                prefix = "rediss://";
            }

            // 生成单机模式的配置
            config.useSingleServer()
                .setAddress(prefix + redisProperties.getHost() + ":" + redisProperties.getPort())
                .setConnectTimeout(timeout)
                .setDatabase(redisProperties.getDatabase())
                .setPassword(redisProperties.getPassword());
        }

        return Redisson.create(config);
    }

    private String[] convert(List<String> nodesObject) {
        // 将哨兵或集群模式的nodes转换成标准配置
        List<String> nodes = new ArrayList<String>(nodesObject.size());
        for (String node : nodesObject) {
            if (!node.startsWith("redis://") && !node.startsWith("rediss://")) {
                nodes.add("redis://" + node);
            } else {
                nodes.add(node);
            }
        }
        return nodes.toArray(new String[nodes.size()]);
    }

    private InputStream getConfigStream() throws IOException {
        // 读取redisson配置文件
        Resource resource = ctx.getResource(redissonProperties.getConfig());
        InputStream is = resource.getInputStream();
        return is;
    }

}

网上查到的资料中很多配置都是多余的(可能是版本问题),看下源码很清楚,这也是看源码的一个好处。

总结

(1)redis由于历史原因导致有三种模式:单机、哨兵、集群;

(2)redis实现分布式锁的进化史:set -> setnx -> setnx + setex -> set nx ex(或px) -> set nx ex(或px) + lua script -> redisson;

(3)redis分布式锁有现成的轮子redisson可以使用;

(4)redisson还提供了很多有用的组件,比如分布式集合、分布式同步器、分布式对象;

彩蛋

redis分布式锁有哪些优点?

答:1)大部分系统都依赖于redis做缓存,不需要额外依赖其它组件(相对于zookeeper来说);

2)redis可以集群部署,相对于mysql的单点更可靠;

3)不会占用mysql的连接数,不会增加mysql的压力;

4)redis社区相对活跃,redisson的实现更是稳定可靠;

5)利用过期机制解决客户端断线的问题,虽然不太及时;

6)有现成的轮子redisson可以使用,锁的种类比较齐全;

redis分布式锁有哪些缺点?

答:1)集群模式下会在所有master节点执行加锁命令,大部分(2N+1)成功了则获得锁,节点越多,加锁的过程越慢;

2)高并发情况下,未获得锁的线程会睡眠重试,如果同一把锁竞争非常激烈,会占用非常多的系统资源;

3)历史原因导致的坑挺多的,自己很难实现出来健壮的redis分布式锁;

总之,redis分布式锁的优点是大于缺点的,而且社区活跃,这也是我们大部分系统使用redis作为分布式锁的原因。

推荐阅读:
  1. 死磕 java同步系列之AQS终篇(面试)
  2. 死磕 java同步系列之终结篇

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

redis 分布式锁 aqs

上一篇:快速创建个人论坛

下一篇:redis介绍及保持session会话

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》