Python中的await

Python中的await

在 Python 中,await 是用于异步编程的关键字,它只能在 async def 定义的协程函数中使用,用于“等待”一个异步操作完成。虽然语法上只有一种 await 形式,但在实际使用中,它可以应用于多种异步对象和场景。下面是对各种 await 使用方式的系统总结:


等待一个协程对象(最基本用法)

1
2
3
4
5
async def fetch_data():
return "data"

async def main():
result = await fetch_data()
  • fetch_data() 是一个协程函数,返回一个协程对象
  • await 会挂起当前协程,直到 fetch_data() 完成

等待一个 asyncio.FutureTask

asyncio.Future 就是“未来的结果盒子”,你可以 await 它,等别人把结果放进去。是一个用来表示“未来某个时间点会有结果”的容器,类似于一个承诺(Promise),但它本身不执行任务,只是一个用来保存结果等待结果的对象,在概念上和 JavaScript Promise 很像

1
2
3
4
5
6
7
8
9
10
11
12
import asyncio

async def main():
future = asyncio.Future() # pending状态
# 模拟异步完成
asyncio.get_event_loop().call_later(1, future.set_result, "done") # 模拟 1 秒后设置结果
print("等待结果中...")
result = await future # 会挂起,直到future完成
print("得到结果:", result)

# 创建Future对象 -> Future状态:pending -> await future -> 事件循环/回调准备好结果 -> ser_resut(value)/set_exception(exc)
# -> Future状态:已完成 -> 事件循环恢复等待的协程 -> 协程获取结果或异常
  • Future 是底层的异步原语

  • Task 是包装协程的调度单元,也可以 awaitasyncio.Task 本质上是 Future 的子类,表示一个正在运行的协程任务

  • Future的3种状态

    • 未完成(pending):刚创建,还没有结果。
    • 已完成(finished):任务完成,有返回值。
    • 已失败(exception):任务完成,但抛出了异常。
  • Future的方法

    • future.set_result(value):手动给它一个结果,状态变为“已完成”。
    • future.set_exception(exc):让它以异常结束。
    • future.result():获取结果(如果还没完成,会抛 InvalidStateError)。
    • future.done():判断是否完成。
    • await future:异步等待结果(推荐方式)。
  • Future & Task

    • Future:只是一个结果容器,本身不会执行代码。

    • Task:是 Future 的子类,会调度执行一个协程,并把结果放到自己内部(本质上 Task 也是一个特殊的 Future)。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      import asyncio

      async def coro():
      await asyncio.sleep(1)
      return "done" # return value会调用 set_result 存入 Future 部分;raise exception会调用 set_exception 存入 Future 部分

      async def main():
      fut = asyncio.Future() # 手动控制
      task = asyncio.create_task(coro()) # 自动运行

      print(isinstance(task, asyncio.Future)) # True

      asyncio.run(main())

等待多个任务:asyncio.gather

gather 就像一个调度员,把多个异步任务打包发射,让它们“同时”跑,然后等全部跑完,把所有结果按顺序放进一个列表(或元组)返回。

1
2
3
4
5
6
7
8
9
async def task1(): ...
async def task2(): ...

results = await asyncio.gather(task1(), task2())

# 任务几乎同时开始,耗时取决于最长的那个。
# 返回结果顺序与传入顺序一致,不是完成的先后顺序。

# 调用asyncio.gather -> 为每个可等待对象创建Task -> 事件循环并发调度所有任务 -> 等待所有任务完成 -> 按传入顺序收集结果 -> 返回结果列表给调用方
  • 并发执行多个协程。并发运行多个可等待对象(coroutine / Task / Future)并收集所有结果的高层 API

  • 返回所有结果组成的列表

  • 参数,asyncio.gather(*aws, return_exceptions=False)

    • *aws:任意数量的可等待对象(协程、Task、Future)。
    • return_exceptions
      • False(默认):只要一个任务抛异常,gather 就立刻抛出那个异常,其他任务可能会被取消。
      • True:不会抛异常,而是把异常对象当作结果放到返回列表中。
  • wait 区别

    • asyncio.gather:收集结果,返回值是按传入顺序排列的结果列表
    • asyncio.wait:返回已完成和未完成任务的集合,不会自动收集结果,需要自己 .result() 取值。

等待第一个完成:asyncio.wait

并发运行多个可等待对象(coroutine / Task / Future),并返回“已完成”和“未完成”的集合,而不帮你自动收集结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 一、
done, pending = await asyncio.wait([task1(), task2()], return_when=asyncio.FIRST_COMPLETED)

# 二、
import asyncio

async def task(name, delay):
await asyncio.sleep(delay)
return f"{name} done"

async def main():
tasks = [
asyncio.create_task(task("A", 2)),
asyncio.create_task(task("B", 1)),
asyncio.create_task(task("C", 3))
]

done, pending = await asyncio.wait(tasks)
print("已完成:", done)
print("未完成:", pending)

for d in done:
print("结果:", d.result())

asyncio.run(main())

