当前位置: 首页 > news >正文

Python asyncio 入门实战-2

目录

    • 简介
    • 前置代码
      • 计算协程函数花费时间的代码
    • aiohttp 的使用方法
      • 发送 Web 请求
      • 设置超时时间
    • 执行并发请求
      • 任务并发 web 请求
      • 使用 gather
      • 结果的有序性
      • 抛出异常的两种方式
      • 在请求完成时立即处理
    • 使用 wait 控制
      • 等待所有任务完成
      • 异常处理
      • 及时处理完成的任务

简介

第一部分看这里

Python asyncio 入门实战-1

第一部分我们学习了 asyncio 的协程函数、任务、Future 等知识,第二部分我们介绍一下 aiohttp 异步非阻塞 http 请求库的基本使用和 Python 中处理异步任务时最常用的几种函数:asyncio.gather()asyncio.wait()asyncio.as_completed()

前置代码

计算协程函数花费时间的代码

def async_timed():"""计算异步函数耗时"""def wrapper(func):@functools.wraps(func)async def wrapped(*args, **kwargs):print(f"开始函数 {func} 参数是 {args} {kwargs}")start = time.time()try:return await func(*args, **kwargs)finally:end = time.time()total = end - startprint(f"完成函数 {func} 共计 {total:.4f} 秒")return wrappedreturn wrapper

aiohttp 的使用方法

第一部分我们说过,python 最常用的 http 请求库之一requests库是阻塞的,调用它会阻塞事件循环中的其他任务,而aiohttp就是非阻塞的库,它可以在事件循环中以非阻塞的形式发送 http 请求。

发送 Web 请求

使用pip install aiohttp安装

import asyncio
import aiohttp
from aiohttp import ClientSessionimport functools
import time@async_timed()
async def fetch_status(session: ClientSession, url: str) -> int:"""Fetch the status code of a URL."""async with session.get(url) as response:return response.status@async_timed()
async def main() -> None:async with aiohttp.ClientSession() as session:url = 'https://example.com'status_code = await fetch_status(session, url)print(f"Status code for {url}: {status_code}")asyncio.run(main())

设置超时时间

aiohttp 的请求超时时间为 5 分钟,我们可以手动设置的低一点。

import asyncio
import aiohttp
from aiohttp import ClientSessionimport functools
import timedef async_timed():"""计算异步函数耗时"""def wrapper(func):@functools.wraps(func)async def wrapped(*args, **kwargs):print(f"开始函数 {func} 参数是 {args} {kwargs}")start = time.time()try:return await func(*args, **kwargs)finally:end = time.time()total = end - startprint(f"完成函数 {func} 共计 {total:.4f} 秒")return wrappedreturn wrapper@async_timed()
async def fetch_status(session: ClientSession, url: str) -> int:"""Fetch the status code of a URL."""async with session.get(url) as response:return response.status@async_timed()
async def main() -> None:session_timeout = aiohttp.ClientTimeout(total=0.1, connect=1)async with aiohttp.ClientSession(timeout=session_timeout) as session:url = 'https://example.com'status_code = await fetch_status(session, url)print(f"Status code for {url}: {status_code}")asyncio.run(main())

执行并发请求

任务并发 web 请求

在第一部分,我们学到了可以使用 Task 来执行协程函数,但是这有一个问题,就是如果需要同时执行数个任务的话,会比较麻烦,你可能会想到使用循环,但是这样有个陷阱

@async_timed()
async def main() -> None:delay_times = [3, 3, 3][await asyncio.create_task(asyncio.sleep(delay)) for delay in delay_times]asyncio.run(main())

执行代码你会发现这会耗时超过 9 秒,也就意味着它是串行执行的,至于为什么,是因为它没创建一个任务就await这意味着阻塞,稍微修改一下就好了

