输出结果是正确的,但是还有潜在的问题。比如假设 A 线程获得了锁后,由于某种异常原因导致线程crash了,这个时候锁将无法被释放。稍微修改一下测试用例的 increase 函数,模拟某个线程在释放锁之前因为异常退出。
# test-3-2.py def increase(redis, lock, key): thread_name = threading.current_thread().name lock_value = lock.get_lock(key) value = redis.get(key) if not value: value = 0 # 模拟实际情况下进行的某些耗时操作 time.sleep(0.1) value = int(value) + 1 redis.set(key, value) print thread_name, value # 模拟线程2异常退出 if thread_name == \'Thread-2\': print \'{} crash...\'.format(thread_name) import sys sys.exit(1) lock.del_lock(key, lock_value)执行test-3-2.py测试脚本,得到的结果如下:
Thread-2 3 Thread-2 crash... Thread-7 waiting... Thread-3 waiting... Thread-5 waiting... Thread-4 waiting... Thread-9 waiting... Thread-6 waiting... Thread-10 waiting...此时就会出现问题,当线程2 crash 之后,后续获取锁的线程一直获取不了锁,一直处于等待锁的状态,于是产生了死锁。如果请求是多线程处理的,比如每来一个请求就开一个线程去处理,那么堆积的线程会逐渐增多,最终可能会导致系统崩溃。
当获得锁的线程异常退出后,无法主动释放锁,因此需要找到一种方式即使线程异常退出,线程占用的锁也能够被释放,显然我们需要一种被动释放锁的机制。从 redis 2.6.12 版本开始,set 命令就已经支持了 nx 和 expire 功能。改进代码如下:
def get_lock(self, key, timeout=3): lock_key = self._get_lock_key(key) while True: value = self.rediscli.set(lock_key, \'1\', nx=True, ex=timeout) if value: return True time.sleep(0.01)执行test.py测试脚本,得到的结果如下:
Thread-1 1 Thread-9 2 Thread-6 3 Thread-2 4 Thread-4 5 Thread-5 6 Thread-8 7 Thread-3 8 Thread-7 9 Thread-10 10执行test-3-2.py测试脚本,模拟 线程2 crash,得到的结果如下:
Thread-1 1 Thread-2 2 Thread-2 crash... Thread-10 3 Thread-7 4 Thread-4 5 Thread-8 6 Thread-3 7 Thread-9 8 Thread-6 9 Thread-5 10从上面的运行结果来看,似乎已经解决了原子性和无死锁的问题。那第三个条件互斥性是否满足呢?正常情况下,3.2节的实现方式是满足互斥性的,但是还有一种场景需要我们考虑:比如假设 A 线程的逻辑还没处理完,但是锁由于过期时间到了,导致锁自动被释放掉,这时 B 线程获得了锁,开始处理 B 的逻辑,然后 A 进程的逻辑处理完了,B 线程还在处理中,就把 B 线程的锁给删除了。通过修改一下测试用例,模拟一下这种场景。
def increase(redis, lock, key): thread_name = threading.current_thread().name # 设置锁的过期时间为2s lock_value = lock.get_lock(key, thread_name, timeout=2) value = redis.get(key) if not value: value = 0 # 模拟实际情况下进行的某些耗时操作, 且执行时间大于锁过期的时间 time.sleep(2.5) value = int(value) + 1 print thread_name, value redis.set(key, value) lock.del_lock(key, lock_value)我们让线程的执行时间大于锁的过期时间,导致锁到期自动释放。执行上面的测试脚本,得到的结果如下:
Thread-1 1 Thread-3 1 Thread-2 2 Thread-9 2 Thread-5 3 Thread-7 3 Thread-6 4 Thread-4 4 Thread-8 5 Thread-10 5既然这种现象是由于锁过期导致误删其他线程的锁引发的,那我们就顺着这个思路,强制线程只能删除自己设置的锁。如果是这样,就需要为每个线程的锁添加一个唯一标识。在我们的分布式锁实现机制中,我们每次添加锁的时候,都是给 lock_key 设为 1,无论是 key 还是 value,都不具备唯一性,如果把 key 设为唯一的,那么在分布式系统中需要产生 N (等于总线程数)个 key 了 ,从直观性和维护性上来说,这都是不可取的。因此只能将 value 设置为每个线程的唯一标识。这个唯一标识由线程 ID + 进程的 PID + 机器的 IP + 时间戳 + 集群名称组成,这样就构成了一个线程锁的唯一标识。
3.3 互斥性根据上一节最后的分析,我们设计出了基于Redis实现分布式锁的最终版。
# 最终版 class RedisLock(object): def __init__(self, rediscli): self.rediscli = rediscli.master # ip 在实例化的时候就获取,避免过多访问DNS self.ip = socket.gethostbyname(socket.gethostname()) self.pid = os.getpid() self.cluster = "hna" def _gen_lock_key(self, key): lock_key = "lock_%s" % key return lock_key def _gen_unique_value(self): thread_name = threading.current_thread().name time_now = time.time() unique_value = "{0}-{1}-{2}-{3}-{4}".format(self.ip, self.pid, thread_name, self.cluster, time_now) return unique_value def get_lock(self, key, timeout=3): lock_key = self._gen_lock_key(key) unique_value = self._gen_unique_value() logger.info("unique value %s" % unique_value) while True: value = self.rediscli.set(lock_key, unique_value, nx=True, ex=timeout) if value: # 注意,我们返回了唯一标识,用于后面的delete时检查是否是当前线程的锁 return unique_value # 进入阻塞状态,避免一直消耗CPU time.sleep(0.1) def del_lock(self, key, value): lock_key = self._gen_lock_key(key) old_value = self.rediscli.get(lock_key) # 检查是否是当前线程持有的锁 if old_value == value: return self.rediscli.delete(lock_key)执行test.py测试脚本,得到的结果如下:
Thread-1 1 Thread-2 2 Thread-4 3 Thread-5 4 Thread-10 5 Thread-3 6 Thread-9 7 Thread-6 8 Thread-8 9 Thread-7 10