Redisson中怎么实现分布式锁

发布时间:2021-08-03 15:42:39 作者:Leah
来源:亿速云 阅读:171

本篇文章为大家展示了Redisson中怎么实现分布式锁,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

Redisson是具备多种内存数据网格特性的基于Java编写的Redis客户端框架(Redis Java Client with features of In-Memory Data Grid),基于Redis的基本数据类型扩展出很多种实现的高级数据结构,具体见其官方的简介图:

Redisson中怎么实现分布式锁  

本文要分析的R(ed)Lock实现,只是其中一个很小的模块,其他高级特性可以按需选用。下面会从基本原理、源码分析和基于Jedis仿实现等内容进行展开。本文分析的Redisson源码是2020-01左右Redisson项目的main分支源码,对应版本是3.14.1

 

基本原理

red lock的基本原理其实就"光明正大地"展示在Redis官网的首页文档中(具体链接是https://redis.io/topics/distlock):

Redisson中怎么实现分布式锁  

摘录一下简介进行翻译:在许多环境中不同进程必须以互斥方式使用共享资源进行操作时,分布式锁是一个非常有用的原语。此试图提供一种更规范的算法来实现Redis的分布式锁。我们提出了一种称为Redlock的算法,它实现了DLM(猜测是Distributed Lock Manager的缩写,分布式锁管理器),我们认为它比普通的单实例方法更安全。

算法的三个核心特征(三大最低保证):

文档中还指出了目前算法对于故障转移的实现还存在明显的竞态条件问题(描述的应该是Redis主从架构下的问题):

算法的实现很简单,单个Redis实例下加锁命令如下:

SET $resource_name $random_value NX PX $ttl
 

这里的NxPXSET命令的增强参数,自从Redis2.6.12版本起,SET命令已经提供了可选的复合操作符:

单个Redis实例下解锁命令如下:

# KEYS[1] = $resource_name
# ARGV[1] = $random_value
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end
   

使用Redisson中的RLock

使用RLock要先实例化RedissonRedisson已经适配了Redis的哨兵、集群、普通主从和单机模式,因为笔者本地只安装了单机Redis,所以这里使用单机模式配置进行演示。实例化RedissonClient

static RedissonClient REDISSON;

@BeforeClass
public static void beforeClass() throws Exception {
    Config config = new Config();
    // 单机
    config.useSingleServer()
            .setTimeout(10000)
            .setAddress("redis://127.0.0.1:6379");
    REDISSON = Redisson.create(config);
//        // 主从
//        config.useMasterSlaveServers()
//                .setMasterAddress("主节点连接地址")
//                .setSlaveAddresses(Sets.newHashSet("从节点连接地址"));
//        REDISSON = Redisson.create(config);
//        // 哨兵
//        config.useSentinelServers()
//                .setMasterName("Master名称")
//                .addSentinelAddress(new String[]{"哨兵连接地址"});
//        REDISSON = Redisson.create(config);
//        // 集群
//        config.useClusterServers()
//                .addNodeAddress(new String[]{"集群节点连接地址"});
//        REDISSON = Redisson.create(config);
}
 

加锁和解锁:

@Test
public void testLockAndUnLock() throws Exception {
    String resourceName = "resource:x";
    RLock lock = REDISSON.getLock(resourceName);
    Thread threadA = new Thread(() -> {
        try {
            lock.lock();
            process(resourceName);
        } finally {
            lock.unlock();
            System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
        }
    }, "threadA");
    Thread threadB = new Thread(() -> {
        try {
            lock.lock();
            process(resourceName);
        } finally {
            lock.unlock();
            System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
        }
    }, "threadB");
    threadA.start();
    threadB.start();
    Thread.sleep(Long.MAX_VALUE);
}

private void process(String resourceName) {
    String threadName = Thread.currentThread().getName();
    System.out.println(String.format("线程%s获取到资源%s的锁", threadName, resourceName));
    try {
        Thread.sleep(1000);
    } catch (InterruptedException ignore) {
    }
}

// 某次执行的输出结果
线程threadB获取到资源resource:x的锁
线程threadB释放资源resource:x的锁
线程threadA获取到资源resource:x的锁
线程threadA释放资源resource:x的锁
 

更多的时候,我们会选用带等待时间周期和锁最大持有时间的API

@Test
public void testTryLockAndUnLock() throws Exception {
    String resourceName = "resource:x";
    int waitTime = 500;
    int leaseTime = 1000;
    Thread threadA = new Thread(() -> {
        process(resourceName, waitTime, leaseTime);
    }, "threadA");
    Thread threadB = new Thread(() -> {
        process(resourceName, waitTime, leaseTime);
    }, "threadB");
    threadA.start();
    threadB.start();
    Thread.sleep(Long.MAX_VALUE);
}

private void process(String resourceName, int waitTime, int leaseTime) {
    RLock lock = REDISSON.getLock(resourceName);
    try {
        String threadName = Thread.currentThread().getName();
        boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
        if (tryLock) {
            try {
                System.out.println(String.format("线程%s获取到资源%s的锁", threadName, resourceName));
                Thread.sleep(800);
            } finally {
                lock.unlock();
                System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
            }
        } else {
            System.out.println(String.format("线程%s获取资源%s的锁失败,等待时间:%d ms", threadName, resourceName, waitTime));
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
// 某次执行的输出结果
线程threadA获取到资源resource:x的锁
线程threadB获取资源resource:x的锁失败,等待时间:500 ms
线程threadA释放资源resource:x的锁
 

为了使用的时候更加简单,可以参考spring-tx中的编程式事务那样进行轻度封装:

@RequiredArgsConstructor
private static class RedissonLockProvider {

    private final RedissonClient redissonClient;

    public <T> T executeInLock(String resourceName, LockAction lockAction) {
        RLock lock = redissonClient.getLock(resourceName);
        try {
            lock.lock();
            lockAction.onAcquire(resourceName);
            return lockAction.doInLock(resourceName);
        } finally {
            lock.unlock();
            lockAction.onExit(resourceName);
        }
    }

    public <T> T executeInLock(String resourceName, int waitTime, int leaseTime, LockAction lockAction) throws InterruptedException {
        RLock lock = redissonClient.getLock(resourceName);
        boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
        if (tryLock) {
            try {
                lockAction.onAcquire(resourceName);
                return lockAction.doInLock(resourceName);
            } finally {
                lock.unlock();
                lockAction.onExit(resourceName);
            }
        }
        return null;
    }

    public void executeInLockWithoutResult(String resourceName, int waitTime, int leaseTime, LockActionWithoutResult lockAction) throws InterruptedException {
        RLock lock = redissonClient.getLock(resourceName);
        boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
        if (tryLock) {
            try {
                lockAction.onAcquire(resourceName);
                lockAction.doInLock(resourceName);
            } finally {
                lock.unlock();
                lockAction.onExit(resourceName);
            }
        }
    }

    public void executeInLockWithoutResult(String resourceName, LockActionWithoutResult lockAction) {
        RLock lock = redissonClient.getLock(resourceName);
        try {
            lock.lock();
            lockAction.onAcquire(resourceName);
            lockAction.doInLock(resourceName);
        } finally {
            lock.unlock();
            lockAction.onExit(resourceName);
        }
    }
}

@FunctionalInterface
interface LockAction {

    default void onAcquire(String resourceName) {

    }

    <T> T doInLock(String resourceName);

    default void onExit(String resourceName) {

    }
}

@FunctionalInterface
interface LockActionWithoutResult {

    default void onAcquire(String resourceName) {

    }

    void doInLock(String resourceName);

    default void onExit(String resourceName) {

    }
}
 

使用RedissonLockProvider(仅供参考):

@Test
public void testRedissonLockProvider() throws Exception {
    RedissonLockProvider provider = new RedissonLockProvider(REDISSON);
    String resourceName = "resource:x";
    Thread threadA = new Thread(() -> {
        provider.executeInLockWithoutResult(resourceName, new LockActionWithoutResult() {

            @Override
            public void onAcquire(String resourceName) {
                System.out.println(String.format("线程%s获取到资源%s的锁", Thread.currentThread().getName(), resourceName));
            }

            @Override
            public void doInLock(String resourceName) {
                try {
                    Thread.sleep(800);
                } catch (InterruptedException ignore) {

                }
            }

            @Override
            public void onExit(String resourceName) {
                System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
            }
        });
    }, "threadA");
    Thread threadB = new Thread(() -> {
        provider.executeInLockWithoutResult(resourceName, new LockActionWithoutResult() {

            @Override
            public void onAcquire(String resourceName) {
                System.out.println(String.format("线程%s获取到资源%s的锁", Thread.currentThread().getName(), resourceName));
            }

            @Override
            public void doInLock(String resourceName) {
                try {
                    Thread.sleep(800);
                } catch (InterruptedException ignore) {

                }
            }

            @Override
            public void onExit(String resourceName) {
                System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
            }
        });
    }, "threadB");
    threadA.start();
    threadB.start();
    Thread.sleep(Long.MAX_VALUE);
}
// 某次执行结果
线程threadA获取到资源resource:x的锁
线程threadA释放资源resource:x的锁
线程threadB获取到资源resource:x的锁
线程threadB释放资源resource:x的锁
   

Redisson中RLock的实现原理

RedissonRLock的实现是基本参照了Redisred lock算法进行实现,不过在原始的red lock算法下进行了改良,主要包括下面的特性:

续期或者说延长KEY的过期时间在Redisson使用watch dog实现,理解为用于续期的守护线程,底层依赖于Netty的时间轮HashedWheelTimer和任务io.netty.util.Timeout实现,「俗称看门狗」,下面会详细分析。

先看RLock的类图:

Redisson中怎么实现分布式锁  
❝  

这里有一个疑惑点,RedissonRedLock(RedissonMultiLock的子类)的注释中提到RedLock locking algorithm implementation for multiple locks. It manages all locks as one. 但从直观上看,RedissonLock才是整个锁体系的核心,里面的实现思路也是遵从red lock算法的。

❞  

RedissonLock就是RLock的直接实现,也是分布式锁实现的核心类,从源码中看到Redisson#getLock()就是直接实例化RedissonLock

public class Redisson implements RedissonClient {
    
    // ...... 省略其他代码

    @Override
    public RLock getLock(String name) {
        return new RedissonLock(connectionManager.getCommandExecutor(), name);
    }

    // ...... 省略其他代码
}
 

因此只需要围绕RedissonLock的源码进行分析即可。RedissonLock的类继承图如下:

Redisson中怎么实现分布式锁  

这里需要有几点认知:

接着先看RedissonLock的构造函数和核心属性:

// 存放entryName -> ExpirationEntry,用于获取当前entryName的线程重入计数器和续期任务
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();

// 内部的锁持有的最大时间,来源于参数Config#lockWatchdogTimeout,用于控制续期的周期
protected long internalLockLeaseTime;

// ID,唯一标识,是一个UUID
final String id;

// 
final String entryName;

// 锁释放事件订阅发布相关
protected final LockPubSub pubSub;

// 命令异步执行器实例
final CommandAsyncExecutor commandExecutor;

/**
 * CommandAsyncExecutor是命令的异步执行器,里面的方法是相对底层的面向通讯框架的方法,包括异步写、异步读和同步结果获取等
 * name参数就是getLock()时候传入的参数,其实就是最终同步到Redis中的KEY
 */
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    super(commandExecutor, name);
    this.commandExecutor = commandExecutor;
    // 这里的ID为外部初始化的UUID实例,调用toString()
    this.id = commandExecutor.getConnectionManager().getId();
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
    // 这里的entryName = uuid值 + : + 外部传进来的name(KEY),如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:resource:x
    this.entryName = id + ":" + name;
    // 初始化LockPubSub实例,用于订阅和发布锁释放的事件
    this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}

// RedissonLock内部类ExpirationEntry,存放着线程重入的计数器和续期的Timeout任务
public static class ExpirationEntry {
    
    // 线程ID -> 线程重入的次数
    private final Map<Long, Integer> threadIds = new LinkedHashMap<>();
    private volatile Timeout timeout;
    
    public ExpirationEntry() {
        super();
    }
    
    // 这个方法主要记录线程重入的计数
    public void addThreadId(long threadId) {
        Integer counter = threadIds.get(threadId);
        if (counter == null) {
            counter = 1;
        } else {
            counter++;
        }
        threadIds.put(threadId, counter);
    }

    public boolean hasNoThreads() {
        return threadIds.isEmpty();
    }

    public Long getFirstThreadId() {
        if (threadIds.isEmpty()) {
            return null;
        }
        return threadIds.keySet().iterator().next();
    }

    public void removeThreadId(long threadId) {
        Integer counter = threadIds.get(threadId);
        if (counter == null) {
            return;
        }
        counter--;
        if (counter == 0) {
            threadIds.remove(threadId);
        } else {
            threadIds.put(threadId, counter);
        }
    }
    
    public void setTimeout(Timeout timeout) {
        this.timeout = timeout;
    }
    public Timeout getTimeout() {
        return timeout;
    }
}
 

这里需要关注一下Config中的lockWatchdogTimeout参数:

Redisson中怎么实现分布式锁  

翻译一下大意:lockWatchdogTimeout参数只有在没有使用leaseTimeout参数定义的成功获取到锁的场景(简单来说就是不设置时限的加锁)下生效,如果看门狗在下一个lockWatchdogTimeout周期内不进行续期,那么锁就会过期释放(「从源码上看,每三分之一lockWatchdogTimeout就会执行一次续期任务,每次通过pexpireKEY的存活周期延长lockWatchdogTimeout),lockWatchdogTimeout的默认值为30000,也就是30秒。

这里先列举一下RedissonLock中获取名称的方法,以便后面分析这些名称作为K-V结构的KEY时候使用:

接着看加锁的方法,核心实现主要是:

先看只包含锁最大持有时间的lock()方法体系:

/**
 * 获取锁,不指定等待时间,只指定锁的最大持有时间
 * 通过interruptibly参数配置支持中断
 */
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    // 尝试获取锁,返回的ttl为空代表获取锁成功,返回的ttl代表已经存在的KEY的剩余存活时间
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }
    // 订阅redisson_lock__channel:{$KEY},其实本质的目的是为了客户端通过Redis的订阅发布,感知到解锁的事件
    // 这个方法会在LockPubSub中注册一个entryName -> RedissonLockEntry的哈希映射,RedissonLockEntry实例中存放着RPromise<RedissonLockEntry>结果,一个信号量形式的锁和订阅方法重入计数器
    // 下面的死循环中的getEntry()或者RPromise<RedissonLockEntry>#getNow()就是从这个映射中获取的
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    // 同步订阅执行,获取注册订阅Channel的响应,区分是否支持中断
    if (interruptibly) {
        commandExecutor.syncSubscriptionInterrupted(future);
    } else {
        commandExecutor.syncSubscription(future);
    }
    // 走到下面的for循环说明返回的ttl不为空,也就是Redis已经存在对应的KEY,有其他客户端已经获取到锁,此客户端线程的调用需要阻塞等待获取锁
    try {
        while (true) {
            // 死循环中尝试获取锁,这个是后面会分析的方法
            ttl = tryAcquire(leaseTime, unit, threadId);
            // 返回的ttl为空,说明获取到锁,跳出死循环,这个死循环或者抛出中断异常,或者获取到锁成功break跳出,没有其他方式
            if (ttl == null) {
                break;
            }

            // 这个ttl来源于等待存在的锁的KEY的存活时间,直接使用许可为0的信号量进行阻塞等待,下面的几个分支判断都是大同小异,只是有的支持超时时间,有的支持中断
            // 有的是永久阻塞直到锁释放事件订阅LockPubSub的onMessage()方法回调激活getLatch().release()进行解锁才会往下走
            // 这里可以学到一个特殊的技巧,Semaphore(0),信号量的许可设置为0,首个调用acquire()的线程会被阻塞,直到其他线程调用此信号量的release()方法才会解除阻塞,类似于一个CountDownLatch(1)的效果
            if (ttl >= 0) {
                try {
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                if (interruptibly) {
                    future.getNow().getLatch().acquire();
                } else {
                    future.getNow().getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        // 获取到锁或者抛出中断异常,退订redisson_lock__channel:{$KEY},不再关注解锁事件
        unsubscribe(future, threadId);
    }
}

// 这是一个异步转同步的方法,类似于FutureTask#get(),关键看调用的tryAcquireAsync()方法
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(leaseTime, unit, threadId));
}

/**
 * 通过传入锁持有的最大时间和线程ID异步获取锁
 */
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    // 锁持有最大时间不为-1,也就是明确锁的持有时间,不是永久持有的场景
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 走到这里说明是leaseTime == -1,KEY不设置过期时间的分支,需要启动看门狗机制。尝试内部异步获取锁,注意这里的lockWatchdogTimeout是从配置中获取传进去,不是内部的internalLockLeaseTime属性,这里的默认值还是30000毫秒
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        // 执行异常场景直接返回
        if (e != null) {
            return;
        }

        // 成功获取到锁的场景,需要基于线程ID启用看门狗,通过时间轮指定定时任务进行续期
        if (ttlRemaining == null) {
            // 定时调度进行续期操作
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

/**
 * 转换锁持有最大时间,通过参数进行加锁的LUA脚本调用 
 * getName()就是传入的KEY,如resource:x getLockName()就是锁的名称,形式是:UUID + : + threadId,如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
 * internalLockLeaseTime在leaseTime != -1的前提下使用的是原值,在leaseTime == -1的前提下,使用的是lockWatchdogTimeout
 */
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    // 时间转换为毫秒,注意一点这里的internalLockLeaseTime是类内的属性,被重新赋值了
    internalLockLeaseTime = unit.toMillis(leaseTime);
    // 底层向Redis服务执行LUA脚本
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                "end; " +
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                "end; " +
                "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
 

「先留意一下属性internalLockLeaseTime,它在tryLockInnerAsync()方法内被重新赋值,在leaseTime == -1L的前提下,它被赋值为lockWatchdogTimeout,这个细节很重要,决定了后面续期方法(看门口)的调度频率。另外,leaseTime != -1L不会进行续期,也就是不会启动看门狗机制。」

接着需要仔细分析一下tryLockInnerAsync()中执行的LUA脚本,笔者把它提取出来通过注释进行描述:

-- KEYS[1] == getName() --> $KEY --> resource:x
-- ARGV[1] == internalLockLeaseTime --> 30000
-- ARGV[2] == getLockName(threadId) --> 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
-- 第一段代码是判断锁定的资源KEY不存在的时候进行相应值的设置,代表资源没有被锁定,首次获取锁成功
if (redis.call('exists', KEYS[1]) == 0) then
    -- 这里是设置调用次数,可以理解为延长KEY过期时间的调用次数
    redis.call('hset', KEYS[1], ARGV[2], 1);
    -- 设置KEY的过期时间
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
-- 第二段代码是判断HASH的field是否存在,如果存在说明是同一个线程重入的情况,这个时候需要延长KEY的TTL,并且HASH的field对应的value加1,记录延长ttl的次数
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    -- 这里是增加调用次数,可以理解为增加延长KEY过期时间的调用次数
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    -- 延长KEY的过期时间
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
-- 第三段代码是兜底的,走到这里说明当前线程获取锁失败,锁已经被其他(进程中的)线程占有,返回当前KEY被占用资源的ttl,用来确定需要休眠的最大时间
return redis.call('pttl', KEYS[1]);
 

这里画一个图演示一下这个Lua脚本中三段代码出现的逻辑:

Redisson中怎么实现分布式锁  

剩下一个scheduleExpirationRenewal(threadId)方法还没有分析,里面的逻辑就是看门狗的定期续期逻辑:

// 基于线程ID定时调度和续期
private void scheduleExpirationRenewal(long threadId) {
    // 如果需要的话新建一个ExpirationEntry记录线程重入计数,同时把续期的任务Timeout对象保存在属性中
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        // 当前进行的当前线程重入加锁
        oldEntry.addThreadId(threadId);
    } else {
        // 当前进行的当前线程首次加锁
        entry.addThreadId(threadId);
        // 首次新建ExpirationEntry需要触发续期方法,记录续期的任务句柄
        renewExpiration();
    }
}

// 处理续期
private void renewExpiration() {
    // 根据entryName获取ExpirationEntry实例,如果为空,说明在cancelExpirationRenewal()方法已经被移除,一般是解锁的时候触发
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    // 新建一个定时任务,这个就是看门狗的实现,io.netty.util.Timeout是Netty结合时间轮使用的定时任务实例
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            // 这里是重复外面的那个逻辑,
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            // 获取ExpirationEntry中首个线程ID,如果为空说明调用过cancelExpirationRenewal()方法清空持有的线程重入计数,一般是锁已经释放的场景
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            // 向Redis异步发送续期的命令
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                // 抛出异常,续期失败,只打印日志和直接终止任务
                if (e != null) {
                    log.error("Can't update lock " + getName() + " expiration", e);
                    return;
                }
                // 返回true证明续期成功,则递归调用续期方法(重新调度自己),续期失败说明对应的锁已经不存在,直接返回,不再递归
                if (res) {
                    // reschedule itself
                    renewExpiration();
                }
            });
        }
    }, 
    // 这里的执行频率为leaseTime转换为ms单位下的三分之一,由于leaseTime初始值为-1的情况下才会进入续期逻辑,那么这里的执行频率为lockWatchdogTimeout的三分之一
    internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); 
    
    // ExpirationEntry实例持有调度任务实例
    ee.setTimeout(task);
}

// 调用Redis,执行Lua脚本,进行异步续期
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "return 0;",
        Collections.<Object>singletonList(getName()), 
        //  这里根据前面的分析,internalLockLeaseTime在leaseTime的值为-1的前提下,对应值为lockWatchdogTimeout
        internalLockLeaseTime, getLockName(threadId));  
}
 

基于源码推断出续期的机制由入参leaseTime决定:

提取续期的Lua脚本如下:

-- KEYS[1] == getName() --> $KEY --> resource:x
-- ARGV[1] == internalLockLeaseTime --> 30000
-- ARGV[2] == getLockName(threadId) --> 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return 1;
end;
return 0;
 

到此为止,不带waitTime参数的加锁和续期逻辑基本分析完毕,而带waitTime参数的tryLock(long waitTime, long leaseTime, TimeUnit unit)实现其实和只存在leaseTime参数的lock(long leaseTime, TimeUnit unit, boolean interruptibly)实现底层调用的方法是一致的,最大的区别是会在尝试获取锁操作之后基于前后的System.currentTimeMillis()计算出时间差和waitTime做对比,决定需要阻塞等待还是直接超时获取锁失败返回,处理阻塞等待的逻辑是客户端本身的逻辑,这里就不做详细展开,因为源码实现也不是十分优雅(太多long currentTime = System.currentTimeMillis()的代码段了)。接着花点功夫分析一下解锁的实现,包括一般情况下的解锁unlock()和强制解锁forceUnlockAsync()

//  一般情况下的解锁
@Override
public void unlock() {
    try {
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        // IllegalMonitorStateException一般是A线程加锁,B线程解锁,内部判断线程状态不一致抛出的
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throw e;
        }
    }
}

@Override
public RFuture<Void> unlockAsync() {
    // 获取当前调用解锁操作的线程ID
    long threadId = Thread.currentThread().getId();
    return unlockAsync(threadId);
}

@Override
public RFuture<Void> unlockAsync(long threadId) {
    // 构建一个结果RedissonPromise
    RPromise<Void> result = new RedissonPromise<Void>();
    // 返回的RFuture如果持有的结果为true,说明解锁成功,返回NULL说明线程ID异常,加锁和解锁的客户端线程不是同一个线程
    RFuture<Boolean> future = unlockInnerAsync(threadId);
    future.onComplete((opStatus, e) -> {
        // 这是内部的异常,说明解锁异常,需要取消看门狗的续期任务
        if (e != null) {
            cancelExpirationRenewal(threadId);
            result.tryFailure(e);
            return;
        }
        // 这种情况说明线程ID异常,加锁和解锁的客户端线程不是同一个线程,抛出IllegalMonitorStateException异常
        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            result.tryFailure(cause);
            return;
        }
        // 走到这里说明正常解锁,取消看门狗的续期任务
        cancelExpirationRenewal(threadId);
        result.trySuccess(null);
    });
    return result;
}

