品划网络做网站,潍坊淘宝网站建设,网站建设大题,seo最好的cms系统在 Python 3.8 以后的版本中#xff0c;异步编程变得越来越重要。本文将系统介绍 Python 标准库中的异步编程工具#xff0c;带领大家掌握 async/await 语法和 asyncio 的使用。
从一个简单的场景开始
假设我们在处理一些耗时的 I/O 操作#xff0c;比如读取多个文件或处理…在 Python 3.8 以后的版本中异步编程变得越来越重要。本文将系统介绍 Python 标准库中的异步编程工具带领大家掌握 async/await 语法和 asyncio 的使用。
从一个简单的场景开始
假设我们在处理一些耗时的 I/O 操作比如读取多个文件或处理多个数据。为了模拟这种场景我们先用 time.sleep() 来代表耗时操作
import time
import randomdef process_item(item):# 模拟耗时操作print(f处理中{item})process_time random.uniform(0.5, 2.0)time.sleep(process_time)return f处理完成{item}耗时 {process_time:.2f} 秒def process_all_items():items [任务A, 任务B, 任务C, 任务D]results []for item in items:result process_item(item)results.append(result)return resultsif __name__ __main__:start time.time()results process_all_items()end time.time()print(\n.join(results))print(f总耗时{end - start:.2f} 秒)处理中任务A
处理中任务B
处理中任务C
处理中任务D
处理完成任务A耗时 1.97 秒
处理完成任务B耗时 1.28 秒
处理完成任务C耗时 0.66 秒
处理完成任务D耗时 1.80 秒
总耗时5.72 秒这段代码的问题很明显每个任务都必须等待前一个任务完成才能开始。如果有4个任务每个任务平均耗时1秒那么总耗时就接近4秒。
认识 async/await
Python 引入了 async/await 语法来支持异步编程。当我们在函数定义前加上 async 关键字时这个函数就变成了一个协程coroutine。而 await 关键字则用于等待一个协程完成。让我们改写上面的代码
import asyncio
import random
import timeasync def process_item(item):print(f处理中{item})# async 定义的函数变成了协程process_time random.uniform(0.5, 2.0)# time.sleep() 换成 asyncio.sleep()await asyncio.sleep(process_time) # await 等待异步操作完成return f处理完成{item}耗时 {process_time:.2f} 秒async def process_all_items():items [任务A, 任务B, 任务C, 任务D]# 创建任务列表tasks [asyncio.create_task(process_item(item))for item in items]print(开始处理)results await asyncio.gather(*tasks)return resultsasync def main():start time.time()results await process_all_items()end time.time()print(\n.join(results))print(f总耗时{end - start:.2f} 秒)if __name__ __main__:asyncio.run(main())开始处理
处理中任务A
处理中任务B
处理中任务C
处理中任务D
处理完成任务A耗时 1.97 秒
处理完成任务B耗时 0.80 秒
处理完成任务C耗时 0.83 秒
处理完成任务D耗时 1.46 秒
总耗时1.97 秒让我们详细解释这段代码的执行过程
当函数被 async 关键字修饰后调用该函数不会直接执行函数体而是返回一个协程对象await 关键字只能在 async 函数内使用它表示等待这个操作完成后再继续asyncio.create_task() 将协程包装成一个任务该任务会被事件循环调度执行asyncio.gather() 并发运行多个任务并等待它们全部完成asyncio.run() 创建事件循环运行 main() 协程直到它完成
使用 asyncio.wait_for 添加超时控制
在实际应用中我们往往需要为异步操作设置超时时间
import asyncio
import random
import timeasync def process_item(item):process_time random.uniform(0.5, 2.0)try:# 设置1秒超时await asyncio.wait_for(asyncio.sleep(process_time),timeout1.0)return f处理完成{item}耗时 {process_time:.2f} 秒except asyncio.TimeoutError:return f处理超时{item}async def main():items [任务A, 任务B, 任务C, 任务D]tasks [asyncio.create_task(process_item(item))for item in items]start time.time()results await asyncio.gather(*tasks, return_exceptionsTrue)end time.time()print(\n.join(results))print(f总耗时{end - start:.2f} 秒)if __name__ __main__:asyncio.run(main())处理超时任务A
处理完成任务B耗时 0.94 秒
处理超时任务C
处理完成任务D耗时 0.78 秒
总耗时1.00 秒使用异步上下文管理器
Python 中的 with 语句可以用于资源管理类似地异步编程中我们可以使用 async with 。一个类要支持异步上下文管理需要实现 __aenter__ 和 __aexit__ 方法
import asyncio
import randomclass AsyncResource:async def __aenter__(self):# 异步初始化资源print(正在初始化资源...)await asyncio.sleep(0.1)return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):# 异步清理资源print(正在清理资源...)await asyncio.sleep(0.1)async def process(self, item):# 异步处理任务print(f正在处理任务{item})process_time random.uniform(0.5, 2.0)await asyncio.sleep(process_time)return f处理完成{item}耗时 {process_time:.2f} 秒async def main():items [任务A, 任务B, 任务C]async with AsyncResource() as resource:tasks [asyncio.create_task(resource.process(item))for item in items]results await asyncio.gather(*tasks)print(\n.join(results))if __name__ __main__:asyncio.run(main())正在初始化资源...
正在处理任务任务A
正在处理任务任务B
正在处理任务任务C
正在清理资源...
处理完成任务A耗时 1.31 秒
处理完成任务B耗时 0.77 秒
处理完成任务C耗时 0.84 秒使用事件循环执行阻塞操作 run_in_executor
在异步编程中我们可能会遇到一些无法避免的阻塞操作比如调用传统的同步API。这时asyncio.get_running_loop() 和 run_in_executor 就显得特别重要
import asyncio
import time
import requests # 一个同步的HTTP客户端库async def blocking_operation():# 获取当前事件循环loop asyncio.get_running_loop()# 在线程池中执行阻塞操作result await loop.run_in_executor(None, # 使用默认的线程池执行器requests.get, # 要执行的阻塞函数http://httpbin.org/delay/1 # 函数参数)return result.status_codeasync def non_blocking_operation():await asyncio.sleep(1)return 非阻塞操作完成async def main():# 同时执行阻塞和非阻塞操作tasks [asyncio.create_task(blocking_operation()),asyncio.create_task(non_blocking_operation())]start time.time()results await asyncio.gather(*tasks)end time.time()print(f操作结果{results})print(f总耗时{end - start:.2f} 秒)if __name__ __main__:asyncio.run(main())输出
操作结果[200, 非阻塞操作完成]
总耗时1.99 秒这个例子展示了如何在异步程序中优雅地处理同步操作。如果不使用 run_in_executor阻塞操作会阻塞整个事件循环导致其他任务无法执行
requests.get() 是同步操作,会阻塞当前线程事件循环运行在主线程上如果直接在协程中调用 requests.get() 整个事件循环都会被阻塞其他任务无法在这期间执行run_in_executor 会将阻塞操作放到另一个线程中执行主线程的事件循环可以继续处理其他任务当线程池中的操作完成时结果会被返回给事件循环
最佳实践是
尽量使用原生支持异步的库(如 aiohttp)如果必须使用同步库就用 run_in_executor对于 CPU 密集型任务也可以用 run_in_executor 放到进程池中执行
任务取消优雅地终止异步操作
有时我们需要取消正在执行的异步任务比如用户中断操作或超时处理
import asyncio
import randomasync def long_operation(name):try:print(f{name} 开始执行)while True: # 模拟一个持续运行的操作await asyncio.sleep(0.5)print(f{name} 正在执行...)except asyncio.CancelledError:print(f{name} 被取消了)raise # 重要继续传播取消信号async def main():# 创建三个任务task1 asyncio.create_task(long_operation(任务1))task2 asyncio.create_task(long_operation(任务2))task3 asyncio.create_task(long_operation(任务3))# 等待1秒后取消task1await asyncio.sleep(1)task1.cancel()# 等待2秒后取消其余任务await asyncio.sleep(1)task2.cancel()task3.cancel()try:# 等待所有任务完成或被取消await asyncio.gather(task1, task2, task3, return_exceptionsTrue)except asyncio.CancelledError:print(某个任务被取消了)if __name__ __main__:asyncio.run(main())输出
任务1 开始执行
任务2 开始执行
任务3 开始执行
任务1 正在执行...
任务2 正在执行...
任务3 正在执行...
任务1 被取消了
任务2 正在执行...
任务3 正在执行...
任务2 正在执行...
任务3 正在执行...
任务2 被取消了
任务3 被取消了这个例子展示了如何正确处理任务取消
任务可以在执行过程中被取消被取消的任务会抛出 CancelledError我们应该适当处理取消信号确保资源被正确清理
深入理解协程为什么需要 async/await
协程Coroutine是一种特殊的函数它可以在执行过程中暂停并在之后从暂停的地方继续执行。当我们使用 async 定义一个函数时我们实际上是在定义一个协程
import asyncio# 这是一个普通函数
def normal_function():return Hello# 这是一个协程
async def coroutine_function():await asyncio.sleep(1)return Hello# 让我们看看它们的区别
print(normal_function) # function normal_function at 0x1052cc040
print(coroutine_function) # function coroutine_function at 0x1054b9790# 调用它们的结果不同
print(normal_function()) # 直接返回: Hello
print(coroutine_function()) # RuntimeWarning: coroutine coroutine_function was never awaited
# coroutine object coroutine_function at 0x105962e40await 如何与事件循环协作
协程Coroutine的核心在于它可以在执行过程中主动交出控制权让其他代码有机会执行。让我们通过一个详细的例子来理解这个过程
import asyncioasync def task1():print(任务1开始)print(任务1准备休眠)await asyncio.sleep(2) # 关键点1交出控制权print(任务1休眠结束)async def task2():print(任务2开始)print(任务2准备休眠)await asyncio.sleep(1) # 关键点2交出控制权print(任务2休眠结束)async def main():# 同时执行两个任务await asyncio.gather(task1(), task2())asyncio.run(main())这段代码的输出会是
任务1开始
任务1准备休眠
任务2开始
任务2准备休眠
任务2休眠结束 # 1秒后
任务1休眠结束 # 2秒后让我们详细解释执行过程 当程序遇到 await asyncio.sleep(2) 时 这个 sleep 操作被注册到事件循环中Python 记录当前的执行位置task1 主动交出控制权重要task1 并没有停止运行而是被暂停了等待之后恢复 事件循环接管控制权后 寻找其他可以执行的协程这里是 task2开始执行 task2直到遇到 await asyncio.sleep(1)task2 也交出控制权被暂停 事件循环继续工作 管理一个计时器追踪这两个 sleep 操作1秒后发现 task2 的 sleep 时间到了恢复 task2 的执行打印任务2休眠结束2秒到时恢复 task1 的执行打印任务1休眠结束
这就像是一个指挥家事件循环在指挥一个管弦乐队多个协程
当某个乐器协程需要休息时它举手示意await指挥家看到后立即指挥其他乐器演奏当休息时间到了指挥家会示意这个乐器继续演奏
代码验证
import asyncio
import timeasync def report_time(name, sleep_time):print(f{time.strftime(%H:%M:%S)} - {name}开始)await asyncio.sleep(sleep_time)print(f{time.strftime(%H:%M:%S)} - {name}结束)async def main():# 同时执行多个任务await asyncio.gather(report_time(任务A, 2),report_time(任务B, 1),report_time(任务C, 3))asyncio.run(main())输出
00:19:26 - 任务A开始
00:19:26 - 任务B开始
00:19:26 - 任务C开始
00:19:27 - 任务B结束
00:19:28 - 任务A结束
00:19:29 - 任务C结束这种机制的优势在于
单线程执行没有线程切换开销协程主动交出控制权而不是被操作系统强制切换比起回调地狱代码更清晰易读错误处理更直观可以使用普通的 try/except
理解了这个机制我们就能更好地使用异步编程
在 await 的时候其他协程有机会执行耗时操作应该是真正的异步操作比如 asyncio.sleep 不要在协程中使用阻塞操作那样会卡住整个事件循环
小结
Python 的异步编程主要依赖以下概念
async/await 语法定义和等待协程asyncio 模块提供事件循环和任务调度Task 对象表示待执行的工作单元异步上下文管理器管理异步资源
使用异步编程的关键点
I/O 密集型任务最适合使用异步编程所有耗时操作都应该是真正的异步操作注意处理超时和异常情况合理使用 asyncio.gather() 和 asyncio.wait_for()
异步编程不是万能的但在处理 I/O 密集型任务时确实能带来显著的性能提升。合理使用这些工具能让我们的程序更高效、更优雅。