# 创建Task/Future集合 -> 调用asyncio.wait -> 事件循环并发调度任务 -> 根据return_when条件出发返回 -> 返回done,pending两个集合 -> 手动从done中提取结果或异常
  • 可控制等待策略(全部完成、任意完成等)

  • 特点

    • 返回值
      • 返回一个 二元组 (done, pending)
        • done:已完成任务的 set
        • pending:未完成任务的 set
    • 不保证顺序
      • done 集合中的任务顺序与完成先后无关(集合是无序的)。
    • 不会自动取结果
      • 你需要自己用 .result().exception()done 中获取结果。
    • 需要 Task/Future
      • 传入的协程会被包装成 Task,否则它不会被调度运行(官方推荐先 create_task)。
  • 重要参数,asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)

    • aws:可等待对象集合(Task/Future)。
    • timeout:等待的秒数(超时后返回当前完成/未完成任务)。
    • return_when:控制何时返回,取值:
      • ALL_COMPLETED(默认):全部完成才返回。
      • FIRST_COMPLETED:任意一个完成就返回。
      • FIRST_EXCEPTION:任意一个抛异常就返回。

等待带超时的任务:asyncio.wait_for

wait_for 就像在异步任务外面套了一个“倒计时炸弹”,如果任务在规定时间内没完成,就会抛 asyncio.TimeoutError,并(可选地)取消这个任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 一、
result = await asyncio.wait_for(task(), timeout=5.0)

# 二、
import asyncio

async def slow():
await asyncio.sleep(3)
return "done"

async def main():
try:
result = await asyncio.wait_for(slow(), timeout=2)
print(result)
except asyncio.TimeoutError:
print("任务超时了!")

asyncio.run(main())

# 启动wait_for -> 事件循环运行aw -> aw在timeout秒内完成? -> 返回aw结果/取消aw并抛出TimeoutError
  • 超时会抛出 asyncio.TimeoutError

  • 参数,asyncio.wait_for(aw, timeout)

    • aw

      • 可等待对象(协程、Task、Future)。
    • timeout

      • 超时时间(秒,浮点数或整数)。
      • None 表示无限等待(等价于 await aw)。
  • 执行行为

    • 任务取消

      • 默认:一旦超时,会调用 aw.cancel() 取消任务。
      • 如果任务忽略取消(try: except asyncio.CancelledError:),它可能会继续运行(后台执行)。
    • 异常处理

      • 超时时会抛 asyncio.TimeoutError
    • 配合 gather/wait 使用

      • 可以对多个任务的整体执行时间加总超时限制。

等待异步生成器(async for

async def 协程函数中,按异步方式遍历一个异步可迭代对象(asynchronous iterable),在每次取下一个元素时都可以执行异步等待(await),不会阻塞事件循环。换句话说,它是 for 循环在异步世界的版本。async for 是“可等待的 for 循环”,专门用来遍历异步数据源,让每次取值的过程都不阻塞事件循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 普通的 for 循环要求迭代器是同步的,取下一个元素是立刻完成的。
# 但在异步场景中,有时“取下一个元素”需要等待,比如:
# 从网络流分块读取数据
# 从异步队列里取任务
# 从数据库异步游标取记录
# 如果用同步 for,会阻塞事件循环,导致其他协程无法运行。
# async for 解决了这个问题,让迭代过程也是异步的。

# 一、
async def gen():
for i in range(3):
yield i

async def main():
async for item in gen():
print(item)

# 二、
import asyncio

class AsyncCounter:
def __init__(self, start, end):
self.current = start
self.end = end

def __aiter__(self):
return self

async def __anext__(self):
if self.current >= self.end:
raise StopAsyncIteration
await asyncio.sleep(1) # 模拟异步等待
self.current += 1
return self.current

async def main():
async for num in AsyncCounter(0, 3):
print(num)

asyncio.run(main())
  • async for 内部隐式使用 await 来获取下一个值


等待异步上下文管理器(async with

在异步环境(async def 协程)中,安全、优雅地管理需要 await 的资源获取和释放过程,就像 with 是同步版本的资源管理器一样。async with协程世界里的 with,专门用来管理需要异步初始化和清理的资源,保证即使出错也会正确释放。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 在异步场景中,有时进入/退出都需要等待,比如:
# 异步打开/关闭网络连接
# 异步锁的获取/释放
# 异步启动/停止会话(数据库、WebSocket)
# 异步文件操作
# 如果用普通 with,就没法 await 这些过程,必须用 async with。

# 一、
async with aiofiles.open('file.txt', mode='r') as f:
content = await f.read()

# 二、
import asyncio

class AsyncResource:
async def __aenter__(self):
print("异步进入资源")
await asyncio.sleep(1) # 模拟耗时初始化
return "资源内容"

async def __aexit__(self, exc_type, exc, tb):
print("异步释放资源")
await asyncio.sleep(1) # 模拟耗时清理

async def main():
async with AsyncResource() as res:
print("使用资源:", res)

asyncio.run(main())
  • async with 用于异步资源管理,如文件、连接池等

  • # 进入上下文:调用 __aenter__
    # 退出上下文:调用 __aexit__
    

注意事项

  • await 只能用于 async def 中,不能在普通函数或模块级别使用
  • await 的对象必须是 awaitable(协程、Future、Task、异步生成器等)
  • await 会让出事件循环控制权,避免阻塞