zhenxun_bot/utils/http_utils.py

356 lines
11 KiB
Python
Raw Normal View History

2021-11-23 21:44:59 +08:00
from typing import Dict, Union, Optional, List, Any, Literal
from utils.user_agent import get_user_agent
from .utils import get_local_proxy
from services.log import logger
from pathlib import Path
from httpx import Response
from asyncio.exceptions import TimeoutError
2022-02-19 18:20:19 +08:00
from nonebot.adapters.onebot.v11 import MessageSegment
2021-11-23 21:44:59 +08:00
from playwright.async_api import Page
from .message_builder import image
2021-12-01 14:03:34 +08:00
from httpx import ConnectTimeout
2021-11-23 21:44:59 +08:00
from .browser import get_browser
2021-12-01 14:03:34 +08:00
from retrying import retry
2021-11-23 21:44:59 +08:00
import asyncio
import aiofiles
import httpx
class AsyncHttpx:
2021-12-01 15:20:32 +08:00
proxy = {"http://": get_local_proxy(), "https://": get_local_proxy()}
2021-11-23 21:44:59 +08:00
@classmethod
2021-12-01 14:03:34 +08:00
@retry(stop_max_attempt_number=3)
2021-11-23 21:44:59 +08:00
async def get(
cls,
url: str,
*,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
cookies: Optional[Dict[str, str]] = None,
use_proxy: bool = True,
proxy: Dict[str, str] = None,
timeout: Optional[int] = 30,
2021-12-16 11:16:28 +08:00
**kwargs,
2021-11-23 21:44:59 +08:00
) -> Response:
"""
2021-12-16 11:16:28 +08:00
说明
Get
参数
:param url: url
:param params: params
:param headers: 请求头
:param cookies: cookies
:param use_proxy: 使用默认代理
:param proxy: 指定代理
:param timeout: 超时时间
2021-11-23 21:44:59 +08:00
"""
if not headers:
headers = get_user_agent()
proxy = proxy if proxy else cls.proxy if use_proxy else None
async with httpx.AsyncClient(proxies=proxy) as client:
return await client.get(
url,
params=params,
headers=headers,
cookies=cookies,
timeout=timeout,
2021-12-16 11:16:28 +08:00
**kwargs
2021-11-23 21:44:59 +08:00
)
@classmethod
async def post(
cls,
url: str,
*,
data: Optional[Dict[str, str]] = None,
content: Any = None,
files: Any = None,
use_proxy: bool = True,
proxy: Dict[str, str] = None,
json: Optional[Dict[str, Union[Any]]] = None,
params: Optional[Dict[str, str]] = None,
headers: Optional[Dict[str, str]] = None,
cookies: Optional[Dict[str, str]] = None,
timeout: Optional[int] = 30,
2021-12-16 11:16:28 +08:00
**kwargs,
2021-11-23 21:44:59 +08:00
) -> Response:
"""
2021-12-16 11:16:28 +08:00
说明
Post
参数
:param url: url
:param data: data
:param content: content
:param files: files
:param use_proxy: 是否默认代理
:param proxy: 指定代理
:param json: json
:param params: params
:param headers: 请求头
:param cookies: cookies
:param timeout: 超时时间
2021-11-23 21:44:59 +08:00
"""
if not headers:
headers = get_user_agent()
proxy = proxy if proxy else cls.proxy if use_proxy else None
async with httpx.AsyncClient(proxies=proxy) as client:
return await client.post(
url,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
timeout=timeout,
2021-12-16 11:16:28 +08:00
**kwargs,
2021-11-23 21:44:59 +08:00
)
@classmethod
async def download_file(
cls,
url: str,
path: Union[str, Path],
*,
params: Optional[Dict[str, str]] = None,
use_proxy: bool = True,
proxy: Dict[str, str] = None,
headers: Optional[Dict[str, str]] = None,
cookies: Optional[Dict[str, str]] = None,
timeout: Optional[int] = 30,
2021-12-16 11:16:28 +08:00
**kwargs,
2021-11-23 21:44:59 +08:00
) -> bool:
"""
2021-12-16 11:16:28 +08:00
说明
下载文件
参数
:param url: url
:param path: 存储路径
:param params: params
:param use_proxy: 使用代理
:param proxy: 指定代理
:param headers: 请求头
:param cookies: cookies
:param timeout: 超时时间
2021-11-23 21:44:59 +08:00
"""
if isinstance(path, str):
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
try:
for _ in range(3):
try:
content = (
await cls.get(
url,
params=params,
headers=headers,
cookies=cookies,
use_proxy=use_proxy,
proxy=proxy,
timeout=timeout,
2021-12-16 11:16:28 +08:00
**kwargs,
2021-11-23 21:44:59 +08:00
)
).content
async with aiofiles.open(path, "wb") as wf:
await wf.write(content)
2021-12-16 11:16:28 +08:00
logger.info(f"下载 {url} 成功.. Path{path.absolute()}")
2021-11-23 21:44:59 +08:00
return True
2021-12-01 14:03:34 +08:00
except (TimeoutError, ConnectTimeout):
2021-11-23 21:44:59 +08:00
pass
else:
2021-12-16 11:16:28 +08:00
logger.error(f"下载 {url} 下载超时.. Path{path.absolute()}")
2021-11-23 21:44:59 +08:00
except Exception as e:
2021-12-16 11:16:28 +08:00
logger.error(f"下载 {url} 未知错误 {type(e)}{e}.. Path{path.absolute()}")
2021-11-23 21:44:59 +08:00
return False
@classmethod
async def gather_download_file(
cls,
url_list: List[str],
path_list: List[Union[str, Path]],
*,
limit_async_number: Optional[int] = None,
params: Optional[Dict[str, str]] = None,
use_proxy: bool = True,
proxy: Dict[str, str] = None,
headers: Optional[Dict[str, str]] = None,
cookies: Optional[Dict[str, str]] = None,
timeout: Optional[int] = 30,
2021-12-16 11:16:28 +08:00
**kwargs,
2021-11-23 21:44:59 +08:00
) -> List[bool]:
"""
2021-12-16 11:16:28 +08:00
说明
分组同时下载文件
参数
:param url_list: url列表
:param path_list: 存储路径列表
:param limit_async_number: 限制同时请求数量
:param params: params
:param use_proxy: 使用代理
:param proxy: 指定代理
:param headers: 请求头
:param cookies: cookies
:param timeout: 超时时间
2021-11-23 21:44:59 +08:00
"""
if n := len(url_list) != len(path_list):
raise UrlPathNumberNotEqual(
f"Url数量与Path数量不对等Url{len(url_list)}Path{len(path_list)}"
)
if limit_async_number and n > limit_async_number:
m = float(n) / limit_async_number
x = 0
j = limit_async_number
_split_url_list = []
_split_path_list = []
for _ in range(int(m)):
_split_url_list.append(url_list[x:j])
_split_path_list.append(path_list[x:j])
x += limit_async_number
j += limit_async_number
if int(m) < m:
_split_url_list.append(url_list[j:])
_split_path_list.append(path_list[j:])
else:
_split_url_list = [url_list]
_split_path_list = [path_list]
tasks = []
result_ = []
for x, y in zip(_split_url_list, _split_path_list):
for url, path in zip(x, y):
tasks.append(
asyncio.create_task(
cls.download_file(
url,
path,
params=params,
headers=headers,
cookies=cookies,
use_proxy=use_proxy,
timeout=timeout,
2021-12-16 11:16:28 +08:00
proxy=proxy,
** kwargs,
2021-11-23 21:44:59 +08:00
)
)
)
_x = await asyncio.gather(*tasks)
result_ = result_ + list(_x)
tasks.clear()
return result_
class AsyncPlaywright:
@classmethod
2021-12-16 11:16:28 +08:00
async def _new_page(cls, user_agent: Optional[str] = None, **kwargs) -> Page:
2021-11-23 21:44:59 +08:00
"""
2021-12-16 11:16:28 +08:00
说明
获取一个新页面
参数
:param user_agent: 请求头
2021-11-23 21:44:59 +08:00
"""
browser = await get_browser()
if browser:
2021-12-16 11:16:28 +08:00
return await browser.new_page(user_agent=user_agent, **kwargs)
2021-11-23 21:44:59 +08:00
raise BrowserIsNone("获取Browser失败...")
@classmethod
async def goto(
cls,
url: str,
*,
timeout: Optional[float] = 100000,
wait_until: Optional[
Literal["domcontentloaded", "load", "networkidle"]
] = "networkidle",
referer: str = None,
2021-12-16 11:16:28 +08:00
**kwargs
2021-11-23 21:44:59 +08:00
) -> Optional[Page]:
"""
2021-12-16 11:16:28 +08:00
说明
goto
参数
:param url: 网址
:param timeout: 超时限制
:param wait_until: 等待类型
:param referer:
2021-11-23 21:44:59 +08:00
"""
page = None
try:
2021-12-16 11:16:28 +08:00
page = await cls._new_page(**kwargs)
2021-11-23 21:44:59 +08:00
await page.goto(url, timeout=timeout, wait_until=wait_until, referer=referer)
return page
except Exception as e:
logger.warning(f"Playwright 访问 url{url} 发生错误 {type(e)}{e}")
if page:
await page.close()
return None
@classmethod
async def screenshot(
cls,
url: str,
path: Union[Path, str],
2022-02-19 18:20:19 +08:00
element: Union[str, List[str]],
2021-11-23 21:44:59 +08:00
*,
2022-04-04 20:33:37 +08:00
wait_time: Optional[int] = None,
2021-11-23 21:44:59 +08:00
viewport_size: Dict[str, int] = None,
wait_until: Optional[
Literal["domcontentloaded", "load", "networkidle"]
] = "networkidle",
timeout: float = None,
type_: Literal["jpeg", "png"] = None,
2021-12-16 11:16:28 +08:00
**kwargs
2021-11-23 21:44:59 +08:00
) -> Optional[MessageSegment]:
"""
2021-12-16 11:16:28 +08:00
说明
截图该方法仅用于简单快捷截图复杂截图请操作 page
参数
:param url: 网址
:param path: 存储路径
:param element: 元素选择
2022-04-04 20:33:37 +08:00
:param wait_time: 等待截取超时时间
2021-12-16 11:16:28 +08:00
:param viewport_size: 窗口大小
:param wait_until: 等待类型
:param timeout: 超时限制
:param type_: 保存类型
2021-11-23 21:44:59 +08:00
"""
page = None
if viewport_size is None:
viewport_size = dict(width=2560, height=1080)
if isinstance(path, str):
path = Path(path)
try:
2021-12-16 11:16:28 +08:00
page = await cls.goto(url, wait_until=wait_until, **kwargs)
2021-11-23 21:44:59 +08:00
await page.set_viewport_size(viewport_size)
2022-02-19 18:20:19 +08:00
if isinstance(element, str):
2022-04-04 20:33:37 +08:00
if wait_time:
card = await page.wait_for_selector(element, timeout=wait_time)
else:
card = await page.query_selector(element)
2022-02-19 18:20:19 +08:00
else:
card = page
for e in element:
2022-04-04 20:33:37 +08:00
if wait_time:
card = await card.wait_for_selector(e, timeout=wait_time)
else:
card = await card.query_selector(e)
2021-11-23 21:44:59 +08:00
await card.screenshot(path=path, timeout=timeout, type=type_)
return image(path)
except Exception as e:
logger.warning(f"Playwright 截图 url{url} element{element} 发生错误 {type(e)}{e}")
2021-12-01 14:03:34 +08:00
finally:
2021-11-23 21:44:59 +08:00
if page:
await page.close()
return None
class UrlPathNumberNotEqual(Exception):
pass
class BrowserIsNone(Exception):
pass