Redisson 实现分布式锁的原理分析

在了解分布式锁具体实现方案之前,我们应该先思考一下使用分布式锁必须要考虑的一些问题。​

互斥性:在任意时刻,只能有一个进程持有锁。

防死锁:即使有一个进程在持有锁的期间崩溃而未能主动释放锁,要有其他方式去释放锁从而保证其他进程能获取到锁。

加锁和解锁的必须是同一个进程。

锁的续期问题。

常见的分布式锁实现方案

基于 Redis 实现分布式锁

基于 Zookeeper 实现分布式锁

本文采用第一种方案,也就是基于 Redis 的分布式锁实现方案。

Redis 实现分布式锁主要步骤

指定一个 key 作为锁标记,存入 Redis 中,指定一个 唯一的用户标识 作为 value。

当 key 不存在时才能设置值,确保同一时间只有一个客户端进程获得锁,满足 互斥性 特性。

设置一个过期时间,防止因系统异常导致没能删除这个 key,满足 防死锁 特性。

当处理完业务之后需要清除这个 key 来释放锁,清除 key 时需要校验 value 值,需要满足 只有加锁的人才能释放锁

特别注意:以上实现步骤考虑到了使用分布式锁需要考虑的互斥性、防死锁、加锁和解锁必须为同一个进程等问题,但是锁的续期无法实现。所以,博主采用 Redisson 实现 Redis 的分布式锁,借助 Redisson 的 WatchDog 机制 能够很好的解决锁续期的问题,同样 Redisson 也是 Redis 官方推荐分布式锁实现方案,实现起来较为简单。

Redisson 实现分布式锁

具体实现代码已经上传到博主的仓库,需要的朋友可以在公众号内回复 【分布式锁代码】 获取码云或 GitHub 项目下载地址。

下面从加锁机制、锁互斥机制、Watch dog 机制、可重入加锁机制、锁释放机制、等五个方面对 Redisson 实现分布式锁的底层原理进行分析

加锁原理

加锁其实是通过一段 lua 脚本实现的,如下:

Redisson 实现分布式锁的原理分析

我们把这一段 lua 脚本抽出来看:

if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);"

这里 KEYS[1] 代表的是你加锁的 key,比如你自己设置了加锁的那个锁 key 就是 “myLock”。

// create a lock RLock lock = redisson.getLock("myLock");

这里 ARGV[1] 代表的是锁 key 的默认生存时间,默认 30 秒。ARGV[2] 代表的是加锁的客户端的 ID,类似于下面这样:285475da-9152-4c83-822a-67ee2f116a79:52。至于最后面的一个 1 是为了后面可重入做的计数统计,后面会有讲解到。

我们来看一下在 Redis 中的存储结构:

127.0.0.1:6379> HGETALL myLock 1) "285475da-9152-4c83-822a-67ee2f116a79:52" 2) "1"

上面这一段加锁的 lua 脚本的作用是:第一段 if 判断语句,就是用 exists myLock 命令判断一下,如果你要加锁的那个锁 key 不存在的话,你就进行加锁。如何加锁呢?使用 hincrby 命令设置一个 hash 结构,类似于在 Redis 中使用下面的操作:

127.0.0.1:6379> HINCRBY myLock 285475da-9152-4c83-822a-67ee2f116a79:52 1 (integer) 1

接着会执行 pexpire myLock 30000 命令,设置 myLock 这个锁 key 的生存时间是 30 秒。到此为止,加锁完成。

有的小伙伴可能此时就有疑问了,如果此时有第二个客户端请求加锁呢? 这就是下面要说的锁互斥机制。

锁互斥机制

此时,如果客户端 2 来尝试加锁,会如何呢?首先,第一个 if 判断会执行 exists myLock,发现 myLock 这个锁 key 已经存在了。接着第二个 if 判断,判断一下,myLock 锁 key 的 hash 数据结构中,是否包含客户端 2 的 ID,这里明显不是,因为那里包含的是客户端 1 的 ID。所以,客户端 2 会执行:

return redis.call('pttl', KEYS[1]);

返回的一个数字,这个数字代表了 myLock 这个锁 key 的剩余生存时间。

接下来我们看一下 Redissson tryLock 的主流程:

@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); // 1.尝试获取锁 Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } // 申请锁的耗时如果大于等于最大等待时间,则申请锁失败. time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(threadId); return false; } current = System.currentTimeMillis(); /** * 2.订阅锁释放事件,并通过 await 方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题: * 基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争. * * 当 this.await 返回 false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败. * 当 this.await 返回 true,进入循环尝试获取锁. */ RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); // await 方法内部是用 CountDownLatch 来实现阻塞,获取 subscribe 异步执行的结果(应用了 Netty 的 Future) if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(threadId); return false; } try { // 计算获取锁的总耗时,如果大于等于最大等待时间,则获取锁失败. time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(threadId); return false; } /** * 3.收到锁释放的信号后,在最大等待时间之内,循环一次接着一次的尝试获取锁 * 获取锁成功,则立马返回 true, * 若在最大等待时间之内还没获取到锁,则认为获取锁失败,返回 false 结束循环 */ while (true) { long currentTime = System.currentTimeMillis(); // 再次尝试获取锁 ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } // 超过最大等待时间则返回 false 结束循环,获取锁失败 time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(threadId); return false; } /** * 6.阻塞等待锁(通过信号量(共享锁)阻塞,等待解锁消息): */ currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { //如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。 getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { //则就在wait time 时间范围内等待可以通过信号量 getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } // 更新剩余的等待时间(最大等待时间-已经消耗的阻塞时间) time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(threadId); return false; } } } finally { // 7.无论是否获得锁,都要取消订阅解锁消息 unsubscribe(subscribeFuture, threadId); } // return get(tryLockAsync(waitTime, leaseTime, unit)); }

流程分析

尝试获取锁,返回 null 则说明加锁成功,返回一个数值,则说明已经存在该锁,ttl 为锁的剩余存活时间。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wssfsd.html