【分布式锁】01-使用Redisson实现可重入分布式锁原理

主流的分布式锁一般有三种实现方式:

数据库乐观锁

基于Redis的分布式锁

基于ZooKeeper的分布式锁

之前我在博客上写过关于mysql和redis实现分布式锁的具体方案:
https://www.cnblogs.com/wang-meng/p/10226618.html
里面主要是从实现原理出发。

这次【分布式锁】系列文章主要是深入redis客户端reddision源码和zk 这两种分布式锁的实现原理。

可靠性

首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

互斥性。在任意时刻,只有一个客户端能持有锁。

不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。

具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。

解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

Redisson加锁原理

redisson是一个非常强大的开源的redis客户端框架, 官方地址:
https://redisson.org/

使用起来很简单,配置好maven和连接信息,这里直接看代码实现:

1RLock lock = redisson.getLock("anyLock");
2
3lock.lock();
4lock.unlock();

redisson具体的执行加锁逻辑都是通过lua脚本来完成的,lua脚本能够保证原子性。

先看下RLock初始化的代码:

1public class Redisson implements RedissonClient {
2
3    @Override
4    public RLock getLock(String name) {
5        return new RedissonLock(connectionManager.getCommandExecutor(), name);
6    }
7}
8
9public class RedissonLock extends RedissonExpirable implements RLock {
10    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
11    super(commandExecutor, name);
12    this.commandExecutor = commandExecutor;
13    this.id = commandExecutor.getConnectionManager().getId();
14    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
15    this.entryName = id + ":" + name;
16}

首先看下RedissonLock 的id返回的是一个UUID对象,每个机器都对应一个自己的id属性,id 值就类似于:"8743c9c0-0795-4907-87fd-6c719a6b4586"

接着往后看lock()的代码实现:

1public class RedissonLock extends RedissonExpirable implements RLock {
2    @Override
3    public void lock() {
4        try {
5            lockInterruptibly();
6        } catch (InterruptedException e) {
7            Thread.currentThread().interrupt();
8        }
9    }
10
11    @Override
12    public void lockInterruptibly() throws InterruptedException {
13        lockInterruptibly(-1, null);
14    }
15
16    @Override
17    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
18        // 获取当前线程id
19        long threadId = Thread.currentThread().getId();
20        Long ttl = tryAcquire(leaseTime, unit, threadId);
21        // lock acquired
22        if (ttl == null) {
23            return;
24        }
25
26        RFuture<RedissonLockEntry> future = subscribe(threadId);
27        commandExecutor.syncSubscription(future);
28
29        try {
30            while (true) {
31                ttl = tryAcquire(leaseTime, unit, threadId);
32                // lock acquired
33                if (ttl == null) {
34                    break;
35                }
36
37                // waiting for message
38                if (ttl >= 0) {
39                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
40                } else {
41                    getEntry(threadId).getLatch().acquire();
42                }
43            }
44        } finally {
45            unsubscribe(future, threadId);
46        }
47    }
48
49    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
50        internalLockLeaseTime = unit.toMillis(leaseTime);
51
52        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
53                  "if (redis.call('exists', KEYS[1]) == 0) then " +
54                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
55                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
56                      "return nil; " +
57                  "end; " +
58                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
59                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
60                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
61                      "return nil; " +
62                  "end; " +
63                  "return redis.call('pttl', KEYS[1]);",
64                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
65    }
66}

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwgpdw.html