zhenxun_bot/zhenxun/utils/http_utils.py
Rumio 1c5f66beee
feat(http_utils): 重构网络请求工具链,增强稳定性与易用性 (#1951)
*  feat(http_utils): 重构网络请求工具链,增强稳定性与易用性

🔧 HTTP工具优化:
  • 全局httpx.AsyncClient管理,提升连接复用效率
  • AsyncHttpx类重构,支持临时客户端和配置覆盖
  • 新增JSON请求方法(get_json/post_json),内置重试机制
  • 兼容httpx>=0.28.0版本

🔄 重试机制升级:
  • Retry装饰器重构,提供simple/api/download预设
  • 支持指数退避、条件重试和自定义失败处理
  • 扩展异常覆盖范围,提升网络容错能力

🏗️ 架构改进:
  • 新增AllURIsFailedError统一异常处理
  • 浏览器工具模块化,提升代码组织性

* 🚨 auto fix by pre-commit hooks

* 🎨 代码格式化

* 🐛 测试修复

---------

Co-authored-by: webjoin111 <455457521@qq.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: HibiKier <45528451+HibiKier@users.noreply.github.com>
Co-authored-by: HibiKier <775757368@qq.com>
2025-07-03 17:39:13 +08:00

695 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
from collections.abc import AsyncGenerator, Awaitable, Callable, Sequence
from contextlib import asynccontextmanager
import os
from pathlib import Path
import time
from typing import Any, ClassVar, cast
import aiofiles
import httpx
from httpx import AsyncClient, AsyncHTTPTransport, HTTPStatusError, Proxy, Response
import nonebot
from rich.progress import (
BarColumn,
DownloadColumn,
Progress,
TextColumn,
TransferSpeedColumn,
)
import ujson as json
from zhenxun.configs.config import BotConfig
from zhenxun.services.log import logger
from zhenxun.utils.decorator.retry import Retry
from zhenxun.utils.exception import AllURIsFailedError
from zhenxun.utils.manager.priority_manager import PriorityLifecycle
from zhenxun.utils.user_agent import get_user_agent
from .browser import AsyncPlaywright, BrowserIsNone # noqa: F401
_SENTINEL = object()
driver = nonebot.get_driver()
_client: AsyncClient | None = None
@PriorityLifecycle.on_startup(priority=0)
async def _():
"""
在Bot启动时初始化全局httpx客户端。
"""
global _client
client_kwargs = {}
if proxy_url := BotConfig.system_proxy or None:
try:
version_parts = httpx.__version__.split(".")
major = int("".join(c for c in version_parts[0] if c.isdigit()))
minor = (
int("".join(c for c in version_parts[1] if c.isdigit()))
if len(version_parts) > 1
else 0
)
if (major, minor) >= (0, 28):
client_kwargs["proxy"] = proxy_url
else:
client_kwargs["proxies"] = proxy_url
except (ValueError, IndexError):
client_kwargs["proxy"] = proxy_url
logger.warning(
f"无法解析 httpx 版本 '{httpx.__version__}'"
"将默认使用新版 'proxy' 参数语法。"
)
_client = httpx.AsyncClient(
headers=get_user_agent(),
follow_redirects=True,
**client_kwargs,
)
logger.info("全局 httpx.AsyncClient 已启动。", "HTTPClient")
@driver.on_shutdown
async def _():
"""
在Bot关闭时关闭全局httpx客户端。
"""
if _client:
await _client.aclose()
logger.info("全局 httpx.AsyncClient 已关闭。", "HTTPClient")
def get_client() -> AsyncClient:
"""
获取全局 httpx.AsyncClient 实例。
"""
global _client
if not _client:
if not os.environ.get("PYTEST_CURRENT_TEST"):
raise RuntimeError("全局 httpx.AsyncClient 未初始化,请检查启动流程。")
# 在测试环境中创建临时客户端
logger.warning("在测试环境中创建临时HTTP客户端", "HTTPClient")
_client = httpx.AsyncClient(
headers=get_user_agent(),
follow_redirects=True,
)
return _client
def get_async_client(
proxies: dict[str, str] | None = None,
proxy: str | None = None,
verify: bool = False,
**kwargs,
) -> httpx.AsyncClient:
"""
[向后兼容] 创建 httpx.AsyncClient 实例的工厂函数。
此函数完全保留了旧版本的接口,确保现有代码无需修改即可使用。
"""
transport = kwargs.pop("transport", None) or AsyncHTTPTransport(verify=verify)
if proxies:
http_proxy = proxies.get("http://")
https_proxy = proxies.get("https://")
return httpx.AsyncClient(
mounts={
"http://": AsyncHTTPTransport(
proxy=Proxy(http_proxy) if http_proxy else None
),
"https://": AsyncHTTPTransport(
proxy=Proxy(https_proxy) if https_proxy else None
),
},
transport=transport,
**kwargs,
)
elif proxy:
return httpx.AsyncClient(
mounts={
"http://": AsyncHTTPTransport(proxy=Proxy(proxy)),
"https://": AsyncHTTPTransport(proxy=Proxy(proxy)),
},
transport=transport,
**kwargs,
)
return httpx.AsyncClient(transport=transport, **kwargs)
class AsyncHttpx:
"""
一个高级的、健壮的异步HTTP客户端工具类。
设计理念:
- **全局共享客户端**: 默认情况下,所有请求都通过一个在应用启动时初始化的全局
`httpx.AsyncClient` 实例发出。这个实例共享连接池,提高了效率和性能。
- **向后兼容与灵活性**: 完全兼容旧的API同时提供了两种方式来处理需要
特殊网络配置(如不同代理、超时)的请求:
1. **单次请求覆盖**: 在调用 `get`, `post` 等方法时,直接传入 `proxies`,
`timeout` 等参数,将为该次请求创建一个临时的、独立的客户端。
2. **临时客户端上下文**: 使用 `temporary_client()` 上下文管理器,可以
获取一个独立的、可配置的客户端,用于执行一系列需要相同特殊配置的请求。
- **健壮性**: 内置了自动重试、多镜像URL回退fallback机制并提供了便捷的
JSON解析和文件下载方法。
"""
CLIENT_KEY: ClassVar[list[str]] = [
"use_proxy",
"proxies",
"proxy",
"verify",
"headers",
]
default_proxy: ClassVar[dict[str, str] | None] = (
{
"http://": BotConfig.system_proxy,
"https://": BotConfig.system_proxy,
}
if BotConfig.system_proxy
else None
)
@classmethod
def _prepare_temporary_client_config(cls, client_kwargs: dict) -> dict:
"""
[向后兼容] 处理旧式的客户端kwargs将其转换为get_async_client可用的配置。
主要负责处理 use_proxy 标志,这是为了兼容旧版本代码中使用的 use_proxy 参数。
"""
final_config = client_kwargs.copy()
use_proxy = final_config.pop("use_proxy", True)
if "proxies" not in final_config and "proxy" not in final_config:
final_config["proxies"] = cls.default_proxy if use_proxy else None
return final_config
@classmethod
def _split_kwargs(cls, kwargs: dict) -> tuple[dict, dict]:
"""[优化] 分离客户端配置和请求参数,使逻辑更清晰。"""
client_kwargs = {k: v for k, v in kwargs.items() if k in cls.CLIENT_KEY}
request_kwargs = {k: v for k, v in kwargs.items() if k not in cls.CLIENT_KEY}
return client_kwargs, request_kwargs
@classmethod
@asynccontextmanager
async def _get_active_client_context(
cls, client: AsyncClient | None = None, **kwargs
) -> AsyncGenerator[AsyncClient, None]:
"""
内部辅助方法,根据 kwargs 决定并提供一个活动的 HTTP 客户端。
- 如果 kwargs 中有客户端配置,则创建并返回一个临时客户端。
- 否则,返回传入的 client 或全局客户端。
- 自动处理临时客户端的关闭。
"""
if kwargs:
logger.debug(f"为单次请求创建临时客户端,配置: {kwargs}")
temp_client_config = cls._prepare_temporary_client_config(kwargs)
async with get_async_client(**temp_client_config) as temp_client:
yield temp_client
else:
yield client or get_client()
@Retry.simple(log_name="内部HTTP请求")
async def _execute_request_inner(
self, client: AsyncClient, method: str, url: str, **kwargs
) -> Response:
"""
[内部] 执行单次HTTP请求的私有核心方法被重试装饰器包裹。
"""
return await client.request(method, url, **kwargs)
@classmethod
async def _single_request(
cls, method: str, url: str, *, client: AsyncClient | None = None, **kwargs
) -> Response:
"""
执行单次HTTP请求的私有方法内置了默认的重试逻辑。
"""
client_kwargs, request_kwargs = cls._split_kwargs(kwargs)
async with cls._get_active_client_context(
client=client, **client_kwargs
) as active_client:
response = await cls()._execute_request_inner(
active_client, method, url, **request_kwargs
)
response.raise_for_status()
return response
@classmethod
async def _execute_with_fallbacks(
cls,
urls: str | list[str],
worker: Callable[..., Awaitable[Any]],
*,
client: AsyncClient | None = None,
**kwargs,
) -> Any:
"""
通用执行器按顺序尝试多个URL直到成功。
参数:
urls: 单个URL或URL列表。
worker: 一个接受单个URL和其他kwargs并执行请求的协程函数。
client: 可选的HTTP客户端。
**kwargs: 传递给worker的额外参数。
"""
url_list = [urls] if isinstance(urls, str) else urls
exceptions = []
for i, url in enumerate(url_list):
try:
result = await worker(url, client=client, **kwargs)
if i > 0:
logger.info(
f"成功从镜像 '{url}' 获取资源 "
f"(在尝试了 {i} 个失败的镜像之后)。",
"AsyncHttpx:FallbackExecutor",
)
return result
except Exception as e:
exceptions.append(e)
if url != url_list[-1]:
logger.warning(
f"Worker '{worker.__name__}' on {url} failed, trying next. "
f"Error: {e.__class__.__name__}",
"AsyncHttpx:FallbackExecutor",
)
raise AllURIsFailedError(url_list, exceptions)
@classmethod
async def get(
cls,
url: str | list[str],
*,
follow_redirects: bool = True,
check_status_code: int | None = None,
client: AsyncClient | None = None,
**kwargs,
) -> Response:
"""发送 GET 请求,并返回第一个成功的响应。
说明:
本方法是 httpx.get 的高级包装,增加了多链接尝试、自动重试和统一的
客户端管理。如果提供 URL 列表,它将依次尝试直到成功为止。
用法建议:
- **常规使用**: `await AsyncHttpx.get(url)` 将使用全局客户端。
- **单次覆盖配置**: `await AsyncHttpx.get(url, timeout=5, proxies=None)`
将为本次请求创建一个独立的临时客户端。
参数:
url: 单个请求 URL 或一个 URL 列表。
follow_redirects: 是否跟随重定向。
check_status_code: (可选) 若提供,将检查响应状态码是否匹配,否则抛出异常。
client: (可选) 指定一个活动的HTTP客户端实例。若提供则忽略
`**kwargs`中的客户端配置。
**kwargs: 其他所有传递给 httpx.get 的参数 (如 `params`, `headers`,
`timeout`)。如果包含 `proxies`, `verify` 等客户端配置参数,
将创建一个临时客户端。
返回:
Response: httpx 的响应对象。
Raises:
AllURIsFailedError: 当所有提供的URL都请求失败时抛出。
"""
async def worker(current_url: str, **worker_kwargs) -> Response:
logger.info(f"开始获取 {current_url}..", "AsyncHttpx:get")
response = await cls._single_request(
"GET", current_url, follow_redirects=follow_redirects, **worker_kwargs
)
if check_status_code and response.status_code != check_status_code:
raise HTTPStatusError(
f"状态码错误: {response.status_code}!={check_status_code}",
request=response.request,
response=response,
)
return response
return await cls._execute_with_fallbacks(url, worker, client=client, **kwargs)
@classmethod
async def head(
cls, url: str | list[str], *, client: AsyncClient | None = None, **kwargs
) -> Response:
"""发送 HEAD 请求,并返回第一个成功的响应。"""
async def worker(current_url: str, **worker_kwargs) -> Response:
return await cls._single_request("HEAD", current_url, **worker_kwargs)
return await cls._execute_with_fallbacks(url, worker, client=client, **kwargs)
@classmethod
async def post(
cls, url: str | list[str], *, client: AsyncClient | None = None, **kwargs
) -> Response:
"""发送 POST 请求,并返回第一个成功的响应。"""
async def worker(current_url: str, **worker_kwargs) -> Response:
return await cls._single_request("POST", current_url, **worker_kwargs)
return await cls._execute_with_fallbacks(url, worker, client=client, **kwargs)
@classmethod
async def get_content(
cls, url: str | list[str], *, client: AsyncClient | None = None, **kwargs
) -> bytes:
"""获取指定 URL 的二进制内容。"""
res = await cls.get(url, client=client, **kwargs)
return res.content
@classmethod
@Retry.api(
log_name="JSON请求",
exception=(json.JSONDecodeError,),
return_on_failure=_SENTINEL,
)
async def _request_and_parse_json(
cls, method: str, url: str, *, client: AsyncClient | None = None, **kwargs
) -> Any:
"""
[私有] 执行单个HTTP请求并解析JSON用于内部统一处理。
"""
async with cls._get_active_client_context(
client=client, **kwargs
) as active_client:
_, request_kwargs = cls._split_kwargs(kwargs)
response = await active_client.request(method, url, **request_kwargs)
response.raise_for_status()
return response.json()
@classmethod
async def get_json(
cls,
url: str | list[str],
*,
default: Any = None,
raise_on_failure: bool = False,
client: AsyncClient | None = None,
**kwargs,
) -> Any:
"""
发送GET请求并自动解析为JSON支持重试和多链接尝试。
说明:
这是一个高度便捷的方法封装了请求、重试、JSON解析和错误处理。
它会在网络错误或JSON解析错误时自动重试。
如果所有尝试都失败,它会安全地返回一个默认值。
参数:
url: 单个请求 URL 或一个备用 URL 列表。
default: (可选) 当所有尝试都失败时返回的默认值默认为None。
raise_on_failure: (可选) 如果为 True, 当所有尝试失败时将抛出
`AllURIsFailedError` 异常, 默认为 False.
client: (可选) 指定的HTTP客户端。
**kwargs: 其他所有传递给 httpx.get 的参数。
例如 `params`, `headers`, `timeout`等。
返回:
Any: 解析后的JSON数据或在失败时返回 `default` 值。
Raises:
AllURIsFailedError: 当 `raise_on_failure` 为 True 且所有URL都请求失败时抛出
"""
async def worker(current_url: str, **worker_kwargs):
logger.debug(f"开始GET JSON: {current_url}", "AsyncHttpx:get_json")
return await cls._request_and_parse_json(
"GET", current_url, **worker_kwargs
)
try:
result = await cls._execute_with_fallbacks(
url, worker, client=client, **kwargs
)
return default if result is _SENTINEL else result
except AllURIsFailedError as e:
logger.error(f"所有URL的JSON GET均失败: {e}", "AsyncHttpx:get_json")
if raise_on_failure:
raise e
return default
@classmethod
async def post_json(
cls,
url: str | list[str],
*,
json: Any = None,
data: Any = None,
default: Any = None,
raise_on_failure: bool = False,
client: AsyncClient | None = None,
**kwargs,
) -> Any:
"""
发送POST请求并自动解析为JSON功能与 get_json 类似。
参数:
url: 单个请求 URL 或一个备用 URL 列表。
json: (可选) 作为请求体发送的JSON数据。
data: (可选) 作为请求体发送的表单数据。
default: (可选) 当所有尝试都失败时返回的默认值默认为None。
raise_on_failure: (可选) 如果为 True, 当所有尝试失败时将抛出
AllURIsFailedError 异常, 默认为 False.
client: (可选) 指定的HTTP客户端。
**kwargs: 其他所有传递给 httpx.post 的参数。
返回:
Any: 解析后的JSON数据或在失败时返回 `default` 值。
"""
if json is not None:
kwargs["json"] = json
if data is not None:
kwargs["data"] = data
async def worker(current_url: str, **worker_kwargs):
logger.debug(f"开始POST JSON: {current_url}", "AsyncHttpx:post_json")
return await cls._request_and_parse_json(
"POST", current_url, **worker_kwargs
)
try:
result = await cls._execute_with_fallbacks(
url, worker, client=client, **kwargs
)
return default if result is _SENTINEL else result
except AllURIsFailedError as e:
logger.error(f"所有URL的JSON POST均失败: {e}", "AsyncHttpx:post_json")
if raise_on_failure:
raise e
return default
@classmethod
@Retry.api(log_name="文件下载(流式)")
async def _stream_download(
cls, url: str, path: Path, *, client: AsyncClient | None = None, **kwargs
) -> None:
"""
执行单个流式下载的私有方法,被重试装饰器包裹。
"""
async with cls._get_active_client_context(
client=client, **kwargs
) as active_client:
async with active_client.stream("GET", url, **kwargs) as response:
response.raise_for_status()
total = int(response.headers.get("Content-Length", 0))
with Progress(
TextColumn(path.name),
"[progress.percentage]{task.percentage:>3.0f}%",
BarColumn(bar_width=None),
DownloadColumn(),
TransferSpeedColumn(),
) as progress:
task_id = progress.add_task("Download", total=total)
async with aiofiles.open(path, "wb") as f:
async for chunk in response.aiter_bytes():
await f.write(chunk)
progress.update(task_id, advance=len(chunk))
@classmethod
async def download_file(
cls,
url: str | list[str],
path: str | Path,
*,
stream: bool = False,
client: AsyncClient | None = None,
**kwargs,
) -> bool:
"""下载文件到指定路径。
说明:
支持多链接尝试和流式下载(带进度条)。
参数:
url: 单个文件 URL 或一个备用 URL 列表。
path: 文件保存的本地路径。
stream: (可选) 是否使用流式下载,适用于大文件,默认为 False。
client: (可选) 指定的HTTP客户端。
**kwargs: 其他所有传递给 get() 方法或 httpx.stream() 的参数。
返回:
bool: 是否下载成功。
"""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
async def worker(current_url: str, **worker_kwargs) -> bool:
if not stream:
content = await cls.get_content(current_url, **worker_kwargs)
async with aiofiles.open(path, "wb") as f:
await f.write(content)
else:
await cls._stream_download(current_url, path, **worker_kwargs)
logger.info(
f"下载 {current_url} 成功 -> {path.absolute()}",
"AsyncHttpx:download",
)
return True
try:
return await cls._execute_with_fallbacks(
url, worker, client=client, **kwargs
)
except AllURIsFailedError:
logger.error(
f"所有URL下载均失败 -> {path.absolute()}", "AsyncHttpx:download"
)
return False
@classmethod
async def gather_download_file(
cls,
url_list: Sequence[list[str] | str],
path_list: Sequence[str | Path],
*,
limit_async_number: int = 5,
**kwargs,
) -> list[bool]:
"""并发下载多个文件,支持为每个文件提供备用镜像链接。
说明:
使用 asyncio.Semaphore 来控制并发请求的数量。
对于 url_list 中的每个元素,如果它是一个列表,则会依次尝试直到下载成功。
参数:
url_list: 包含所有文件下载任务的列表。每个元素可以是:
- 一个字符串 (str): 代表该任务的唯一URL。
- 一个字符串列表 (list[str]): 代表该任务的多个备用/镜像URL。
path_list: 与 url_list 对应的文件保存路径列表。
limit_async_number: (可选) 最大并发下载数,默认为 5。
**kwargs: 其他所有传递给 download_file() 方法的参数。
返回:
list[bool]: 对应每个下载任务是否成功。
"""
if len(url_list) != len(path_list):
raise ValueError("URL 列表和路径列表的长度必须相等")
semaphore = asyncio.Semaphore(limit_async_number)
async def _download_with_semaphore(
urls_for_one_path: str | list[str], path: str | Path
):
async with semaphore:
return await cls.download_file(urls_for_one_path, path, **kwargs)
tasks = [
_download_with_semaphore(url_group, path)
for url_group, path in zip(url_list, path_list)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
final_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
url_info = (
url_list[i]
if isinstance(url_list[i], str)
else ", ".join(url_list[i])
)
logger.error(f"并发下载任务 ({url_info}) 时发生错误", e=result)
final_results.append(False)
else:
final_results.append(cast(bool, result))
return final_results
@classmethod
async def get_fastest_mirror(cls, url_list: list[str]) -> list[str]:
"""测试并返回最快的镜像地址。
说明:
通过并发发送 HEAD 请求来测试每个 URL 的响应时间和可用性,并按响应速度排序。
参数:
url_list: 需要测试的镜像 URL 列表。
返回:
list[str]: 按从快到慢的顺序包含了所有可用的 URL。
"""
assert url_list
async def head_mirror(client: type[AsyncHttpx], url: str) -> dict[str, Any]:
begin_time = time.time()
response = await client.head(url=url, timeout=6)
elapsed_time = (time.time() - begin_time) * 1000
content_length = int(response.headers.get("content-length", 0))
return {
"url": url,
"elapsed_time": elapsed_time,
"content_length": content_length,
}
logger.debug(f"开始获取最快镜像,可能需要一段时间... | URL列表{url_list}")
results = await asyncio.gather(
*(head_mirror(cls, url) for url in url_list),
return_exceptions=True,
)
_results: list[dict[str, Any]] = []
for result in results:
if isinstance(result, BaseException):
logger.warning(f"获取镜像失败,错误:{result}")
else:
logger.debug(f"获取镜像成功,结果:{result}")
_results.append(result)
_results = sorted(iter(_results), key=lambda r: r["elapsed_time"])
return [result["url"] for result in _results]
@classmethod
@asynccontextmanager
async def temporary_client(cls, **kwargs) -> AsyncGenerator[AsyncClient, None]:
"""
创建一个临时的、可配置的HTTP客户端上下文并直接返回该客户端实例。
此方法返回一个标准的 `httpx.AsyncClient`,它不使用全局连接池,
拥有独立的配置(如代理、headers、超时等),并在退出上下文后自动关闭。
适用于需要用一套特殊网络配置执行一系列请求的场景。
用法:
async with AsyncHttpx.temporary_client(proxies=None, timeout=5) as client:
# client 是一个标准的 httpx.AsyncClient 实例
response1 = await client.get("http://some.internal.api/1")
response2 = await client.get("http://some.internal.api/2")
data = response2.json()
参数:
**kwargs: 所有传递给 `httpx.AsyncClient` 构造函数的参数。
例如: `proxies`, `headers`, `verify`, `timeout`,
`follow_redirects`。
Yields:
httpx.AsyncClient: 一个配置好的、临时的客户端实例。
"""
async with get_async_client(**kwargs) as client:
yield client