case是一个典型的对数据库“存在则更新,不存在则插入的”并发操作(这里忽略数据库层面的锁),通过对比是否通过Redis分布式锁控制来看效果。
#!/usr/bin/env Python3
import redis
import sys
import time
import uuid
import threading
from time import ctime,sleep
from redis import StrictRedis
from redlock import Redlock
from multiprocessing import Pool
import pymssql
import random
class RedLockTest:
_connection_list = None
_lock_resource = None
_ttl = 10 #ttl
def __init__(self, *args, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
def get_conn(self):
try:
#如果当前线程获取不到锁,重试次数以及重试等待时间
conn = Redlock(self._connection_list,retry_count=100, retry_delay=10 )
except:
raise
return conn
def execute_under_lock(self,thread_id):
conn = self.get_conn()
lock = conn.lock(self._lock_resource, self._ttl)
if lock :
self.business_method(thread_id)
conn.unlock(lock)
else:
print("try later")
'''
模拟一个经典的不存在则插入,存在则更新,起多线程并发操作
实际中可能是一个非常复杂的需要独占性的原子性操作
'''
def business_method(self,thread_id):
print(" thread -----{0}------ execute business method begin".format(thread_id))
conn = pymssql.connect(host="127.0.0.1",server="SQL2014", port=50503, database="DB01")
cursor = conn.cursor()
id = random.randint(0, 100)
sql_script = ''' select 1 from TestTable where Id = {0} '''.format(id)
cursor.execute(sql_script)
if not(cursor.fetchone()):
sql_script = ''' insert into TestTable values ({0},{1},{1},getdate(),getdate()) '''.format(id,thread_id)
else:
sql_script = ''' update TestTable set LastUpdateThreadId ={0} ,LastUpdate = getdate() where Id = {1} '''.format(thread_id,id)
cursor.execute(sql_script)
conn.commit()
cursor.close()
conn.close()
print(" thread -----{0}------ execute business method finish".format(thread_id))
if __name__ == "__main__":
redis_servers = [{"host": "*.*.*.*","port": 9000,"db": 0},
{"host": "*.*.*.*","port": 9001,"db": 0},
{"host": "*.*.*.*","port": 9002,"db": 0},]
lock_resource = "mylock"
ttl = 2000 #毫秒
redlock_test = RedLockTest(_connection_list = redis_servers,_lock_resource=lock_resource, _ttl=ttl)
#redlock_test.execute_under_lock(redlock_test.business_method)
threads = []
for i in range(50):
#普通的并发模式调用业务逻辑的方法,会产生大量的主键冲突
#t = threading.Thread(target=redlock_test.business_method,args=(i,))
#Redis分布式锁控制下的多线程
t = threading.Thread(target=redlock_test.execute_under_lock,args=(i,))
threads.append(t)
begin_time = ctime()
for t in threads:
t.setDaemon(True)
t.start()
for t in threads:
t.join()
测试 1,简单多线程并发