zhenxun_bot/zhenxun/utils/utils.py
HibiKier 1e2aa99207
Bugfix/fix aliyun (#2036)
* 🐛 修复数据库超时问题

* 🔧 移除帮助图片清理功能.

*  更新插件商店功能,允许在添加插件时指定源类型为 None。优化插件 ID 查找逻辑,增强代码可读性。新增 zhenxun/ui 模块导入。

* 🔧 优化数据访问和数据库上下文逻辑,移除不必要的全局变量和日志信息,调整日志级别为调试,提升代码可读性和性能。

*  增强插件商店功能,支持在下载文件时指定稀疏检出路径和目标目录。优化二进制文件处理逻辑,提升文件下载的准确性和效率。

*  增强阿里云和GitHub的文件管理功能,新增Git不可用异常处理,优化稀疏检出逻辑,提升代码可读性和稳定性。

*  增强插件下载功能,新增对下载结果的异常处理,确保在Git不可用时抛出相应异常信息。优化错误提示,提升用户体验。

*  增强插件商店功能,优化添加插件时的提示信息,明确区分插件模块和名称。新增 Windows 下删除只读文件的处理逻辑,提升插件管理的稳定性和用户体验。

*  优化文件内容获取逻辑,新增对非二进制文件的UTF-8解码处理,提升文件读取的稳定性和准确性。
2025-08-29 14:57:08 +08:00

268 lines
6.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime
import os
from pathlib import Path
import stat
import time
from types import TracebackType
from typing import Any, ClassVar
import httpx
from nonebot_plugin_uninfo import Uninfo
import pypinyin
from zhenxun.configs.config import Config
from zhenxun.services.log import logger
from .limiters import CountLimiter, FreqLimiter, UserBlockLimiter # noqa: F401
@dataclass
class EntityIDs:
user_id: str
"""用户id"""
group_id: str | None
"""群组id"""
channel_id: str | None
"""频道id"""
class ResourceDirManager:
"""
临时文件管理器
"""
temp_path: ClassVar[set[Path]] = set()
@classmethod
def __tree_append(cls, path: Path, deep: int = 1, current: int = 0):
"""递归添加文件夹"""
if current >= deep and deep != -1:
return
path = path.resolve() # 标准化路径
for f in os.listdir(path):
file = (path / f).resolve() # 标准化子路径
if file.is_dir():
if file not in cls.temp_path:
cls.temp_path.add(file)
logger.debug(f"添加临时文件夹: {file}")
cls.__tree_append(file, deep, current + 1)
@classmethod
def add_temp_dir(cls, path: str | Path, tree: bool = False, deep: int = 1):
"""添加临时清理文件夹,这些文件夹会被自动清理
参数:
path: 文件夹路径
tree: 是否递归添加文件夹
deep: 深度, -1 为无限深度
"""
if isinstance(path, str):
path = Path(path)
if path not in cls.temp_path:
cls.temp_path.add(path)
logger.debug(f"添加临时文件夹: {path}")
if tree:
cls.__tree_append(path, deep)
def is_binary_file(file_path: str) -> bool:
"""判断是否为二进制文件
参数:
file_path: 文件路径
返回:
bool: 是否为二进制文件
"""
# fmt: off
# 精简但包含图片和字体的二进制文件扩展名集合
BINARY_EXTENSIONS = frozenset({
# 图片文件
"jpg", "jpeg", "png", "gif", "bmp", "ico", "webp", "tiff", "tif", "svg",
# 字体文件
"ttf", "otf", "woff", "woff2", "eot",
# 压缩文件
"zip", "rar", "7z", "tar", "gz", "bz2", "xz",
# 可执行文件和库
"exe", "dll", "so", "dylib",
# 文档文件
"pdf", "doc", "docx", "xls", "xlsx", "ppt", "pptx",
# 多媒体文件
"mp3", "mp4", "avi", "mov", "wmv", "flv",
# 其他常见二进制文件
"bin", "dat", "db", "class", "pyc"
})
# 使用os.path.splitext高效提取扩展名
_, ext = os.path.splitext(file_path)
# 去除点号并转换为小写
ext_clean = ext.lstrip(".").lower()
return ext_clean in BINARY_EXTENSIONS
def cn2py(word: str) -> str:
"""将字符串转化为拼音
参数:
word: 文本
"""
return "".join("".join(i) for i in pypinyin.pinyin(word, style=pypinyin.NORMAL))
async def get_user_avatar(uid: int | str) -> bytes | None:
"""快捷获取用户头像
参数:
uid: 用户id
"""
url = f"http://q1.qlogo.cn/g?b=qq&nk={uid}&s=160"
async with httpx.AsyncClient() as client:
for _ in range(3):
try:
return (await client.get(url)).content
except Exception:
logger.error("获取用户头像错误", "Util", target=uid)
return None
async def get_group_avatar(gid: int | str) -> bytes | None:
"""快捷获取用群头像
参数:
gid: 群号
"""
url = f"http://p.qlogo.cn/gh/{gid}/{gid}/640/"
async with httpx.AsyncClient() as client:
for _ in range(3):
try:
return (await client.get(url)).content
except Exception:
logger.error("获取群头像错误", "Util", target=gid)
return None
def change_pixiv_image_links(
url: str, size: str | None = None, nginx_url: str | None = None
) -> str:
"""根据配置改变图片大小和反代链接
参数:
url: 图片原图链接
size: 模式
nginx_url: 反代
返回:
str: url
"""
if size == "master":
img_sp = url.rsplit(".", maxsplit=1)
url = img_sp[0]
img_type = img_sp[1]
url = url.replace("original", "master") + f"_master1200.{img_type}"
if not nginx_url:
nginx_url = Config.get_config("pixiv", "PIXIV_NGINX_URL")
if nginx_url:
url = (
url.replace("i.pximg.net", nginx_url)
.replace("i.pixiv.cat", nginx_url)
.replace("i.pixiv.re", nginx_url)
.replace("_webp", "")
)
return url
def change_img_md5(path_file: str | Path) -> bool:
"""改变图片MD5
参数:
path_file: 图片路径
返还:
bool: 是否修改成功
"""
try:
with open(path_file, "a") as f:
f.write(str(int(time.time() * 1000)))
return True
except Exception as e:
logger.warning(f"改变图片MD5错误 Path{path_file}", e=e)
return False
def is_valid_date(date_text: str, separator: str = "-") -> bool:
"""日期是否合法
参数:
date_text: 日期
separator: 分隔符
返回:
bool: 日期是否合法
"""
try:
datetime.strptime(date_text, f"%Y{separator}%m{separator}%d")
return True
except ValueError:
return False
def get_entity_ids(session: Uninfo) -> EntityIDs:
"""获取用户id群组id频道id
参数:
session: Uninfo
返回:
EntityIDs: 用户id群组id频道id
"""
user_id = session.user.id
group_id = None
channel_id = None
if session.group:
if session.group.parent:
group_id = session.group.parent.id
channel_id = session.group.id
else:
group_id = session.group.id
return EntityIDs(user_id=user_id, group_id=group_id, channel_id=channel_id)
def is_number(text: str) -> bool:
"""是否为数字
参数:
text: 文本
返回:
bool: 是否为数字
"""
try:
float(text)
return True
except ValueError:
return False
def win_on_rm_error(
func: Callable[[str], Any],
path: str,
_exc_info: tuple[type[BaseException], BaseException, TracebackType],
) -> None:
"""Windows下删除只读文件/目录时的回调。
去除只读属性后重试删除,避免 WinError 5。
"""
try:
os.chmod(path, stat.S_IWRITE)
except Exception:
# 即使去除权限失败也继续尝试
pass
try:
func(path)
except Exception:
# 仍失败则记录调试日志并忽略,交由上层继续处理
logger.debug(f"删除失败重试仍失败: {path}")