您好,登录后才能下订单哦!
在分布式系统中,锁机制是保证数据一致性和系统稳定性的重要手段。Redisson基于Redis的Java客户端,提供了丰富的分布式锁实现。本文将深入分析Redisson分布式锁的加锁和解锁源码,帮助读者理解其内部工作原理。
分布式锁是一种在分布式系统中用于控制多个进程对共享资源访问的机制。它通过协调不同节点之间的操作,确保在同一时间只有一个节点能够访问共享资源。
Redisson分布式锁具有以下特点:
Redisson分布式锁的加锁过程主要包括以下几个步骤:
Redisson分布式锁的加锁入口是RLock
接口的lock()
方法。以下是RLock
接口的定义:
public interface RLock extends Lock, RLockAsync {
void lock();
void lock(long leaseTime, TimeUnit unit);
boolean tryLock();
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
void unlock();
void forceUnlock();
boolean isLocked();
boolean isHeldByCurrentThread();
int getHoldCount();
long remainTimeToLive();
}
lock()
方法实现lock()
方法的实现位于RedissonLock
类中。以下是lock()
方法的源码:
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
lock()
方法调用了lockInterruptibly()
方法,该方法允许在等待锁的过程中响应中断。
lockInterruptibly()
方法实现lockInterruptibly()
方法的实现如下:
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
lockInterruptibly()
方法调用了另一个重载方法lockInterruptibly(long leaseTime, TimeUnit unit)
,其中leaseTime
为-1表示不设置锁的过期时间。
lockInterruptibly(long leaseTime, TimeUnit unit)
方法实现lockInterruptibly(long leaseTime, TimeUnit unit)
方法的实现如下:
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {
break;
}
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
}
该方法首先调用tryAcquire()
方法尝试获取锁。如果获取锁成功,则直接返回;否则,订阅锁的释放事件,并在循环中不断尝试获取锁。
tryAcquire()
方法实现tryAcquire()
方法的实现如下:
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
tryAcquire()
方法调用了tryAcquireAsync()
方法,并等待其返回结果。
tryAcquireAsync()
方法实现tryAcquireAsync()
方法的实现如下:
private RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
tryAcquireAsync()
方法根据leaseTime
参数的值决定是否设置锁的过期时间。如果leaseTime
为-1,则调用tryLockInnerAsync()
方法尝试获取锁,并在获取成功后启动锁的自动续期。
tryLockInnerAsync()
方法实现tryLockInnerAsync()
方法是加锁的核心逻辑,其实现如下:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
tryLockInnerAsync()
方法通过Lua脚本在Redis中执行加锁操作。Lua脚本的逻辑如下:
nil
。nil
。hincrby
命令实现锁的可重入性,同一个线程可以多次获取同一把锁。Redisson分布式锁的解锁过程主要包括以下几个步骤:
Redisson分布式锁的解锁入口是RLock
接口的unlock()
方法。以下是unlock()
方法的源码:
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
unlock()
方法调用了unlockAsync()
方法,并等待其返回结果。
unlockAsync()
方法实现unlockAsync()
方法的实现如下:
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
if (e != null) {
cancelExpirationRenewal(threadId);
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
cancelExpirationRenewal(threadId);
result.trySuccess(null);
});
return result;
}
unlockAsync()
方法调用了unlockInnerAsync()
方法,并在解锁成功后取消锁的自动续期任务。
unlockInnerAsync()
方法实现unlockInnerAsync()
方法是解锁的核心逻辑,其实现如下:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
unlockInnerAsync()
方法通过Lua脚本在Redis中执行解锁操作。Lua脚本的逻辑如下:
nil
。0
。1
。publish
命令发布锁的释放事件,通知其他等待锁的客户端。在分布式锁的使用过程中,如果业务逻辑执行时间较长,可能会导致锁过期。为了防止这种情况发生,Redisson提供了自动续期机制。
在加锁成功后,如果未设置锁的过期时间,Redisson会启动一个定时任务,定期续期锁的过期时间。以下是自动续期的启动代码:
private void scheduleExpirationRenewal(long threadId) {
if (expirationRenewalMap.containsKey(threadId)) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
scheduleExpirationRenewal(threadId);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
expirationRenewalMap.put(threadId, task);
}
scheduleExpirationRenewal()
方法启动一个定时任务,每隔internalLockLeaseTime / 3
毫秒执行一次renewExpirationAsync()
方法。
renewExpirationAsync()
方法的实现如下:
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
renewExpirationAsync()
方法通过Lua脚本在Redis中续期锁的过期时间。Lua脚本的逻辑如下:
1
。0
。在解锁成功后,Redisson会取消锁的自动续期任务。以下是自动续期取消的代码:
protected void cancelExpirationRenewal(Long threadId) {
Timeout task = expirationRenewalMap.remove(threadId);
if (task != null) {
task.cancel();
}
}
cancelExpirationRenewal()
方法从expirationRenewalMap
中移除定时任务,并取消该任务。
Redisson分布式锁的高可用性依赖于Redis的高可用性。Redis通过主从复制、哨兵模式和集群模式等方式实现高可用性。
Redisson通过以下方式实现分布式锁的高可用性:
Redisson使用Lua脚本在Redis中执行加锁和解锁操作,确保操作的原子性,减少网络开销。
Redisson的自动续期机制避免了因业务逻辑执行时间过长而导致锁过期的问题,提高了锁的可靠性。
Redisson支持公平锁和非公平锁两种模式,用户可以根据业务需求选择合适的锁模式,提高系统的并发性能。
Redisson分布式锁通过Lua脚本、自动续期机制和高可用性设计,提供了一个高效、可靠的分布式锁解决方案。本文详细分析了Redisson分布式锁的加锁和解锁源码,帮助读者深入理解其内部工作原理。在实际应用中,用户可以根据业务需求选择合适的锁模式,并通过性能优化手段提高系统的并发性能。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。