def car(n):
while 1:
time.sleep(random.randrange(8))
if event.isSet(): # 判断标志位是否设定
print("car [%s] is running.." % n)
else:
print("car [%s] is waiting for the red light.." %n)
if __name__ == '__main__':
event = threading.Event()
Light = threading.Thread(target=light)
Light.start()
for i in range(3):
t = threading.Thread(target=car,args=(i,))
t.start()
第二个例子,员工过门。如果门是开的,员工直接过门;如果们是关的,员工需要刷卡开门再过门。
import threading
import time
import random
def door():
count = 0
while True:
if door_event.is_set():
print("\033[32;1mdoor opening....\033[0m")
count +=1
else:
print("\033[31;1mdoor closed...., swipe to open.\033[0m")
count = 0 #清空计时器
door_event.wait()
if count > 3:#门开了已经3s了,该关了
door_event.clear()
time.sleep(0.5)
def staff(n):
print("staff [%s] is comming..." % n )
while True:
if door_event.is_set():
print("\033[34;1mdoor is opened, passing.....\033[0m")
break
else:
print("staff [%s] sees door got closed, swipping the card....." % n)
door_event.set()
print("door is opening...")
time.sleep(0.5)
if __name__ == "__main__":
door_event = threading.Event() #设置事件
door_thread = threading.Thread(target=door) # 实例化“门”线程并启动
door_thread.start()
for i in range(5):
p = threading.Thread(target=staff,args=(i,))
time.sleep(random.randrange(3))
p.start()
队列queue
线程间的数据通信。需要导入模块queue。
分为三种:
1. queue.Queue(maxsize=0) 先入先出
qsize() 查询队列中数据的个数
put(item,block=Ture,timeout=None) 往队列中放入数据
get(block=True,timeout=None) 取出数据。当队列中没有数据的时候,get将会造成阻塞,直到队列中又存入数据为止。参数block=False不会阻塞,timeout=1表示只卡1s
empty() 当队列为空返回True
full() 当队列已满返回True
def put():
#time.sleep(1) # 将执行此函数的线程滞后,更好地观摩多线程的运行效果
try:
for i in range(50):
cou = i
q.put(i + 1, block=False)
else:
print("存入了{0}个".format(cou+1))
except queue.Full:
print("队列已满,存入了{0}个".format(cou))