@async_timed()
async def main() -> None:delay_times = [3, 3, 3]tasks = [ asyncio.create_task(asyncio.sleep(delay)) for delay in delay_times][await task for task in tasks]asyncio.run(main())

这样就是先统一创建任务,然后运行了。

虽然这能解决一次性创建多个任务的需求,但还是有缺点的,最大的缺点就是会有失败的请求,如果一个请求失败了,前面已经完成的结果我们无法拿到并且使用。

使用 gather

为了解决上面提到的问题,我们需要学习一个新的方法,asyncio.gather它直接翻译为 收集,顾名思义,收集一系列可等待对象执行

@async_timed()
async def main() -> None:urls = ['https://www.example.com'] * 1000request_list = [fetch_status(url) for url in urls]status_code = await asyncio.gather(*request_list)print(status_code)asyncio.run(main())

对比一下,比同步省下不少时间,接下来,我们要着重介绍 gather 的不同用法了。

结果的有序性

async def task(name, delay):await asyncio.sleep(delay)print(f"Task {name} done (after {delay}s)")return f"Result from {name}"async def main():results = await asyncio.gather(task("A", 3),task("B", 1),task("C", 2),)print("\nFinal results (ordered):")print(results)

如果你运行上面这段示例,会返回如下的结果

Task B done (after 1s)
Task C done (after 2s)
Task A done (after 3s)Final results (ordered):
['Result from A', 'Result from B', 'Result from C']

我们看到,任务 A 耗时最长,但是还是在第一位,也就是说,gather 返回的结果默认是按顺序的。

抛出异常的两种方式

@async_timed()
async def main():async with aiohttp.ClientSession() as session:urls = ['https://www.example.com', 'bad']tasks = [fetch_status(session, url) for url in urls]status_code = await asyncio.gather(*tasks)print(f"Status codes: {status_code}")

默认方式是直接返回异常调用栈,同时也不会取消其他正在运行的任务。

我们可以通过设置return_exceptions参数来修改默认方式。

@async_timed()
async def main():async with aiohttp.ClientSession() as session:urls = ['https://www.example.com', 'bad']tasks = [fetch_status(session, url) for url in urls]results = await asyncio.gather(*tasks, return_exceptions=True)exceptions = [res for res in results if isinstance(res, Exception)]success = [res for res in results if not isinstance(res, Exception)]print(f"All results: {results}")print(f"Exceptions: {exceptions}")print(f"Success: {success}")

这样,我们就可以针对成功的结果和异常的结果分别处理,虽然这样还是有点笨笨的,没关系,后面我们还会介绍一种方法。

在请求完成时立即处理

gather 方法有一个问题,就是它的结果必须要全部执行完成之后才可用,我们需要一个可以在每个任务完成时,都可以使用这些已完成任务结果的方法,巧了,as_completed就是这样一个方法。

@async_timed()
async def main():async with aiohttp.ClientSession() as session:fetchers = [fetch_status(session, 'https://www.example.com', delay=1),fetch_status(session, 'https://www.example.org', delay=2),fetch_status(session, 'https://www.example.net', delay=3)]for f in asyncio.as_completed(fetchers):status = await fprint(f"Status: {status}")

设置超时时间

@async_timed()
async def main():async with aiohttp.ClientSession() as session:fetchers = [fetch_status(session, 'https://www.example.com', delay=1),fetch_status(session, 'https://www.example.org', delay=2),fetch_status(session, 'https://www.example.net', delay=3)]for f in asyncio.as_completed(fetchers):status = await fprint(f"Status: {status}")

使用 wait 控制

通过前面的内容,我们知道了gatheras_completed的一些缺点,那就是没有简单的方法来取消正在运行的任务。而wait提供了更具体的控制来处理成功或者失败的请求,它返回两个集合,第一个集合是完成或者失败的任务结果集合,第二个是还没有执行的任务的集合。

等待所有任务完成

默认情况下,wait 会返回所有任务已完成的结果

