Redisson分布式锁之加解锁源码分析

发布时间:2023-03-16 10:53:07 作者:iii
来源:亿速云 阅读:84

Redisson分布式锁之加解锁源码分析

引言

在分布式系统中,锁机制是保证数据一致性和系统稳定性的重要手段。Redisson基于Redis的Java客户端,提供了丰富的分布式锁实现。本文将深入分析Redisson分布式锁的加锁和解锁源码,帮助读者理解其内部工作原理。

1. Redisson分布式锁概述

1.1 分布式锁的基本概念

分布式锁是一种在分布式系统中用于控制多个进程对共享资源访问的机制。它通过协调不同节点之间的操作,确保在同一时间只有一个节点能够访问共享资源。

1.2 Redisson分布式锁的特点

Redisson分布式锁具有以下特点:

2. Redisson分布式锁的加锁过程

2.1 加锁的基本流程

Redisson分布式锁的加锁过程主要包括以下几个步骤:

  1. 尝试获取锁:客户端尝试在Redis中设置一个键值对,如果设置成功,则表示获取锁成功。
  2. 设置锁的过期时间:为了防止锁被长时间占用,设置锁的过期时间。
  3. 自动续期:如果业务逻辑执行时间较长,Redisson会自动续期锁的过期时间。

2.2 加锁源码分析

2.2.1 加锁入口

Redisson分布式锁的加锁入口是RLock接口的lock()方法。以下是RLock接口的定义:

public interface RLock extends Lock, RLockAsync {
    void lock();
    void lock(long leaseTime, TimeUnit unit);
    boolean tryLock();
    boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
    void unlock();
    void forceUnlock();
    boolean isLocked();
    boolean isHeldByCurrentThread();
    int getHoldCount();
    long remainTimeToLive();
}

2.2.2 lock()方法实现

lock()方法的实现位于RedissonLock类中。以下是lock()方法的源码:

@Override
public void lock() {
    try {
        lockInterruptibly();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

lock()方法调用了lockInterruptibly()方法,该方法允许在等待锁的过程中响应中断。

2.2.3 lockInterruptibly()方法实现

lockInterruptibly()方法的实现如下:

@Override
public void lockInterruptibly() throws InterruptedException {
    lockInterruptibly(-1, null);
}

lockInterruptibly()方法调用了另一个重载方法lockInterruptibly(long leaseTime, TimeUnit unit),其中leaseTime为-1表示不设置锁的过期时间。

2.2.4 lockInterruptibly(long leaseTime, TimeUnit unit)方法实现

lockInterruptibly(long leaseTime, TimeUnit unit)方法的实现如下:

@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    if (ttl == null) {
        return;
    }
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    commandExecutor.syncSubscription(future);
    try {
        while (true) {
            ttl = tryAcquire(leaseTime, unit, threadId);
            if (ttl == null) {
                break;
            }
            if (ttl >= 0) {
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().acquire();
            }
        }
    } finally {
        unsubscribe(future, threadId);
    }
}

该方法首先调用tryAcquire()方法尝试获取锁。如果获取锁成功,则直接返回;否则,订阅锁的释放事件,并在循环中不断尝试获取锁。

2.2.5 tryAcquire()方法实现

tryAcquire()方法的实现如下:

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(leaseTime, unit, threadId));
}

tryAcquire()方法调用了tryAcquireAsync()方法,并等待其返回结果。

2.2.6 tryAcquireAsync()方法实现

tryAcquireAsync()方法的实现如下:

private RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }
        if (ttlRemaining == null) {
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

tryAcquireAsync()方法根据leaseTime参数的值决定是否设置锁的过期时间。如果leaseTime为-1,则调用tryLockInnerAsync()方法尝试获取锁,并在获取成功后启动锁的自动续期。

2.2.7 tryLockInnerAsync()方法实现

tryLockInnerAsync()方法是加锁的核心逻辑,其实现如下:

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
            Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

tryLockInnerAsync()方法通过Lua脚本在Redis中执行加锁操作。Lua脚本的逻辑如下:

  1. 检查锁是否存在,如果不存在,则设置锁并返回nil
  2. 如果锁已经存在且当前线程已经持有锁,则增加锁的计数并返回nil
  3. 如果锁已经存在且当前线程未持有锁,则返回锁的剩余过期时间。

2.3 加锁过程中的关键点

3. Redisson分布式锁的解锁过程

3.1 解锁的基本流程

Redisson分布式锁的解锁过程主要包括以下几个步骤:

  1. 释放锁:客户端在Redis中删除锁的键值对。
  2. 取消自动续期:如果锁的自动续期任务正在运行,则取消该任务。
  3. 发布锁的释放事件:通知其他等待锁的客户端。

3.2 解锁源码分析

3.2.1 解锁入口

Redisson分布式锁的解锁入口是RLock接口的unlock()方法。以下是unlock()方法的源码:

@Override
public void unlock() {
    try {
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throw e;
        }
    }
}

