zhenxun_bot/zhenxun/utils/http_utils.py

378 lines
13 KiB
Python
Raw Normal View History

2024-02-25 03:18:34 +08:00
import asyncio
from asyncio.exceptions import TimeoutError
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any, AsyncGenerator, Dict, Literal
import aiofiles
import httpx
import rich
from httpx import ConnectTimeout, Response
2024-08-10 02:25:04 +08:00
from nonebot_plugin_alconna import UniMessage
2024-08-15 20:27:19 +08:00
from nonebot_plugin_htmlrender import get_browser
2024-02-25 03:18:34 +08:00
from playwright.async_api import Page
from retrying import retry
2024-08-24 17:06:23 +08:00
from zhenxun.configs.config import BotConfig
2024-02-25 03:18:34 +08:00
from zhenxun.services.log import logger
2024-08-10 02:25:04 +08:00
from zhenxun.utils.message import MessageUtils
2024-02-25 03:18:34 +08:00
from zhenxun.utils.user_agent import get_user_agent
2024-08-15 20:27:19 +08:00
# from .browser import get_browser
2024-02-25 03:18:34 +08:00
class AsyncHttpx:
2024-08-24 17:06:23 +08:00
proxy = {"http://": BotConfig.system_proxy, "https://": BotConfig.system_proxy}
2024-02-25 03:18:34 +08:00
@classmethod
@retry(stop_max_attempt_number=3)
async def get(
cls,
url: str,
*,
params: Dict[str, Any] | None = None,
headers: Dict[str, str] | None = None,
cookies: Dict[str, str] | None = None,
verify: bool = True,
use_proxy: bool = True,
proxy: Dict[str, str] | None = None,
timeout: int = 30,
**kwargs,
) -> Response:
"""Get
参数:
url: url
params: params
headers: 请求头
cookies: cookies
verify: verify
use_proxy: 使用默认代理
proxy: 指定代理
timeout: 超时时间
"""
if not headers:
headers = get_user_agent()
_proxy = proxy if proxy else cls.proxy if use_proxy else None
async with httpx.AsyncClient(proxies=_proxy, verify=verify) as client: # type: ignore
return await client.get(
url,
params=params,
headers=headers,
cookies=cookies,
timeout=timeout,
**kwargs,
)
@classmethod
async def post(
cls,
url: str,
*,
2024-05-16 19:54:30 +08:00
data: Dict[str, Any] | None = None,
2024-02-25 03:18:34 +08:00
content: Any = None,
files: Any = None,
verify: bool = True,
use_proxy: bool = True,
proxy: Dict[str, str] | None = None,
json: Dict[str, Any] | None = None,
params: Dict[str, str] | None = None,
headers: Dict[str, str] | None = None,
cookies: Dict[str, str] | None = None,
timeout: int = 30,
**kwargs,
) -> Response:
"""
说明:
Post
参数:
url: url
data: data
content: content
files: files
use_proxy: 是否默认代理
proxy: 指定代理
json: json
params: params
headers: 请求头
cookies: cookies
timeout: 超时时间
"""
if not headers:
headers = get_user_agent()
_proxy = proxy if proxy else cls.proxy if use_proxy else None
async with httpx.AsyncClient(proxies=_proxy, verify=verify) as client: # type: ignore
return await client.post(
url,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
timeout=timeout,
**kwargs,
)
@classmethod
async def download_file(
cls,
url: str,
path: str | Path,
*,
params: Dict[str, str] | None = None,
verify: bool = True,
use_proxy: bool = True,
proxy: Dict[str, str] | None = None,
headers: Dict[str, str] | None = None,
cookies: Dict[str, str] | None = None,
timeout: int = 30,
stream: bool = False,
**kwargs,
) -> bool:
"""下载文件
参数:
url: url
path: 存储路径
params: params
verify: verify
use_proxy: 使用代理
proxy: 指定代理
headers: 请求头
cookies: cookies
timeout: 超时时间
stream: 是否使用流式下载流式写入+进度条适用于下载大文件
"""
if isinstance(path, str):
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
try:
for _ in range(3):
if not stream:
try:
content = (
await cls.get(
url,
params=params,
headers=headers,
cookies=cookies,
use_proxy=use_proxy,
proxy=proxy,
timeout=timeout,
**kwargs,
)
).content
async with aiofiles.open(path, "wb") as wf:
await wf.write(content)
logger.info(f"下载 {url} 成功.. Path{path.absolute()}")
return True
except (TimeoutError, ConnectTimeout):
pass
else:
if not headers:
headers = get_user_agent()
_proxy = proxy if proxy else cls.proxy if use_proxy else None
try:
async with httpx.AsyncClient(
proxies=_proxy, verify=verify # type: ignore
) as client:
async with client.stream(
"GET",
url,
params=params,
headers=headers,
cookies=cookies,
timeout=timeout,
**kwargs,
) as response:
logger.info(
f"开始下载 {path.name}.. Path: {path.absolute()}"
)
async with aiofiles.open(path, "wb") as wf:
total = int(response.headers["Content-Length"])
with rich.progress.Progress( # type: ignore
rich.progress.TextColumn(path.name), # type: ignore
"[progress.percentage]{task.percentage:>3.0f}%", # type: ignore
rich.progress.BarColumn(bar_width=None), # type: ignore
rich.progress.DownloadColumn(), # type: ignore
rich.progress.TransferSpeedColumn(), # type: ignore
) as progress:
download_task = progress.add_task(
"Download", total=total
)
async for chunk in response.aiter_bytes():
await wf.write(chunk)
await wf.flush()
progress.update(
download_task,
completed=response.num_bytes_downloaded,
)
logger.info(
f"下载 {url} 成功.. Path{path.absolute()}"
)
return True
except (TimeoutError, ConnectTimeout):
pass
else:
logger.error(f"下载 {url} 下载超时.. Path{path.absolute()}")
except Exception as e:
logger.error(f"下载 {url} 错误 Path{path.absolute()}", e=e)
return False
@classmethod
async def gather_download_file(
cls,
url_list: list[str],
path_list: list[str | Path],
*,
limit_async_number: int | None = None,
params: Dict[str, str] | None = None,
use_proxy: bool = True,
proxy: Dict[str, str] | None = None,
headers: Dict[str, str] | None = None,
cookies: Dict[str, str] | None = None,
timeout: int = 30,
**kwargs,
) -> list[bool]:
"""分组同时下载文件
参数:
url_list: url列表
path_list: 存储路径列表
limit_async_number: 限制同时请求数量
params: params
use_proxy: 使用代理
proxy: 指定代理
headers: 请求头
cookies: cookies
timeout: 超时时间
"""
if n := len(url_list) != len(path_list):
raise UrlPathNumberNotEqual(
f"Url数量与Path数量不对等Url{len(url_list)}Path{len(path_list)}"
)
if limit_async_number and n > limit_async_number:
m = float(n) / limit_async_number
x = 0
j = limit_async_number
_split_url_list = []
_split_path_list = []
for _ in range(int(m)):
_split_url_list.append(url_list[x:j])
_split_path_list.append(path_list[x:j])
x += limit_async_number
j += limit_async_number
if int(m) < m:
_split_url_list.append(url_list[j:])
_split_path_list.append(path_list[j:])
else:
_split_url_list = [url_list]
_split_path_list = [path_list]
tasks = []
result_ = []
for x, y in zip(_split_url_list, _split_path_list):
for url, path in zip(x, y):
tasks.append(
asyncio.create_task(
cls.download_file(
url,
path,
params=params,
headers=headers,
cookies=cookies,
use_proxy=use_proxy,
timeout=timeout,
proxy=proxy,
**kwargs,
)
)
)
_x = await asyncio.gather(*tasks)
result_ = result_ + list(_x)
tasks.clear()
return result_
class AsyncPlaywright:
@classmethod
@asynccontextmanager
async def new_page(cls, **kwargs) -> AsyncGenerator[Page, None]:
"""获取一个新页面
参数:
user_agent: 请求头
"""
browser = await get_browser()
2024-02-25 03:18:34 +08:00
ctx = await browser.new_context(**kwargs)
page = await ctx.new_page()
try:
yield page
finally:
await page.close()
await ctx.close()
@classmethod
async def screenshot(
cls,
url: str,
path: Path | str,
element: str | list[str],
*,
wait_time: int | None = None,
viewport_size: Dict[str, int] | None = None,
wait_until: (
Literal["domcontentloaded", "load", "networkidle"] | None
) = "networkidle",
timeout: float | None = None,
type_: Literal["jpeg", "png"] | None = None,
user_agent: str | None = None,
**kwargs,
2024-08-10 02:25:04 +08:00
) -> UniMessage | None:
2024-02-25 03:18:34 +08:00
"""截图,该方法仅用于简单快捷截图,复杂截图请操作 page
参数:
url: 网址
path: 存储路径
element: 元素选择
wait_time: 等待截取超时时间
viewport_size: 窗口大小
wait_until: 等待类型
timeout: 超时限制
type_: 保存类型
"""
if viewport_size is None:
viewport_size = dict(width=2560, height=1080)
if isinstance(path, str):
path = Path(path)
wait_time = wait_time * 1000 if wait_time else None
if isinstance(element, str):
element_list = [element]
else:
element_list = element
async with cls.new_page(
viewport=viewport_size,
user_agent=user_agent,
**kwargs,
) as page:
await page.goto(url, timeout=timeout, wait_until=wait_until)
card = page
for e in element_list:
if not card:
return None
card = await card.wait_for_selector(e, timeout=wait_time)
if card:
await card.screenshot(path=path, timeout=timeout, type=type_)
2024-08-10 02:25:04 +08:00
return MessageUtils.build_message(path)
2024-02-25 03:18:34 +08:00
return None
class UrlPathNumberNotEqual(Exception):
pass
class BrowserIsNone(Exception):
pass