zhenxun_bot/zhenxun/utils/utils.py
HibiKier f9a38a26b2
Some checks are pending
检查bot是否运行正常 / bot check (push) Waiting to run
CodeQL Code Security Analysis / Analyze (${{ matrix.language }}) (none, javascript-typescript) (push) Waiting to run
CodeQL Code Security Analysis / Analyze (${{ matrix.language }}) (none, python) (push) Waiting to run
Sequential Lint and Type Check / ruff-call (push) Waiting to run
Sequential Lint and Type Check / pyright-call (push) Blocked by required conditions
Release Drafter / Update Release Draft (push) Waiting to run
Force Sync to Aliyun / sync (push) Waiting to run
Update Version / update-version (push) Waiting to run
🐛 修复群组申请通知 (#2026)
*  修复一些bug

- 移除不必要的定时器类,简化代码结构
- 优化好友请求处理逻辑,确保在自动同意和手动处理之间的清晰区分
- 更新缓存机制,避免重复处理相同的好友请求
- 新增判断文件是否为二进制文件的功能,提升文件处理的准确性
- 优化缓存字典的过期检查逻辑,提高性能和可读性

*  更新 get_async_client 函数,支持字符串类型的代理参数

- 修改 proxies 参数类型,允许传入字符串形式的代理地址
- 增强代理处理逻辑,将字符串代理转换为字典格式,提升灵活性和可用性
2025-08-19 16:20:52 +08:00

227 lines
5.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from dataclasses import dataclass
from datetime import datetime
import os
from pathlib import Path
import time
from typing import ClassVar
import httpx
from nonebot_plugin_uninfo import Uninfo
import pypinyin
from zhenxun.configs.config import Config
from zhenxun.services.log import logger
from .limiters import CountLimiter, FreqLimiter, UserBlockLimiter # noqa: F401
@dataclass
class EntityIDs:
user_id: str
"""用户id"""
group_id: str | None
"""群组id"""
channel_id: str | None
"""频道id"""
class ResourceDirManager:
"""
临时文件管理器
"""
temp_path: ClassVar[set[Path]] = set()
@classmethod
def __tree_append(cls, path: Path, deep: int = 1, current: int = 0):
"""递归添加文件夹"""
if current >= deep and deep != -1:
return
path = path.resolve() # 标准化路径
for f in os.listdir(path):
file = (path / f).resolve() # 标准化子路径
if file.is_dir():
if file not in cls.temp_path:
cls.temp_path.add(file)
logger.debug(f"添加临时文件夹: {file}")
cls.__tree_append(file, deep, current + 1)
@classmethod
def add_temp_dir(cls, path: str | Path, tree: bool = False, deep: int = 1):
"""添加临时清理文件夹,这些文件夹会被自动清理
参数:
path: 文件夹路径
tree: 是否递归添加文件夹
deep: 深度, -1 为无限深度
"""
if isinstance(path, str):
path = Path(path)
if path not in cls.temp_path:
cls.temp_path.add(path)
logger.debug(f"添加临时文件夹: {path}")
if tree:
cls.__tree_append(path, deep)
def is_binary_file(file_path: str) -> bool:
"""判断是否为二进制文件"""
binary_extensions = {
".jpg",
".jpeg",
".png",
".gif",
".bmp",
".ico",
".pdf",
".zip",
".rar",
".7z",
".exe",
".dll",
}
return any(file_path.lower().endswith(ext) for ext in binary_extensions)
def cn2py(word: str) -> str:
"""将字符串转化为拼音
参数:
word: 文本
"""
return "".join("".join(i) for i in pypinyin.pinyin(word, style=pypinyin.NORMAL))
async def get_user_avatar(uid: int | str) -> bytes | None:
"""快捷获取用户头像
参数:
uid: 用户id
"""
url = f"http://q1.qlogo.cn/g?b=qq&nk={uid}&s=160"
async with httpx.AsyncClient() as client:
for _ in range(3):
try:
return (await client.get(url)).content
except Exception:
logger.error("获取用户头像错误", "Util", target=uid)
return None
async def get_group_avatar(gid: int | str) -> bytes | None:
"""快捷获取用群头像
参数:
gid: 群号
"""
url = f"http://p.qlogo.cn/gh/{gid}/{gid}/640/"
async with httpx.AsyncClient() as client:
for _ in range(3):
try:
return (await client.get(url)).content
except Exception:
logger.error("获取群头像错误", "Util", target=gid)
return None
def change_pixiv_image_links(
url: str, size: str | None = None, nginx_url: str | None = None
) -> str:
"""根据配置改变图片大小和反代链接
参数:
url: 图片原图链接
size: 模式
nginx_url: 反代
返回:
str: url
"""
if size == "master":
img_sp = url.rsplit(".", maxsplit=1)
url = img_sp[0]
img_type = img_sp[1]
url = url.replace("original", "master") + f"_master1200.{img_type}"
if not nginx_url:
nginx_url = Config.get_config("pixiv", "PIXIV_NGINX_URL")
if nginx_url:
url = (
url.replace("i.pximg.net", nginx_url)
.replace("i.pixiv.cat", nginx_url)
.replace("i.pixiv.re", nginx_url)
.replace("_webp", "")
)
return url
def change_img_md5(path_file: str | Path) -> bool:
"""改变图片MD5
参数:
path_file: 图片路径
返还:
bool: 是否修改成功
"""
try:
with open(path_file, "a") as f:
f.write(str(int(time.time() * 1000)))
return True
except Exception as e:
logger.warning(f"改变图片MD5错误 Path{path_file}", e=e)
return False
def is_valid_date(date_text: str, separator: str = "-") -> bool:
"""日期是否合法
参数:
date_text: 日期
separator: 分隔符
返回:
bool: 日期是否合法
"""
try:
datetime.strptime(date_text, f"%Y{separator}%m{separator}%d")
return True
except ValueError:
return False
def get_entity_ids(session: Uninfo) -> EntityIDs:
"""获取用户id群组id频道id
参数:
session: Uninfo
返回:
EntityIDs: 用户id群组id频道id
"""
user_id = session.user.id
group_id = None
channel_id = None
if session.group:
if session.group.parent:
group_id = session.group.parent.id
channel_id = session.group.id
else:
group_id = session.group.id
return EntityIDs(user_id=user_id, group_id=group_id, channel_id=channel_id)
def is_number(text: str) -> bool:
"""是否为数字
参数:
text: 文本
返回:
bool: 是否为数字
"""
try:
float(text)
return True
except ValueError:
return False