分布式锁(2) —— Redisson实现分布式锁

Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。充分的利用了Redis键值数据库提供的一系列优势,基于Java实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。

Redisson 分布式重入锁用法

Redisson 支持单点模式、主从模式、哨兵模式、集群模式,这里以单点模式为例:

public class RedissonDemo { public static RedissonClient getRedissonClient() { // 1. Create config object Config config = new Config(); // use "rediss://" for SSL connection config.useClusterServers().addNodeAddress("redis://127.0.0.1:5379"); // 2. Create RedissonClient RedissonClient redisson = Redisson.create(config); return redisson; } public static void main(String[] args) { RedissonClient redisson = getRedissonClient(); // 3. 获取锁对象实例(无法保证按照线程的顺序获取到) RLock rLock = redisson.getLock("anyLock"); long waitTimeout = 60; long leaseTime = 30; try { /** * 4. 尝试获取锁 * waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败 * leaseTime 锁的持有时间,超过这个时间锁会自动失效 (值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完) */ boolean res = rLock.tryLock(waitTimeout, leaseTime, TimeUnit.SECONDS); if (res) { //成功获得锁,在这里处理业务 } } catch (Exception e) { throw new RuntimeException("aquire lock fail"); }finally{ // 无论如何,最后都要解锁 rLock.unlock(); } } } 加锁源码分析 1. getLock方法获取锁对象 // org.redisson.Redisson#getLock() @Override public RLock getLock(String name) { /** * 构造并返回一个 RedissonLock 对象 * commandExecutor: 与 Redis 节点通信并发送指令的真正实现。 * 需要说明一下,CommandExecutor 实现是通过 eval 命令来执行 Lua 脚本 */ return new RedissonLock(connectionManager.getCommandExecutor(), name); } 2. tryLock方法尝试获取锁 // org.redisson.RedissonLock#tryLock() @Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { // 最大等待时间 long time = unit.toMillis(waitTime); // 当前时间 long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); // 1.尝试申请锁,返回还剩余的锁过期时间 Long ttl = tryAcquire(leaseTime, unit, threadId); // 2.如果为空,表示申请锁成功 if (ttl == null) { return true; } // 3.申请锁的耗时如果大于等于最大等待时间,则申请锁失败 time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(threadId); return false; } current = System.currentTimeMillis(); /** * 4.订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题: * 基于信息量,当锁被其它资源占用时, * 当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争 * 当 this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败 * 当 this.await返回true,进入循环尝试获取锁 */ RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); //await 方法内部是用CountDownLatch来实现阻塞,获取subscribe异步执行的结果(应用了Netty 的 Future) if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(threadId); return false; } try { //计算获取锁的总耗时,如果大于等于最大等待时间,则获取锁失败 time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(threadId); return false; } /** * 5.收到锁释放的信号后,在最大等待时间之内,循环一次接着一次的尝试获取锁 * 获取锁成功,则立马返回true, * 若在最大等待时间之内还没获取到锁,则认为获取锁失败,返回false结束循环 */ while (true) { long currentTime = System.currentTimeMillis(); // 再次尝试申请锁 ttl = tryAcquire(leaseTime, unit, threadId); // 成功获取锁则直接返回true结束循环 if (ttl == null) { return true; } // 超过最大等待时间则返回false结束循环,获取锁失败 time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(threadId); return false; } /** * 6.阻塞等待锁(通过信号量(共享锁)阻塞,等待解锁消息): */ currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } // 7.更新剩余的等待时间(最大等待时间-已经消耗的阻塞时间) time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(threadId); return false; } } } finally { // 8.无论是否获得锁,都要取消订阅解锁消息 unsubscribe(subscribeFuture, threadId); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwzdzf.html