Repair self-test

This commit is contained in:
HibiKier 2025-01-09 10:29:49 +08:00 committed by BalconyJH
parent 73d2ed444e
commit 0462703b13
5 changed files with 99 additions and 42 deletions

View File

@ -1,3 +1,4 @@
from collections import namedtuple
from collections.abc import Callable
from pathlib import Path
import platform
@ -25,6 +26,40 @@ cpuinfo_get_cpu_info = {"brand_raw": "Intel(R) Core(TM) i7-10700K"}
def init_mocker(mocker: MockerFixture, tmp_path: Path):
mock_psutil = mocker.patch("zhenxun.builtin_plugins.check.data_source.psutil")
# Define namedtuples for complex return values
CpuFreqs = namedtuple("CpuFreqs", ["current"]) # noqa: PYI024
VirtualMemoryInfo = namedtuple("VirtualMemoryInfo", ["used", "total", "percent"]) # noqa: PYI024
SwapInfo = namedtuple("SwapInfo", ["used", "total", "percent"]) # noqa: PYI024
DiskUsage = namedtuple("DiskUsage", ["used", "total", "free", "percent"]) # noqa: PYI024
# Set specific return values for psutil methods
mock_psutil.cpu_percent.return_value = 1.0 # CPU 使用率
mock_psutil.cpu_freq.return_value = CpuFreqs(current=0.0) # CPU 频率
mock_psutil.cpu_count.return_value = 1 # CPU 核心数
# Memory Info
mock_psutil.virtual_memory.return_value = VirtualMemoryInfo(
used=1 * 1024**3, # 1 GB in bytes for used memory
total=1 * 1024**3, # 1 GB in bytes for total memory
percent=100.0, # 100% of memory used
)
# Swap Info
mock_psutil.swap_memory.return_value = SwapInfo(
used=1 * 1024**3, # 1 GB in bytes for used swap space
total=1 * 1024**3, # 1 GB in bytes for total swap space
percent=100.0, # 100% of swap space used
)
# Disk Usage
mock_psutil.disk_usage.return_value = DiskUsage(
used=1 * 1024**3, # 1 GB in bytes for used disk space
total=1 * 1024**3, # 1 GB in bytes for total disk space
free=0, # No free space
percent=100.0, # 100% of disk space used
)
mock_cpuinfo = mocker.patch("zhenxun.builtin_plugins.check.data_source.cpuinfo")
mock_cpuinfo.get_cpu_info.return_value = cpuinfo_get_cpu_info
@ -95,30 +130,35 @@ async def test_check(
)
ctx.receive_event(bot=bot, event=event)
ctx.should_ignore_rule(_self_check_matcher)
print("Template to pic call args:", mock_template_to_pic.call_args_list)
data = {
"cpu_info": f"{mock_psutil.cpu_percent.return_value}% "
+ f"- {mock_psutil.cpu_freq.return_value.current}Ghz "
+ f"[{mock_psutil.cpu_count.return_value} core]",
"cpu_process": mock_psutil.cpu_percent.return_value,
"ram_info": f"{round(mock_psutil.virtual_memory.return_value.used / (1024 ** 3), 1)}" # noqa: E501
+ f" / {round(mock_psutil.virtual_memory.return_value.total / (1024 ** 3), 1)}"
+ " GB",
"ram_process": mock_psutil.virtual_memory.return_value.percent,
"swap_info": f"{round(mock_psutil.swap_memory.return_value.used / (1024 ** 3), 1)}" # noqa: E501
+ f" / {round(mock_psutil.swap_memory.return_value.total / (1024 ** 3), 1)} GB",
"swap_process": mock_psutil.swap_memory.return_value.percent,
"disk_info": f"{round(mock_psutil.disk_usage.return_value.used / (1024 ** 3), 1)}" # noqa: E501
+ f" / {round(mock_psutil.disk_usage.return_value.total / (1024 ** 3), 1)} GB",
"disk_process": mock_psutil.disk_usage.return_value.percent,
"brand_raw": cpuinfo_get_cpu_info["brand_raw"],
"baidu": "red",
"google": "red",
"system": f"{platform_uname.system} " f"{platform_uname.release}",
"version": __get_version(),
"plugin_count": len(nonebot.get_loaded_plugins()),
"nickname": BotConfig.self_nickname,
}
mock_template_to_pic.assert_awaited_once_with(
template_path=str((mock_template_path_new / "check").absolute()),
template_name="main.html",
templates={
"data": {
"cpu_info": "1.0% - 1.0Ghz [1 core]",
"cpu_process": 1.0,
"ram_info": "1.0 / 1.0 GB",
"ram_process": 100.0,
"swap_info": "1.0 / 1.0 GB",
"swap_process": 100.0,
"disk_info": "1.0 / 1.0 GB",
"disk_process": 100.0,
"brand_raw": cpuinfo_get_cpu_info["brand_raw"],
"baidu": "red",
"google": "red",
"system": f"{platform_uname.system} " f"{platform_uname.release}",
"version": __get_version(),
"plugin_count": len(nonebot.get_loaded_plugins()),
"nickname": BotConfig.self_nickname,
}
},
templates={"data": data},
pages={
"viewport": {"width": 195, "height": 750},
"base_url": f"file://{mock_template_path_new.absolute()}",

View File

@ -47,7 +47,7 @@ def pytest_configure(config: pytest.Config) -> None:
},
"host": "127.0.0.1",
"port": 8080,
"log_level": "DEBUG",
"log_level": "INFO",
}
@ -60,9 +60,7 @@ def _init_bot(nonebug_init: None):
nonebot.load_plugin("nonebot_plugin_alconna")
nonebot.load_plugin("nonebot_plugin_apscheduler")
nonebot.load_plugin("nonebot_plugin_userinfo")
nonebot.load_plugin("nonebot_plugin_htmlrender")
nonebot.load_plugins("zhenxun/builtin_plugins")
nonebot.load_plugins("zhenxun/plugins")

