提取续期的Lua脚本如下:
-- KEYS[1] == getName() --> $KEY --> resource:x -- ARGV[1] == internalLockLeaseTime --> 30000 -- ARGV[2] == getLockName(threadId) --> 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1 if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;到此为止,不带waitTime参数的加锁和续期逻辑基本分析完毕,而带waitTime参数的tryLock(long waitTime, long leaseTime, TimeUnit unit)实现其实和只存在leaseTime参数的lock(long leaseTime, TimeUnit unit, boolean interruptibly)实现底层调用的方法是一致的,最大的区别是会在尝试获取锁操作之后基于前后的System.currentTimeMillis()计算出时间差和waitTime做对比,决定需要阻塞等待还是直接超时获取锁失败返回,处理阻塞等待的逻辑是客户端本身的逻辑,这里就不做详细展开,因为源码实现也不是十分优雅(太多long currentTime = System.currentTimeMillis()的代码段了)。接着花点功夫分析一下解锁的实现,包括一般情况下的解锁unlock()和强制解锁forceUnlockAsync():
// 一般情况下的解锁 @Override public void unlock() { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { // IllegalMonitorStateException一般是A线程加锁,B线程解锁,内部判断线程状态不一致抛出的 if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException) e.getCause(); } else { throw e; } } } @Override public RFuture<Void> unlockAsync() { // 获取当前调用解锁操作的线程ID long threadId = Thread.currentThread().getId(); return unlockAsync(threadId); } @Override public RFuture<Void> unlockAsync(long threadId) { // 构建一个结果RedissonPromise RPromise<Void> result = new RedissonPromise<Void>(); // 返回的RFuture如果持有的结果为true,说明解锁成功,返回NULL说明线程ID异常,加锁和解锁的客户端线程不是同一个线程 RFuture<Boolean> future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> { // 这是内部的异常,说明解锁异常,需要取消看门狗的续期任务 if (e != null) { cancelExpirationRenewal(threadId); result.tryFailure(e); return; } // 这种情况说明线程ID异常,加锁和解锁的客户端线程不是同一个线程,抛出IllegalMonitorStateException异常 if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return; } // 走到这里说明正常解锁,取消看门狗的续期任务 cancelExpirationRenewal(threadId); result.trySuccess(null); }); return result; } // 真正的内部解锁的方法,执行解锁的Lua脚本 protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); } // 取消续期任务 void cancelExpirationRenewal(Long threadId) { // 这里说明ExpirationEntry已经被移除,一般是基于同一个线程ID多次调用解锁方法导致的(并发解锁) ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (task == null) { return; } // 传入的线程ID不为NULL,从ExpirationEntry中移除线程ID,如果持有的线程ID对应的线程重入计数不为0,会先递减到0,等于0的前提下才会进行删除 if (threadId != null) { task.removeThreadId(threadId); } // 这里threadId == null的情况是为了满足强制解锁的场景,强制解锁需要直接删除锁所在的KEY,不需要理会传入的线程ID(传入的线程ID直接为NULL) // 后者task.hasNoThreads()是为了说明当前的锁没有被任何线程持有,对于单线程也确定在移除线程ID之后重入计数器已经为0,从ExpirationEntry中移除,这个时候获取ExpirationEntry的任务实例进行取消即可 if (threadId == null || task.hasNoThreads()) { Timeout timeout = task.getTimeout(); if (timeout != null) { timeout.cancel(); } // EntryName -> ExpirationEntry映射中移除当前锁的相关实例ExpirationEntry EXPIRATION_RENEWAL_MAP.remove(getEntryName()); } } // 强制解锁 @Override public boolean forceUnlock() { return get(forceUnlockAsync()); } @Override public RFuture<Boolean> forceUnlockAsync() { // 线程ID传入为NULL,取消当前的EntryName对应的续期任务 cancelExpirationRenewal(null); // 执行Lua脚本强制删除锁所在的KEY并且发布解锁消息 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('del', KEYS[1]) == 1) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1 " + "else " + "return 0 " + "end", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE); }