import asyncioasync def task(name, delay):await asyncio.sleep(delay)print(f"Task {name} done")return f"Result from {name}"@async_timed()
async def main():tasks = [asyncio.create_task(task("A", 2)),asyncio.create_task(task("B", 1)),asyncio.create_task(task("C", 3)),]# wait for all to completedone, pending = await asyncio.wait(tasks)print("\nDone tasks:")for d in done:print(d.result())  # 获取每个已完成任务的返回值asyncio.run(main())

异常处理

@async_timed()
async def main():async with aiohttp.ClientSession() as session:good_result = fetch_status(session, "https://www.example.com")bad_result = fetch_status(session, "bad.com")fetchers = [asyncio.create_task(good_result), asyncio.create_task(bad_result)]done, pending = await asyncio.wait(fetchers)for done_task in done:if done_task.exception() is None:print(done_task.result())else:logging.error("请求失败", exc_info=done_task.exception())

我们可以使用 exception方法处理异常。我们可以再加一段代码,使得当发生异常时,后面的任务都取消

@async_timed()
async def main():async with aiohttp.ClientSession() as session:good_result = fetch_status(session, "https://www.example.com")bad_result = fetch_status(session, "bad.com")fetchers = [asyncio.create_task(good_result), asyncio.create_task(bad_result)]done, pending = await asyncio.wait(fetchers)for done_task in done:if done_task.exception() is None:print(done_task.result())else:logging.error("请求失败", exc_info=done_task.exception())

及时处理完成的任务

参数 return_when 默认是 asyncio.ALL_COMPLETED即完成所有任务,遇到异常时返回参数是asyncio.FIRST_EXCEPTION,现在我们介绍至少有一个结果时返回,它的参数是 asyncio.FIRST_COMPLETED

async def main():async with aiohttp.ClientSession() as session:fetchers = [asyncio.create_task(fetch_status(session, "https://www.example.com")),asyncio.create_task(fetch_status(session, "https://www.example.com")),asyncio.create_task(fetch_status(session, "https://www.example.com")),]done, pending = await asyncio.wait(fetchers, return_when=asyncio.FIRST_COMPLETED)print(f"当前完成任务数量 {len(done)}")print(f"当前未完成任务数量 {len(pending)}")for done_task in done:print(await done_task)

它会返回一个已完成任务,和两个待执行任务。

相关文章:

  • 游戏引擎学习第226天
  • 381_C++_decrypt解密数据、encrypt加密数据,帧头和数据buffer分开
  • Nacos-Controller 2.0:使用 Nacos 高效管理你的 K8s 配置
  • 0415美团面试题目详解
  • MapSet 2 (Set)
  • Vulhub-DarkHole靶机通关攻略
  • 代码随想录算法训练营第十八天
  • redisson分布式锁--实际应用!!!
  • 决策树简介
  • redis -- redis介绍,性能(与mysql性能对比),使用场景,CAP介绍
  • gravity`(控制 View 内部内容的对齐方式)
  • Hikyuu C++与Python层交互机制
  • Vue 3中的setup【与Vue 2的区别】
  • 深度学习--深度学习概念、框架以及构造
  • GIT工具学习【1】:新安装git预操作
  • candence17.4原理图编号
  • 你了解哪些Java限流算法?
  • 深入解析操作系统的文件系统:从存储介质到数据管理的核心引擎
  • 猿辅导集团推首个教育AI范式小猿AI 聚焦家校应用场景发布3款新品
  • VGA显示
  • 运油-20亮相中埃空军联训
  • 北京理工大学:教师宫某涉嫌师德失范,暂停其一切职务活动
  • 语言天才、魔方大师,击败王楚钦前他豪言:我能比中国球员强
  • 美伊第二轮核问题间接谈判结束,伊方称“结果是建设性的”
  • 人民文学奖颁出,董宇辉获传播贡献奖
  • 伊朗艺术中的中国风