View File

@ -7,6 +7,7 @@ from zhenxun.configs.utils import PluginExtraData
from zhenxun.services.log import logger
from zhenxun.utils.enum import PluginType
from zhenxun.utils.message import MessageUtils
from zhenxun.utils.utils import is_number
from .data_source import ShopManage
@ -31,10 +32,10 @@ __plugin_meta__ = PluginMetadata(
_matcher = on_alconna(
Alconna(
"插件商店",
Subcommand("add", Args["plugin_id", int | str]),
Subcommand("remove", Args["plugin_id", int | str]),
Subcommand("add", Args["plugin_id", str]),
Subcommand("remove", Args["plugin_id", str]),
Subcommand("search", Args["plugin_name_or_author", str]),
Subcommand("update", Args["plugin_id", int | str]),
Subcommand("update", Args["plugin_id", str]),
Subcommand("update_all"),
),
permission=SUPERUSER,
@ -90,12 +91,12 @@ async def _(session: EventSession):
@_matcher.assign("add")
async def _(session: EventSession, plugin_id: int | str):
async def _(session: EventSession, plugin_id: str):
try:
if isinstance(plugin_id, str):
await MessageUtils.build_message(f"正在添加插件 Module: {plugin_id}").send()
else:
if is_number(plugin_id):
await MessageUtils.build_message(f"正在添加插件 Id: {plugin_id}").send()
else:
await MessageUtils.build_message(f"正在添加插件 Module: {plugin_id}").send()
result = await ShopManage.add_plugin(plugin_id)
except Exception as e:
logger.error(f"添加插件 Id: {plugin_id}失败", "插件商店", session=session, e=e)
@ -107,7 +108,7 @@ async def _(session: EventSession, plugin_id: int | str):
@_matcher.assign("remove")
async def _(session: EventSession, plugin_id: int | str):
async def _(session: EventSession, plugin_id: str):
try:
result = await ShopManage.remove_plugin(plugin_id)
except Exception as e:
@ -138,12 +139,12 @@ async def _(session: EventSession, plugin_name_or_author: str):
@_matcher.assign("update")
async def _(session: EventSession, plugin_id: int | str):
async def _(session: EventSession, plugin_id: str):
try:
if isinstance(plugin_id, str):
await MessageUtils.build_message(f"正在更新插件 Module: {plugin_id}").send()
else:
if is_number(plugin_id):
await MessageUtils.build_message(f"正在更新插件 Id: {plugin_id}").send()
else:
await MessageUtils.build_message(f"正在更新插件 Module: {plugin_id}").send()
result = await ShopManage.update_plugin(plugin_id)
except Exception as e:
logger.error(f"更新插件 Id: {plugin_id}失败", "插件商店", session=session, e=e)

View File

@ -14,6 +14,7 @@ from zhenxun.utils.github_utils import GithubUtils
from zhenxun.utils.github_utils.models import RepoAPI
from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.utils.image_utils import BuildImage, ImageTemplate, RowStyle
from zhenxun.utils.utils import is_number
from .config import BASE_PATH, DEFAULT_GITHUB_URL, EXTRA_GITHUB_URL
@ -175,7 +176,7 @@ class ShopManage:
)
@classmethod
async def add_plugin(cls, plugin_id: int | str) -> str:
async def add_plugin(cls, plugin_id: str) -> str:
"""添加插件
参数:
@ -268,7 +269,7 @@ class ShopManage:
raise Exception("插件下载失败")
@classmethod
async def remove_plugin(cls, plugin_id: int | str) -> str:
async def remove_plugin(cls, plugin_id: str) -> str:
"""移除插件
参数:
@ -344,7 +345,7 @@ class ShopManage:
)
@classmethod
async def update_plugin(cls, plugin_id: int | str) -> str:
async def update_plugin(cls, plugin_id: str) -> str:
"""更新插件
参数:
@ -441,12 +442,13 @@ class ShopManage:
)
@classmethod
async def _resolve_plugin_key(cls, plugin_id: int | str) -> str:
async def _resolve_plugin_key(cls, plugin_id: str) -> str:
data: dict[str, StorePluginInfo] = await cls.get_data()
if isinstance(plugin_id, int):
if plugin_id < 0 or plugin_id >= len(data):
if is_number(plugin_id):
idx = int(plugin_id)
if idx < 0 or idx >= len(data):
raise ValueError("插件ID不存在...")
return list(data.keys())[plugin_id]
return list(data.keys())[idx]
elif isinstance(plugin_id, str):
if plugin_id not in [v.module for k, v in data.items()]:
raise ValueError("插件Module不存在...")

View File

@ -228,3 +228,19 @@ def is_valid_date(date_text: str, separator: str = "-") -> bool:
return True
except ValueError:
return False
def is_number(text: str) -> bool:
"""是否为数字
参数:
text: 文本
返回:
bool: 是否为数字
"""
try:
float(text)
return True
except ValueError:
return False