【高并发】高并发分布式锁架构解密,不是所有的锁都是分布式锁!! (6)

结合以上分析,我们将提交订单的接口方法代码改造成如下所示。

public class RedisLockImpl implements RedisLock{ @Autowired private StringRedisTemplate stringRedisTemplate; private ThreadLocal<String> threadLocal = new ThreadLocal<String>(); @Override public boolean tryLock(String key, long timeout, TimeUnit unit){ Boolean isLocked = false; if(threadLocal.get() == null){ String uuid = UUID.randomUUID().toString(); threadLocal.set(uuid); isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit); }else{ isLocked = true; } return isLocked; } @Override public void releaseLock(String key){ //当前线程中绑定的uuid与Redis中的uuid相同时,再执行删除锁的操作 if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){ stringRedisTemplate.delete(key); } } }

这样写看似没有啥问题,但是大家细想一下,这样写就真的OK了吗?

可重入性的问题分析

既然上面分布式锁的可重入性是存在问题的,那我们就来分析下问题的根源在哪里!

假设我们提交订单的方法中,首先使用RedisLock接口对代码块添加了分布式锁,在加锁后的代码中调用了服务A,而服务A中也存在调用RedisLock接口的加锁和解锁操作。而多次调用RedisLock接口的加锁操作时,只要之前的锁没有失效,则会直接返回true,表示成功获取锁。也就是说,无论调用加锁操作多少次,最终只会成功加锁一次。而执行完服务A中的逻辑后,在服务A中调用RedisLock接口的解锁方法,此时,会将当前线程所有的加锁操作获得的锁全部释放掉。

我们可以使用下图来简单的表示这个过程。

那么问题来了,如何解决可重入性的问题呢?

解决可重入性问题

相信很多小伙伴都能够想出使用计数器的方式来解决上面可重入性的问题,没错,就是使用计数器来解决。 整体流程如下所示。

那么,体现在程序代码上是什么样子呢?我们来修改RedisLockImpl类的代码,如下所示。

public class RedisLockImpl implements RedisLock{ @Autowired private StringRedisTemplate stringRedisTemplate; private ThreadLocal<String> threadLocal = new ThreadLocal<String>(); private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>(); @Override public boolean tryLock(String key, long timeout, TimeUnit unit){ Boolean isLocked = false; if(threadLocal.get() == null){ String uuid = UUID.randomUUID().toString(); threadLocal.set(uuid); isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit); }else{ isLocked = true; } //加锁成功后将计数器加1 if(isLocked){ Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get(); threadLocalInteger.set(count++); } return isLocked; } @Override public void releaseLock(String key){ //当前线程中绑定的uuid与Redis中的uuid相同时,再执行删除锁的操作 if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){ Integer count = threadLocalInteger.get(); //计数器减为0时释放锁 if(count == null || --count <= 0){ stringRedisTemplate.delete(key); } } } }

至此,我们基本上解决了分布式锁的可重入性问题。

说到这里,我还要问大家一句,上面的解决问题的方案真的没问题了吗?

阻塞与非阻塞锁

在提交订单的方法中,当获取Redis分布式锁失败时,我们直接返回了failure来表示当前请求下单的操作失败了。试想,在高并发环境下,一旦某个请求获得了分布式锁,那么,在这个请求释放锁之前,其他的请求调用下单方法时,都会返回下单失败的信息。在真实场景中,这是非常不友好的。我们可以将后续的请求进行阻塞,直到当前请求释放锁后,再唤醒阻塞的请求获得分布式锁来执行方法。

所以,我们设计的分布式锁需要支持 阻塞和非阻塞 的特性。

那么,如何实现阻塞呢?我们可以使用自旋来实现,继续修改RedisLockImpl的代码如下所示。

public class RedisLockImpl implements RedisLock{ @Autowired private StringRedisTemplate stringRedisTemplate; private ThreadLocal<String> threadLocal = new ThreadLocal<String>(); private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>(); @Override public boolean tryLock(String key, long timeout, TimeUnit unit){ Boolean isLocked = false; if(threadLocal.get() == null){ String uuid = UUID.randomUUID().toString(); threadLocal.set(uuid); isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit); //如果获取锁失败,则自旋获取锁,直到成功 if(!isLocked){ for(;;){ isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit); if(isLocked){ break; } } } }else{ isLocked = true; } //加锁成功后将计数器加1 if(isLocked){ Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get(); threadLocalInteger.set(count++); } return isLocked; } @Override public void releaseLock(String key){ //当前线程中绑定的uuid与Redis中的uuid相同时,再执行删除锁的操作 if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){ Integer count = threadLocalInteger.get(); //计数器减为0时释放锁 if(count == null || --count <= 0){ stringRedisTemplate.delete(key); } } } }

在分布式锁的设计中,阻塞锁和非阻塞锁 是非常重要的概念,大家一定要记住这个知识点。

锁失效问题

尽管我们实现了分布式锁的阻塞特性,但是还有一个问题是我们不得不考虑的。那就是 锁失效 的问题。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wssxwg.html