// 真正的内部解锁的方法,执行解锁的Lua脚本
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

// 取消续期任务
void cancelExpirationRenewal(Long threadId) {
    // 这里说明ExpirationEntry已经被移除,一般是基于同一个线程ID多次调用解锁方法导致的(并发解锁)
    ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (task == null) {
        return;
    }
    // 传入的线程ID不为NULL,从ExpirationEntry中移除线程ID,如果持有的线程ID对应的线程重入计数不为0,会先递减到0,等于0的前提下才会进行删除
    if (threadId != null) {
        task.removeThreadId(threadId);
    }
    // 这里threadId == null的情况是为了满足强制解锁的场景,强制解锁需要直接删除锁所在的KEY,不需要理会传入的线程ID(传入的线程ID直接为NULL)
    // 后者task.hasNoThreads()是为了说明当前的锁没有被任何线程持有,对于单线程也确定在移除线程ID之后重入计数器已经为0,从ExpirationEntry中移除,这个时候获取ExpirationEntry的任务实例进行取消即可
    if (threadId == null || task.hasNoThreads()) {
        Timeout timeout = task.getTimeout();
        if (timeout != null) {
            timeout.cancel();
        }
        // EntryName -> ExpirationEntry映射中移除当前锁的相关实例ExpirationEntry
        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
    }
}

// 强制解锁
@Override
public boolean forceUnlock() {
    return get(forceUnlockAsync());
}

@Override
public RFuture<Boolean> forceUnlockAsync() {
    // 线程ID传入为NULL,取消当前的EntryName对应的续期任务
    cancelExpirationRenewal(null);
    // 执行Lua脚本强制删除锁所在的KEY并且发布解锁消息
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('del', KEYS[1]) == 1) then "
            + "redis.call('publish', KEYS[2], ARGV[1]); "
            + "return 1 "
            + "else "
            + "return 0 "
            + "end",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE);
}
 

这里列出一般情况下解锁和强制解锁的Lua脚本,分析如下:

-- unlockInnerAsync方法的lua脚本
-- KEYS[1] == getName() --> $KEY --> resource:x
-- KEYS[2] == getChannelName() --> 订阅锁的Channel --> redisson_lock__channel:{resource:x}
-- ARGV[1] == LockPubSub.UNLOCK_MESSAGE --> 常量数值0
-- ARGV[2] == internalLockLeaseTime --> 30000或者具体的锁最大持有时间
-- ARGV[3] == getLockName(threadId) --> 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
-- 第一个IF分支判断如果锁所在的哈希的field不存在,说明当前线程ID未曾获取过对应的锁,返回NULL表示解锁失败
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    return nil;
end;
-- 走到这里通过hincrby进行线程重入计数-1,返回计数值
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
-- 计数值大于0,说明线程重入加锁,这个时候基于internalLockLeaseTime对锁所在KEY进行续期
if (counter > 0) then
    redis.call('pexpire', KEYS[1], ARGV[2]);
    return 0;
