翻译一下大意:lockWatchdogTimeout参数只有在没有使用leaseTimeout参数定义的成功获取到锁的场景(简单来说就是不设置时限的加锁)下生效,如果看门狗在下一个lockWatchdogTimeout周期内不进行续期,那么锁就会过期释放(从源码上看,每三分之一lockWatchdogTimeout就会执行一次续期任务,每次通过pexpire把KEY的存活周期延长lockWatchdogTimeout),lockWatchdogTimeout的默认值为30000,也就是30秒。
这里先列举一下RedissonLock中获取名称的方法,以便后面分析这些名称作为K-V结构的KEY时候使用:
id:由配置实例化时候实例化的UUID实例生成,从源码上分析每个连接方式的Redisson实例有唯一的UUID,ConnectionManager初始化的时候会调用UUID id = UUID.randomUUID(),笔者认为可以理解为Redisson实例在某个应用程序进程中的唯一标识,毕竟一般情况下,一个应用程序应该只会应用一种Redisson的连接方式
getEntryName():返回的是UUID + : + $KEY,例如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:resource:x
getName():返回的是$KEY,例如resource:x
getChannelName():返回的是redisson_lock__channel:{$KEY},例如redisson_lock__channel:{resource:x}
getLockName(long threadId):返回的是UUID + : + $threadId,例如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
接着看加锁的方法,核心实现主要是:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException:lock方法体系
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException:tryLock方法体系
先看只包含锁最大持有时间的lock()方法体系:
/** * 获取锁,不指定等待时间,只指定锁的最大持有时间 * 通过interruptibly参数配置支持中断 */ private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); // 尝试获取锁,返回的ttl为空代表获取锁成功,返回的ttl代表已经存在的KEY的剩余存活时间 Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } // 订阅redisson_lock__channel:{$KEY},其实本质的目的是为了客户端通过Redis的订阅发布,感知到解锁的事件 // 这个方法会在LockPubSub中注册一个entryName -> RedissonLockEntry的哈希映射,RedissonLockEntry实例中存放着RPromise<RedissonLockEntry>结果,一个信号量形式的锁和订阅方法重入计数器 // 下面的死循环中的getEntry()或者RPromise<RedissonLockEntry>#getNow()就是从这个映射中获取的 RFuture<RedissonLockEntry> future = subscribe(threadId); // 同步订阅执行,获取注册订阅Channel的响应,区分是否支持中断 if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); } // 走到下面的for循环说明返回的ttl不为空,也就是Redis已经存在对应的KEY,有其他客户端已经获取到锁,此客户端线程的调用需要阻塞等待获取锁 try { while (true) { // 死循环中尝试获取锁,这个是后面会分析的方法 ttl = tryAcquire(leaseTime, unit, threadId); // 返回的ttl为空,说明获取到锁,跳出死循环,这个死循环或者抛出中断异常,或者获取到锁成功break跳出,没有其他方式 if (ttl == null) { break; } // 这个ttl来源于等待存在的锁的KEY的存活时间,直接使用许可为0的信号量进行阻塞等待,下面的几个分支判断都是大同小异,只是有的支持超时时间,有的支持中断 // 有的是永久阻塞直到锁释放事件订阅LockPubSub的onMessage()方法回调激活getLatch().release()进行解锁才会往下走 // 这里可以学到一个特殊的技巧,Semaphore(0),信号量的许可设置为0,首个调用acquire()的线程会被阻塞,直到其他线程调用此信号量的release()方法才会解除阻塞,类似于一个CountDownLatch(1)的效果 if (ttl >= 0) { try { future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { future.getNow().getLatch().acquire(); } else { future.getNow().getLatch().acquireUninterruptibly(); } } } } finally { // 获取到锁或者抛出中断异常,退订redisson_lock__channel:{$KEY},不再关注解锁事件 unsubscribe(future, threadId); } } // 这是一个异步转同步的方法,类似于FutureTask#get(),关键看调用的tryAcquireAsync()方法 private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId)); } /** * 通过传入锁持有的最大时间和线程ID异步获取锁 */ private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) { // 锁持有最大时间不为-1,也就是明确锁的持有时间,不是永久持有的场景 if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } // 走到这里说明是leaseTime == -1,KEY不设置过期时间的分支,需要启动看门狗机制。尝试内部异步获取锁,注意这里的lockWatchdogTimeout是从配置中获取传进去,不是内部的internalLockLeaseTime属性,这里的默认值还是30000毫秒 RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { // 执行异常场景直接返回 if (e != null) { return; } // 成功获取到锁的场景,需要基于线程ID启用看门狗,通过时间轮指定定时任务进行续期 if (ttlRemaining == null) { // 定时调度进行续期操作 scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; } /** * 转换锁持有最大时间,通过参数进行加锁的LUA脚本调用 * getName()就是传入的KEY,如resource:x getLockName()就是锁的名称,形式是:UUID + : + threadId,如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1 * internalLockLeaseTime在leaseTime != -1的前提下使用的是原值,在leaseTime == -1的前提下,使用的是lockWatchdogTimeout */ <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { // 时间转换为毫秒,注意一点这里的internalLockLeaseTime是类内的属性,被重新赋值了 internalLockLeaseTime = unit.toMillis(leaseTime); // 底层向Redis服务执行LUA脚本 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }