feat(utils): 重构网络请求工具链,增强稳定性与易用性

🔧 HTTP工具优化:
  • 全局httpx.AsyncClient管理,提升连接复用效率
  • AsyncHttpx类重构,支持临时客户端和配置覆盖
  • 新增JSON请求方法(get_json/post_json),内置重试机制
  • 增强文件下载,支持多镜像回退和流式进度条

🔄 重试机制升级:
  • Retry装饰器重构,提供simple/api/download预设
  • 支持指数退避、条件重试和自定义失败处理
  • 扩展异常覆盖范围,提升网络容错能力

🏗️ 架构改进:
  • 新增AllURIsFailedError统一异常处理
  • 浏览器工具模块化,提升代码组织性
This commit is contained in:
webjoin111 2025-06-27 08:55:39 +08:00
parent 8b9ae7255b
commit c9bd957146
4 changed files with 756 additions and 254 deletions

View File

@ -1,12 +1,19 @@
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
import os
from pathlib import Path
import sys
from typing import Any, Literal
from nonebot import get_driver
from nonebot_plugin_alconna import UniMessage
from nonebot_plugin_htmlrender import get_browser
from playwright.__main__ import main
from playwright.async_api import Browser, Playwright, async_playwright
from playwright.async_api import Browser, Page, Playwright, async_playwright
from zhenxun.configs.config import BotConfig
from zhenxun.services.log import logger
from zhenxun.utils.message import MessageUtils
driver = get_driver()
@ -89,3 +96,87 @@ async def check_playwright_env():
await p.chromium.launch()
except Exception as e:
raise ImportError("加载失败Playwright 依赖不全,") from e
class BrowserIsNone(Exception):
pass
class AsyncPlaywright:
@classmethod
@asynccontextmanager
async def new_page(
cls, cookies: list[dict[str, Any]] | dict[str, Any] | None = None, **kwargs
) -> AsyncGenerator[Page, None]:
"""获取一个新页面
参数:
cookies: cookies
"""
browser = await get_browser()
ctx = await browser.new_context(**kwargs)
if cookies:
if isinstance(cookies, dict):
cookies = [cookies]
await ctx.add_cookies(cookies) # type: ignore
page = await ctx.new_page()
try:
yield page
finally:
await page.close()
await ctx.close()
@classmethod
async def screenshot(
cls,
url: str,
path: Path | str,
element: str | list[str],
*,
wait_time: int | None = None,
viewport_size: dict[str, int] | None = None,
wait_until: (
Literal["domcontentloaded", "load", "networkidle"] | None
) = "networkidle",
timeout: float | None = None,
type_: Literal["jpeg", "png"] | None = None,
user_agent: str | None = None,
cookies: list[dict[str, Any]] | dict[str, Any] | None = None,
**kwargs,
) -> UniMessage | None:
"""截图,该方法仅用于简单快捷截图,复杂截图请操作 page
参数:
url: 网址
path: 存储路径
element: 元素选择
wait_time: 等待截取超时时间
viewport_size: 窗口大小
wait_until: 等待类型
timeout: 超时限制
type_: 保存类型
user_agent: user_agent
cookies: cookies
"""
if viewport_size is None:
viewport_size = {"width": 2560, "height": 1080}
if isinstance(path, str):
path = Path(path)
wait_time = wait_time * 1000 if wait_time else None
element_list = [element] if isinstance(element, str) else element
async with cls.new_page(
cookies,
viewport=viewport_size,
user_agent=user_agent,
**kwargs,
) as page:
await page.goto(url, timeout=timeout, wait_until=wait_until)
card = page
for e in element_list:
if not card:
return None
card = await card.wait_for_selector(e, timeout=wait_time)
if card:
await card.screenshot(path=path, timeout=timeout, type=type_)
return MessageUtils.build_message(path)
return None

View File

