常见的并发模型
- 多进程/多线程
- 异步
- Actor
- Pub/Sub
Python 异步的基石:协程
协程简介
概念:协作式多任务的子程序,用户态线程或微线程(Coroutine)。
特点:子程序执行可以中断,恢复后不会丢失之前的上下文状态。
区别:与线程不同,协程是用户级的非抢占式调度,比线程更轻量(纳秒级的 CPU 时间),无资源竞争,无需加锁。
Python 使用协程 + IO 多路复用实现异步编程。
Python 协程的发展历程
- PEP 255 (Python 2.2):引入生成器,实现可迭代对象的惰性求值。
- PEP 342:引入
send
和throw
方法,允许在try/finally
中使用yield
。 - PEP 380 (Python 3.3):引入
yield from
,简化生成器/协程的实现和串联。 - PEP 3156 (Python 3.4):试验性引入异步 I/O 框架 asyncio。
- PEP 492 (Python 3.5):通过
async/await
语法明确支持协程。 - Python 3.6:
asyncio
成为标准库的一部分。
Python 第三方异步 I/O 方案
- Greenlet
- Gevent
- Eventlet
- Twisted
官方异步模块 Asyncio
认识几个概念
- Task:协程的高层抽象。
- Future:沟通协程和事件循环。
- Coroutine:协程与
await
搭配使用的语法糖。 - Event Loop:事件循环协程的执行调度。
从一个爬虫例子着手
阻塞方式写法
import socket
def blocking_run():sock = socket.socket()sock.connect(("example.com", 80))request = "GET / HTTP/1.0\r\nHost: example.com\r\n\r\n"sock.send(request.encode())response = sock.recv(2048)return response
for _ in range(10):print(blocking_run())
非阻塞方式 - 事件循环 + 回调
import socket
from selectors import EVENT_READ, EVENT_WRITE, DefaultSelector
selector = DefaultSelector()
def no_blocking_run():def on_connect(key, mask):selector.unregister(key.fd)request = "GET / HTTP/1.0\r\nHost: example.com\r\n\r\n"sock.send(request.encode())selector.register(key.fd, EVENT_READ, on_response)return "on_connect success"def on_response(key, mask):response = sock.recv(2048)selector.unregister(key.fd)return "on_response success"sock = socket.socket()sock.setblocking(False)try:sock.connect(("example.com", 80))except BlockingIOError:passselector.register(sock.fileno(), EVENT_WRITE, on_connect)
def event_loop():for _ in range(10):no_blocking_run()while True:events = selector.select(timeout=3)if not events:breakfor event_key, event_mask in events:callback = event_key.dataprint(callback(event_key, event_mask))
执行过程说明
- 启动执行函数,创建 socket 连接并在 selector 上注册可写事件,无阻塞操作,函数立即返回。
- 遍历 10 个不同的下载任务,注册连接事件。
- 启动事件循环,阻塞在事件监听上。
- 当某个下载任务 EVENT_WRITE 被触发,回调其 on_connect 方法。
- 进入下一轮事件循环,处理事件。
回调地狱问题
- 回调函数执行不正常时,错误处理困难。
- 嵌套回调导致代码复杂。
- 状态共享和管理困难。
callback_a(callback_b(callback_c(callback_d(...))))
对比基于协程的解决方案
Future 对象
作为数据接收代理,和系统事件沟通的桥梁
class Future:def __init__(self):self.result = Noneself._callbacks = []def add_done_callback(self, func):self._callbacks.append(func)def set_result(self, result):self.result = resultfor func in self._callbacks:func(self)
基于协程重构
def no_blocking_v2():sock = socket.socket()sock.setblocking(False)try:sock.connect(("example.com", 80))except BlockingIOError:passf = Future()selector.register(sock.fileno(), EVENT_WRITE, lambda: f.set_result(None))yield fselector.unregister(sock.fileno())request = "GET / HTTP/1.0\r\nHost: example.com\r\n\r\n"sock.send(request.encode())f = Future()selector.register(sock.fileno(), EVENT_READ, lambda: f.set_result(sock.recv(2048)))yield fselector.unregister(sock.fileno())
Task 对象
协程高层抽象,能管理和控制协程
class Task:def __init__(self, coro):self.coro = corof = Future()f.set_result(None)self.next(f)def next(self, future):try:next_future = self.coro.send(future.result)except StopIteration:returnnext_future.add_done_callback(self.next)
加入 Event Loop 驱动协程运行
def event_loop_v2():for _ in range(10):Task(no_blocking_v2())while True:events = selector.select(timeout=3)if not events:breakfor event_key, event_mask in events:callback = event_key.datacallback()
整个过程如图所示
协程风格与回调风格对比
- 回调风格:
- 存在链式回调,破坏同步代码结构。
- 需要在回调之间维护状态。
- 协程风格:
- 无链式调用。
- Selector 的回调只需设置 future 的值。
- 事件循环只管调度,结构更接近同步代码。
- 无需在多个协程之间维护状态。
结语
上述只是原型的简单介绍,实际 Asyncio 远比上述原型复杂,需要实现零拷贝、公平调度、异常处理、任务状态管理等。理解原理有助于在后续开发中更好地处理异步问题。
如果你对异步编程感兴趣,欢迎体验 AppBoot 项目,它基于FastAPI提供了一个类似 Django 的开发体验,完全异步,并内置 SQLAlchemy 2.0 支持。使用 AppBoot,你可以轻松快速地开发企业级的异步 Web 服务,感受异步编程的强大与乐趣。