续期,这个是一个比较前卫解决思路,也就是如果一个客户端对资源X永久锁定,那么并不是直接对KEY生存周期设置为-1,而是通过一个守护线程每隔固定周期延长KEY的过期时间,这样就能实现在守护线程不被杀掉的前提下,避免客户端崩溃导致锁无法释放长期占用资源的问题
锁状态变更订阅,依赖于org.redisson.pubsub.LockPubSub,用于订阅和通知锁释放事件
不是完全参考red lock算法的实现,数据类型选用了HASH,配合Lua脚本完成多个命令的原子性
续期或者说延长KEY的过期时间在Redisson使用watch dog实现,理解为用于续期的守护线程,底层依赖于Netty的时间轮HashedWheelTimer和任务io.netty.util.Timeout实现,俗称看门狗,下面会详细分析。
先看RLock的类图:
这里有一个疑惑点,RedissonRedLock(RedissonMultiLock的子类)的注释中提到RedLock locking algorithm implementation for multiple locks. It manages all locks as one. 但从直观上看,RedissonLock才是整个锁体系的核心,里面的实现思路也是遵从red lock算法的。
RedissonLock就是RLock的直接实现,也是分布式锁实现的核心类,从源码中看到Redisson#getLock()就是直接实例化RedissonLock
public class Redisson implements RedissonClient { // ...... 省略其他代码 @Override public RLock getLock(String name) { return new RedissonLock(connectionManager.getCommandExecutor(), name); } // ...... 省略其他代码 }因此只需要围绕RedissonLock的源码进行分析即可。RedissonLock的类继承图如下:
这里需要有几点认知:
RedissonLock实现了java.util.concurrent.locks.Lock接口中除了newCondition()方法外的所有方法,也就是可以基本无缝适配Lock接口,对于习惯Lock接口的API的使用者来说是一个福音
RedissonLock基本所有同步API都依赖于异步API的实现,也就是RLock的实现依赖于RLockAsync的实现,底层依赖的是Netty的io.netty.util.concurrent.Promise,具体见RedissonPromise,如果用过JUC中的Future的开发者应该比较熟悉Future#get(),这里的做法类似
右边的几个父类的简单功能描述如下:
RObjectAsync:所有Redisson对象的基础接口,提供一些内存测量、对象拷贝、移动等的异步方法
RObject:RObjectAsync的同步版本
RExpirableAsync:提供对象TTL相关的异步方法
RExpirable:RExpirableAsync的同步版本
RedissonObject:直接实现类RObject接口中的方法
RedissonExpirable:主要是实现了RExpirable接口中的方法
接着先看RedissonLock的构造函数和核心属性:
// 存放entryName -> ExpirationEntry,用于获取当前entryName的线程重入计数器和续期任务 private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>(); // 内部的锁持有的最大时间,来源于参数Config#lockWatchdogTimeout,用于控制续期的周期 protected long internalLockLeaseTime; // ID,唯一标识,是一个UUID final String id; // final String entryName; // 锁释放事件订阅发布相关 protected final LockPubSub pubSub; // 命令异步执行器实例 final CommandAsyncExecutor commandExecutor; /** * CommandAsyncExecutor是命令的异步执行器,里面的方法是相对底层的面向通讯框架的方法,包括异步写、异步读和同步结果获取等 * name参数就是getLock()时候传入的参数,其实就是最终同步到Redis中的KEY */ public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; // 这里的ID为外部初始化的UUID实例,调用toString() this.id = commandExecutor.getConnectionManager().getId(); this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); // 这里的entryName = uuid值 + : + 外部传进来的name(KEY),如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:resource:x this.entryName = id + ":" + name; // 初始化LockPubSub实例,用于订阅和发布锁释放的事件 this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); } // RedissonLock内部类ExpirationEntry,存放着线程重入的计数器和续期的Timeout任务 public static class ExpirationEntry { // 线程ID -> 线程重入的次数 private final Map<Long, Integer> threadIds = new LinkedHashMap<>(); private volatile Timeout timeout; public ExpirationEntry() { super(); } // 这个方法主要记录线程重入的计数 public void addThreadId(long threadId) { Integer counter = threadIds.get(threadId); if (counter == null) { counter = 1; } else { counter++; } threadIds.put(threadId, counter); } public boolean hasNoThreads() { return threadIds.isEmpty(); } public Long getFirstThreadId() { if (threadIds.isEmpty()) { return null; } return threadIds.keySet().iterator().next(); } public void removeThreadId(long threadId) { Integer counter = threadIds.get(threadId); if (counter == null) { return; } counter--; if (counter == 0) { threadIds.remove(threadId); } else { threadIds.put(threadId, counter); } } public void setTimeout(Timeout timeout) { this.timeout = timeout; } public Timeout getTimeout() { return timeout; } }这里需要关注一下Config中的lockWatchdogTimeout参数: