Python爬虫之协程
为什么要用协程
协程声明
await
aiohttp
aiofiles
案例修改
案例完整代码
为什么要用协程
- 轻量级:协程是轻量级的执行单元,可以在同一个线程中并发执行。相比于多线程或多进程,创建和切换协程的开销更小。
- 高效利用资源:由于协程可以在同一个线程中并发执行,因此不会涉及多个线程或进程之间的上下文切换,从而减少了额外的开销。这使得协程能够高效地利用计算资源。
- 简化并发编程:协程采用显式的挂起和恢复机制,程序员可以明确控制协程的执行流程。相比于多线程或多进程的共享内存模型,协程通过显式的消息传递(如使用
await
、asyncio.Queue
等)来实现数据交换,简化了并发编程的复杂性。 - 异步非阻塞:协程通常与异步编程一起使用,可以在 I/O 密集型任务中实现非阻塞的操作。通过使用
await
关键字等待 I/O 操作完成时,可以在等待期间释放 CPU,执行其他协程任务,提高整体的并发性能。
协程声明
创建协程函数:async def func()
运行协程函数:asyncio.run(func())
注意:当调用协程函数func()
后,内部代码是不会执行的,只是得到了一个协程对象,如果想要运行协程对象,则必须将其交给事件循环来处理
import asyncioasync def func():print("Hello, coroutine!")# 调用协程
asyncio.run(func())
也可以这么写
import asyncioasync def func():print("Hello, coroutine!")# 创建事件循环对象
loop = asyncio.get_event_loop()# 将事件封装为任务
task = loop.create_task(func())# 运行事件直到任务完成
loop.run_until_complete(task)
执行时间循环:.wait()
和.gather
-
await asyncio.wait(tasks)
:接受一个任务集合作为参数,并等待所有任务完成。返回两个集合(Sets):已完成的任务集合和仍在进行中的任务集合。 -
await asyncio.gather(*tasks)
:接受一个任务集合作为参数,并等待所有任务完成。返回每个任务的实际返回值 -
await task
:执行单个任务,返回每个任务的实际返回值
await
await
关键字后面可以定义可等待对象,例如协程对象,Future,Task对象
此处的可等待对象其实就是I/O阻塞,当await
包裹的协程任务遇到阻塞时会自动从当前任务切换到另一个任务中,以节省时间和内存
result = await
表示result就是await后面的指令运行完毕后得到的结果
import asyncioasync def fun1():print('1')await asyncio.sleep(2)return '结束'async def main():# 创建任务task1 = asyncio.create_task(fun1())task2 = asyncio.create_task(fun1())# 创建事件循环res1 = await task1res2 = await task2print(res1, res2)asyncio.run(main())
也可以这么写
import asyncioasync def fun1():print('1')await asyncio.sleep(2)return '结束'async def main():# 创建任务task = [asyncio.create_task(fun1()) for i in range(10)]# 创建事件循环res = await asyncio.gather(task)print(res)asyncio.run(main())
数量太少看不出效率,但是可以证明await
会等待所有任务返回结果后再继续往下运行
aiohttp
aiohttp
与requests
相比最大的区别就是aiohttp
支持异步操作,因此用协程编写爬虫时aiohttp
是相当重要的一个模块
aiohttp.ClientSession()
:
- 用于创建异步的HTTP客户端会话对象
- 通过该对象发送异步请求并处理响应
session.get(url)
和 session.post(url)
:
- 在
ClientSeesion
对象上调用这些方法可以发送GET/POST请求 url
作为参数传递,一般就是访问的主网址
response.status
和 response.text()
:
- 这俩响应
ClientResponse
对象的属性和方法 response.status
返回响应状态码(如200、404等)response.status
返回响应内容的文本字符串
response.json()
:
- 当服务器返回JSON格式的响应是,可以用该方法将响应内容解析为Python对象(字典、列表)
async with session.get(url) as response
和 async with session.post(url) as response
:
- 使用
async with
语法结构,可以在异步上下文管理器中发送请求和处理响应 response
是一个异步上下文管理器返回的响应对象,可以执行response.status
之类的操作,并且使其能够被await
包裹
利用aiohttp
模块获取一个简单的浏览器响应
import asyncio
import aiohttp
from lxml import etreeurl = 'https://www.baidu.com'async def main():connector = aiohttp.TCPConnector(ssl=False)async with aiohttp.ClientSession(connector=connector) as session:async with session.get(url) as response:res = await response.text()et = etree.HTML(res)print(et)if __name__ == '__main__':asyncio.run(main())
aiofiles
aiofiles
与python中常用的with open操作类似,并且支持异步操作,且与asyncio
配合良好
具体操作也和with open类似
读
async def read_file():async with aiofiles.open('file.txt', mode='r') as file:contents = await file.read()print(contents)if __name__ == '__main__':asyncio.run(read_file())
写
async def write_file():async with aiofiles.open('file.txt', mode='w') as file:await file.write('Hello, World!')if __name__ == '__main__':asyncio.run(write_file())
案例修改
掏出上次我们写的线程池爬虫案例:
from concurrent.futures import ThreadPoolExecutorimport requests
from lxml import etreeurl = 'https://loryx.wiki/%E6%B5%8F%E8%A7%88/%E7%89%8C%E5%BA%93'def download(name, src):with open(name, 'wb') as f:f.write(requests.get(src).content)print(f'{name}已下载')def main():res = requests.get(url=url)res.encoding = 'utf-8'et = etree.HTML(res.text)src = et.xpath("//td[@class='col15 leftalign']/a/@href")name = et.xpath("//td[@class='col0 leftalign']/text()")for i, index in enumerate(name):name[i] = index.strip()with ThreadPoolExecutor(64) as t:for i in range(len(src)):file_name = f"img/{name[i]}.png"t.submit(download, file_name, src[i])if __name__ == '__main__':main()
现在开始改写
首先跟requests
相关的可以全部删了换成aiohttp
,比如
res = requests.get(url=url)res.encoding = 'utf-8'et = etree.HTML(res.text)
替换为
async with aiohttp.ClientSession(connector=connector) as session:async with session.get(url) as response:res = await response.text()
et = etree.HTML(res)
然后ThreadPoolExecutor
相关的也可以全部用asyncio
替换
for i, index in enumerate(name):name[i] = index.strip()with ThreadPoolExecutor(64) as t:for i in range(len(src)):file_name = f"img/{name[i]}.png"t.submit(download, file_name, src[i])
替换为
tasks = []
for i in range(len(src)):file_name = f"img/{name[i]}.png"if not os.path.exists('img'):os.makedirs('img')task = asyncio.create_task(spider(file_name, src[i]))tasks.append(task)
await asyncio.gather(*tasks)
文件读写的部分也可以用aiofiles
重写
def download(name, src):with open(name, 'wb') as f:f.write(requests.get(src).content)print(f'{name}已下载')
替换成
async def spider(name, src):connector = aiohttp.TCPConnector(ssl=False)async with aiohttp.ClientSession(connector=connector) as session:async with session.get(src) as response:count = await response.read()async with aiofiles.open(name, 'wb') as f:await f.write(count)print(f'{name}已下载')
案例完整代码
import asyncio
import os.pathimport aiofiles
import aiohttp
from lxml import etreeurl = 'https://loryx.wiki/%E6%B5%8F%E8%A7%88/%E7%89%8C%E5%BA%93'async def spider(name, src):# 关闭SSL证书验证connector = aiohttp.TCPConnector(ssl=False)# 创建图片链接对象async with aiohttp.ClientSession(connector=connector) as session:async with session.get(src) as response:# 读取图片信息 准备写入本地count = await response.read()# 写入本地 下载时遇到io阻塞自动跳转其他任务async with aiofiles.open(name, 'wb') as f:await f.write(count)print(f'{name}已下载')async def main():# 关闭SSL证书验证connector = aiohttp.TCPConnector(ssl=False)# 创建异步HTTP客户端对象async with aiohttp.ClientSession(connector=connector) as session:# 发送get请求async with session.get(url) as response:# 返回响应内容的字符串res = await response.text()et = etree.HTML(res)src = et.xpath("//td[@class='col15 leftalign']/a/@href") # 图片链接temp_name = et.xpath("//td[@class='col0 leftalign']/text()") # 图片名称for i, index in enumerate(temp_name):temp_name[i] = index.strip()# 任务列表tasks = []for i in range(len(src)):# 下载到本地的名称file_name = f"img/{temp_name[i]}.png"if not os.path.exists('img'):os.makedirs('img')# 批量创建asyncio异步任务 执行spider函数task = asyncio.create_task(spider(file_name, src[i]))tasks.append(task)# 启动await asyncio.wait(tasks)if __name__ == '__main__':asyncio.run(main())(src)):# 下载到本地的名称file_name = f"img/{temp_name[i]}.png"if not os.path.exists('img'):os.makedirs('img')# 批量创建asyncio异步任务 执行spider函数task = asyncio.create_task(spider(file_name, src[i]))tasks.append(task)# 启动await asyncio.wait(tasks)if __name__ == '__main__':asyncio.run(main())