redis系列:分布式锁 (2)

抽象类RedisLock增加lockValue字段,lockValue字段的默认值为UUID随机值假设当前线程ID。

public abstract class RedisLock implements Lock { //... protected String lockValue; public RedisLock(Jedis jedis,String lockKey) { this(jedis, lockKey, UUID.randomUUID().toString()+Thread.currentThread().getId()); } public RedisLock(Jedis jedis, String lockKey, String lockValue) { this.jedis = jedis; this.lockKey = lockKey; this.lockValue = lockValue; } //... }

加锁代码

public void lock() { while(true){ String result = jedis.set(lockKey, lockValue, NOT_EXIST,SECONDS,30); if(OK.equals(result)){ System.out.println(Thread.currentThread().getId()+"加锁成功!"); break; } } }

解锁代码

public void unlock() { String lockValue = jedis.get(lockKey); if (lockValue.equals(lockValue)){ jedis.del(lockKey); } }

这时看看加锁代码,好像没有什么问题啊。
再来看看解锁的代码,这里的解锁操作包含三步操作:获取值、判断和删除锁。这时你有没有想到在多线程环境下的i++操作?

3.4.1 i++问题

i++操作也可分为三个步骤:读i的值,进行i+1,设置i的值。
如果两个线程同时对i进行i++操作,会出现如下情况

i设置值为0

线程A读到i的值为0

线程B也读到i的值为0

线程A执行了+1操作,将结果值1写入到内存

线程B执行了+1操作,将结果值1写入到内存

此时i进行了两次i++操作,但是结果却为1

在多线程环境下有什么方式可以避免这类情况发生?
解决方式有很多种,例如用AtomicInteger、CAS、synchronized等等。
这些解决方式的目的都是要确保i++ 操作的原子性。那么回过头来看看解锁,同理我们也是要确保解锁的原子性。我们可以利用Redis的lua脚本来实现解锁操作的原子性。

版本3的完整代码:Github地址

3.5 版本4-具有原子性的释放锁

lua脚本内容如下

if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end

这段Lua脚本在执行的时候要把的lockValue作为ARGV[1]的值传进去,把lockKey作为KEYS[1]的值传进去。现在来看看解锁的java代码

public void unlock() { // 使用lua脚本进行原子删除操作 String checkAndDelScript = "if redis.call('get', KEYS[1]) == ARGV[1] then " + "return redis.call('del', KEYS[1]) " + "else " + "return 0 " + "end"; jedis.eval(checkAndDelScript, 1, lockKey, lockValue); }

好了,解锁操作也确保了原子性了,那么是不是单机Redis环境的分布式锁到此就完成了?
别忘了还有一个,过期时间如何保证大于业务执行时间问题没有解决。

版本4的完整代码:Github地址

3.6 版本5-确保过期时间大于业务执行时间

抽象类RedisLock增加一个boolean类型的属性isOpenExpirationRenewal,用来标识是否开启定时刷新过期时间。
在增加一个scheduleExpirationRenewal方法用于开启刷新过期时间的线程。

public abstract class RedisLock implements Lock { //... protected volatile boolean isOpenExpirationRenewal = true; /** * 开启定时刷新 */ protected void scheduleExpirationRenewal(){ Thread renewalThread = new Thread(new ExpirationRenewal()); renewalThread.start(); } /** * 刷新key的过期时间 */ private class ExpirationRenewal implements Runnable{ @Override public void run() { while (isOpenExpirationRenewal){ System.out.println("执行延迟失效时间中..."); String checkAndExpireScript = "if redis.call('get', KEYS[1]) == ARGV[1] then " + "return redis.call('expire',KEYS[1],ARGV[2]) " + "else " + "return 0 end"; jedis.eval(checkAndExpireScript, 1, lockKey, lockValue, "30"); //休眠10秒 sleepBySencond(10); } } } }

加锁代码在获取锁成功后将isOpenExpirationRenewal置为true,并且调用scheduleExpirationRenewal方法,开启刷新过期时间的线程。

public void lock() { while (true) { String result = jedis.set(lockKey, lockValue, NOT_EXIST, SECONDS, 30); if (OK.equals(result)) { System.out.println("线程id:"+Thread.currentThread().getId() + "加锁成功!时间:"+LocalTime.now()); //开启定时刷新过期时间 isOpenExpirationRenewal = true; scheduleExpirationRenewal(); break; } System.out.println("线程id:"+Thread.currentThread().getId() + "获取锁失败,休眠10秒!时间:"+LocalTime.now()); //休眠10秒 sleepBySencond(10); } }

解锁代码增加一行代码,将isOpenExpirationRenewal属性置为false,停止刷新过期时间的线程轮询。

public void unlock() { //... isOpenExpirationRenewal = false; }

版本5的完整代码:Github地址

3.7 测试

测试代码如下

public void testLockCase5() { //定义线程池 ThreadPoolExecutor pool = new ThreadPoolExecutor(0, 10, 1, TimeUnit.SECONDS, new SynchronousQueue<>()); //添加10个线程获取锁 for (int i = 0; i < 10; i++) { pool.submit(() -> { try { Jedis jedis = new Jedis("localhost"); LockCase5 lock = new LockCase5(jedis, lockName); lock.lock(); //模拟业务执行15秒 lock.sleepBySencond(15); lock.unlock(); } catch (Exception e){ e.printStackTrace(); } }); } //当线程池中的线程数为0时,退出 while (pool.getPoolSize() != 0) {} }

测试结果

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyfjps.html