From 1c5f66beeee9fd9230fc67f95eb91eb6c35693a0 Mon Sep 17 00:00:00 2001 From: Rumio <32546670+webjoin111@users.noreply.github.com> Date: Thu, 3 Jul 2025 17:39:13 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat(http=5Futils):=20=E9=87=8D?= =?UTF-8?q?=E6=9E=84=E7=BD=91=E7=BB=9C=E8=AF=B7=E6=B1=82=E5=B7=A5=E5=85=B7?= =?UTF-8?q?=E9=93=BE=EF=BC=8C=E5=A2=9E=E5=BC=BA=E7=A8=B3=E5=AE=9A=E6=80=A7?= =?UTF-8?q?=E4=B8=8E=E6=98=93=E7=94=A8=E6=80=A7=20(#1951)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ✨ feat(http_utils): 重构网络请求工具链,增强稳定性与易用性 🔧 HTTP工具优化: • 全局httpx.AsyncClient管理,提升连接复用效率 • AsyncHttpx类重构,支持临时客户端和配置覆盖 • 新增JSON请求方法(get_json/post_json),内置重试机制 • 兼容httpx>=0.28.0版本 🔄 重试机制升级: • Retry装饰器重构,提供simple/api/download预设 • 支持指数退避、条件重试和自定义失败处理 • 扩展异常覆盖范围,提升网络容错能力 🏗️ 架构改进: • 新增AllURIsFailedError统一异常处理 • 浏览器工具模块化,提升代码组织性 * :rotating_light: auto fix by pre-commit hooks * :art: 代码格式化 * :bug: 测试修复 --------- Co-authored-by: webjoin111 <455457521@qq.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: HibiKier <45528451+HibiKier@users.noreply.github.com> Co-authored-by: HibiKier <775757368@qq.com> --- tests/conftest.py | 1 + zhenxun/utils/browser.py | 165 ++++---- zhenxun/utils/decorator/retry.py | 224 +++++++++- zhenxun/utils/exception.py | 20 + zhenxun/utils/http_utils.py | 692 ++++++++++++++++++++----------- 5 files changed, 771 insertions(+), 331 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 0fce1583..d6a7e9fa 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -116,6 +116,7 @@ async def app(app: App, tmp_path: Path, mocker: MockerFixture): await init() # await driver._lifespan.startup() os.environ["AIOCACHE_DISABLE"] = "1" + os.environ["PYTEST_CURRENT_TEST"] = "1" yield app diff --git a/zhenxun/utils/browser.py b/zhenxun/utils/browser.py index ca2e7755..310ed606 100644 --- a/zhenxun/utils/browser.py +++ b/zhenxun/utils/browser.py @@ -1,91 +1,94 @@ -import os -import sys +from collections.abc import AsyncGenerator +from contextlib import asynccontextmanager +from pathlib import Path +from typing import Any, Literal -from nonebot import get_driver -from playwright.__main__ import main -from playwright.async_api import Browser, Playwright, async_playwright +from nonebot_plugin_alconna import UniMessage +from nonebot_plugin_htmlrender import get_browser +from playwright.async_api import Page -from zhenxun.configs.config import BotConfig -from zhenxun.services.log import logger - -driver = get_driver() - -_playwright: Playwright | None = None -_browser: Browser | None = None +from zhenxun.utils.message import MessageUtils -# @driver.on_startup -# async def start_browser(): -# global _playwright -# global _browser -# install() -# await check_playwright_env() -# _playwright = await async_playwright().start() -# _browser = await _playwright.chromium.launch() +class BrowserIsNone(Exception): + pass -# @driver.on_shutdown -# async def shutdown_browser(): -# if _browser: -# await _browser.close() -# if _playwright: -# await _playwright.stop() # type: ignore +class AsyncPlaywright: + @classmethod + @asynccontextmanager + async def new_page( + cls, cookies: list[dict[str, Any]] | dict[str, Any] | None = None, **kwargs + ) -> AsyncGenerator[Page, None]: + """获取一个新页面 - -# def get_browser() -> Browser: -# if not _browser: -# raise RuntimeError("playwright is not initalized") -# return _browser - - -def install(): - """自动安装、更新 Chromium""" - - def set_env_variables(): - os.environ["PLAYWRIGHT_DOWNLOAD_HOST"] = ( - "https://npmmirror.com/mirrors/playwright/" - ) - if BotConfig.system_proxy: - os.environ["HTTPS_PROXY"] = BotConfig.system_proxy - - def restore_env_variables(): - os.environ.pop("PLAYWRIGHT_DOWNLOAD_HOST", None) - if BotConfig.system_proxy: - os.environ.pop("HTTPS_PROXY", None) - if original_proxy is not None: - os.environ["HTTPS_PROXY"] = original_proxy - - def try_install_chromium(): + 参数: + cookies: cookies + """ + browser = await get_browser() + ctx = await browser.new_context(**kwargs) + if cookies: + if isinstance(cookies, dict): + cookies = [cookies] + await ctx.add_cookies(cookies) # type: ignore + page = await ctx.new_page() try: - sys.argv = ["", "install", "chromium"] - main() - except SystemExit as e: - return e.code == 0 - return False + yield page + finally: + await page.close() + await ctx.close() - logger.info("检查 Chromium 更新") + @classmethod + async def screenshot( + cls, + url: str, + path: Path | str, + element: str | list[str], + *, + wait_time: int | None = None, + viewport_size: dict[str, int] | None = None, + wait_until: ( + Literal["domcontentloaded", "load", "networkidle"] | None + ) = "networkidle", + timeout: float | None = None, + type_: Literal["jpeg", "png"] | None = None, + user_agent: str | None = None, + cookies: list[dict[str, Any]] | dict[str, Any] | None = None, + **kwargs, + ) -> UniMessage | None: + """截图,该方法仅用于简单快捷截图,复杂截图请操作 page - original_proxy = os.environ.get("HTTPS_PROXY") - set_env_variables() - - success = try_install_chromium() - - if not success: - logger.info("Chromium 更新失败,尝试从原始仓库下载,速度较慢") - os.environ["PLAYWRIGHT_DOWNLOAD_HOST"] = "" - success = try_install_chromium() - - restore_env_variables() - - if not success: - raise RuntimeError("未知错误,Chromium 下载失败") - - -async def check_playwright_env(): - """检查 Playwright 依赖""" - logger.info("检查 Playwright 依赖") - try: - async with async_playwright() as p: - await p.chromium.launch() - except Exception as e: - raise ImportError("加载失败,Playwright 依赖不全,") from e + 参数: + url: 网址 + path: 存储路径 + element: 元素选择 + wait_time: 等待截取超时时间 + viewport_size: 窗口大小 + wait_until: 等待类型 + timeout: 超时限制 + type_: 保存类型 + user_agent: user_agent + cookies: cookies + """ + if viewport_size is None: + viewport_size = {"width": 2560, "height": 1080} + if isinstance(path, str): + path = Path(path) + wait_time = wait_time * 1000 if wait_time else None + element_list = [element] if isinstance(element, str) else element + async with cls.new_page( + cookies, + viewport=viewport_size, + user_agent=user_agent, + **kwargs, + ) as page: + await page.goto(url, timeout=timeout, wait_until=wait_until) + card = page + for e in element_list: + if not card: + return None + card = await card.wait_for_selector(e, timeout=wait_time) + if card: + await card.screenshot(path=path, timeout=timeout, type=type_) + return MessageUtils.build_message(path) + return None diff --git a/zhenxun/utils/decorator/retry.py b/zhenxun/utils/decorator/retry.py index ddc55584..e81aa334 100644 --- a/zhenxun/utils/decorator/retry.py +++ b/zhenxun/utils/decorator/retry.py @@ -1,24 +1,226 @@ +from collections.abc import Callable +from functools import partial, wraps +from typing import Any, Literal + from anyio import EndOfStream -from httpx import ConnectError, HTTPStatusError, TimeoutException -from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed +from httpx import ( + ConnectError, + HTTPStatusError, + RemoteProtocolError, + StreamError, + TimeoutException, +) +from nonebot.utils import is_coroutine_callable +from tenacity import ( + RetryCallState, + retry, + retry_if_exception_type, + retry_if_result, + stop_after_attempt, + wait_exponential, + wait_fixed, +) + +from zhenxun.services.log import logger + +LOG_COMMAND = "RetryDecorator" +_SENTINEL = object() + + +def _log_before_sleep(log_name: str | None, retry_state: RetryCallState): + """ + tenacity 重试前的日志记录回调函数。 + """ + func_name = retry_state.fn.__name__ if retry_state.fn else "unknown_function" + log_context = f"函数 '{func_name}'" + if log_name: + log_context = f"操作 '{log_name}' ({log_context})" + + reason = "" + if retry_state.outcome: + if exc := retry_state.outcome.exception(): + reason = f"触发异常: {exc.__class__.__name__}({exc})" + else: + reason = f"不满足结果条件: result={retry_state.outcome.result()}" + + wait_time = ( + getattr(retry_state.next_action, "sleep", 0) if retry_state.next_action else 0 + ) + logger.warning( + f"{log_context} 第 {retry_state.attempt_number} 次重试... " + f"等待 {wait_time:.2f} 秒. {reason}", + LOG_COMMAND, + ) class Retry: @staticmethod - def api( - retry_count: int = 3, wait: int = 1, exception: tuple[type[Exception], ...] = () + def simple( + stop_max_attempt: int = 3, + wait_fixed_seconds: int = 2, + exception: tuple[type[Exception], ...] = (), + *, + log_name: str | None = None, + on_failure: Callable[[Exception], Any] | None = None, + return_on_failure: Any = _SENTINEL, ): - """接口调用重试""" + """ + 一个简单的、用于通用网络请求的重试装饰器预设。 + + 参数: + stop_max_attempt: 最大重试次数。 + wait_fixed_seconds: 固定等待策略的等待秒数。 + exception: 额外需要重试的异常类型元组。 + log_name: 用于日志记录的操作名称。 + on_failure: (可选) 所有重试失败后的回调。 + return_on_failure: (可选) 所有重试失败后的返回值。 + """ + return Retry.api( + stop_max_attempt=stop_max_attempt, + wait_fixed_seconds=wait_fixed_seconds, + exception=exception, + strategy="fixed", + log_name=log_name, + on_failure=on_failure, + return_on_failure=return_on_failure, + ) + + @staticmethod + def download( + stop_max_attempt: int = 3, + exception: tuple[type[Exception], ...] = (), + *, + wait_exp_multiplier: int = 2, + wait_exp_max: int = 15, + log_name: str | None = None, + on_failure: Callable[[Exception], Any] | None = None, + return_on_failure: Any = _SENTINEL, + ): + """ + 一个适用于文件下载的重试装饰器预设,使用指数退避策略。 + + 参数: + stop_max_attempt: 最大重试次数。 + exception: 额外需要重试的异常类型元组。 + wait_exp_multiplier: 指数退避的乘数。 + wait_exp_max: 指数退避的最大等待时间。 + log_name: 用于日志记录的操作名称。 + on_failure: (可选) 所有重试失败后的回调。 + return_on_failure: (可选) 所有重试失败后的返回值。 + """ + return Retry.api( + stop_max_attempt=stop_max_attempt, + exception=exception, + strategy="exponential", + wait_exp_multiplier=wait_exp_multiplier, + wait_exp_max=wait_exp_max, + log_name=log_name, + on_failure=on_failure, + return_on_failure=return_on_failure, + ) + + @staticmethod + def api( + stop_max_attempt: int = 3, + wait_fixed_seconds: int = 1, + exception: tuple[type[Exception], ...] = (), + *, + strategy: Literal["fixed", "exponential"] = "fixed", + retry_on_result: Callable[[Any], bool] | None = None, + wait_exp_multiplier: int = 1, + wait_exp_max: int = 10, + log_name: str | None = None, + on_failure: Callable[[Exception], Any] | None = None, + return_on_failure: Any = _SENTINEL, + ): + """ + 通用、可配置的API调用重试装饰器。 + + 参数: + stop_max_attempt: 最大重试次数。 + wait_fixed_seconds: 固定等待策略的等待秒数。 + exception: 额外需要重试的异常类型元组。 + strategy: 重试等待策略, 'fixed' (固定) 或 'exponential' (指数退避)。 + retry_on_result: 一个回调函数,接收函数返回值。如果返回 True,则触发重试。 + 例如 `lambda r: r.status_code != 200` + wait_exp_multiplier: 指数退避的乘数。 + wait_exp_max: 指数退避的最大等待时间。 + log_name: 用于日志记录的操作名称,方便区分不同的重试场景。 + on_failure: (可选) 当所有重试都失败后,在抛出异常或返回默认值之前, + 会调用此函数,并将最终的异常实例作为参数传入。 + return_on_failure: (可选) 如果设置了此参数,当所有重试失败后, + 将不再抛出异常,而是返回此参数指定的值。 + """ base_exceptions = ( TimeoutException, ConnectError, HTTPStatusError, + StreamError, + RemoteProtocolError, EndOfStream, *exception, ) - return retry( - reraise=True, - stop=stop_after_attempt(retry_count), - wait=wait_fixed(wait), - retry=retry_if_exception_type(base_exceptions), - ) + + def decorator(func: Callable) -> Callable: + if strategy == "exponential": + wait_strategy = wait_exponential( + multiplier=wait_exp_multiplier, max=wait_exp_max + ) + else: + wait_strategy = wait_fixed(wait_fixed_seconds) + + retry_conditions = retry_if_exception_type(base_exceptions) + if retry_on_result: + retry_conditions |= retry_if_result(retry_on_result) + + log_callback = partial(_log_before_sleep, log_name) + + tenacity_retry_decorator = retry( + stop=stop_after_attempt(stop_max_attempt), + wait=wait_strategy, + retry=retry_conditions, + before_sleep=log_callback, + reraise=True, + ) + + decorated_func = tenacity_retry_decorator(func) + + if return_on_failure is _SENTINEL: + return decorated_func + + if is_coroutine_callable(func): + + @wraps(func) + async def async_wrapper(*args, **kwargs): + try: + return await decorated_func(*args, **kwargs) + except Exception as e: + if on_failure: + if is_coroutine_callable(on_failure): + await on_failure(e) + else: + on_failure(e) + return return_on_failure + + return async_wrapper + else: + + @wraps(func) + def sync_wrapper(*args, **kwargs): + try: + return decorated_func(*args, **kwargs) + except Exception as e: + if on_failure: + if is_coroutine_callable(on_failure): + logger.error( + f"不能在同步函数 '{func.__name__}' 中调用异步的 " + f"on_failure 回调。", + LOG_COMMAND, + ) + else: + on_failure(e) + return return_on_failure + + return sync_wrapper + + return decorator diff --git a/zhenxun/utils/exception.py b/zhenxun/utils/exception.py index 8ec925ec..9ab664f4 100644 --- a/zhenxun/utils/exception.py +++ b/zhenxun/utils/exception.py @@ -64,3 +64,23 @@ class GoodsNotFound(Exception): """ pass + + +class AllURIsFailedError(Exception): + """ + 当所有备用URL都尝试失败后抛出此异常 + """ + + def __init__(self, urls: list[str], exceptions: list[Exception]): + self.urls = urls + self.exceptions = exceptions + super().__init__( + f"All {len(urls)} URIs failed. Last exception: {exceptions[-1]}" + ) + + def __str__(self) -> str: + exc_info = "\n".join( + f" - {url}: {exc.__class__.__name__}({exc})" + for url, exc in zip(self.urls, self.exceptions) + ) + return f"All {len(self.urls)} URIs failed:\n{exc_info}" diff --git a/zhenxun/utils/http_utils.py b/zhenxun/utils/http_utils.py index 0ccf777f..9f00e9af 100644 --- a/zhenxun/utils/http_utils.py +++ b/zhenxun/utils/http_utils.py @@ -1,16 +1,15 @@ import asyncio -from collections.abc import AsyncGenerator, Sequence +from collections.abc import AsyncGenerator, Awaitable, Callable, Sequence from contextlib import asynccontextmanager +import os from pathlib import Path import time -from typing import Any, ClassVar, Literal, cast +from typing import Any, ClassVar, cast import aiofiles import httpx -from httpx import AsyncHTTPTransport, HTTPStatusError, Proxy, Response -from nonebot_plugin_alconna import UniMessage -from nonebot_plugin_htmlrender import get_browser -from playwright.async_api import Page +from httpx import AsyncClient, AsyncHTTPTransport, HTTPStatusError, Proxy, Response +import nonebot from rich.progress import ( BarColumn, DownloadColumn, @@ -18,13 +17,84 @@ from rich.progress import ( TextColumn, TransferSpeedColumn, ) +import ujson as json from zhenxun.configs.config import BotConfig from zhenxun.services.log import logger -from zhenxun.utils.message import MessageUtils +from zhenxun.utils.decorator.retry import Retry +from zhenxun.utils.exception import AllURIsFailedError +from zhenxun.utils.manager.priority_manager import PriorityLifecycle from zhenxun.utils.user_agent import get_user_agent -CLIENT_KEY = ["use_proxy", "proxies", "proxy", "verify", "headers"] +from .browser import AsyncPlaywright, BrowserIsNone # noqa: F401 + +_SENTINEL = object() + +driver = nonebot.get_driver() +_client: AsyncClient | None = None + + +@PriorityLifecycle.on_startup(priority=0) +async def _(): + """ + 在Bot启动时初始化全局httpx客户端。 + """ + global _client + client_kwargs = {} + if proxy_url := BotConfig.system_proxy or None: + try: + version_parts = httpx.__version__.split(".") + major = int("".join(c for c in version_parts[0] if c.isdigit())) + minor = ( + int("".join(c for c in version_parts[1] if c.isdigit())) + if len(version_parts) > 1 + else 0 + ) + if (major, minor) >= (0, 28): + client_kwargs["proxy"] = proxy_url + else: + client_kwargs["proxies"] = proxy_url + except (ValueError, IndexError): + client_kwargs["proxy"] = proxy_url + logger.warning( + f"无法解析 httpx 版本 '{httpx.__version__}'," + "将默认使用新版 'proxy' 参数语法。" + ) + + _client = httpx.AsyncClient( + headers=get_user_agent(), + follow_redirects=True, + **client_kwargs, + ) + + logger.info("全局 httpx.AsyncClient 已启动。", "HTTPClient") + + +@driver.on_shutdown +async def _(): + """ + 在Bot关闭时关闭全局httpx客户端。 + """ + if _client: + await _client.aclose() + logger.info("全局 httpx.AsyncClient 已关闭。", "HTTPClient") + + +def get_client() -> AsyncClient: + """ + 获取全局 httpx.AsyncClient 实例。 + """ + global _client + if not _client: + if not os.environ.get("PYTEST_CURRENT_TEST"): + raise RuntimeError("全局 httpx.AsyncClient 未初始化,请检查启动流程。") + # 在测试环境中创建临时客户端 + logger.warning("在测试环境中创建临时HTTP客户端", "HTTPClient") + _client = httpx.AsyncClient( + headers=get_user_agent(), + follow_redirects=True, + ) + return _client def get_async_client( @@ -33,6 +103,10 @@ def get_async_client( verify: bool = False, **kwargs, ) -> httpx.AsyncClient: + """ + [向后兼容] 创建 httpx.AsyncClient 实例的工厂函数。 + 此函数完全保留了旧版本的接口,确保现有代码无需修改即可使用。 + """ transport = kwargs.pop("transport", None) or AsyncHTTPTransport(verify=verify) if proxies: http_proxy = proxies.get("http://") @@ -62,6 +136,30 @@ def get_async_client( class AsyncHttpx: + """ + 一个高级的、健壮的异步HTTP客户端工具类。 + + 设计理念: + - **全局共享客户端**: 默认情况下,所有请求都通过一个在应用启动时初始化的全局 + `httpx.AsyncClient` 实例发出。这个实例共享连接池,提高了效率和性能。 + - **向后兼容与灵活性**: 完全兼容旧的API,同时提供了两种方式来处理需要 + 特殊网络配置(如不同代理、超时)的请求: + 1. **单次请求覆盖**: 在调用 `get`, `post` 等方法时,直接传入 `proxies`, + `timeout` 等参数,将为该次请求创建一个临时的、独立的客户端。 + 2. **临时客户端上下文**: 使用 `temporary_client()` 上下文管理器,可以 + 获取一个独立的、可配置的客户端,用于执行一系列需要相同特殊配置的请求。 + - **健壮性**: 内置了自动重试、多镜像URL回退(fallback)机制,并提供了便捷的 + JSON解析和文件下载方法。 + """ + + CLIENT_KEY: ClassVar[list[str]] = [ + "use_proxy", + "proxies", + "proxy", + "verify", + "headers", + ] + default_proxy: ClassVar[dict[str, str] | None] = ( { "http://": BotConfig.system_proxy, @@ -72,155 +170,346 @@ class AsyncHttpx: ) @classmethod - @asynccontextmanager - async def _create_client( - cls, - *, - use_proxy: bool = True, - proxies: dict[str, str] | None = None, - proxy: str | None = None, - headers: dict[str, str] | None = None, - verify: bool = False, - **kwargs, - ) -> AsyncGenerator[httpx.AsyncClient, None]: - """创建一个私有的、配置好的 httpx.AsyncClient 上下文管理器。 + def _prepare_temporary_client_config(cls, client_kwargs: dict) -> dict: + """ + [向后兼容] 处理旧式的客户端kwargs,将其转换为get_async_client可用的配置。 + 主要负责处理 use_proxy 标志,这是为了兼容旧版本代码中使用的 use_proxy 参数。 + """ + final_config = client_kwargs.copy() - 说明: - 此方法用于内部统一创建客户端,处理代理和请求头逻辑,减少代码重复。 + use_proxy = final_config.pop("use_proxy", True) + + if "proxies" not in final_config and "proxy" not in final_config: + final_config["proxies"] = cls.default_proxy if use_proxy else None + return final_config + + @classmethod + def _split_kwargs(cls, kwargs: dict) -> tuple[dict, dict]: + """[优化] 分离客户端配置和请求参数,使逻辑更清晰。""" + client_kwargs = {k: v for k, v in kwargs.items() if k in cls.CLIENT_KEY} + request_kwargs = {k: v for k, v in kwargs.items() if k not in cls.CLIENT_KEY} + return client_kwargs, request_kwargs + + @classmethod + @asynccontextmanager + async def _get_active_client_context( + cls, client: AsyncClient | None = None, **kwargs + ) -> AsyncGenerator[AsyncClient, None]: + """ + 内部辅助方法,根据 kwargs 决定并提供一个活动的 HTTP 客户端。 + - 如果 kwargs 中有客户端配置,则创建并返回一个临时客户端。 + - 否则,返回传入的 client 或全局客户端。 + - 自动处理临时客户端的关闭。 + """ + if kwargs: + logger.debug(f"为单次请求创建临时客户端,配置: {kwargs}") + temp_client_config = cls._prepare_temporary_client_config(kwargs) + async with get_async_client(**temp_client_config) as temp_client: + yield temp_client + else: + yield client or get_client() + + @Retry.simple(log_name="内部HTTP请求") + async def _execute_request_inner( + self, client: AsyncClient, method: str, url: str, **kwargs + ) -> Response: + """ + [内部] 执行单次HTTP请求的私有核心方法,被重试装饰器包裹。 + """ + return await client.request(method, url, **kwargs) + + @classmethod + async def _single_request( + cls, method: str, url: str, *, client: AsyncClient | None = None, **kwargs + ) -> Response: + """ + 执行单次HTTP请求的私有方法,内置了默认的重试逻辑。 + """ + client_kwargs, request_kwargs = cls._split_kwargs(kwargs) + + async with cls._get_active_client_context( + client=client, **client_kwargs + ) as active_client: + response = await cls()._execute_request_inner( + active_client, method, url, **request_kwargs + ) + response.raise_for_status() + return response + + @classmethod + async def _execute_with_fallbacks( + cls, + urls: str | list[str], + worker: Callable[..., Awaitable[Any]], + *, + client: AsyncClient | None = None, + **kwargs, + ) -> Any: + """ + 通用执行器,按顺序尝试多个URL,直到成功。 参数: - use_proxy: 是否使用在类中定义的默认代理。 - proxies: 手动指定的代理,会覆盖默认代理。 - proxy: 单个代理,用于兼容旧版本,不再使用 - headers: 需要合并到客户端的自定义请求头。 - verify: 是否验证 SSL 证书。 - **kwargs: 其他所有传递给 httpx.AsyncClient 的参数。 - - 返回: - AsyncGenerator[httpx.AsyncClient, None]: 生成器。 + urls: 单个URL或URL列表。 + worker: 一个接受单个URL和其他kwargs并执行请求的协程函数。 + client: 可选的HTTP客户端。 + **kwargs: 传递给worker的额外参数。 """ - proxies_to_use = proxies or (cls.default_proxy if use_proxy else None) + url_list = [urls] if isinstance(urls, str) else urls + exceptions = [] - final_headers = get_user_agent() - if headers: - final_headers.update(headers) + for i, url in enumerate(url_list): + try: + result = await worker(url, client=client, **kwargs) + if i > 0: + logger.info( + f"成功从镜像 '{url}' 获取资源 " + f"(在尝试了 {i} 个失败的镜像之后)。", + "AsyncHttpx:FallbackExecutor", + ) + return result + except Exception as e: + exceptions.append(e) + if url != url_list[-1]: + logger.warning( + f"Worker '{worker.__name__}' on {url} failed, trying next. " + f"Error: {e.__class__.__name__}", + "AsyncHttpx:FallbackExecutor", + ) - async with get_async_client( - proxies=proxies_to_use, - proxy=proxy, - verify=verify, - headers=final_headers, - **kwargs, - ) as client: - yield client + raise AllURIsFailedError(url_list, exceptions) @classmethod async def get( cls, url: str | list[str], *, + follow_redirects: bool = True, check_status_code: int | None = None, + client: AsyncClient | None = None, **kwargs, - ) -> Response: # sourcery skip: use-assigned-variable + ) -> Response: """发送 GET 请求,并返回第一个成功的响应。 说明: - 本方法是 httpx.get 的高级包装,增加了多链接尝试、自动重试和统一的代理管理。 - 如果提供 URL 列表,它将依次尝试直到成功为止。 + 本方法是 httpx.get 的高级包装,增加了多链接尝试、自动重试和统一的 + 客户端管理。如果提供 URL 列表,它将依次尝试直到成功为止。 + + 用法建议: + - **常规使用**: `await AsyncHttpx.get(url)` 将使用全局客户端。 + - **单次覆盖配置**: `await AsyncHttpx.get(url, timeout=5, proxies=None)` + 将为本次请求创建一个独立的临时客户端。 参数: url: 单个请求 URL 或一个 URL 列表。 + follow_redirects: 是否跟随重定向。 check_status_code: (可选) 若提供,将检查响应状态码是否匹配,否则抛出异常。 - **kwargs: 其他所有传递给 httpx.get 的参数 - (如 `params`, `headers`, `timeout`等)。 + client: (可选) 指定一个活动的HTTP客户端实例。若提供,则忽略 + `**kwargs`中的客户端配置。 + **kwargs: 其他所有传递给 httpx.get 的参数 (如 `params`, `headers`, + `timeout`)。如果包含 `proxies`, `verify` 等客户端配置参数, + 将创建一个临时客户端。 返回: - Response: Response + Response: httpx 的响应对象。 + + Raises: + AllURIsFailedError: 当所有提供的URL都请求失败时抛出。 """ - urls = [url] if isinstance(url, str) else url - last_exception = None - for current_url in urls: - try: - logger.info(f"开始获取 {current_url}..") - client_kwargs = {k: v for k, v in kwargs.items() if k in CLIENT_KEY} - for key in CLIENT_KEY: - kwargs.pop(key, None) - async with cls._create_client(**client_kwargs) as client: - response = await client.get(current_url, **kwargs) - if check_status_code and response.status_code != check_status_code: - raise HTTPStatusError( - f"状态码错误: {response.status_code}!={check_status_code}", - request=response.request, - response=response, - ) - return response - except Exception as e: - last_exception = e - if current_url != urls[-1]: - logger.warning(f"获取 {current_url} 失败, 尝试下一个", e=e) + async def worker(current_url: str, **worker_kwargs) -> Response: + logger.info(f"开始获取 {current_url}..", "AsyncHttpx:get") + response = await cls._single_request( + "GET", current_url, follow_redirects=follow_redirects, **worker_kwargs + ) + if check_status_code and response.status_code != check_status_code: + raise HTTPStatusError( + f"状态码错误: {response.status_code}!={check_status_code}", + request=response.request, + response=response, + ) + return response - raise last_exception or Exception("所有URL都获取失败") + return await cls._execute_with_fallbacks(url, worker, client=client, **kwargs) @classmethod - async def head(cls, url: str, **kwargs) -> Response: - """发送 HEAD 请求。 + async def head( + cls, url: str | list[str], *, client: AsyncClient | None = None, **kwargs + ) -> Response: + """发送 HEAD 请求,并返回第一个成功的响应。""" - 说明: - 本方法是对 httpx.head 的封装,通常用于检查资源的元信息(如大小、类型)。 + async def worker(current_url: str, **worker_kwargs) -> Response: + return await cls._single_request("HEAD", current_url, **worker_kwargs) - 参数: - url: 请求的 URL。 - **kwargs: 其他所有传递给 httpx.head 的参数 - (如 `headers`, `timeout`, `allow_redirects`)。 - - 返回: - Response: Response - """ - client_kwargs = {k: v for k, v in kwargs.items() if k in CLIENT_KEY} - for key in CLIENT_KEY: - kwargs.pop(key, None) - async with cls._create_client(**client_kwargs) as client: - return await client.head(url, **kwargs) + return await cls._execute_with_fallbacks(url, worker, client=client, **kwargs) @classmethod - async def post(cls, url: str, **kwargs) -> Response: - """发送 POST 请求。 + async def post( + cls, url: str | list[str], *, client: AsyncClient | None = None, **kwargs + ) -> Response: + """发送 POST 请求,并返回第一个成功的响应。""" - 说明: - 本方法是对 httpx.post 的封装,提供了统一的代理和客户端管理。 + async def worker(current_url: str, **worker_kwargs) -> Response: + return await cls._single_request("POST", current_url, **worker_kwargs) - 参数: - url: 请求的 URL。 - **kwargs: 其他所有传递给 httpx.post 的参数 - (如 `data`, `json`, `content` 等)。 - - 返回: - Response: Response。 - """ - client_kwargs = {k: v for k, v in kwargs.items() if k in CLIENT_KEY} - for key in CLIENT_KEY: - kwargs.pop(key, None) - async with cls._create_client(**client_kwargs) as client: - return await client.post(url, **kwargs) + return await cls._execute_with_fallbacks(url, worker, client=client, **kwargs) @classmethod - async def get_content(cls, url: str, **kwargs) -> bytes: - """获取指定 URL 的二进制内容。 - - 说明: - 这是一个便捷方法,等同于调用 get() 后再访问 .content 属性。 - - 参数: - url: 请求的 URL。 - **kwargs: 所有传递给 get() 方法的参数。 - - 返回: - bytes: 响应内容的二进制字节流 (bytes)。 - """ - res = await cls.get(url, **kwargs) + async def get_content( + cls, url: str | list[str], *, client: AsyncClient | None = None, **kwargs + ) -> bytes: + """获取指定 URL 的二进制内容。""" + res = await cls.get(url, client=client, **kwargs) return res.content + @classmethod + @Retry.api( + log_name="JSON请求", + exception=(json.JSONDecodeError,), + return_on_failure=_SENTINEL, + ) + async def _request_and_parse_json( + cls, method: str, url: str, *, client: AsyncClient | None = None, **kwargs + ) -> Any: + """ + [私有] 执行单个HTTP请求并解析JSON,用于内部统一处理。 + """ + async with cls._get_active_client_context( + client=client, **kwargs + ) as active_client: + _, request_kwargs = cls._split_kwargs(kwargs) + response = await active_client.request(method, url, **request_kwargs) + response.raise_for_status() + return response.json() + + @classmethod + async def get_json( + cls, + url: str | list[str], + *, + default: Any = None, + raise_on_failure: bool = False, + client: AsyncClient | None = None, + **kwargs, + ) -> Any: + """ + 发送GET请求并自动解析为JSON,支持重试和多链接尝试。 + + 说明: + 这是一个高度便捷的方法,封装了请求、重试、JSON解析和错误处理。 + 它会在网络错误或JSON解析错误时自动重试。 + 如果所有尝试都失败,它会安全地返回一个默认值。 + + 参数: + url: 单个请求 URL 或一个备用 URL 列表。 + default: (可选) 当所有尝试都失败时返回的默认值,默认为None。 + raise_on_failure: (可选) 如果为 True, 当所有尝试失败时将抛出 + `AllURIsFailedError` 异常, 默认为 False. + client: (可选) 指定的HTTP客户端。 + **kwargs: 其他所有传递给 httpx.get 的参数。 + 例如 `params`, `headers`, `timeout`等。 + + 返回: + Any: 解析后的JSON数据,或在失败时返回 `default` 值。 + + Raises: + AllURIsFailedError: 当 `raise_on_failure` 为 True 且所有URL都请求失败时抛出 + """ + + async def worker(current_url: str, **worker_kwargs): + logger.debug(f"开始GET JSON: {current_url}", "AsyncHttpx:get_json") + return await cls._request_and_parse_json( + "GET", current_url, **worker_kwargs + ) + + try: + result = await cls._execute_with_fallbacks( + url, worker, client=client, **kwargs + ) + return default if result is _SENTINEL else result + except AllURIsFailedError as e: + logger.error(f"所有URL的JSON GET均失败: {e}", "AsyncHttpx:get_json") + if raise_on_failure: + raise e + return default + + @classmethod + async def post_json( + cls, + url: str | list[str], + *, + json: Any = None, + data: Any = None, + default: Any = None, + raise_on_failure: bool = False, + client: AsyncClient | None = None, + **kwargs, + ) -> Any: + """ + 发送POST请求并自动解析为JSON,功能与 get_json 类似。 + + 参数: + url: 单个请求 URL 或一个备用 URL 列表。 + json: (可选) 作为请求体发送的JSON数据。 + data: (可选) 作为请求体发送的表单数据。 + default: (可选) 当所有尝试都失败时返回的默认值,默认为None。 + raise_on_failure: (可选) 如果为 True, 当所有尝试失败时将抛出 + AllURIsFailedError 异常, 默认为 False. + client: (可选) 指定的HTTP客户端。 + **kwargs: 其他所有传递给 httpx.post 的参数。 + + 返回: + Any: 解析后的JSON数据,或在失败时返回 `default` 值。 + """ + if json is not None: + kwargs["json"] = json + if data is not None: + kwargs["data"] = data + + async def worker(current_url: str, **worker_kwargs): + logger.debug(f"开始POST JSON: {current_url}", "AsyncHttpx:post_json") + return await cls._request_and_parse_json( + "POST", current_url, **worker_kwargs + ) + + try: + result = await cls._execute_with_fallbacks( + url, worker, client=client, **kwargs + ) + return default if result is _SENTINEL else result + except AllURIsFailedError as e: + logger.error(f"所有URL的JSON POST均失败: {e}", "AsyncHttpx:post_json") + if raise_on_failure: + raise e + return default + + @classmethod + @Retry.api(log_name="文件下载(流式)") + async def _stream_download( + cls, url: str, path: Path, *, client: AsyncClient | None = None, **kwargs + ) -> None: + """ + 执行单个流式下载的私有方法,被重试装饰器包裹。 + """ + async with cls._get_active_client_context( + client=client, **kwargs + ) as active_client: + async with active_client.stream("GET", url, **kwargs) as response: + response.raise_for_status() + total = int(response.headers.get("Content-Length", 0)) + + with Progress( + TextColumn(path.name), + "[progress.percentage]{task.percentage:>3.0f}%", + BarColumn(bar_width=None), + DownloadColumn(), + TransferSpeedColumn(), + ) as progress: + task_id = progress.add_task("Download", total=total) + async with aiofiles.open(path, "wb") as f: + async for chunk in response.aiter_bytes(): + await f.write(chunk) + progress.update(task_id, advance=len(chunk)) + @classmethod async def download_file( cls, @@ -228,6 +517,7 @@ class AsyncHttpx: path: str | Path, *, stream: bool = False, + client: AsyncClient | None = None, **kwargs, ) -> bool: """下载文件到指定路径。 @@ -239,6 +529,7 @@ class AsyncHttpx: url: 单个文件 URL 或一个备用 URL 列表。 path: 文件保存的本地路径。 stream: (可选) 是否使用流式下载,适用于大文件,默认为 False。 + client: (可选) 指定的HTTP客户端。 **kwargs: 其他所有传递给 get() 方法或 httpx.stream() 的参数。 返回: @@ -247,49 +538,29 @@ class AsyncHttpx: path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) - urls = [url] if isinstance(url, str) else url + async def worker(current_url: str, **worker_kwargs) -> bool: + if not stream: + content = await cls.get_content(current_url, **worker_kwargs) + async with aiofiles.open(path, "wb") as f: + await f.write(content) + else: + await cls._stream_download(current_url, path, **worker_kwargs) - for current_url in urls: - try: - if not stream: - response = await cls.get(current_url, **kwargs) - response.raise_for_status() - async with aiofiles.open(path, "wb") as f: - await f.write(response.content) - else: - async with cls._create_client(**kwargs) as client: - stream_kwargs = { - k: v - for k, v in kwargs.items() - if k not in ["use_proxy", "proxy", "verify"] - } - async with client.stream( - "GET", current_url, **stream_kwargs - ) as response: - response.raise_for_status() - total = int(response.headers.get("Content-Length", 0)) + logger.info( + f"下载 {current_url} 成功 -> {path.absolute()}", + "AsyncHttpx:download", + ) + return True - with Progress( - TextColumn(path.name), - "[progress.percentage]{task.percentage:>3.0f}%", - BarColumn(bar_width=None), - DownloadColumn(), - TransferSpeedColumn(), - ) as progress: - task_id = progress.add_task("Download", total=total) - async with aiofiles.open(path, "wb") as f: - async for chunk in response.aiter_bytes(): - await f.write(chunk) - progress.update(task_id, advance=len(chunk)) - - logger.info(f"下载 {current_url} 成功 -> {path.absolute()}") - return True - - except Exception as e: - logger.warning(f"下载 {current_url} 失败,尝试下一个。错误: {e}") - - logger.error(f"所有URL {urls} 下载均失败 -> {path.absolute()}") - return False + try: + return await cls._execute_with_fallbacks( + url, worker, client=client, **kwargs + ) + except AllURIsFailedError: + logger.error( + f"所有URL下载均失败 -> {path.absolute()}", "AsyncHttpx:download" + ) + return False @classmethod async def gather_download_file( @@ -346,7 +617,6 @@ class AsyncHttpx: logger.error(f"并发下载任务 ({url_info}) 时发生错误", e=result) final_results.append(False) else: - # download_file 返回的是 bool,可以直接附加 final_results.append(cast(bool, result)) return final_results @@ -395,86 +665,30 @@ class AsyncHttpx: _results = sorted(iter(_results), key=lambda r: r["elapsed_time"]) return [result["url"] for result in _results] - -class AsyncPlaywright: @classmethod @asynccontextmanager - async def new_page( - cls, cookies: list[dict[str, Any]] | dict[str, Any] | None = None, **kwargs - ) -> AsyncGenerator[Page, None]: - """获取一个新页面 + async def temporary_client(cls, **kwargs) -> AsyncGenerator[AsyncClient, None]: + """ + 创建一个临时的、可配置的HTTP客户端上下文,并直接返回该客户端实例。 + + 此方法返回一个标准的 `httpx.AsyncClient`,它不使用全局连接池, + 拥有独立的配置(如代理、headers、超时等),并在退出上下文后自动关闭。 + 适用于需要用一套特殊网络配置执行一系列请求的场景。 + + 用法: + async with AsyncHttpx.temporary_client(proxies=None, timeout=5) as client: + # client 是一个标准的 httpx.AsyncClient 实例 + response1 = await client.get("http://some.internal.api/1") + response2 = await client.get("http://some.internal.api/2") + data = response2.json() 参数: - cookies: cookies + **kwargs: 所有传递给 `httpx.AsyncClient` 构造函数的参数。 + 例如: `proxies`, `headers`, `verify`, `timeout`, + `follow_redirects`。 + + Yields: + httpx.AsyncClient: 一个配置好的、临时的客户端实例。 """ - browser = await get_browser() - ctx = await browser.new_context(**kwargs) - if cookies: - if isinstance(cookies, dict): - cookies = [cookies] - await ctx.add_cookies(cookies) # type: ignore - page = await ctx.new_page() - try: - yield page - finally: - await page.close() - await ctx.close() - - @classmethod - async def screenshot( - cls, - url: str, - path: Path | str, - element: str | list[str], - *, - wait_time: int | None = None, - viewport_size: dict[str, int] | None = None, - wait_until: ( - Literal["domcontentloaded", "load", "networkidle"] | None - ) = "networkidle", - timeout: float | None = None, - type_: Literal["jpeg", "png"] | None = None, - user_agent: str | None = None, - cookies: list[dict[str, Any]] | dict[str, Any] | None = None, - **kwargs, - ) -> UniMessage | None: - """截图,该方法仅用于简单快捷截图,复杂截图请操作 page - - 参数: - url: 网址 - path: 存储路径 - element: 元素选择 - wait_time: 等待截取超时时间 - viewport_size: 窗口大小 - wait_until: 等待类型 - timeout: 超时限制 - type_: 保存类型 - user_agent: user_agent - cookies: cookies - """ - if viewport_size is None: - viewport_size = {"width": 2560, "height": 1080} - if isinstance(path, str): - path = Path(path) - wait_time = wait_time * 1000 if wait_time else None - element_list = [element] if isinstance(element, str) else element - async with cls.new_page( - cookies, - viewport=viewport_size, - user_agent=user_agent, - **kwargs, - ) as page: - await page.goto(url, timeout=timeout, wait_until=wait_until) - card = page - for e in element_list: - if not card: - return None - card = await card.wait_for_selector(e, timeout=wait_time) - if card: - await card.screenshot(path=path, timeout=timeout, type=type_) - return MessageUtils.build_message(path) - return None - - -class BrowserIsNone(Exception): - pass + async with get_async_client(**kwargs) as client: + yield client