feat(http_utils): 优化AsyncHttpx类,解决并发下载问题 (#1968)

- 分离客户端配置和请求参数,避免不必要的临时客户端创建
- 添加可选下载进度条,解决并发下载时Progress实例冲突
- 优化 AsyncHttpx 方法文档字符串

Co-authored-by: webjoin111 <455457521@qq.com>
This commit is contained in:
Rumio 2025-07-11 10:13:02 +08:00 committed by GitHub
parent acfed0837a
commit 99eacdfc12
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -137,19 +137,13 @@ def get_async_client(
class AsyncHttpx:
"""
一个高级的健壮的异步HTTP客户端工具类
高性能异步HTTP客户端工具类
设计理念:
- **全局共享客户端**: 默认情况下所有请求都通过一个在应用启动时初始化的全局
`httpx.AsyncClient` 实例发出这个实例共享连接池提高了效率和性能
- **向后兼容与灵活性**: 完全兼容旧的API同时提供了两种方式来处理需要
特殊网络配置如不同代理超时的请求
1. **单次请求覆盖**: 在调用 `get`, `post` 等方法时直接传入 `proxies`,
`timeout` 等参数将为该次请求创建一个临时的独立的客户端
2. **临时客户端上下文**: 使用 `temporary_client()` 上下文管理器可以
获取一个独立的可配置的客户端用于执行一系列需要相同特殊配置的请求
- **健壮性**: 内置了自动重试多镜像URL回退fallback机制并提供了便捷的
JSON解析和文件下载方法
特性:
- 全局共享连接池提升性能
- 支持临时客户端配置代理超时等
- 内置重试机制和多URL回退
- 提供JSON解析和文件下载功能
"""
CLIENT_KEY: ClassVar[list[str]] = [
@ -157,7 +151,6 @@ class AsyncHttpx:
"proxies",
"proxy",
"verify",
"headers",
]
default_proxy: ClassVar[dict[str, str] | None] = (
@ -290,15 +283,6 @@ class AsyncHttpx:
) -> Response:
"""发送 GET 请求,并返回第一个成功的响应。
说明:
本方法是 httpx.get 的高级包装增加了多链接尝试自动重试和统一的
客户端管理如果提供 URL 列表它将依次尝试直到成功为止
用法建议:
- **常规使用**: `await AsyncHttpx.get(url)` 将使用全局客户端
- **单次覆盖配置**: `await AsyncHttpx.get(url, timeout=5, proxies=None)`
将为本次请求创建一个独立的临时客户端
参数:
url: 单个请求 URL 或一个 URL 列表
follow_redirects: 是否跟随重定向
@ -312,7 +296,7 @@ class AsyncHttpx:
返回:
Response: httpx 的响应对象
Raises:
异常:
AllURIsFailedError: 当所有提供的URL都请求失败时抛出
"""
@ -373,10 +357,11 @@ class AsyncHttpx:
"""
[私有] 执行单个HTTP请求并解析JSON用于内部统一处理
"""
client_kwargs, request_kwargs = cls._split_kwargs(kwargs)
async with cls._get_active_client_context(
client=client, **kwargs
client=client, **client_kwargs
) as active_client:
_, request_kwargs = cls._split_kwargs(kwargs)
response = await active_client.request(method, url, **request_kwargs)
response.raise_for_status()
return response.json()
@ -394,11 +379,6 @@ class AsyncHttpx:
"""
发送GET请求并自动解析为JSON支持重试和多链接尝试
说明:
这是一个高度便捷的方法封装了请求重试JSON解析和错误处理
它会在网络错误或JSON解析错误时自动重试
如果所有尝试都失败它会安全地返回一个默认值
参数:
url: 单个请求 URL 或一个备用 URL 列表
default: (可选) 当所有尝试都失败时返回的默认值默认为None
@ -411,7 +391,7 @@ class AsyncHttpx:
返回:
Any: 解析后的JSON数据或在失败时返回 `default`
Raises:
异常:
AllURIsFailedError: `raise_on_failure` True 且所有URL都请求失败时抛出
"""
@ -490,25 +470,33 @@ class AsyncHttpx:
"""
执行单个流式下载的私有方法被重试装饰器包裹
"""
client_kwargs, request_kwargs = cls._split_kwargs(kwargs)
show_progress = request_kwargs.pop("show_progress", False)
async with cls._get_active_client_context(
client=client, **kwargs
client=client, **client_kwargs
) as active_client:
async with active_client.stream("GET", url, **kwargs) as response:
async with active_client.stream("GET", url, **request_kwargs) as response:
response.raise_for_status()
total = int(response.headers.get("Content-Length", 0))
with Progress(
TextColumn(path.name),
"[progress.percentage]{task.percentage:>3.0f}%",
BarColumn(bar_width=None),
DownloadColumn(),
TransferSpeedColumn(),
) as progress:
task_id = progress.add_task("Download", total=total)
if show_progress:
with Progress(
TextColumn(path.name),
"[progress.percentage]{task.percentage:>3.0f}%",
BarColumn(bar_width=None),
DownloadColumn(),
TransferSpeedColumn(),
) as progress:
task_id = progress.add_task("Download", total=total)
async with aiofiles.open(path, "wb") as f:
async for chunk in response.aiter_bytes():
await f.write(chunk)
progress.update(task_id, advance=len(chunk))
else:
async with aiofiles.open(path, "wb") as f:
async for chunk in response.aiter_bytes():
await f.write(chunk)
progress.update(task_id, advance=len(chunk))
@classmethod
async def download_file(
@ -517,6 +505,7 @@ class AsyncHttpx:
path: str | Path,
*,
stream: bool = False,
show_progress: bool = False,
client: AsyncClient | None = None,
**kwargs,
) -> bool:
@ -529,6 +518,7 @@ class AsyncHttpx:
url: 单个文件 URL 或一个备用 URL 列表
path: 文件保存的本地路径
stream: (可选) 是否使用流式下载适用于大文件默认为 False
show_progress: (可选) stream=True 是否显示下载进度条默认为 False
client: (可选) 指定的HTTP客户端
**kwargs: 其他所有传递给 get() 方法或 httpx.stream() 的参数
@ -544,7 +534,9 @@ class AsyncHttpx:
async with aiofiles.open(path, "wb") as f:
await f.write(content)
else:
await cls._stream_download(current_url, path, **worker_kwargs)
await cls._stream_download(
current_url, path, show_progress=show_progress, **worker_kwargs
)
logger.info(
f"下载 {current_url} 成功 -> {path.absolute()}",
@ -573,10 +565,6 @@ class AsyncHttpx:
) -> list[bool]:
"""并发下载多个文件,支持为每个文件提供备用镜像链接。
说明:
使用 asyncio.Semaphore 来控制并发请求的数量
对于 url_list 中的每个元素如果它是一个列表则会依次尝试直到下载成功
参数:
url_list: 包含所有文件下载任务的列表每个元素可以是
- 一个字符串 (str): 代表该任务的唯一URL
@ -625,9 +613,6 @@ class AsyncHttpx:
async def get_fastest_mirror(cls, url_list: list[str]) -> list[str]:
"""测试并返回最快的镜像地址。
说明:
通过并发发送 HEAD 请求来测试每个 URL 的响应时间和可用性并按响应速度排序
参数:
url_list: 需要测试的镜像 URL 列表
@ -671,23 +656,12 @@ class AsyncHttpx:
"""
创建一个临时的可配置的HTTP客户端上下文并直接返回该客户端实例
此方法返回一个标准的 `httpx.AsyncClient`它不使用全局连接池
拥有独立的配置(如代理headers超时等)并在退出上下文后自动关闭
适用于需要用一套特殊网络配置执行一系列请求的场景
用法:
async with AsyncHttpx.temporary_client(proxies=None, timeout=5) as client:
# client 是一个标准的 httpx.AsyncClient 实例
response1 = await client.get("http://some.internal.api/1")
response2 = await client.get("http://some.internal.api/2")
data = response2.json()
参数:
**kwargs: 所有传递给 `httpx.AsyncClient` 构造函数的参数
例如: `proxies`, `headers`, `verify`, `timeout`,
`follow_redirects`
Yields:
返回:
httpx.AsyncClient: 一个配置好的临时的客户端实例
"""
async with get_async_client(**kwargs) as client: