另外,还可以用conn.poll()(返回Bool类型)来获知conn中是否有对面发来的未读信息。
from multiprocessing import Process, Pipe import time def send_through_pipe(conn, pipe_name, sender_name, content, base_time): print(sender_name, 'tries to send', content, 'through', pipe_name, 'at', '%.6f'%(time.perf_counter()-base_time)) conn.send(content) print(sender_name, 'successfully finishes sending at', '%.6f'%(time.perf_counter()-base_time)) def receive_from_pipe(conn, pipe_name, receiver_name, base_time): print(receiver_name, 'tries to receive content from', pipe_name, 'at', '%.6f'%(time.perf_counter()-base_time)) content = conn.recv() print(receiver_name, 'successfully receives', content, 'at', '%.6f'%(time.perf_counter()-base_time)) return content def task1(conn, pipe_name, process_name, base_time): receive_from_pipe(conn, pipe_name, process_name, base_time) time.sleep(1) send_through_pipe(conn, pipe_name, process_name, 'greetings from ' + process_name, base_time) def task2(conn, pipe_name, process_name, base_time): time.sleep(1) send_through_pipe(conn, pipe_name, process_name, 'greetings from ' + process_name, base_time) time.sleep(2) receive_from_pipe(conn, pipe_name, process_name, base_time) if __name__ == '__main__': base_time = time.perf_counter() conn_A, conn_B = Pipe() p1 = Process(target=task1, args=(conn_A,'pipe','son1',base_time)) p2 = Process(target=task2, args=(conn_B,'pipe','son2',base_time)) p1.start() p2.start() p1.join() p2.join() son1 tries to receive content from pipe at 0.033372 son2 tries to send greetings from son2 through pipe at 1.058998 son2 successfully finishes sending at 1.060660 son1 successfully receives greetings from son2 at 1.061171 son1 tries to send greetings from son1 through pipe at 2.062389 son1 successfully finishes sending at 2.063290 son2 tries to receive content from pipe at 3.061378 son2 successfully receives greetings from son1 at 3.061843由此可见:
Pipe可以暂存数据,而且其暂存的数据符合FIFO规则。
但是,Pipe用于暂存数据的区域大小比较有限(具体大小随OS而定),如果这个区域满了,send()就会被阻塞,直到对面用recv()腾出位置为止。
Pipe的两个端点可以分配给任意两个进程。
不建议把同一个端点分配给多个进程,这可能会带来风险;如果确实需要的话,请使用Queue。
Queue本质上是一个能够跨进程运行的队列。
Queue的操作的时间开销约为Pipe中对应操作的两倍。
from multiprocessing import Process, Queue import time def put_into_queue(q, queue_name, putter_name, content, base_time): print(putter_name, 'tries to put', content, 'into', queue_name, 'at', '%.6f'%(time.perf_counter()-base_time)) q.put(content) print(putter_name, 'successfully finishes putting at', '%.6f'%(time.perf_counter()-base_time)) def get_from_queue(q, queue_name, getter_name, base_time): print(getter_name, 'tries to receive content from', queue_name, 'at', '%.6f'%(time.perf_counter()-base_time)) content = q.get() print(getter_name, 'successfully gets', content, 'at', '%.6f'%(time.perf_counter()-base_time)) return content def task1(q, delay, queue_name, process_name, base_time): time.sleep(delay) put_into_queue(q, queue_name, process_name, 'christmas card from ' + process_name, base_time) time.sleep(5) get_from_queue(q, queue_name, process_name, base_time) def task2(q, delay, queue_name, process_name, base_time): time.sleep(delay) get_from_queue(q, queue_name, process_name, base_time) time.sleep(5) put_into_queue(q, queue_name, process_name, 'christmas card from ' + process_name, base_time) if __name__ == '__main__': base_time = time.perf_counter() q = Queue() put_and_get_1 = Process(target=task1, args=(q,0,'queue','putAndGet_No.1',base_time)) get_and_put_1 = Process(target=task2, args=(q,1,'queue','getAndPut_No.1',base_time)) get_and_put_2 = Process(target=task2, args=(q,2,'queue','getAndPut_No.2',base_time)) put_and_get_1.start() get_and_put_1.start() get_and_put_2.start() put_and_get_1.join() get_and_put_1.join() get_and_put_2.join() putAndGet_No.1 tries to put christmas card from putAndGet_No.1 into queue at 0.077883 putAndGet_No.1 successfully finishes putting at 0.079291 getAndPut_No.1 tries to receive content from queue at 1.104196 getAndPut_No.1 successfully gets christmas card from putAndGet_No.1 at 1.105489 getAndPut_No.2 tries to receive content from queue at 2.126434 putAndGet_No.1 tries to receive content from queue at 5.081044 getAndPut_No.1 tries to put christmas card from getAndPut_No.1 into queue at 6.106381 getAndPut_No.1 successfully finishes putting at 6.107820 getAndPut_No.2 successfully gets christmas card from getAndPut_No.1 at 6.108565 getAndPut_No.2 tries to put christmas card from getAndPut_No.2 into queue at 11.109579 getAndPut_No.2 successfully finishes putting at 11.112493 putAndGet_No.1 successfully gets christmas card from getAndPut_No.2 at 11.113546另外,如果Queue的大小实在过大以至于达到了某个上限,则put()操作也会被阻塞。不过应该很难把大小弄到那么大。
多线程基本语法和多进程很相似,但机制上有重要的不同。由于全局解释器锁的存在,Python多线程并不实用,这里仅作简单介绍。
从下图中可以看到,多线程的基本代码和多进程完全一致。下图中的代码在CPython解释器中会运行大约3s。
另外,多线程中其实不需要这个if __name__ == '__main__':的判断。