增强 httpx 兼容性 (#1915)

* 增强 httpx 兼容性

* 顺便修了对tx图片服务器的ssl错误

* 🚨 auto fix by pre-commit hooks

* 修复通不过检查的问题

* 🚨 auto fix by pre-commit hooks

* 适配 httpx 0.28.0+版本

* 🚨 auto fix by pre-commit hooks

* 🎨 代码格式化

*  代码优化

* 🎨 代码格式化

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: HibiKier <45528451+HibiKier@users.noreply.github.com>
This commit is contained in:
molanp 2025-06-16 11:14:19 +08:00 committed by GitHub
parent 4cc800c832
commit 62b0b02466
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,219 +1,205 @@
import asyncio import asyncio
from asyncio.exceptions import TimeoutError from collections.abc import AsyncGenerator, Sequence
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from pathlib import Path from pathlib import Path
import time import time
from typing import Any, ClassVar, Literal from typing import Any, ClassVar, Literal, cast
import aiofiles import aiofiles
from anyio import EndOfStream
import httpx import httpx
from httpx import ConnectTimeout, HTTPStatusError, Response from httpx import AsyncHTTPTransport, HTTPStatusError, Response
from nonebot_plugin_alconna import UniMessage from nonebot_plugin_alconna import UniMessage
from nonebot_plugin_htmlrender import get_browser from nonebot_plugin_htmlrender import get_browser
from packaging.version import parse as parse_version
from playwright.async_api import Page from playwright.async_api import Page
from retrying import retry from rich.progress import (
import rich BarColumn,
DownloadColumn,
Progress,
TextColumn,
TransferSpeedColumn,
)
from zhenxun.configs.config import BotConfig from zhenxun.configs.config import BotConfig
from zhenxun.services.log import logger from zhenxun.services.log import logger
from zhenxun.utils.message import MessageUtils from zhenxun.utils.message import MessageUtils
from zhenxun.utils.user_agent import get_user_agent from zhenxun.utils.user_agent import get_user_agent
# from .browser import get_browser
def get_async_client(
proxies: dict[str, str] | None = None, verify: bool = False, **kwargs
) -> httpx.AsyncClient:
check_httpx_version = parse_version(httpx.__version__) >= parse_version("0.28.0")
transport = kwargs.pop("transport", None) or AsyncHTTPTransport(verify=verify)
if not check_httpx_version:
return httpx.AsyncClient(proxies=proxies, transport=transport, **kwargs) # type: ignore
proxy_str = None
if proxies:
proxy_str = proxies.get("http://") or proxies.get("https://")
if not proxy_str:
logger.warning(f"代理字典 {proxies} 中未能提取出有效的URL代理已被忽略。")
return httpx.AsyncClient(proxy=proxy_str, transport=transport, **kwargs) # type: ignore
class AsyncHttpx: class AsyncHttpx:
proxy: ClassVar[dict[str, str | None]] = { default_proxy: ClassVar[dict[str, str] | None] = (
"http://": BotConfig.system_proxy, {
"https://": BotConfig.system_proxy, "http://": BotConfig.system_proxy,
} "https://": BotConfig.system_proxy,
}
if BotConfig.system_proxy
else None
)
@classmethod
@asynccontextmanager
async def _create_client(
cls,
*,
use_proxy: bool = True,
proxy: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
verify: bool = False,
**kwargs,
) -> AsyncGenerator[httpx.AsyncClient, None]:
"""创建一个私有的、配置好的 httpx.AsyncClient 上下文管理器。
说明:
此方法用于内部统一创建客户端处理代理和请求头逻辑减少代码重复
参数:
use_proxy: 是否使用在类中定义的默认代理
proxy: 手动指定的代理会覆盖默认代理
headers: 需要合并到客户端的自定义请求头
verify: 是否验证 SSL 证书
**kwargs: 其他所有传递给 httpx.AsyncClient 的参数
返回:
AsyncGenerator[httpx.AsyncClient, None]: 生成器
"""
proxies_to_use = proxy or (cls.default_proxy if use_proxy else None)
final_headers = get_user_agent()
if headers:
final_headers.update(headers)
async with get_async_client(
proxies=proxies_to_use, verify=verify, headers=final_headers, **kwargs
) as client:
yield client
@classmethod @classmethod
@retry(stop_max_attempt_number=3)
async def get( async def get(
cls, cls,
url: str | list[str], url: str | list[str],
*, *,
params: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
cookies: dict[str, str] | None = None,
verify: bool = True,
use_proxy: bool = True,
proxy: dict[str, str] | None = None,
timeout: int = 30, # noqa: ASYNC109
check_status_code: int | None = None, check_status_code: int | None = None,
**kwargs, **kwargs,
) -> Response: ) -> Response: # sourcery skip: use-assigned-variable
"""Get """发送 GET 请求,并返回第一个成功的响应。
说明:
本方法是 httpx.get 的高级包装增加了多链接尝试自动重试和统一的代理管理
如果提供 URL 列表它将依次尝试直到成功为止
参数: 参数:
url: url url: 单个请求 URL 或一个 URL 列表
params: params check_status_code: (可选) 若提供将检查响应状态码是否匹配否则抛出异常
headers: 请求头 **kwargs: 其他所有传递给 httpx.get 的参数
cookies: cookies ( `params`, `headers`, `timeout`)
verify: verify
use_proxy: 使用默认代理 返回:
proxy: 指定代理 Response: Response
timeout: 超时时间
check_status_code: 检查状态码
""" """
urls = [url] if isinstance(url, str) else url urls = [url] if isinstance(url, str) else url
return await cls._get_first_successful(
urls,
params=params,
headers=headers,
cookies=cookies,
verify=verify,
use_proxy=use_proxy,
proxy=proxy,
timeout=timeout,
check_status_code=check_status_code,
**kwargs,
)
@classmethod
async def _get_first_successful(
cls,
urls: list[str],
check_status_code: int | None = None,
**kwargs,
) -> Response:
last_exception = None last_exception = None
for url in urls: for current_url in urls:
try: try:
logger.info(f"开始获取 {url}..") logger.info(f"开始获取 {current_url}..")
response = await cls._get_single(url, **kwargs) async with cls._create_client(**kwargs) as client:
# 从 kwargs 中提取仅 client.get 支持的参数
get_kwargs = {
k: v
for k, v in kwargs.items()
if k not in ["use_proxy", "proxy", "verify", "headers"]
}
response = await client.get(current_url, **get_kwargs)
if check_status_code and response.status_code != check_status_code: if check_status_code and response.status_code != check_status_code:
status_code = response.status_code raise HTTPStatusError(
raise Exception(f"状态码错误:{status_code}!={check_status_code}") f"状态码错误: {response.status_code}!={check_status_code}",
request=response.request,
response=response,
)
return response return response
except Exception as e: except Exception as e:
last_exception = e last_exception = e
if url != urls[-1]: if current_url != urls[-1]:
logger.warning(f"获取 {url} 失败, 尝试下一个") logger.warning(f"获取 {current_url} 失败, 尝试下一个", e=e)
raise last_exception or Exception("All URLs failed")
raise last_exception or Exception("所有URL都获取失败")
@classmethod @classmethod
async def _get_single( async def head(cls, url: str, **kwargs) -> Response:
cls, """发送 HEAD 请求。
url: str,
*,
params: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
cookies: dict[str, str] | None = None,
verify: bool = True,
use_proxy: bool = True,
proxy: dict[str, str] | None = None,
timeout: int = 30, # noqa: ASYNC109
**kwargs,
) -> Response:
if not headers:
headers = get_user_agent()
_proxy = proxy or (cls.proxy if use_proxy else None)
async with httpx.AsyncClient(proxies=_proxy, verify=verify) as client: # type: ignore
return await client.get(
url,
params=params,
headers=headers,
cookies=cookies,
timeout=timeout,
**kwargs,
)
@classmethod
async def head(
cls,
url: str,
*,
params: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
cookies: dict[str, str] | None = None,
verify: bool = True,
use_proxy: bool = True,
proxy: dict[str, str] | None = None,
timeout: int = 30, # noqa: ASYNC109
**kwargs,
) -> Response:
"""Get
参数:
url: url
params: params
headers: 请求头
cookies: cookies
verify: verify
use_proxy: 使用默认代理
proxy: 指定代理
timeout: 超时时间
"""
if not headers:
headers = get_user_agent()
_proxy = proxy or (cls.proxy if use_proxy else None)
async with httpx.AsyncClient(proxies=_proxy, verify=verify) as client: # type: ignore
return await client.head(
url,
params=params,
headers=headers,
cookies=cookies,
timeout=timeout,
**kwargs,
)
@classmethod
async def post(
cls,
url: str,
*,
data: dict[str, Any] | None = None,
content: Any = None,
files: Any = None,
verify: bool = True,
use_proxy: bool = True,
proxy: dict[str, str] | None = None,
json: dict[str, Any] | None = None,
params: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
cookies: dict[str, str] | None = None,
timeout: int = 30, # noqa: ASYNC109
**kwargs,
) -> Response:
"""
说明: 说明:
Post 本方法是对 httpx.head 的封装通常用于检查资源的元信息如大小类型
参数: 参数:
url: url url: 请求的 URL
data: data **kwargs: 其他所有传递给 httpx.head 的参数
content: content ( `headers`, `timeout`, `allow_redirects`)
files: files
use_proxy: 是否默认代理 返回:
proxy: 指定代理 Response: Response
json: json
params: params
headers: 请求头
cookies: cookies
timeout: 超时时间
""" """
if not headers: async with cls._create_client(**kwargs) as client:
headers = get_user_agent() head_kwargs = {
_proxy = proxy or (cls.proxy if use_proxy else None) k: v
async with httpx.AsyncClient(proxies=_proxy, verify=verify) as client: # type: ignore for k, v in kwargs.items()
return await client.post( if k not in ["use_proxy", "proxy", "verify"]
url, }
content=content, return await client.head(url, **head_kwargs)
data=data,
files=files, @classmethod
json=json, async def post(cls, url: str, **kwargs) -> Response:
params=params, """发送 POST 请求。
headers=headers,
cookies=cookies, 说明:
timeout=timeout, 本方法是对 httpx.post 的封装提供了统一的代理和客户端管理
**kwargs,
) 参数:
url: 请求的 URL
**kwargs: 其他所有传递给 httpx.post 的参数
( `data`, `json`, `content` )
返回:
Response: Response
"""
async with cls._create_client(**kwargs) as client:
post_kwargs = {
k: v
for k, v in kwargs.items()
if k not in ["use_proxy", "proxy", "verify"]
}
return await client.post(url, **post_kwargs)
@classmethod @classmethod
async def get_content(cls, url: str, **kwargs) -> bytes: async def get_content(cls, url: str, **kwargs) -> bytes:
"""获取指定 URL 的二进制内容。
说明:
这是一个便捷方法等同于调用 get() 后再访问 .content 属性
参数:
url: 请求的 URL
**kwargs: 所有传递给 get() 方法的参数
返回:
bytes: 响应内容的二进制字节流 (bytes)
"""
res = await cls.get(url, **kwargs) res = await cls.get(url, **kwargs)
return res.content return res.content
@ -223,195 +209,143 @@ class AsyncHttpx:
url: str | list[str], url: str | list[str],
path: str | Path, path: str | Path,
*, *,
params: dict[str, str] | None = None,
verify: bool = True,
use_proxy: bool = True,
proxy: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
cookies: dict[str, str] | None = None,
timeout: int = 30, # noqa: ASYNC109
stream: bool = False, stream: bool = False,
follow_redirects: bool = True,
**kwargs, **kwargs,
) -> bool: ) -> bool:
"""下载文件 """下载文件到指定路径。
说明:
支持多链接尝试和流式下载带进度条
参数: 参数:
url: url url: 单个文件 URL 或一个备用 URL 列表
path: 存储路径 path: 文件保存的本地路径
params: params stream: (可选) 是否使用流式下载适用于大文件默认为 False
verify: verify **kwargs: 其他所有传递给 get() 方法或 httpx.stream() 的参数
use_proxy: 使用代理
proxy: 指定代理 返回:
headers: 请求头 bool: 是否下载成功
cookies: cookies
timeout: 超时时间
stream: 是否使用流式下载流式写入+进度条适用于下载大文件
""" """
if isinstance(path, str): path = Path(path)
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True) path.parent.mkdir(parents=True, exist_ok=True)
try:
for _ in range(3): urls = [url] if isinstance(url, str) else url
if not isinstance(url, list):
url = [url] for current_url in urls:
for u in url: try:
try: if not stream:
if not stream: response = await cls.get(current_url, **kwargs)
response = await cls.get( response.raise_for_status()
u, async with aiofiles.open(path, "wb") as f:
params=params, await f.write(response.content)
headers=headers, else:
cookies=cookies, async with cls._create_client(**kwargs) as client:
use_proxy=use_proxy, stream_kwargs = {
proxy=proxy, k: v
timeout=timeout, for k, v in kwargs.items()
follow_redirects=follow_redirects, if k not in ["use_proxy", "proxy", "verify"]
**kwargs, }
) async with client.stream(
"GET", current_url, **stream_kwargs
) as response:
response.raise_for_status() response.raise_for_status()
content = response.content total = int(response.headers.get("Content-Length", 0))
async with aiofiles.open(path, "wb") as wf:
await wf.write(content) with Progress(
logger.info(f"下载 {u} 成功.. Path{path.absolute()}") TextColumn(path.name),
else: "[progress.percentage]{task.percentage:>3.0f}%",
if not headers: BarColumn(bar_width=None),
headers = get_user_agent() DownloadColumn(),
_proxy = proxy or (cls.proxy if use_proxy else None) TransferSpeedColumn(),
async with httpx.AsyncClient( ) as progress:
proxies=_proxy, # type: ignore task_id = progress.add_task("Download", total=total)
verify=verify, async with aiofiles.open(path, "wb") as f:
) as client: async for chunk in response.aiter_bytes():
async with client.stream( await f.write(chunk)
"GET", progress.update(task_id, advance=len(chunk))
u,
params=params, logger.info(f"下载 {current_url} 成功 -> {path.absolute()}")
headers=headers, return True
cookies=cookies,
timeout=timeout, except Exception as e:
follow_redirects=True, logger.warning(f"下载 {current_url} 失败,尝试下一个。错误: {e}")
**kwargs,
) as response: logger.error(f"所有URL {urls} 下载均失败 -> {path.absolute()}")
response.raise_for_status()
logger.info(
f"开始下载 {path.name}.. "
f"Url: {u}.. "
f"Path: {path.absolute()}"
)
async with aiofiles.open(path, "wb") as wf:
total = int(
response.headers.get("Content-Length", 0)
)
with rich.progress.Progress( # type: ignore
rich.progress.TextColumn(path.name), # type: ignore
"[progress.percentage]{task.percentage:>3.0f}%", # type: ignore
rich.progress.BarColumn(bar_width=None), # type: ignore
rich.progress.DownloadColumn(), # type: ignore
rich.progress.TransferSpeedColumn(), # type: ignore
) as progress:
download_task = progress.add_task(
"Download",
total=total or None,
)
async for chunk in response.aiter_bytes():
await wf.write(chunk)
await wf.flush()
progress.update(
download_task,
completed=response.num_bytes_downloaded,
)
logger.info(
f"下载 {u} 成功.. Path{path.absolute()}"
)
return True
except (TimeoutError, ConnectTimeout, HTTPStatusError):
logger.warning(f"下载 {u} 失败.. 尝试下一个地址..")
except EndOfStream as e:
logger.warning(
f"下载 {url} EndOfStream 异常 Path{path.absolute()}", e=e
)
if path.exists():
return True
logger.error(f"下载 {url} 下载超时.. Path{path.absolute()}")
except Exception as e:
logger.error(f"下载 {url} 错误 Path{path.absolute()}", e=e)
return False return False
@classmethod @classmethod
async def gather_download_file( async def gather_download_file(
cls, cls,
url_list: list[str] | list[list[str]], url_list: Sequence[list[str] | str],
path_list: list[str | Path], path_list: Sequence[str | Path],
*, *,
limit_async_number: int | None = None, limit_async_number: int = 5,
params: dict[str, str] | None = None,
use_proxy: bool = True,
proxy: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
cookies: dict[str, str] | None = None,
timeout: int = 30, # noqa: ASYNC109
**kwargs, **kwargs,
) -> list[bool]: ) -> list[bool]:
"""分组同时下载文件 """并发下载多个文件,支持为每个文件提供备用镜像链接。
说明:
使用 asyncio.Semaphore 来控制并发请求的数量
对于 url_list 中的每个元素如果它是一个列表则会依次尝试直到下载成功
参数: 参数:
url_list: url列表 url_list: 包含所有文件下载任务的列表每个元素可以是
path_list: 存储路径列表 - 一个字符串 (str): 代表该任务的唯一URL
limit_async_number: 限制同时请求数量 - 一个字符串列表 (list[str]): 代表该任务的多个备用/镜像URL
params: params path_list: url_list 对应的文件保存路径列表
use_proxy: 使用代理 limit_async_number: (可选) 最大并发下载数默认为 5
proxy: 指定代理 **kwargs: 其他所有传递给 download_file() 方法的参数
headers: 请求头
cookies: cookies 返回:
timeout: 超时时间 list[bool]: 对应每个下载任务是否成功
""" """
if n := len(url_list) != len(path_list): if len(url_list) != len(path_list):
raise UrlPathNumberNotEqual( raise ValueError("URL 列表和路径列表的长度必须相等")
f"Url数量与Path数量不对等Url{len(url_list)}Path{len(path_list)}"
) semaphore = asyncio.Semaphore(limit_async_number)
if limit_async_number and n > limit_async_number:
m = float(n) / limit_async_number async def _download_with_semaphore(
x = 0 urls_for_one_path: str | list[str], path: str | Path
j = limit_async_number ):
_split_url_list = [] async with semaphore:
_split_path_list = [] return await cls.download_file(urls_for_one_path, path, **kwargs)
for _ in range(int(m)):
_split_url_list.append(url_list[x:j]) tasks = [
_split_path_list.append(path_list[x:j]) _download_with_semaphore(url_group, path)
x += limit_async_number for url_group, path in zip(url_list, path_list)
j += limit_async_number ]
if int(m) < m:
_split_url_list.append(url_list[j:]) results = await asyncio.gather(*tasks, return_exceptions=True)
_split_path_list.append(path_list[j:])
else: final_results = []
_split_url_list = [url_list] for i, result in enumerate(results):
_split_path_list = [path_list] if isinstance(result, Exception):
tasks = [] url_info = (
result_ = [] url_list[i]
for x, y in zip(_split_url_list, _split_path_list): if isinstance(url_list[i], str)
tasks.extend( else ", ".join(url_list[i])
asyncio.create_task(
cls.download_file(
url,
path,
params=params,
headers=headers,
cookies=cookies,
use_proxy=use_proxy,
timeout=timeout,
proxy=proxy,
**kwargs,
)
) )
for url, path in zip(x, y) logger.error(f"并发下载任务 ({url_info}) 时发生错误", e=result)
) final_results.append(False)
_x = await asyncio.gather(*tasks) else:
result_ = result_ + list(_x) # download_file 返回的是 bool可以直接附加
tasks.clear() final_results.append(cast(bool, result))
return result_
return final_results
@classmethod @classmethod
async def get_fastest_mirror(cls, url_list: list[str]) -> list[str]: async def get_fastest_mirror(cls, url_list: list[str]) -> list[str]:
"""测试并返回最快的镜像地址。
说明:
通过并发发送 HEAD 请求来测试每个 URL 的响应时间和可用性并按响应速度排序
参数:
url_list: 需要测试的镜像 URL 列表
返回:
list[str]: 按从快到慢的顺序包含了所有可用的 URL
"""
assert url_list assert url_list
async def head_mirror(client: type[AsyncHttpx], url: str) -> dict[str, Any]: async def head_mirror(client: type[AsyncHttpx], url: str) -> dict[str, Any]:
@ -480,7 +414,7 @@ class AsyncPlaywright:
wait_until: ( wait_until: (
Literal["domcontentloaded", "load", "networkidle"] | None Literal["domcontentloaded", "load", "networkidle"] | None
) = "networkidle", ) = "networkidle",
timeout: float | None = None, # noqa: ASYNC109 timeout: float | None = None,
type_: Literal["jpeg", "png"] | None = None, type_: Literal["jpeg", "png"] | None = None,
user_agent: str | None = None, user_agent: str | None = None,
cookies: list[dict[str, Any]] | dict[str, Any] | None = None, cookies: list[dict[str, Any]] | dict[str, Any] | None = None,
@ -524,9 +458,5 @@ class AsyncPlaywright:
return None return None
class UrlPathNumberNotEqual(Exception):
pass
class BrowserIsNone(Exception): class BrowserIsNone(Exception):
pass pass