Python中的多进程、多线程和协程 (2)

对于从别处import来的代码,系统变量__name__在这段代码中会等于来源文件的名字(或模块名,这你不用在意);对于存在于本文件中的代码,__name__会等于__main__。

由于某些原因,在Windows下,如果一个文件的代码中使用了多进程,则这个文件中会隐式地import自己(一次或多次);将所有零级缩进的代码放在if __name__ == '__main__':中,可以避免产生重复执行的问题(注意到如果不这样做的话,import来的副本中还会再次import自身,导致无限递归import并报错)。

暂时可以认为,采取这一措施后就可完全消除“隐式import自身”所产生的效应。

进程复制 from multiprocessing import Process import os pid = os.getpid() def task(): global pid print(pid) pid = 1 print(pid) if __name__ == '__main__': p = Process(target=task) p.start() p.join() print(pid)

在Windows下的输出:

4836 1 2944

在Linux下的输出:

511 1 511

前两个数都是由子进程输出,第三个数由父进程输出。

注意到pid在子进程中被赋为1后,在父进程中并不是1。这说明,子进程的target函数中对运行环境的修改,不影响父进程的运行环境。事实上,反之也是成立的(父不影响子)。也就是说,一旦子进程的运行环境完成创建之后,父进程的运行环境与子进程的运行环境之间就完全独立。

由于这个独立性,子进程的运算结果也无法直接反馈给父进程。稍后会介绍两种解决方式:1. 进程间通信 2. 利用 进程池apply方法的返回值。

注意到一三行的输出在Windows下不同,而在Linux下相同。这说明,子进程中全局变量pid的取值,在Linux下是直接复制父进程中pid的取值而得到的,在Windows下是通过重新运行pid = os.getpid()而得到的。更一般地,有以下这两个事实:

在Windows中,Process(target)创建出的子进程是一张白纸(即运行环境空空如也);当调用start()的时候,它会先通过import语句来将父进程的整个代码文件完整执行一遍(从而创建出一个新的运行环境),然后再开始运行target函数。所以,if __name__ == '__main__':包起来的代码,就只会被父进程执行;而未被包起来的零级缩进代码,则也会被每个子进程(在自己的运行环境里)各自执行一遍。

这就是之前提到的“隐式import自身”的机制。

在Linux中,Process(target)创建出的子进程,会全盘复制父进程的运行环境,而不会自己重新创建。复制出来的子进程运行环境,与父进程的运行环境完全独立。

Linux下的进程复制方式称为fork,Windows下的进程复制方式称为spawn。关于这些,详见 https://stackoverflow.com/questions/64095876/multiprocessing-fork-vs-spawn 。

from multiprocessing import Process import os def task(): pass if __name__ == '__main__': p = Process(target=task) print('son process created') p.start() print('son process starts') p.join() print('son process ends') print('gu?')

在Windows下的输出

son process created son process starts gu? son process ends gu?

由此可见,Windows下子进程(在初始化时)在执行父进程的代码文件时,父进程中son_process.start()以后的内容(比如print('gu?'))也会被执行。

进程池

如果我们有很多的任务要同时进行,为每个任务各开一个进程既低效(创建和销毁进程开销大、无法全部并行、内核难以调度)又可能不被内核允许。

解决方案:使用进程池,池中放着几个进程(一般不会比CPU的核数多很多),有新任务时找一个空闲进程分配给它,没有空闲进程则等待。缺点是没有空闲进程时需要等待,因此不能算是完全的并发。

进程池的基本用法 from multiprocessing import Pool import os, time def task(duration, base_time, task_name): pid = os.getpid() print(f'son process id {pid} starts working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s with parameter {duration}') time.sleep(duration) print(f'son process id {pid} ends working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s') if __name__ == '__main__': pid = os.getpid() base_time = time.perf_counter() print(f'main process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s') pool = Pool(3) # a pool containing 3 subprocesses print('start assigning tasks') for i in range(4): pool.apply_async(task, args=(1, base_time, "TaskNo."+str(i+1))) # assign task to some process in pool and start running # if all son processes are busy, wait until one is free and then start pool.close() # no longer accepting new tasks, but already applied ones (including those that are waiting) keeps running. print('all tasks assigned; wait for son processes to finish') pool.join() # wait until all tasks are done, and then the pool is dead. `join()` can be called only if `close()` has already been called print(f'all tasks finished at {"%.6f" % (time.perf_counter()-base_time)}s') print(f'main process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpgdgd.html