Python Asyncio异步深入浅出
Python Asyncio异步深入浅出
Kimbleex1. Asyncio
异步处理基本概念
async
:写在函数开头,将指定函数转换为协程。
event_loop
:事件循环,将协程注册到事件循环上协程才可以被调用。
coroutine
:协程,类似于平常所说的函数,用于实现某些操作,只有被转为任务或者注册到事件循环上后才可以执行,否则返回一个协程对象。
await
:写在协程、任务前,用于挂起阻塞的异步调用 。
task
:任务,进一步封装好的协程,可以直接被调用。
future
: 从功能来讲是跟task差不多,但是它是一个底层的等待对象,表示一个异步操作的最终结果,通常情况下不会在应用层级创建。
2. Asyncio
异步处理基础上手
2.1 协程和休眠
根据第1小节有关协程的介绍,简单写一个协程,使用async
关键字定义一个协程函数。再使用await
关键字挂起一个阻塞。
import asyncio |
运行结果如下:
可以看到执行代码共计2s,与预期一致。
注意:协程不能直接调用!如果直接调用协程返回的只有一个协程对象,并抛出RuntimeWarning
报错,无法执行内部逻辑。将上述案例进行修改如下。
import asyncio |
运行结果为:
2.2 任务
引入任务这一概念来实现多个协程的并发运行。当我两个任务都需要一定的运行时间时,可以通过并发操作来调整不同任务对cpu的时间占用,使得整个流程更加高效。
task
对象的常用方法如下:
asyncio.create_task(coro,*,name,context,eager_start)
创建化task
对象,其中coro
为协程对象,name
为任务的名称可自命名,默认为None
,context
为关联到该任务的contextvars.Context
对象,默认为None
,eager_satrt
为是否主动默认为False
。
task.cancel()
取消任务对象,会抛出asyncio.CancelledError
异常,可被捕获
task.done()
如果任务已执行完成,返回True
task.result()
返回任务的返回值
task.exception()
返回任务所抛出的异常
task.get_coro()
返回任务所包装的协程对象,如果任务主动完成,返回None,见3.1 主动任务工厂
task.get_name() & task.set_name
获取任务名称 & 设置任务名称
写一个案例。
import asyncio |
运行结果如下:
可以看到整体时间为5s而不是8s,这就实现了基本的异步功能。如果没有使用异步,直接逐步调用协程或者普通函数,上述代码的运行时长为8s。
2.3 任务组TaskGroup()
在python 3.11版本中新增了任务组asyncio.TaskGroup()
,它可以将很多tasks
加入到其中,并挨个等待执行。如果任务组中任何一个任务因为非人为取消的异常而失败,任务组中后续的所有任务都将会被终止。
任务组是一个异步上下文管理器,以async with
开头。
import asyncio |
运行结果如下:
可以看到任务组中的任务都成功执行了。
2.3.2 asyncio.gather()
其中,async.TaskGroup().create_task()
等价于asyncio.create_task()
,.result()
函数可以获取任务的返回值参数。
2.3.1 asyncio.gather()
与任务组进行对比的是asyncio.gather()
函数,它的参数表达式如下:
# *aws表示可等待对象组 |
与任务组方法最大的区别在于,当.gather()
函数中任务组中的任务抛出报错时,剩下的任务不会被取消,会继续执行直至任务组中的任务全部被执行。
由此可见,3.11版本新增的任务组是对.gather()
函数的一个安全性方向上的优化。
2.3.2 asyncio.wait()
此外,对于多任务并发的函数还有asyncio.wait(aws, *, timeout, return_when)
。其中aws
为可等待不能为空,且可等待对象必须为task
或者future
对象,它返回一个集合(done, pending)
,done
表示已完成的可等待对象,pending
表示待完成的可等待对象。常用的用法为:
done, pending = await asyncio.wait(aws) |
timeout
参数可以不指定,默认等待全部任务完成,若指定需要为整型或浮点型。
return_when
参数默认为ALL_COMPLETED
表示等待所有可等待对象全部完成,所有参数介绍如下:
asyncio.FIRST_COMPLETED
: 函数将在第一个可等待对象结束时取消并返回。
asyncio.FIRST_EXCEPTION
: 该函数将在任何future
对象抛出异常时返回,如果不存在future
对象则与ALL_COMPLETED
等价。
asyncio.ALL_COMPLETED
: 函数在所有可等待对象结束时返回。
下面是一个示例:
import asyncio |
运行结果如下:
可以看到已执行任务都在done
中,未执行的都在pending
中。
2.4 设置任务超时
2.4.1 asyncio.timeout()
asyncio.timeout(delay)
异步上下文管理器,delay为秒数,可为None。创建后可用Timeout.reschedule()
重新安排计划。它可以将任务超时引发的asyncio.CancelledError
转化为TimeoutError
以便捕获。
import asyncio |
运行结果如下:
可以看到抛出的错误是Timeout
,说明成功转换了取消异常。
注意:asyncio.timeout()在内部将异常进行了转化,Timeout异常是可被捕获的,所以想要捕获异常只能在该上下文管理器外部捕获。
asyncio.timeout_at(when)
与asynvio.timeout(delay)
功能差不多,只是参数中when
表示是停止等待的绝代时间,而delay
表示停止的秒数。
2.4.2 asyncio.Timeout()
asyncio.Timeout(when)
也为异步上下文管理器,when
为一个指明上下文将要过期的绝对时间,由事件的循环时钟进行计时。同样也可以通过reschedule
函数进行重新安排超时。
import asyncio |
运行结果如下:
2.4.3 asyncio.wait_for(aw, timeout)
与上述的超时方法功能一致,但是它不是异步上下文管理器。aw
参数表示可等待对象,timeout
表示等待的秒数,也可以为None
。如果可等待对象超时,同样会抛出Timeout
。
import asyncio |
运行结果如下:
2.5 任务取消
2.1 cancel()、cancelled()和uncancel()
如果想要人为主动取消task
任务,可以使用cancel()
方法来实现,它会抛出一个CancelledError
异常停止任务,这个异常是可以被捕获的。所以可以通过try...except...finally
方法来抑制cancel()
方法。所以该方法并不能保证绝对可以取消任务,只要人为干涉,它会失效。
import asyncio |
运行结果如下:
使用cancelled()
函数来判断task
对象是否被取消。如果被取消则返回True
。
使用uncancel()
方法来屏蔽asyncio.CancelledError
,以达到撤销取消操作的目的,但是这个方法不应该被经常使用,因为asyncio
很多内部函数都是通过asyncio.CancelledError
来抛出的。
2.2 shield()
如果像屏蔽取消操作而不屏蔽asyncio.CancelledError
异常,可以通过使用shield(aw)
方法来实现。aw
参数表示一个任务对象。如果传入的aw
对象为协程对象,则它会作为任务来进行调度。即:
task = asynciocreate_task(coroutine()) |
其实很容易看出来区别,第一个部分中协程被转为的任务,如果原来的协程被取消,任务不会受到影响,但是在第二部分直接调度协程,如果写成被取消,则会触发异常。
3. Asyncio高阶
3.1 主动任务工厂eager_task_factory()
该内容在python-3.12.
中新增.(2024/07/26时为最新版本)
当使用这个工厂函数时,协程将在task
对象构造期间同步地开始执行。 任务仅会在它们阻塞时被加入事件循环上的计划任务。 这可以达成性能提升因为对同步完成的协程来说可以避免循环调度的开销。
此特性会带来好处的一个常见例子是应用了缓存或记忆功能以便在可能的情况避免实际 I/O 的协程。
写一个示例:
import asyncio |
运行结果如下:
3.2 内省方法
常用的内省方法就几个,如下:
asyncio.current_task(loop=None)
返回当前运行的task
对象,没有就返回None
asyncio.all_tasks(loop=None)
返回事件循环所运行的未完成的task
对象的合集
asyncio.iscoroutine(obj)
如果传入的obj
是一个协程则返回True
3.3 线程中调度
当事件循环发生阻塞等问题时,通常可以用线程调度来调节阻塞。
3.3.1 引入线程
to_thread(func, /, *args, **kwargs)
方法可以调用一个额外的线程来执行传入的函数,以避免阻塞发生。下面写一个实例:
import time |
运行结果如下:
可以看到如果不使用线程,需要2 s,但是程序的执行只需要1 s,成功避免了事件循环的阻塞。
3.3.2 跨线程调度
asyncio.run_coroutine_threadsafe(coro, loop)
函数用于跨线程调度,即线程安全。
函数返回一个future
对象,等待其他线程的结果。需要在事件循环外部调用该函数。
# 创一个协程 |