else
    -- 计数值小于或等于0,说明可以解锁,删除锁所在的KEY,并且向redisson_lock__channel:{$KEY}发布消息,内容是0(常量数值)
    redis.call('del', KEYS[1]);
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1;
end;
-- 最后的return nil;在IDEA中提示是不会到达的语句,估计这里是开发者笔误写上去的,前面的if-else都有返回语句,这里应该是不可达的
return nil;

-------------------------------------------------- 不怎么华丽的分割线 -------------------------------------------------

-- forceUnlockAsync方法的lua脚本
-- KEYS[1] == getName() --> $KEY --> resource:x
-- KEYS[2] == getChannelName() --> 订阅锁的Channel --> redisson_lock__channel:{resource:x}
-- ARGV[1] == LockPubSub.UNLOCK_MESSAGE --> 常量数值0
-- 强制删除锁所在的KEY,如果删除成功向redisson_lock__channel:{$KEY}发布消息,内容是0(常量数值)
if (redis.call('del', KEYS[1]) == 1) then
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1
else
    return 0
end
 

其他辅助方法都相对简单,这里弄个简单的"流水账"记录一番:

「订阅和发布」部分设计到大量Netty组件使用相关的源码,这里不详细展开,这部分的逻辑简单附加到后面这个流程图中。最后,通过一个比较详细的图分析一下Redisson的加锁和解锁流程。

Redisson中怎么实现分布式锁  
Redisson中怎么实现分布式锁  
Redisson中怎么实现分布式锁  

假设不同进程的两个不同的线程XY去竞争资源RESOURCE的锁,那么可能的流程如下:

Redisson中怎么实现分布式锁  

最后再概括一下Redisson中实现red lock算法使用的HASH数据类型:

 

基于Jedis实现类似Redisson的分布式锁功能

前面的章节已经比较详细分析了Redisson中分布式锁的实现原理,这里使用Jedis和多线程技巧做一个类似的实现。为了简单起见,这里只实现一个无入参的lock()方法(类似于RedissonleaseTime == -1的场景)和unlock()方法。定义接口RedLock

public interface RedLock {

    void lock(String resource) throws InterruptedException;

    void unlock(String resource);
}
 

为了简单起见,笔者把所有实现逻辑都写在实现类RedisRedLock中:

@RequiredArgsConstructor
public class RedisRedLock implements RedLock {

    private final JedisPool jedisPool;
    private final String uuid;

    private static final String WATCH_DOG_TIMEOUT_STRING = "30000";
    private static final long WATCH_DOG_TASK_DURATION = 10000L;
    private static final String CHANNEL_PREFIX = "__red__lock:";
    private static final String UNLOCK_STATUS_STRING = "0";

    private static final String LOCK_LUA = "if (redis.call('exists', KEYS[1]) == 0) then\n" +
            "    redis.call('hset', KEYS[1], ARGV[2], 1);\n" +
            "    redis.call('pexpire', KEYS[1], ARGV[1]);\n" +
            "    return nil;\n" +
            "end;\n" +
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then\n" +
            "    redis.call('hincrby', KEYS[1], ARGV[2], 1);\n" +
            "    redis.call('pexpire', KEYS[1], ARGV[1]);\n" +
            "    return nil;\n" +
            "end;\n" +
            "return redis.call('pttl', KEYS[1]);";

    private static final String UNLOCK_LUA = "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then\n" +
            "    return nil;\n" +
            "end;\n" +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);\n" +
            "if (counter > 0) then\n" +
            "    redis.call('pexpire', KEYS[1], ARGV[2]);\n" +
            "    return 0;\n" +
            "else\n" +
            "    redis.call('del', KEYS[1]);\n" +
            "    redis.call('publish', KEYS[2], ARGV[1]);\n" +
            "    return 1;\n" +
            "end;";

    private static final String RENEW_LUA = "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
            "return 1; " +
            "end; " +
            "return 0;";

    private static final ExecutorService SUB_PUB_POOL = Executors.newCachedThreadPool();
    private static final ScheduledExecutorService WATCH_DOG_POOL = new ScheduledThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors() * 2
    );

    private static class ThreadEntry {

        private final ConcurrentMap<Long, Integer> threadCounter = Maps.newConcurrentMap();

        private volatile WatchDogTask watchDogTask;

        public synchronized void addThreadId(long threadId) {
            Integer counter = threadCounter.get(threadId);
            if (counter == null) {
                counter = 1;
            } else {
                counter++;
            }
            threadCounter.put(threadId, counter);
        }

        public synchronized boolean hasNoThreads() {
            return threadCounter.isEmpty();
        }

        public synchronized Long getFirstThreadId() {
            if (threadCounter.isEmpty()) {
                return null;
            }
            return threadCounter.keySet().iterator().next();
        }

        public synchronized void removeThreadId(long threadId) {
            Integer counter = threadCounter.get(threadId);
            if (counter == null) {
                return;
            }
            counter--;
            if (counter == 0) {
                threadCounter.remove(threadId);
            } else {
                threadCounter.put(threadId, counter);
            }
        }

        public void setWatchDogTask(WatchDogTask watchDogTask) {
            this.watchDogTask = watchDogTask;
        }

        public WatchDogTask getWatchDogTask() {
            return watchDogTask;
        }
    }

    @Getter
    private static class SubPubEntry {

        private final String key;
        private final Semaphore latch;
        private final SubscribeListener subscribeListener;

        public SubPubEntry(String key) {
            this.key = key;
            this.latch = new Semaphore(0);
            this.subscribeListener = new SubscribeListener(key, latch);
        }
    }

    private static final ConcurrentMap<String, ThreadEntry> THREAD_ENTRY_MAP = Maps.newConcurrentMap();

    @Override
    public void lock(String resource) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        String lockName = uuid + ":" + threadId;
        String entryName = uuid + ":" + resource;
        // 获取锁
        Long ttl = acquire(resource, lockName, threadId, entryName);
        // 加锁成功直接返回
        if (Objects.isNull(ttl)) {
            return;
        }
        // 订阅
        SubPubEntry subPubEntry = subscribeAsync(resource);
        try {
            for (; ; ) {
                ttl = acquire(resource, lockName, threadId, entryName);
                // 加锁成功直接返回
                if (Objects.isNull(ttl)) {
                    return;
                }
                if (ttl > 0L) {
                    subPubEntry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            }
        } finally {
            unsubscribeSync(subPubEntry);
        }
    }

    private Long acquire(String key, String lockName, long threadId, String entryName) {
        Object result = execute0(jedis -> jedis.eval(LOCK_LUA, Lists.newArrayList(key),
                Lists.newArrayList(WATCH_DOG_TIMEOUT_STRING, lockName)));
        if (Objects.nonNull(result)) {
            return Long.parseLong(String.valueOf(result));
        }
        // 启动看门狗
        ThreadEntry entry = new ThreadEntry();
        ThreadEntry oldEntry = THREAD_ENTRY_MAP.putIfAbsent(entryName, entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            Runnable renewAction = () -> executeWithoutResult(jedis -> jedis.eval(RENEW_LUA, Lists.newArrayList(key),
                    Lists.newArrayList(WATCH_DOG_TIMEOUT_STRING, lockName)));
            WatchDogTask watchDogTask = new WatchDogTask(new AtomicReference<>(renewAction));
            entry.setWatchDogTask(watchDogTask);
            WATCH_DOG_POOL.scheduleWithFixedDelay(watchDogTask, 0, WATCH_DOG_TASK_DURATION, TimeUnit.MILLISECONDS);
        }
        return null;
    }

    private SubPubEntry subscribeAsync(String key) {
        SubPubEntry subPubEntry = new SubPubEntry(key);
        SUB_PUB_POOL.submit(() -> {
            SubscribeListener subscribeListener = subPubEntry.getSubscribeListener();
            executeWithoutResult(jedis -> jedis.subscribe(subscribeListener, subscribeListener.getChannelName()));
            return null;
        });
        return subPubEntry;
    }

    private void unsubscribeSync(SubPubEntry subPubEntry) {
        SubscribeListener subscribeListener = subPubEntry.getSubscribeListener();
        subscribeListener.unsubscribe(subscribeListener.getChannelName());
    }

    @Override
    public void unlock(String resource) {
        long threadId = Thread.currentThread().getId();
        String entryName = uuid + ":" + resource;
        String lockName = uuid + ":" + threadId;
        String channelName = CHANNEL_PREFIX + resource;
        Object result = execute0(jedis -> jedis.eval(UNLOCK_LUA, Lists.newArrayList(resource, channelName),
                Lists.newArrayList(UNLOCK_STATUS_STRING, WATCH_DOG_TIMEOUT_STRING, lockName)));
        ThreadEntry threadEntry = THREAD_ENTRY_MAP.get(entryName);
        if (Objects.nonNull(threadEntry)) {
            threadEntry.removeThreadId(threadId);
            if (threadEntry.hasNoThreads() && Objects.nonNull(threadEntry.getWatchDogTask())) {
                threadEntry.getWatchDogTask().cancel();
            }
        }
        if (Objects.isNull(result)) {
            throw new IllegalMonitorStateException();
        }
    }

    private static class SubscribeListener extends JedisPubSub {

        @Getter
        private final String key;
        @Getter
        private final String channelName;
        @Getter
        private final Semaphore latch;

        public SubscribeListener(String key, Semaphore latch) {
            this.key = key;
            this.channelName = CHANNEL_PREFIX + key;
            this.latch = latch;
        }

        @Override
        public void onMessage(String channel, String message) {
            if (Objects.equals(channelName, channel) && Objects.equals(UNLOCK_STATUS_STRING, message)) {
                latch.release();
            }
        }
    }

    @RequiredArgsConstructor
    private static class WatchDogTask implements Runnable {

        private final AtomicBoolean running = new AtomicBoolean(true);
        private final AtomicReference<Runnable> actionReference;

        @Override
        public void run() {
            if (running.get() && Objects.nonNull(actionReference.get())) {
                actionReference.get().run();
            } else {
                throw new WatchDogTaskStopException("watch dog cancel");
            }
        }

        public void cancel() {
            actionReference.set(null);
            running.set(false);
        }
    }

    private <T> T execute0(Function<Jedis, T> function) {
        try (Jedis jedis = jedisPool.getResource()) {
            return function.apply(jedis);
        }
    }

    interface Action {

        void apply(Jedis jedis);
    }

    private void executeWithoutResult(Action action) {
        try (Jedis jedis = jedisPool.getResource()) {
            action.apply(jedis);
        }
    }

    private static class WatchDogTaskStopException extends RuntimeException {

        @Override
        public synchronized Throwable fillInStackTrace() {
            return this;
        }
    }

    public static void main(String[] args) throws Exception {
        String resourceName = "resource:x";
        RedLock redLock = new RedisRedLock(new JedisPool(new GenericObjectPoolConfig()), UUID.randomUUID().toString());
        Thread threadA = new Thread(() -> {
            try {
                redLock.lock(resourceName);
                process(resourceName);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                redLock.unlock(resourceName);
                System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
            }
        }, "threadA");
        Thread threadB = new Thread(() -> {
            try {
                redLock.lock(resourceName);
                process(resourceName);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                redLock.unlock(resourceName);
                System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
            }
        }, "threadB");
        threadA.start();
        threadB.start();
        Thread.sleep(Long.MAX_VALUE);
    }

    private static void process(String resourceName) {
        String threadName = Thread.currentThread().getName();
        System.out.println(String.format("线程%s获取到资源%s的锁", threadName, resourceName));
        try {
            Thread.sleep(1000);
        } catch (InterruptedException ignore) {
        }
    }
}
 

上面的实现短时间内编写完,没有做详细的DEBUG,可能会有纰漏。某次执行结果如下:

线程threadB获取到资源resource:x的锁
线程threadB释放资源resource:x的锁
线程threadA获取到资源resource:x的锁
线程threadA释放资源resource:x的锁
   

小结

Redisson中的red lock实现,应用到下面的核心技术:

上面的核心技术相对合理地应用,才能实现一个高效而且容错能力相对比较高的分布式锁方案,但是从目前来看,Redisson仍未解决red lock算法中的故障转移缺陷,笔者认为这个有可能是Redis实现分布式锁方案的一个底层缺陷,「此方案在Redis单实例中是相对完善」,一旦应用在Redis集群(普通主从、哨兵或者Cluster),有几率会出现前文提到的节点角色切换导致多个不同客户端获取到同一个资源对应的锁的问题。暂时无解。


上述内容就是Redisson中怎么实现分布式锁,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注亿速云行业资讯频道。

推荐阅读:
  1. Java redisson实现分布式锁原理详解
  2. 怎么在SpringBoot中利用Redisson实现一个分布式锁

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

redisson

上一篇:Nginx中怎么实现负载均衡和缓存

下一篇:如何解决某些HTML字符打不出来的问题

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》