分布式锁Redission (2)

没有指定过期时间调用getLockWatchdogTimeout()方法,获取锁的默认看门狗时间,30秒

public long getLockWatchdogTimeout() { return this.lockWatchdogTimeout; } this.lockWatchdogTimeout = 30000L;

还是调用tryLockInnerAsync给redis发送命令,占锁成功返回一个以不变异步编排的RFuture<Long>对象,来进行监听,里面有两个参数ttlRemaining, e

ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e == null) { if (ttlRemaining == null) { this.scheduleExpirationRenewal(threadId); } } });

里面有个scheduleExpirationRenewal方法

private void scheduleExpirationRenewal(long threadId) { RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry(); RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); //重新设置过期时间 this.renewExpiration(); } }

里面的关键方法renewExpiration执行定时任务,

private void renewExpiration() { RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName()); if (ee != null) { //里面会执行一个定时任务 Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() { public void run(Timeout timeout) throws Exception { RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName()); if (ent != null) { Long threadId = ent.getFirstThreadId(); if (threadId != null) { RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e); } else { if (res) { RedissonLock.this.renewExpiration(); } } }); } } } //看门狗时间/3 10秒钟重试一次 }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); ee.setTimeout(task); } }

主要是来运行renewExpirationAsync这个方法

protected RFuture<Boolean> renewExpirationAsync(long threadId) { return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)}); }

里面传入了一个internalLockLeaseTime时间参数

image-20211006163310509

又是获取看门狗时间

总结

如果传了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间

如果未指定锁的超时时间,就是使用lockWatchdogTimeout的默认时间30秒,只要占锁成功就会启动一个定时任务【重新给所设置时间,新的过期时间就是lockWatchdogTimeout的默认时间】

最佳实践使用自定义过期时间,省掉了自动续期时间,自动加锁

读写锁测试 @GetMapping("/write") @ResponseBody public String writeValue() { RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock"); String s=""; RLock rLock=readWriteLock.writeLock(); try{ //加写锁 rLock.lock(); s= UUID.randomUUID().toString(); Thread.sleep(30000); redisTemplate.opsForValue().set("writeValue",s); }catch (Exception e){ e.printStackTrace(); } finally { rLock.unlock(); } return s; } @GetMapping("/read") @ResponseBody public String readValue() { RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock"); String s=""; //加读锁 RLock rLock=readWriteLock.readLock(); rLock.lock(); try{ s=redisTemplate.opsForValue().get("writeValue"); }catch (Exception e){ e.printStackTrace(); } finally { rLock.unlock(); } return s; }

写锁没释放读锁就必须等待,没有写锁读锁都可以读

保证数据的一致性,写锁是一个排他锁、互斥锁,读锁是共享锁。

读读共享、读写互斥、写写互斥、写读互斥,只要有写的存在都必须等待

信号量测试

像车库停车,每进来一辆车,车库减少一个车位,只有当车库还有车位才可以停车

@GetMapping("/park") @ResponseBody public String park() throws InterruptedException { RSemaphore park = redisson.getSemaphore("park"); //获取一个信号 占一个值 park.acquire(); return "ok"; } @GetMapping("/go") @ResponseBody public String go(){ RSemaphore park = redisson.getSemaphore("park"); //释放一个车位 park.release(); return "ok"; }

访问:

gulimall.com/park

gulimall.com/go

信号量可以用作分布式的限流

闭锁

只有等待所有活动都完成才发生,例如当所有班级放学走完才关闭学校大门

@GetMapping("/lockdoor") @ResponseBody public String lockDoor() throws InterruptedException { RCountDownLatch door = redisson.getCountDownLatch("door"); door.trySetCount(5); door.await();//等待闭锁都完成 return "放假啦...."; } @GetMapping("/gogo/{id}") @ResponseBody public String gogogo(@PathVariable("id") Long id) throws InterruptedException { RCountDownLatch door = redisson.getCountDownLatch("door"); door.countDown(); return id+"班都走了"; } 缓存一致性解决

在我们读缓存的时候可能会有数据被修改过,为了让我们能够读到最新的数据,有两种处理方法:

双写模式

在把数据写入数据库的时候,同时写入到缓存中

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwyxsz.html