@ -1,24 +1,226 @@
from collections.abc import Callable
from functools import partial, wraps
from typing import Any, Literal
from anyio import EndOfStream
from httpx import ConnectError, HTTPStatusError, TimeoutException
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
from httpx import (
ConnectError,
HTTPStatusError,
RemoteProtocolError,
StreamError,
TimeoutException,
)
from nonebot.utils import is_coroutine_callable
from tenacity import (
RetryCallState,
retry,
retry_if_exception_type,
retry_if_result,
stop_after_attempt,
wait_exponential,
wait_fixed,
)
from zhenxun.services.log import logger
LOG_COMMAND = "RetryDecorator"
_SENTINEL = object()
def _log_before_sleep(log_name: str | None, retry_state: RetryCallState):
"""
tenacity 重试前的日志记录回调函数
"""
func_name = retry_state.fn.__name__ if retry_state.fn else "unknown_function"
log_context = f"函数 '{func_name}'"
if log_name:
log_context = f"操作 '{log_name}' ({log_context})"
reason = ""
if retry_state.outcome:
if exc := retry_state.outcome.exception():
reason = f"触发异常: {exc.__class__.__name__}({exc})"
else:
reason = f"不满足结果条件: result={retry_state.outcome.result()}"
wait_time = (
getattr(retry_state.next_action, "sleep", 0) if retry_state.next_action else 0
)
logger.warning(
f"{log_context}{retry_state.attempt_number} 次重试... "
f"等待 {wait_time:.2f} 秒. {reason}",
LOG_COMMAND,
)
class Retry:
@staticmethod
def api(
retry_count: int = 3, wait: int = 1, exception: tuple[type[Exception], ...] = ()
def simple(
stop_max_attempt: int = 3,
wait_fixed_seconds: int = 2,
exception: tuple[type[Exception], ...] = (),
*,
log_name: str | None = None,
on_failure: Callable[[Exception], Any] | None = None,
return_on_failure: Any = _SENTINEL,
):
"""接口调用重试"""
"""
一个简单的用于通用网络请求的重试装饰器预设
参数:
stop_max_attempt: 最大重试次数
wait_fixed_seconds: 固定等待策略的等待秒数
exception: 额外需要重试的异常类型元组
log_name: 用于日志记录的操作名称
on_failure: (可选) 所有重试失败后的回调
return_on_failure: (可选) 所有重试失败后的返回值
"""
return Retry.api(
stop_max_attempt=stop_max_attempt,
wait_fixed_seconds=wait_fixed_seconds,
exception=exception,
strategy="fixed",
log_name=log_name,
on_failure=on_failure,
return_on_failure=return_on_failure,
)
@staticmethod
def download(
stop_max_attempt: int = 3,
exception: tuple[type[Exception], ...] = (),
*,
wait_exp_multiplier: int = 2,
wait_exp_max: int = 15,
log_name: str | None = None,
on_failure: Callable[[Exception], Any] | None = None,
return_on_failure: Any = _SENTINEL,
):
"""
一个适用于文件下载的重试装饰器预设使用指数退避策略
参数:
stop_max_attempt: 最大重试次数
exception: 额外需要重试的异常类型元组
wait_exp_multiplier: 指数退避的乘数
wait_exp_max: 指数退避的最大等待时间
log_name: 用于日志记录的操作名称
on_failure: (可选) 所有重试失败后的回调
return_on_failure: (可选) 所有重试失败后的返回值
"""
return Retry.api(
stop_max_attempt=stop_max_attempt,
exception=exception,
strategy="exponential",
wait_exp_multiplier=wait_exp_multiplier,
wait_exp_max=wait_exp_max,
log_name=log_name,
on_failure=on_failure,
return_on_failure=return_on_failure,
)
@staticmethod
def api(
stop_max_attempt: int = 3,
wait_fixed_seconds: int = 1,
exception: tuple[type[Exception], ...] = (),
*,
strategy: Literal["fixed", "exponential"] = "fixed",
retry_on_result: Callable[[Any], bool] | None = None,
wait_exp_multiplier: int = 1,
wait_exp_max: int = 10,
log_name: str | None = None,
on_failure: Callable[[Exception], Any] | None = None,
return_on_failure: Any = _SENTINEL,
):
"""
通用可配置的API调用重试装饰器
参数:
stop_max_attempt: 最大重试次数
wait_fixed_seconds: 固定等待策略的等待秒数
exception: 额外需要重试的异常类型元组
strategy: 重试等待策略, 'fixed' (固定) 'exponential' (指数退避)
retry_on_result: 一个回调函数接收函数返回值如果返回 True则触发重试
例如 `lambda r: r.status_code != 200`
wait_exp_multiplier: 指数退避的乘数
wait_exp_max: 指数退避的最大等待时间
log_name: 用于日志记录的操作名称方便区分不同的重试场景
on_failure: (可选) 当所有重试都失败后在抛出异常或返回默认值之前
会调用此函数并将最终的异常实例作为参数传入
return_on_failure: (可选) 如果设置了此参数当所有重试失败后
将不再抛出异常而是返回此参数指定的值
"""
base_exceptions = (
TimeoutException,
ConnectError,
HTTPStatusError,
StreamError,
RemoteProtocolError,
EndOfStream,
*exception,
)
return retry(
reraise=True,
stop=stop_after_attempt(retry_count),
wait=wait_fixed(wait),
retry=retry_if_exception_type(base_exceptions),
)
def decorator(func: Callable) -> Callable:
if strategy == "exponential":
wait_strategy = wait_exponential(
multiplier=wait_exp_multiplier, max=wait_exp_max
)
else:
wait_strategy = wait_fixed(wait_fixed_seconds)
retry_conditions = retry_if_exception_type(base_exceptions)
if retry_on_result:
retry_conditions |= retry_if_result(retry_on_result)
log_callback = partial(_log_before_sleep, log_name)
tenacity_retry_decorator = retry(
stop=stop_after_attempt(stop_max_attempt),
wait=wait_strategy,
retry=retry_conditions,
before_sleep=log_callback,
reraise=True,
)
decorated_func = tenacity_retry_decorator(func)
if return_on_failure is _SENTINEL:
return decorated_func
if is_coroutine_callable(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
try:
return await decorated_func(*args, **kwargs)
except Exception as e:
if on_failure:
if is_coroutine_callable(on_failure):
await on_failure(e)
else:
on_failure(e)
return return_on_failure
return async_wrapper
else:
@wraps(func)
def sync_wrapper(*args, **kwargs):
try:
return decorated_func(*args, **kwargs)
except Exception as e:
if on_failure:
if is_coroutine_callable(on_failure):
logger.error(
f"不能在同步函数 '{func.__name__}' 中调用异步的 "
f"on_failure 回调。",
LOG_COMMAND,
)
else:
on_failure(e)
return return_on_failure
return sync_wrapper
return decorator

View File

@ -64,3 +64,23 @@ class GoodsNotFound(Exception):
"""
pass
class AllURIsFailedError(Exception):
"""
当所有备用URL都尝试失败后抛出此异常
"""
def __init__(self, urls: list[str], exceptions: list[Exception]):
self.urls = urls
self.exceptions = exceptions
super().__init__(
f"All {len(urls)} URIs failed. Last exception: {exceptions[-1]}"
)
def __str__(self) -> str:
exc_info = "\n".join(
f" - {url}: {exc.__class__.__name__}({exc})"
for url, exc in zip(self.urls, self.exceptions)
)
return f"All {len(self.urls)} URIs failed:\n{exc_info}"

View File

@ -1,16 +1,15 @@
import asyncio
from collections.abc import AsyncGenerator, Sequence
from collections.abc import AsyncGenerator, Awaitable, Callable, Sequence
from contextlib import asynccontextmanager
import json
from pathlib import Path
import time
from typing import Any, ClassVar, Literal, cast
from typing import Any, ClassVar, cast
import aiofiles
import httpx
from httpx import AsyncHTTPTransport, HTTPStatusError, Proxy, Response
from nonebot_plugin_alconna import UniMessage
from nonebot_plugin_htmlrender import get_browser
from playwright.async_api import Page
from httpx import AsyncClient, AsyncHTTPTransport, HTTPStatusError, Proxy, Response
import nonebot
from rich.progress import (
BarColumn,
DownloadColumn,
@ -21,18 +20,60 @@ from rich.progress import (
from zhenxun.configs.config import BotConfig
from zhenxun.services.log import logger
from zhenxun.utils.message import MessageUtils
from zhenxun.utils.decorator.retry import Retry
from zhenxun.utils.exception import AllURIsFailedError
from zhenxun.utils.user_agent import get_user_agent
CLIENT_KEY = ["use_proxy", "proxies", "proxy", "verify", "headers"]
from .browser import AsyncPlaywright, BrowserIsNone # noqa: F401
_SENTINEL = object()
driver = nonebot.get_driver()
_client: AsyncClient | None = None
@driver.on_startup
async def _init_client():
"""
在Bot启动时初始化全局httpx客户端
"""
global _client
proxy_url = BotConfig.system_proxy if BotConfig.system_proxy else None
_client = httpx.AsyncClient(
proxies=proxy_url, headers=get_user_agent(), follow_redirects=True
)
logger.info("全局 httpx.AsyncClient 已启动。", "HTTPClient")
@driver.on_shutdown
async def _close_client():
"""
在Bot关闭时关闭全局httpx客户端
"""
if _client:
await _client.aclose()
logger.info("全局 httpx.AsyncClient 已关闭。", "HTTPClient")
def get_client() -> AsyncClient:
"""
获取全局 httpx.AsyncClient 实例
"""
if not _client:
raise RuntimeError("全局 httpx.AsyncClient 未初始化,请检查启动流程。")
return _client
def get_async_client(
proxies: dict[str, str] | None = None,
proxy: str | None = None,
proxy: str | None = None, # 向后兼容:保留旧版本的 proxy 参数
verify: bool = False,
**kwargs,
) -> httpx.AsyncClient:
"""
[向后兼容] 创建 httpx.AsyncClient 实例的工厂函数
此函数完全保留了旧版本的接口确保现有代码无需修改即可使用
"""
transport = kwargs.pop("transport", None) or AsyncHTTPTransport(verify=verify)
if proxies:
http_proxy = proxies.get("http://")
@ -62,6 +103,32 @@ def get_async_client(
class AsyncHttpx:
"""
一个高级的健壮的异步HTTP客户端工具类
设计理念:
- **全局共享客户端**: 默认情况下所有请求都通过一个在应用启动时初始化的全局
`httpx.AsyncClient` 实例发出这个实例共享连接池提高了效率和性能
- **向后兼容与灵活性**: 完全兼容旧的API同时提供了两种方式来处理需要
特殊网络配置如不同代理超时的请求
1. **单次请求覆盖**: 在调用 `get`, `post` 等方法时直接传入 `proxies`,
`timeout` 等参数将为该次请求创建一个临时的独立的客户端
2. **临时客户端上下文**: 使用 `temporary_client()` 上下文管理器可以
获取一个独立的可配置的客户端用于执行一系列需要相同特殊配置的请求
- **健壮性**: 内置了自动重试多镜像URL回退fallback机制并提供了便捷的
JSON解析和文件下载方法
"""
# [向后兼容] 保留旧版本的客户端配置键名,确保现有代码可以正常工作
CLIENT_KEY: ClassVar[list[str]] = [
"use_proxy",
"proxies",
"proxy",
"verify",
"headers",
]
# [向后兼容] 保持与旧版本相同的默认代理结构
default_proxy: ClassVar[dict[str, str] | None] = (
{
"http://": BotConfig.system_proxy,
@ -72,155 +139,351 @@ class AsyncHttpx:
)
@classmethod
@asynccontextmanager
async def _create_client(
cls,
*,
use_proxy: bool = True,
proxies: dict[str, str] | None = None,
proxy: str | None = None,
headers: dict[str, str] | None = None,
verify: bool = False,
**kwargs,
) -> AsyncGenerator[httpx.AsyncClient, None]:
"""创建一个私有的、配置好的 httpx.AsyncClient 上下文管理器。
说明:
此方法用于内部统一创建客户端处理代理和请求头逻辑减少代码重复
参数:
use_proxy: 是否使用在类中定义的默认代理
proxies: 手动指定的代理会覆盖默认代理
proxy: 单个代理,用于兼容旧版本不再使用
headers: 需要合并到客户端的自定义请求头
verify: 是否验证 SSL 证书
**kwargs: 其他所有传递给 httpx.AsyncClient 的参数
返回:
AsyncGenerator[httpx.AsyncClient, None]: 生成器
def _prepare_temporary_client_config(cls, client_kwargs: dict) -> dict:
"""
proxies_to_use = proxies or (cls.default_proxy if use_proxy else None)
[向后兼容] 处理旧式的客户端kwargs将其转换为get_async_client可用的配置
主要负责处理 use_proxy 标志这是为了兼容旧版本代码中使用的 use_proxy 参数
"""
final_config = client_kwargs.copy()
final_headers = get_user_agent()
if headers:
final_headers.update(headers)
use_proxy = final_config.pop("use_proxy", True)
async with get_async_client(
proxies=proxies_to_use,
proxy=proxy,
verify=verify,
headers=final_headers,
**kwargs,
) as client:
yield client
if "proxies" not in final_config and "proxy" not in final_config:
if use_proxy:
final_config["proxies"] = cls.default_proxy
else:
final_config["proxies"] = None
return final_config
@classmethod
def _split_kwargs(cls, kwargs: dict) -> tuple[dict, dict]:
"""[优化] 分离客户端配置和请求参数,使逻辑更清晰。"""
client_kwargs = {k: v for k, v in kwargs.items() if k in cls.CLIENT_KEY}
request_kwargs = {k: v for k, v in kwargs.items() if k not in cls.CLIENT_KEY}
return client_kwargs, request_kwargs
@classmethod
@asynccontextmanager
async def _get_active_client_context(
cls, client: AsyncClient | None = None, **kwargs
) -> AsyncGenerator[AsyncClient, None]:
"""
内部辅助方法根据 kwargs 决定并提供一个活动的 HTTP 客户端
- 如果 kwargs 中有客户端配置则创建并返回一个临时客户端
- 否则返回传入的 client 或全局客户端
- 自动处理临时客户端的关闭
"""
if kwargs:
logger.debug(f"为单次请求创建临时客户端,配置: {kwargs}")
temp_client_config = cls._prepare_temporary_client_config(kwargs)
async with get_async_client(**temp_client_config) as temp_client:
yield temp_client
else:
yield client or get_client()
@Retry.simple(log_name="内部HTTP请求")
async def _execute_request_inner(
self, client: AsyncClient, method: str, url: str, **kwargs
) -> Response:
"""
[内部] 执行单次HTTP请求的私有核心方法被重试装饰器包裹
"""
response = await client.request(method, url, **kwargs)
return response
@classmethod
async def _single_request(
cls, method: str, url: str, *, client: AsyncClient | None = None, **kwargs
) -> Response:
"""
执行单次HTTP请求的私有方法内置了默认的重试逻辑
"""
client_kwargs, request_kwargs = cls._split_kwargs(kwargs)
async with cls._get_active_client_context(
client=client, **client_kwargs
) as active_client:
response = await cls()._execute_request_inner(
active_client, method, url, **request_kwargs
)
return response
@classmethod
async def _execute_with_fallbacks(
cls,
urls: str | list[str],
worker: Callable[..., Awaitable[Any]],
*,
client: AsyncClient | None = None,
**kwargs,
) -> Any:
"""
通用执行器按顺序尝试多个URL直到成功
Args:
urls: 单个URL或URL列表
worker: 一个接受单个URL和其他kwargs并执行请求的协程函数
client: 可选的HTTP客户端
**kwargs: 传递给worker的额外参数
"""
url_list = [urls] if isinstance(urls, str) else urls
exceptions = []
for i, url in enumerate(url_list):
try:
result = await worker(url, client=client, **kwargs)
if i > 0:
logger.info(
f"成功从镜像 '{url}' 获取资源 "
f"(在尝试了 {i} 个失败的镜像之后)。",
"AsyncHttpx:FallbackExecutor",
)
return result
except Exception as e:
exceptions.append(e)
if url != url_list[-1]:
logger.warning(
f"Worker '{worker.__name__}' on {url} failed, trying next. "
f"Error: {e.__class__.__name__}",
"AsyncHttpx:FallbackExecutor",
)
raise AllURIsFailedError(url_list, exceptions)
@classmethod
async def get(
cls,
url: str | list[str],
*,
follow_redirects: bool = True,
check_status_code: int | None = None,
client: AsyncClient | None = None,
**kwargs,
) -> Response: # sourcery skip: use-assigned-variable
) -> Response:
"""发送 GET 请求,并返回第一个成功的响应。
说明:
本方法是 httpx.get 的高级包装增加了多链接尝试自动重试和统一的代理管理
如果提供 URL 列表它将依次尝试直到成功为止
本方法是 httpx.get 的高级包装增加了多链接尝试自动重试和统一的
客户端管理如果提供 URL 列表它将依次尝试直到成功为止
用法建议:
- **常规使用**: `await AsyncHttpx.get(url)` 将使用全局客户端
- **单次覆盖配置**: `await AsyncHttpx.get(url, timeout=5, proxies=None)`
将为本次请求创建一个独立的临时客户端
参数:
url: 单个请求 URL 或一个 URL 列表
follow_redirects: 是否跟随重定向
check_status_code: (可选) 若提供将检查响应状态码是否匹配否则抛出异常
**kwargs: 其他所有传递给 httpx.get 的参数
( `params`, `headers`, `timeout`)
client: (可选) 指定一个活动的HTTP客户端实例若提供则忽略
`**kwargs`中的客户端配置
**kwargs: 其他所有传递给 httpx.get 的参数 ( `params`, `headers`,
`timeout`)如果包含 `proxies`, `verify` 等客户端配置参数
将创建一个临时客户端
返回:
Response: Response
Response: httpx 的响应对象
Raises:
AllURIsFailedError: 当所有提供的URL都请求失败时抛出
"""
urls = [url] if isinstance(url, str) else url
last_exception = None
for current_url in urls:
try:
logger.info(f"开始获取 {current_url}..")
client_kwargs = {k: v for k, v in kwargs.items() if k in CLIENT_KEY}
for key in CLIENT_KEY:
kwargs.pop(key, None)
async with cls._create_client(**client_kwargs) as client:
response = await client.get(current_url, **kwargs)
if check_status_code and response.status_code != check_status_code:
raise HTTPStatusError(
f"状态码错误: {response.status_code}!={check_status_code}",
request=response.request,
response=response,
)
return response
except Exception as e:
last_exception = e
if current_url != urls[-1]:
logger.warning(f"获取 {current_url} 失败, 尝试下一个", e=e)
async def worker(current_url: str, **worker_kwargs) -> Response:
logger.info(f"开始获取 {current_url}..", "AsyncHttpx:get")
response = await cls._single_request(
"GET", current_url, follow_redirects=follow_redirects, **worker_kwargs
)
if check_status_code and response.status_code != check_status_code:
raise HTTPStatusError(
f"状态码错误: {response.status_code}!={check_status_code}",
request=response.request,
response=response,
)
return response
raise last_exception or Exception("所有URL都获取失败")
return await cls._execute_with_fallbacks(url, worker, client=client, **kwargs)
@classmethod
async def head(cls, url: str, **kwargs) -> Response:
"""发送 HEAD 请求。
async def head(
cls, url: str | list[str], *, client: AsyncClient | None = None, **kwargs
) -> Response:
"""发送 HEAD 请求,并返回第一个成功的响应。"""
说明:
本方法是对 httpx.head 的封装通常用于检查资源的元信息如大小类型
async def worker(current_url: str, **worker_kwargs) -> Response:
return await cls._single_request("HEAD", current_url, **worker_kwargs)
参数:
url: 请求的 URL
**kwargs: 其他所有传递给 httpx.head 的参数
( `headers`, `timeout`, `allow_redirects`)
返回:
Response: Response
"""
client_kwargs = {k: v for k, v in kwargs.items() if k in CLIENT_KEY}
for key in CLIENT_KEY:
kwargs.pop(key, None)
async with cls._create_client(**client_kwargs) as client:
return await client.head(url, **kwargs)
return await cls._execute_with_fallbacks(url, worker, client=client, **kwargs)
@classmethod
async def post(cls, url: str, **kwargs) -> Response:
"""发送 POST 请求。
async def post(
cls, url: str | list[str], *, client: AsyncClient | None = None, **kwargs
) -> Response:
"""发送 POST 请求,并返回第一个成功的响应。"""
说明:
本方法是对 httpx.post 的封装提供了统一的代理和客户端管理
async def worker(current_url: str, **worker_kwargs) -> Response:
return await cls._single_request("POST", current_url, **worker_kwargs)
参数:
url: 请求的 URL
**kwargs: 其他所有传递给 httpx.post 的参数
( `data`, `json`, `content` )
返回:
Response: Response
"""
client_kwargs = {k: v for k, v in kwargs.items() if k in CLIENT_KEY}
for key in CLIENT_KEY:
kwargs.pop(key, None)
async with cls._create_client(**client_kwargs) as client:
return await client.post(url, **kwargs)
return await cls._execute_with_fallbacks(url, worker, client=client, **kwargs)
@classmethod
async def get_content(cls, url: str, **kwargs) -> bytes:
"""获取指定 URL 的二进制内容。
说明:
这是一个便捷方法等同于调用 get() 后再访问 .content 属性
参数:
url: 请求的 URL
**kwargs: 所有传递给 get() 方法的参数
返回:
bytes: 响应内容的二进制字节流 (bytes)
"""
res = await cls.get(url, **kwargs)
async def get_content(
cls, url: str | list[str], *, client: AsyncClient | None = None, **kwargs
) -> bytes:
"""获取指定 URL 的二进制内容。"""
res = await cls.get(url, client=client, **kwargs)
return res.content
# [优化] 将 get/post 的JSON解析逻辑合并为一个方法
@classmethod
@Retry.api(
log_name="JSON请求",
exception=(json.JSONDecodeError,),
return_on_failure=_SENTINEL,
)
async def _request_and_parse_json(
cls, method: str, url: str, *, client: AsyncClient | None = None, **kwargs
) -> Any:
"""
[私有] 执行单个HTTP请求并解析JSON用于内部统一处理
"""
async with cls._get_active_client_context(
client=client, **kwargs
) as active_client:
_, request_kwargs = cls._split_kwargs(kwargs)
response = await active_client.request(method, url, **request_kwargs)
response.raise_for_status()
return response.json()
@classmethod
async def get_json(
cls,
url: str | list[str],
*,
default: Any = None,
raise_on_failure: bool = False,
client: AsyncClient | None = None,
**kwargs,
) -> Any:
"""
发送GET请求并自动解析为JSON支持重试和多链接尝试
说明:
这是一个高度便捷的方法封装了请求重试JSON解析和错误处理
它会在网络错误或JSON解析错误时自动重试
如果所有尝试都失败它会安全地返回一个默认值
参数:
url: 单个请求 URL 或一个备用 URL 列表
default: (可选) 当所有尝试都失败时返回的默认值默认为None
raise_on_failure: (可选) 如果为 True, 当所有尝试失败时将抛出
`AllURIsFailedError` 异常, 默认为 False.
client: (可选) 指定的HTTP客户端
**kwargs: 其他所有传递给 httpx.get 的参数
例如 `params`, `headers`, `timeout`
返回:
Any: 解析后的JSON数据或在失败时返回 `default`
Raises:
AllURIsFailedError: `raise_on_failure` True 且所有URL都请求失败时抛出
"""
async def worker(current_url: str, **worker_kwargs):
logger.debug(f"开始GET JSON: {current_url}", "AsyncHttpx:get_json")
return await cls._request_and_parse_json(
"GET", current_url, **worker_kwargs
)
try:
result = await cls._execute_with_fallbacks(
url, worker, client=client, **kwargs
)
return default if result is _SENTINEL else result
except AllURIsFailedError as e:
logger.error(f"所有URL的JSON GET均失败: {e}", "AsyncHttpx:get_json")
if raise_on_failure:
raise e
return default
@classmethod
async def post_json(
cls,
url: str | list[str],
*,
json: Any = None,
data: Any = None,
default: Any = None,
raise_on_failure: bool = False,
client: AsyncClient | None = None,
**kwargs,
) -> Any:
"""
发送POST请求并自动解析为JSON功能与 get_json 类似
参数:
url: 单个请求 URL 或一个备用 URL 列表
json: (可选) 作为请求体发送的JSON数据
data: (可选) 作为请求体发送的表单数据
default: (可选) 当所有尝试都失败时返回的默认值默认为None
raise_on_failure: (可选) 如果为 True, 当所有尝试失败时将抛出
AllURIsFailedError 异常, 默认为 False.
client: (可选) 指定的HTTP客户端
**kwargs: 其他所有传递给 httpx.post 的参数
返回:
Any: 解析后的JSON数据或在失败时返回 `default`
"""
if json is not None:
kwargs["json"] = json
if data is not None:
kwargs["data"] = data
async def worker(current_url: str, **worker_kwargs):
logger.debug(f"开始POST JSON: {current_url}", "AsyncHttpx:post_json")
return await cls._request_and_parse_json(
"POST", current_url, **worker_kwargs
)
try:
result = await cls._execute_with_fallbacks(
url, worker, client=client, **kwargs
)
return default if result is _SENTINEL else result
except AllURIsFailedError as e:
logger.error(f"所有URL的JSON POST均失败: {e}", "AsyncHttpx:post_json")
if raise_on_failure:
raise e
return default
@classmethod
@Retry.api(log_name="文件下载(流式)")
async def _stream_download(
cls, url: str, path: Path, *, client: AsyncClient | None = None, **kwargs
) -> None:
"""
执行单个流式下载的私有方法被重试装饰器包裹
"""
async with cls._get_active_client_context(
client=client, **kwargs
) as active_client:
async with active_client.stream("GET", url, **kwargs) as response:
response.raise_for_status()
total = int(response.headers.get("Content-Length", 0))
with Progress(
TextColumn(path.name),
"[progress.percentage]{task.percentage:>3.0f}%",
BarColumn(bar_width=None),
DownloadColumn(),
TransferSpeedColumn(),
) as progress:
task_id = progress.add_task("Download", total=total)
async with aiofiles.open(path, "wb") as f:
async for chunk in response.aiter_bytes():
await f.write(chunk)
progress.update(task_id, advance=len(chunk))
@classmethod
async def download_file(
cls,
@ -228,6 +491,7 @@ class AsyncHttpx:
path: str | Path,
*,
stream: bool = False,
client: AsyncClient | None = None,
**kwargs,
) -> bool:
"""下载文件到指定路径。
@ -239,6 +503,7 @@ class AsyncHttpx:
url: 单个文件 URL 或一个备用 URL 列表
path: 文件保存的本地路径
stream: (可选) 是否使用流式下载适用于大文件默认为 False
client: (可选) 指定的HTTP客户端
**kwargs: 其他所有传递给 get() 方法或 httpx.stream() 的参数
返回:
@ -247,49 +512,29 @@ class AsyncHttpx:
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
urls = [url] if isinstance(url, str) else url
async def worker(current_url: str, **worker_kwargs) -> bool:
if not stream:
content = await cls.get_content(current_url, **worker_kwargs)
async with aiofiles.open(path, "wb") as f:
await f.write(content)
else:
await cls._stream_download(current_url, path, **worker_kwargs)
for current_url in urls:
try:
if not stream:
response = await cls.get(current_url, **kwargs)
response.raise_for_status()
async with aiofiles.open(path, "wb") as f:
await f.write(response.content)
else:
async with cls._create_client(**kwargs) as client:
stream_kwargs = {
k: v
for k, v in kwargs.items()
if k not in ["use_proxy", "proxy", "verify"]
}
async with client.stream(
"GET", current_url, **stream_kwargs
) as response:
response.raise_for_status()
total = int(response.headers.get("Content-Length", 0))
logger.info(
f"下载 {current_url} 成功 -> {path.absolute()}",
"AsyncHttpx:download",
)
return True
with Progress(
TextColumn(path.name),
"[progress.percentage]{task.percentage:>3.0f}%",
BarColumn(bar_width=None),
DownloadColumn(),
TransferSpeedColumn(),
) as progress:
task_id = progress.add_task("Download", total=total)
async with aiofiles.open(path, "wb") as f:
async for chunk in response.aiter_bytes():
await f.write(chunk)
progress.update(task_id, advance=len(chunk))
logger.info(f"下载 {current_url} 成功 -> {path.absolute()}")
return True
except Exception as e:
logger.warning(f"下载 {current_url} 失败,尝试下一个。错误: {e}")
logger.error(f"所有URL {urls} 下载均失败 -> {path.absolute()}")
return False
try:
return await cls._execute_with_fallbacks(
url, worker, client=client, **kwargs
)
except AllURIsFailedError:
logger.error(
f"所有URL下载均失败 -> {path.absolute()}", "AsyncHttpx:download"
)
return False
@classmethod
async def gather_download_file(
@ -346,7 +591,6 @@ class AsyncHttpx:
logger.error(f"并发下载任务 ({url_info}) 时发生错误", e=result)
final_results.append(False)
else:
# download_file 返回的是 bool可以直接附加
final_results.append(cast(bool, result))
return final_results
@ -396,85 +640,30 @@ class AsyncHttpx:
return [result["url"] for result in _results]
class AsyncPlaywright:
@classmethod
@asynccontextmanager
async def new_page(
cls, cookies: list[dict[str, Any]] | dict[str, Any] | None = None, **kwargs
) -> AsyncGenerator[Page, None]:
"""获取一个新页面
async def temporary_client(cls, **kwargs) -> AsyncGenerator[AsyncClient, None]:
"""
创建一个临时的可配置的HTTP客户端上下文并直接返回该客户端实例
此方法返回一个标准的 `httpx.AsyncClient`它不使用全局连接池
拥有独立的配置(如代理headers超时等)并在退出上下文后自动关闭
适用于需要用一套特殊网络配置执行一系列请求的场景
用法:
async with AsyncHttpx.temporary_client(proxies=None, timeout=5) as client:
# client 是一个标准的 httpx.AsyncClient 实例
response1 = await client.get("http://some.internal.api/1")
response2 = await client.get("http://some.internal.api/2")
data = response2.json()
参数:
cookies: cookies
**kwargs: 所有传递给 `httpx.AsyncClient` 构造函数的参数
例如: `proxies`, `headers`, `verify`, `timeout`,
`follow_redirects`
Yields:
httpx.AsyncClient: 一个配置好的临时的客户端实例
"""
browser = await get_browser()
ctx = await browser.new_context(**kwargs)
if cookies:
if isinstance(cookies, dict):
cookies = [cookies]
await ctx.add_cookies(cookies) # type: ignore
page = await ctx.new_page()
try:
yield page
finally:
await page.close()
await ctx.close()
@classmethod
async def screenshot(
cls,
url: str,
path: Path | str,
element: str | list[str],
*,
wait_time: int | None = None,
viewport_size: dict[str, int] | None = None,
wait_until: (
Literal["domcontentloaded", "load", "networkidle"] | None
) = "networkidle",
timeout: float | None = None,
type_: Literal["jpeg", "png"] | None = None,
user_agent: str | None = None,
cookies: list[dict[str, Any]] | dict[str, Any] | None = None,
**kwargs,
) -> UniMessage | None:
"""截图,该方法仅用于简单快捷截图,复杂截图请操作 page
参数:
url: 网址
path: 存储路径
element: 元素选择
wait_time: 等待截取超时时间
viewport_size: 窗口大小
wait_until: 等待类型
timeout: 超时限制
type_: 保存类型
user_agent: user_agent
cookies: cookies
"""
if viewport_size is None:
viewport_size = {"width": 2560, "height": 1080}
if isinstance(path, str):
path = Path(path)
wait_time = wait_time * 1000 if wait_time else None
element_list = [element] if isinstance(element, str) else element
async with cls.new_page(
cookies,
viewport=viewport_size,
user_agent=user_agent,
**kwargs,
) as page:
await page.goto(url, timeout=timeout, wait_until=wait_until)
card = page
for e in element_list:
if not card:
return None
card = await card.wait_for_selector(e, timeout=wait_time)
if card:
await card.screenshot(path=path, timeout=timeout, type=type_)
return MessageUtils.build_message(path)
return None
class BrowserIsNone(Exception):
pass
async with get_async_client(**kwargs) as client:
yield client