分布式锁(2) —— Redisson实现分布式锁 (2)

主体过程就是申请锁 --> 如果获取失败则订阅锁释放事件(通过redis发布订阅功能)

3.加锁流程 private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId)); } private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } // 加锁核心为tryLockInnerAsync RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager(). getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; } // org.redisson.RedissonLock#tryLockInnerAsync() <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); /** * KEYS[1]就是Collections.singletonList(getName()),表示分布式锁的key; * ARGV[1]就是internalLockLeaseTime,即锁的租约时间(持有锁的有效时间),默认30s; * ARGV[2]就是getLockName(threadId),是获取锁时set的唯一值 value,即UUID+threadId。 */ return evalWriteAsync(getName(), LongCodec.INSTANCE, command, // 如果不存在键则加锁 "if (redis.call(\'exists\', KEYS[1]) == 0) then " + "redis.call(\'hincrby\', KEYS[1], ARGV[2], 1); " + "redis.call(\'pexpire\', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 如果存在键且是当前线程加锁,则重入该锁,并更新过期时间 "if (redis.call(\'hexists\', KEYS[1], ARGV[2]) == 1) then " + "redis.call(\'hincrby\', KEYS[1], ARGV[2], 1); " + "redis.call(\'pexpire\', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 返回锁剩余时间 "return redis.call(\'pttl\', KEYS[1]);", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); } 解锁源码分析 // org.redisson.RedissonLock#unlock() @Override public void unlock() { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException) e.getCause(); } else { throw e; } } // org.redisson.RedissonLock#unlockAsync() @Override public RFuture<Void> unlockAsync(long threadId) { RPromise<Void> result = new RedissonPromise<Void>(); // 核心方法 unlockInnerAsync RFuture<Boolean> future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> { cancelExpirationRenewal(threadId); if (e != null) { result.tryFailure(e); return; } if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return; } result.trySuccess(null); }); return result; } // org.redisson.RedissonLock#unlockInnerAsync() protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 锁不存在直接返回 "if (redis.call(\'hexists\', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + // 锁存在则可重入锁计数减1 "local counter = redis.call(\'hincrby\', KEYS[1], ARGV[3], -1); " + // 计数>0,更新过期时间 "if (counter > 0) then " + "redis.call(\'pexpire\', KEYS[1], ARGV[2]); " + "return 0; " + // 计数==0,删除key,并发送解锁事件 "else " + "redis.call(\'del\', KEYS[1]); " + "redis.call(\'publish\', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); } 解锁消息处理 // org.redisson.pubsub#onMessage() public class LockPubSub extends PublishSubscribe<RedissonLockEntry> { public static final Long UNLOCK_MESSAGE = 0L; public static final Long READ_UNLOCK_MESSAGE = 1L; @Override protected void onMessage(RedissonLockEntry value, Long message) { // 判断是否解锁消息 if (message.equals(UNLOCK_MESSAGE)) { Runnable runnableToExecute = value.getListeners().poll(); if (runnableToExecute != null) { runnableToExecute.run(); } // 释放一个信号量,唤醒等待的entry.getLatch().tryAcquire去再次尝试申请锁 value.getLatch().release(); } else if (message.equals(READ_UNLOCK_MESSAGE)) { while (true) { Runnable runnableToExecute = value.getListeners().poll(); if (runnableToExecute == null) { break; } runnableToExecute.run(); } value.getLatch().release(value.getLatch().getQueueLength()); } } } 总结

通过 Redisson 实现分布式锁,基本原理和自己通过set key value px milliseconds nx + lua 实现一样,但是效果更好些,主要因为:

RedissonLock是可重入的;

考虑了失败重试;

阻塞的同时可以设置锁的最大等待时间 ;

在实现上也做了一些优化,减少了无效的锁申请,提升了资源的利用率。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwzdzf.html