修改test.py测试脚本,测试一下锁过期。测试脚本如下:
# test-3-3.py def increase(redis, lock, key): thread_name = threading.current_thread().name lock_value = lock.get_lock(key, timeout=1) value = redis.get(key) if not value: value = 0 # 模拟实际情况下进行的某些耗时操作, 且执行时间大于锁过期的时间 time.sleep(3) value = int(value) + 1 print thread_name, value redis.set(key, value) lock.del_lock(key, lock_value)执行test-3-3.py测试脚本,得到的结果如下:
Thread-1 1 Thread-2 1 Thread-5 1 Thread-6 2 Thread-8 2 Thread-10 2 Thread-9 3 Thread-3 3 Thread-4 3 Thread-7 4从运行test-3-3.py测试脚本结果来看,问题没有得到解决。这是为什么呢?因为我们设置value的唯一性只能确保线程不会误删其他线程产生的锁,不会出现一连串的误删锁的情况,比如 A 删了 B 的锁,B 执行完删了 C 的锁。使用 redis 的过期机制,只要业务的处理时间大于锁的过期时间,就没有一个很好的方式来避免由于锁过期导致其他线程同时占有锁的问题,所以需要熟悉业务的执行时间,来合理地设置锁的过期时间。(PS:对于这种情况,一般的处理方式是获得锁的线程开启一个守护线程,用来给快要过期的锁"续航"。比如过去了29秒,线程A还没执行完,这时候守护线程会执行expire指令,为这把锁"续航"20秒。守护线程从第29秒开始执行,每20秒执行一次检查。当线程A执行完任务,会显式关掉守护线程。线程A的进程或者守护进程异常退出,这把锁将自动超时释放,从而不会导致死锁。)
另外,需要注意的一点是:3.3节的实现方式中,删除锁(del_lock)的操作不是原子性的,先是拿到锁,再判断锁的值是否相等,相等的话最后再删除锁,既然不是原子性的,就有可能存在这样一种极端情况:在判断的那一时刻,锁正好过期了,被其他线程占有了锁,那最后一步的删除,就可能会造成误删其他线程的锁。因此推荐使用官方提供的 Lua 脚本来确保原子性:
def del_lock(self, key, value): if redis.call("get",key) == value then return redis.call("del",key) else return 0 4. 总结以上就是我们使用 Redis 来实现一个分布式同步锁的方式,其特点是:
加锁和释放锁是原子性的
满足互斥性,同一个时刻只能有一个线程可以获取锁和释放锁
利用 Redis 的 ttl机制和守护进程的方式来保证不会出现死锁
以上的方案中,我们是假设 Redis 服务端是单集群且高可用的,忽视了以下的问题:
如果某一时刻 Redis master 节点发生了故障,集群中的某个 slave 节点变成 master 节点,在故障迁移(failover)过程中可能出现原 master 节点上的锁没有及时同步到 slave 节点,导致其他线程同时获得锁。对于这个问题,可以参考 Redis 官方推出的 redlock 算法,但是比较遗憾的是,该算法也没有很好地解决锁过期的问题。(PS:不过这种不安全也仅仅是在主从发生 failover 的情况下才会产生,而且持续时间极短,业务系统多数情况下可以容忍。)
5. 参考资料漫画:什么是分布式锁?
基于 redis 的分布式锁实现
redis分布式锁深度剖析(超时情况)
SET key value
Distributed locks with Redis