第37天并发编程之线程篇 (5)

基于上面的问题,我们可以将解析过程放在get函数里面,在一个任务下载结束之后就会立马的进行解析数据,可以解决问题1,而且对于解析数据而言都是通过下载数据相同的进程进行解析的,可以解决第二个问题。因此我们的代码可以修改成下面这个样子。

import requests import time import os from concurrent.futures import ProcessPoolExecutor def get(url): """获取网页的信息""" print('%s get %s' % (os.getpid(), url)) response = requests.get(url) time.sleep(0.5) # 模拟下载时间 if response.status_code == 200: # 返回值我们也不需要,直接去调用parse解析就可以了 parse(response.text) def parse(data): """解析数据""" time.sleep(0.2) print('%s 解析长度为%s' % (os.getpid(), len(data))) if __name__ == '__main__': # 创建一个进程池,设置进程池的数量为4 pool = ProcessPoolExecutor(4) # 这是我们需要爬取的url urls = [ 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', ] for url in urls: pool.submit(get, url)

问题: 这样写的话我们会发现下载内容和解析内容的代码被写在一块了,而且还是被同一个进程执行,这就和我们之前所讲的生产者和消费者模型相悖了,如果我们想让两个函数解耦合,我们可以在get函数中将结果返回回来,然后在主进程中接收,并执行解析函数。此时我们就需要用到回调函数了。

import requests import time import os from concurrent.futures import ProcessPoolExecutor def get(url): """获取网页的信息""" print('%s get %s' % (os.getpid(), url)) response = requests.get(url) time.sleep(0.5) # 模拟下载时间 if response.status_code == 200: # 返回值我们也不需要,直接去调用parse解析就可以了 return response.text def parse(data): """解析数据""" time.sleep(0.2) print('%s 解析长度为%s' % (os.getpid(), len(data))) if __name__ == '__main__': # 创建一个进程池,设置进程池的数量为4 pool = ProcessPoolExecutor(4) # 这是我们需要爬取的url urls = [ 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', ] for url in urls: # 通过submit将对象obj扔到进程池中 obj = pool.submit(get, url) # 通过函数add_done_callback函数将传递的函数parse绑定到对象上obj # 当对象obj所执行的任务一旦完成并获得一个返回值的时候就会自动的去调用parse函数,并将对象当做参数传递进去 # 这种方式和get方法中直接解析数据所实现的效果都是差不多的,只是说解决了耦合的问题,从某种意义上讲,有时候解析的时间还要更长一点 # 注意: parse函数是全部都是通过主进程进行调用的,这也就解释了回调的意思,我来调用你,结果给我之后就应该就由我来解析 obj.add_done_callback(parse)

多线程和多进程的方式都是一样的,唯一不同的地方就在于回调函数是哪个线程空闲哪个线程去执行回调函数

六. 线程queue, Event

Queue和进程的队列一样,先进先出
# 这种队列是先进先出
q1 = queue.Queue(3)
q1.put(1)
q1.put('2')
q1.put([234])
# q1.put({5: 6}) # 此时会阻塞,等到取出去一个之后才能添加到队列中
print(q1.get())
# 结果1

LifoQueue 堆栈,先进后出
# 先进后出,堆栈
q1 = queue.LifoQueue(3)
q1.put(1)
q1.put('2')
q1.put([234])
# q1.put({5: 6}) # 此时会阻塞,等到取出去一个之后才能添加到队列中
print(q1.get())
# 结果是[234]

PriorityQueue 优先级队列,数字越小优先级越大
# 优先级队列,传入的参数是一个元组,第一个值为优先级,必须是int,第二个是要压入队列中的值
# 优先级越小越优先,如果优先级相等,比较后面的值,越小越优先
q1 = queue.PriorityQueue(3)
q1.put((1, '123'))
q1.put((1, '456'))
q1.put((2, '789'))
# q1.put((2, {5: 6})) # 此时会阻塞,等到取出去一个之后才能添加到队列中
print(q1.get())
# 结果(1, '123')

Event 用来进程之间协同工作的
简单的event的使用,用来提前连接服务器判断服务器是否可连
from threading import Thread, Event, current_thread
import time

# 创建一个event对象 event = Event() def check(): """首先启动一个线程尝试连接请求""" print('%s 尝试服务器是否可以连接' % current_thread().name) time.sleep(3) # 模拟连接请求持续了秒 event.set() # 当连接请求成功之后设置事件 def connection(): """当check检测通过之后再开始连接""" print('%s 尝试连接' % current_thread().name) event.wait() # 当event.set之后才会执行下面的代码,否则阻塞 print('%s 连接成功' % current_thread().name) t1 = Thread(target=connection) t2 = Thread(target=connection) t3 = Thread(target=connection) t4 = Thread(target=check) t1.start() t2.start() t3.start() t4.start()

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyfxjj.html