手把手教你实现基于Redis的分布式锁

目前,分布式系统已经是各大公司的标配,它具有高可用、可扩展等特点。在分布式系统中,由于存在多台机器上的进程竞争同一份资源的问题,因此需要分布式锁来保证同步访问资源。

一个经典的场景就是淘宝双11秒杀活动,全国人民的客户端访问不同的后端服务器,然后后端服务器再访问数据库,此时数据库就是需要同步访问的资源。

在介绍基于Redis实现的分布式锁之前;以Python语言为例,我们看看根据应用的实现架构,同步锁可能会有以下几种类型

如果处理程序是单进程多线程的,在Python语言中,就可以使用 threading 模块的 Lock 对象来限制对共享资源的同步访问,实现多线程安全。

单机多进程的情况,在Python语言中,可以使用 multiprocessing 的 Lock 对象来保证多进程安全。

多机多进程部署的情况,需要依赖一个第三方组件(存储锁对象)来实现一个分布式的同步锁。

2. 分布式锁的必要条件

本文主要介绍第三种场景下基于Redis如何实现分布式锁。现在我们来看看实现一个分布式锁的必要条件有哪些?

原子性:加锁和释放锁的操作必须满足原子性

无死锁:不会发生死锁(PS:例如已获得锁的线程/进程在释放锁之前突然异常退出,导致其他线程/进程会一直在循环等待锁被释放)

互斥性:同一个时刻只能有一个线程/进程占有锁,其他线程/进程必须等待直到锁被释放

可重入性:当前线程/进程获得锁之后,还可以继续调用获取锁的操作,第二次以及之后的获取锁的操作不会被阻塞等待(PS:释放锁的操作也是一样的,调用多次之后,只有最后一次释放锁的时候才会真正地释放锁)--- 这个条件根据业务来决定是否需要实现

3. 实现过程

根据分布式锁的必要条件,下面将给出几种实现方式,来观察任意一个条件不满足时,会出现什么样的问题?在实现的过程中将使用同一份测试用例。测试用例代码如下:

# test.py \'\'\' 启用多个线程对 redis 中的 test_key 的值进行自增操作,理想情况,test_key 的值应该等于线程的数量,比如开了 10 个线程,test_key的值最终应该是10。 \'\'\' def increase(redis, lock, key): # 获得锁 lock_value = lock.get_lock(key) value = redis.get(key) # 模拟实际情况下进行的某些耗时操作 time.sleep(0.1) value += 1 redis.set(key, value) thread_name = threading.current_thread().name # 打印线程名和最新的值 print thread_name, new_value # 释放锁 lock.del_lock(key, lock_value) # 连接服务端 redis = RedisCli(REDIS_CACHE_HOST_LIST, REDIS_CACHE_MASTER_NAME) lock = RedisLock(redis) key = \'test_key\' thread_count = 10 redis.delete(key) for i in xrange(thread_count): thread = threading.Thread(target=increase, args=(redis, lock, key)) thread.start()

Tips:
下面的代码片段中只展示需要修改的部分,其他部分和test.py保持一致。

3.1 原子性

在这个版本中,当线程 A get(lock_key) 的值为空时,set lock_key 的值为 1,并返回,这表示线程 A 获得了锁,可以继续执行后面的操作,否则需要一直循环去获取锁,直到 key 的值再次为空,重新获得锁,执行任务完成后释放锁。

class RedisLock(object): def __init__(self, rediscli): self.rediscli = rediscli def _get_lock_key(self, key): lock_key = "lock_%s" % key return lock_key def get_lock(self, key): lock_key = self._get_lock_key(key) while True: value = self.rediscli.get(lock_key) if not value: self.rediscli.set(lock_key, \'1\') return True time.sleep(0.01) def del_lock(self, key, new_expire_time): lock_key = self._get_lock_key(key) return self.rediscli.delete(lock_key)

执行test.py测试脚本,得到的结果如下:

Thread-1 1 Thread-5 2 Thread-2 2 Thread-6 3 Thread-7 3 Thread-4 3 Thread-9 4 Thread-8 5 Thread-10 5 Thread-3 5

观察输出结果发现,同时有多个线程输出的结果是一样的。初看上面加锁的代码逻辑似乎没什么问题,但是最终的结果却事与愿违,原因是上面的代码get(lock_key)和set(lock_key, \'1\')并不是原子性的执行,而是分开执行。A 线程在get(lock_key)的时候发现是空值,于是重新set(lock_key, \'1\'),但在get操作之后,set操作之前,B 线程恰好执行了get(lock_key),此时B 线程的get操作得到的还是空值,然后也顺利获得锁,导致数据被两个或多个线程同时修改,最后出现不一致。

3.2 无死锁

由于3.1的版本是因为get_lock方法不是原子性操作,造成两个或多个线程同时获得锁的问题,这个版本改成使用 redis 的 setnx 命令来进行锁的查询和设置操作,setnx 即 set if not exists,顾名思义就是当key不存在的时候才设置 value,并返回 1,如果 key 已经存在,则不进行任何操作,返回 0。

#只展示需要修改的部分,其他部分还是和3.1的代码一样 def get_lock(self, key): lock_key = self._get_lock_key(key) thread_name = threading.current_thread().name while True: value = self.rediscli.setnx(lock_key, 1) if value: return True time.sleep(0.01) print "{} waiting...".format(thread_name)

执行test.py测试脚本,得到的结果如下:

Thread-1 1 Thread-4 2 Thread-2 3 Thread-3 4 Thread-7 5 Thread-6 6 Thread-5 7 Thread-8 8 Thread-9 9 Thread-10 10

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzgdfx.html