第37天并发编程之线程篇 (4)

阻塞和非阻塞
阻塞和非阻塞描述的是程序的一种运行状态
阻塞(阻塞态)
遇到I/0之后,程序在原地等待I/0,并释放cpu资源
非阻塞(就绪态或者运行态):
没有遇到I/0,或者通过某种方式即便是程序遇到I/0也不会停在原地,而是去执行其他的操作,尽可能多的使用cpu的资源。

同步调用和异步调用
同步和异步描述的是程序执行的方式
同步调用
提交完任务之后就在原地进行等待,直到任务执行完毕并拿到了返回值,才会去执行下一行代码。
异步调用
提交完任务之后不会在原地进行等待,直接运行下一行代码,结果可以通过异步回调得到。

同步调用简单的实现过程
from concurrent.futures import ThreadPoolExecutor
import time
import random

def task(x): print('%s is running' % x) time.sleep(random.randint(1, 3)) return x ** 2 thread_pool = ThreadPoolExecutor(4) obj_l = [] for i in range(20): # 此处可以返回一个对象,对象有一个result属性可以让我们获得返回值 obj = thread_pool.submit(task, i) # 因为有了result,所以此时的程序就是一个同步调用的方式,我们必须等待任务一个一个完成并且拿到返回值之后才会继续执行循环 print(obj.result())

异步调用简单的实现过程
from concurrent.futures import ThreadPoolExecutor
import time
import random

def task(x): print('%s is running' % x) time.sleep(random.randint(1, 3)) return x ** 2 thread_pool = ThreadPoolExecutor(4) obj_l = [] for i in range(20): # 此处可以返回一个对象,对象有一个result属性可以让我们获得返回值 # 此时是异步调用方式,因为在所有的任务创建的时候,我们并不需要去等待结果,就可以直接去运行下一次循环 obj = thread_pool.submit(task, i) obj_l.append(obj) # 就相当于之前的的join等待线程结束 # 因为现在是一个线程池,所以我们不能单纯的使用join去等待线程结束,我们还需要close将目前的任务先锁定住 # 其实shutdown内部就是实现了先锁定任务,然后等待所有任务执行完毕的过程 thread_pool.shutdown() # 当所有的任务结束之后,我们就可以通过返回的对象列表中随便查看其中的值 print(obj_l[0].result())

五. 异步+回调机制

案例:写一个简单的爬虫案例,来详细的分析一下异步和回调机制

首先,我们先以多进程的方式写一个同步的程序,用来爬取网页上的信息

import requests import time import os import random from concurrent.futures import ProcessPoolExecutor def get(url): """获取网页的信息""" print('%s get %s' % (os.getpid(), url)) response = requests.get(url) time.sleep(0.5) # 模拟下载时间 if response.status_code == 200: return response.text def parse(data): """解析数据""" time.sleep(0.2) # 模拟解析时间 print('%s 解析长度为%s' % (os.getpid(), len(data))) if __name__ == '__main__': # 创建一个进程池,设置进程池的数量为4 pool = ProcessPoolExecutor(4) # 这是我们需要爬取的url urls = [ 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', ] for url in urls: # 每执行一个任务,都通过result去获得相应的数据,然后通过parse去解析数据 data = pool.submit(get, url).result() parse(data)

问题:我们发现虽然说能够实现基本的功能,但是太慢了,每次都要等待一个任务全部完成获得返回值之后才会去执行下面的代码,为了提升效率,我们考虑可以把同步的方式转换成异步。因此我们需要将name里面的内容转换成下面的样子。

if __name__ == '__main__': # 创建一个进程池,设置进程池的数量为4 pool = ProcessPoolExecutor(4) # 这是我们需要爬取的url urls = [ 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', ] obj_l = [] for url in urls: obj = pool.submit(get, url) obj_l.append(obj) # 等待进程池内的任务全部完成之后才去执行下面的代码 pool.shutdown() # 此时url的内容都已经下载完成并且保存在对象obj_l列表中,我们通过parse就可以解析了 for obj in obj_l: parse(obj.result())

问题: 虽然说效率有所提升但是依然存在一些问题

解析过程要等待所有的下载任务执行完成之后才能进行解析

解析数据是串行的,如果一个解析过程很慢,就会大大的降低整个程序的效率

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyfxjj.html