zhenxun_plugin_farm/request.py

295 lines
9.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import httpx
from rich.progress import (
BarColumn,
DownloadColumn,
Progress,
TextColumn,
TimeRemainingColumn,
TransferSpeedColumn,
)
from zhenxun.configs.config import Config
from zhenxun.services.log import logger
from .config import g_sPlantPath, g_sSignInPath
from .dbService import g_pDBService
from .tool import g_pToolManager
class CRequestManager:
m_sTokens = "xZ%?z5LtWV7H:0-Xnwp+bNRNQ-jbfrxG"
@classmethod
async def download(
cls,
url: str,
savePath: str,
fileName: str,
params: dict | None = None,
jsonData: dict | None = None,
) -> bool:
"""下载文件到指定路径并覆盖已存在的文件,并显示下载进度条
Args:
url (str): 文件的下载链接
savePath (str): 保存文件夹路径
fileName (str): 保存后的文件名
params (dict | None): 可选的 URL 查询参数
jsonData (dict | None): 可选的 JSON 请求体
Returns:
bool: 是否下载成功
"""
headers = {"token": cls.m_sTokens}
try:
async with httpx.AsyncClient(timeout=30.0) as client:
requestArgs: dict = {"headers": headers}
if params:
requestArgs["params"] = params
if jsonData:
requestArgs["json"] = jsonData
response = await client.request(
"GET", url, **requestArgs, follow_redirects=True
)
if response.status_code != 200:
logger.warning(
f"文件下载失败: HTTP {response.status_code} {response.text}"
)
return False
totalLength = int(response.headers.get("Content-Length", 0))
fullPath = os.path.join(savePath, fileName)
os.makedirs(os.path.dirname(fullPath), exist_ok=True)
with Progress(
TextColumn("[progress.description]{task.description}"),
BarColumn(),
DownloadColumn(),
TransferSpeedColumn(),
TimeRemainingColumn(),
transient=True,
) as progress:
task = progress.add_task(
f"[green]【真寻农场】正在下载 {fileName}", total=totalLength
)
with open(fullPath, "wb") as f:
async for chunk in response.aiter_bytes(chunk_size=1024):
f.write(chunk)
progress.advance(task, len(chunk))
return True
except Exception as e:
logger.warning(f"下载文件异常: {e}")
return False
@classmethod
async def post(cls, endpoint: str, name: str = "", jsonData: dict = {}) -> dict:
"""发送POST请求到指定接口统一调用仅支持JSON格式数据
Args:
endpoint (str): 请求的接口路径
name (str, optional): 操作名称用于日志记录
jsonData (dict): 以JSON格式发送的数据
Raises:
ValueError: 当jsonData未提供时抛出
Returns:
dict: 返回请求结果的JSON数据
"""
baseUrl = Config.get_config("zhenxun_plugin_farm", "服务地址")
url = f"{baseUrl.rstrip('/')}:8998/{endpoint.lstrip('/')}"
headers = {"token": cls.m_sTokens}
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.post(url, json=jsonData, headers=headers)
if response.status_code == 200:
return response.json()
else:
logger.warning(
f"{name}请求失败: HTTP {response.status_code} {response.text}"
)
return {}
except httpx.RequestError as e:
logger.warning(f"{name}请求异常", e=e)
return {}
except Exception as e:
logger.warning(f"{name}处理异常", e=e)
return {}
@classmethod
async def get(cls, endpoint: str, name: str = "") -> dict:
"""发送GET请求到指定接口统一调用仅支持无体的查询
Args:
endpoint (str): 请求的接口路径
name (str, optional): 操作名称用于日志记录
Returns:
dict: 返回请求结果的JSON数据
"""
baseUrl = Config.get_config("zhenxun_plugin_farm", "服务地址")
url = f"{baseUrl.rstrip('/')}:8998/{endpoint.lstrip('/')}"
headers = {"token": cls.m_sTokens}
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(url, headers=headers)
if response.status_code == 200:
return response.json()
else:
logger.warning(
f"{name}请求失败: HTTP {response.status_code} {response.text}"
)
return {}
except httpx.RequestError as e:
logger.warning(f"{name}请求异常", e=e)
return {}
except Exception as e:
logger.warning(f"{name}处理异常", e=e)
return {}
@classmethod
async def initSignInFile(cls) -> bool:
if os.path.exists(g_sSignInPath):
try:
with open(g_sSignInPath, encoding="utf-8") as f:
content = f.read()
sign = json.loads(content)
date = sign.get("date", "")
yearMonth = g_pToolManager.dateTime().now().strftime("%Y%m")
if date == yearMonth:
logger.debug("真寻农场签到文件检查完毕")
return True
else:
logger.warning("真寻农场签到文件检查失败, 即将下载")
return await cls.downloadSignInFile()
except json.JSONDecodeError:
logger.warning("真寻农场签到文件格式错误, 即将下载")
return await cls.downloadSignInFile()
else:
return await cls.downloadSignInFile()
@classmethod
async def downloadSignInFile(cls) -> bool:
"""下载签到文件,并重命名为 sign_in.json
Returns:
bool: 是否下载成功
"""
try:
baseUrl = Config.get_config("zhenxun_plugin_farm", "服务地址")
url = f"{baseUrl.rstrip('/')}:8998/sign_in"
path = str(g_sSignInPath.parent.resolve(strict=False))
yearMonth = g_pToolManager.dateTime().now().strftime("%Y%m")
# 下载为 signTemp.json
success = await cls.download(
url=url,
savePath=path,
fileName="signTemp.json",
jsonData={"date": yearMonth},
)
if not success:
return False
# 重命名为 sign_in.json
g_pToolManager.renameFile(f"{path}/signTemp.json", "sign_in.json")
return True
except Exception as e:
logger.error("下载签到文件失败", e=e)
return False
@classmethod
async def initPlantDBFile(cls) -> bool:
"""检查本地 plant.db 版本,如远程版本更新则重新下载
Returns:
bool: 是否为最新版或成功更新
"""
versionPath = os.path.join(os.path.dirname(g_sPlantPath), "version.json")
try:
with open(versionPath, encoding="utf-8") as f:
localVersion = json.load(f).get("version", 0)
except Exception as e:
logger.warning(f"读取本地版本失败默认版本为0: {e}")
localVersion = 0
remoteInfo = await cls.get("plant_version", name="版本检查")
remoteVersion = remoteInfo.get("version")
if remoteVersion is None:
logger.warning("获取远程版本失败")
return False
if float(remoteVersion) <= float(localVersion):
logger.debug("plant.db 已为最新版本")
return True
logger.warning(
f"发现新版本 plant.db远程: {remoteVersion} / 本地: {localVersion}),开始更新..."
)
# 先断开数据库连接
await g_pDBService.cleanup()
return await cls.downloadPlantDBFile(remoteVersion)
@classmethod
async def downloadPlantDBFile(cls, remoteVersion: float) -> bool:
"""下载最新版 plant.db 并更新本地 version.json
Args:
remoteVersion (float): 远程版本号
Returns:
bool: 是否下载并更新成功
"""
baseUrl = Config.get_config("zhenxun_plugin_farm", "服务地址")
savePath = os.path.dirname(g_sPlantPath)
success = await cls.download(
url=f"{baseUrl.rstrip('/')}:8998/file/plant.db",
savePath=savePath,
fileName="plantTemp.db",
)
if not success:
return False
# 重命名为 sign_in.json
g_pToolManager.renameFile(f"{savePath}/plantTemp.db", "plant.db")
versionPath = os.path.join(savePath, "version.json")
try:
with open(versionPath, "w", encoding="utf-8") as f:
json.dump({"version": remoteVersion}, f)
logger.debug("版本文件已更新")
except Exception as e:
logger.warning(f"写入版本文件失败: {e}")
return False
await g_pDBService.plant.init()
await g_pDBService.plant.downloadPlant()
return True
g_pRequestManager = CRequestManager()