unlock()方法调用了unlockAsync()方法,并等待其返回结果。

3.2.2 unlockAsync()方法实现

unlockAsync()方法的实现如下:

@Override
public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<Void>();
    RFuture<Boolean> future = unlockInnerAsync(threadId);
    future.onComplete((opStatus, e) -> {
        if (e != null) {
            cancelExpirationRenewal(threadId);
            result.tryFailure(e);
            return;
        }
        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            result.tryFailure(cause);
            return;
        }
        cancelExpirationRenewal(threadId);
        result.trySuccess(null);
    });
    return result;
}

unlockAsync()方法调用了unlockInnerAsync()方法,并在解锁成功后取消锁的自动续期任务。

3.2.3 unlockInnerAsync()方法实现

unlockInnerAsync()方法是解锁的核心逻辑,其实现如下:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                    "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

unlockInnerAsync()方法通过Lua脚本在Redis中执行解锁操作。Lua脚本的逻辑如下:

  1. 检查当前线程是否持有锁,如果不持有,则返回nil
  2. 如果当前线程持有锁,则减少锁的计数。
  3. 如果锁的计数大于0,则更新锁的过期时间并返回0
  4. 如果锁的计数为0,则删除锁并发布锁的释放事件,返回1

3.3 解锁过程中的关键点

4. Redisson分布式锁的自动续期机制

4.1 自动续期的必要性

在分布式锁的使用过程中,如果业务逻辑执行时间较长,可能会导致锁过期。为了防止这种情况发生,Redisson提供了自动续期机制。

4.2 自动续期的实现

4.2.1 自动续期的启动

在加锁成功后,如果未设置锁的过期时间,Redisson会启动一个定时任务,定期续期锁的过期时间。以下是自动续期的启动代码:

private void scheduleExpirationRenewal(long threadId) {
    if (expirationRenewalMap.containsKey(threadId)) {
        return;
    }
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getName() + " expiration", e);
                    return;
                }
                if (res) {
                    scheduleExpirationRenewal(threadId);
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    expirationRenewalMap.put(threadId, task);
}

scheduleExpirationRenewal()方法启动一个定时任务,每隔internalLockLeaseTime / 3毫秒执行一次renewExpirationAsync()方法。

4.2.2 自动续期的执行

renewExpirationAsync()方法的实现如下:

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

renewExpirationAsync()方法通过Lua脚本在Redis中续期锁的过期时间。Lua脚本的逻辑如下:

  1. 检查当前线程是否持有锁,如果持有,则续期锁的过期时间并返回1
  2. 如果当前线程未持有锁,则返回0

4.3 自动续期的取消

在解锁成功后,Redisson会取消锁的自动续期任务。以下是自动续期取消的代码:

protected void cancelExpirationRenewal(Long threadId) {
    Timeout task = expirationRenewalMap.remove(threadId);
    if (task != null) {
        task.cancel();
    }
}

cancelExpirationRenewal()方法从expirationRenewalMap中移除定时任务,并取消该任务。

5. Redisson分布式锁的高可用性

5.1 Redis的高可用性

Redisson分布式锁的高可用性依赖于Redis的高可用性。Redis通过主从复制、哨兵模式和集群模式等方式实现高可用性。

5.2 Redisson的高可用性实现

Redisson通过以下方式实现分布式锁的高可用性:

6. Redisson分布式锁的性能优化

6.1 Lua脚本的使用

Redisson使用Lua脚本在Redis中执行加锁和解锁操作,确保操作的原子性,减少网络开销。

6.2 自动续期机制

Redisson的自动续期机制避免了因业务逻辑执行时间过长而导致锁过期的问题,提高了锁的可靠性。

6.3 锁的公平性

Redisson支持公平锁和非公平锁两种模式,用户可以根据业务需求选择合适的锁模式,提高系统的并发性能。

7. 总结

Redisson分布式锁通过Lua脚本、自动续期机制和高可用性设计,提供了一个高效、可靠的分布式锁解决方案。本文详细分析了Redisson分布式锁的加锁和解锁源码,帮助读者深入理解其内部工作原理。在实际应用中,用户可以根据业务需求选择合适的锁模式,并通过性能优化手段提高系统的并发性能。

推荐阅读:
  1. SpringBoot整合Redisson如何实现分布式锁
  2. 使用Redisson订阅数问题怎么解决

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

redisson

上一篇:MyBatis在SQL语句中怎么获取list的大小

下一篇:Vue父子组件之间事件通信怎么实现

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》