Repair self-test

This commit is contained in:
HibiKier 2025-01-09 10:29:49 +08:00 committed by BalconyJH
parent 3ef700c167
commit 2df4fad9c5
No known key found for this signature in database
GPG Key ID: FF602923BD2A1FAF
5 changed files with 99 additions and 42 deletions

View File

@ -1,3 +1,4 @@
from collections import namedtuple
from collections.abc import Callable from collections.abc import Callable
from pathlib import Path from pathlib import Path
import platform import platform
@ -25,6 +26,40 @@ cpuinfo_get_cpu_info = {"brand_raw": "Intel(R) Core(TM) i7-10700K"}
def init_mocker(mocker: MockerFixture, tmp_path: Path): def init_mocker(mocker: MockerFixture, tmp_path: Path):
mock_psutil = mocker.patch("zhenxun.builtin_plugins.check.data_source.psutil") mock_psutil = mocker.patch("zhenxun.builtin_plugins.check.data_source.psutil")
# Define namedtuples for complex return values
CpuFreqs = namedtuple("CpuFreqs", ["current"]) # noqa: PYI024
VirtualMemoryInfo = namedtuple("VirtualMemoryInfo", ["used", "total", "percent"]) # noqa: PYI024
SwapInfo = namedtuple("SwapInfo", ["used", "total", "percent"]) # noqa: PYI024
DiskUsage = namedtuple("DiskUsage", ["used", "total", "free", "percent"]) # noqa: PYI024
# Set specific return values for psutil methods
mock_psutil.cpu_percent.return_value = 1.0 # CPU 使用率
mock_psutil.cpu_freq.return_value = CpuFreqs(current=0.0) # CPU 频率
mock_psutil.cpu_count.return_value = 1 # CPU 核心数
# Memory Info
mock_psutil.virtual_memory.return_value = VirtualMemoryInfo(
used=1 * 1024**3, # 1 GB in bytes for used memory
total=1 * 1024**3, # 1 GB in bytes for total memory
percent=100.0, # 100% of memory used
)
# Swap Info
mock_psutil.swap_memory.return_value = SwapInfo(
used=1 * 1024**3, # 1 GB in bytes for used swap space
total=1 * 1024**3, # 1 GB in bytes for total swap space
percent=100.0, # 100% of swap space used
)
# Disk Usage
mock_psutil.disk_usage.return_value = DiskUsage(
used=1 * 1024**3, # 1 GB in bytes for used disk space
total=1 * 1024**3, # 1 GB in bytes for total disk space
free=0, # No free space
percent=100.0, # 100% of disk space used
)
mock_cpuinfo = mocker.patch("zhenxun.builtin_plugins.check.data_source.cpuinfo") mock_cpuinfo = mocker.patch("zhenxun.builtin_plugins.check.data_source.cpuinfo")
mock_cpuinfo.get_cpu_info.return_value = cpuinfo_get_cpu_info mock_cpuinfo.get_cpu_info.return_value = cpuinfo_get_cpu_info
@ -95,30 +130,35 @@ async def test_check(
) )
ctx.receive_event(bot=bot, event=event) ctx.receive_event(bot=bot, event=event)
ctx.should_ignore_rule(_self_check_matcher) ctx.should_ignore_rule(_self_check_matcher)
print("Template to pic call args:", mock_template_to_pic.call_args_list)
data = {
"cpu_info": f"{mock_psutil.cpu_percent.return_value}% "
+ f"- {mock_psutil.cpu_freq.return_value.current}Ghz "
+ f"[{mock_psutil.cpu_count.return_value} core]",
"cpu_process": mock_psutil.cpu_percent.return_value,
"ram_info": f"{round(mock_psutil.virtual_memory.return_value.used / (1024 ** 3), 1)}" # noqa: E501
+ f" / {round(mock_psutil.virtual_memory.return_value.total / (1024 ** 3), 1)}"
+ " GB",
"ram_process": mock_psutil.virtual_memory.return_value.percent,
"swap_info": f"{round(mock_psutil.swap_memory.return_value.used / (1024 ** 3), 1)}" # noqa: E501
+ f" / {round(mock_psutil.swap_memory.return_value.total / (1024 ** 3), 1)} GB",
"swap_process": mock_psutil.swap_memory.return_value.percent,
"disk_info": f"{round(mock_psutil.disk_usage.return_value.used / (1024 ** 3), 1)}" # noqa: E501
+ f" / {round(mock_psutil.disk_usage.return_value.total / (1024 ** 3), 1)} GB",
"disk_process": mock_psutil.disk_usage.return_value.percent,
"brand_raw": cpuinfo_get_cpu_info["brand_raw"],
"baidu": "red",
"google": "red",
"system": f"{platform_uname.system} " f"{platform_uname.release}",
"version": __get_version(),
"plugin_count": len(nonebot.get_loaded_plugins()),
"nickname": BotConfig.self_nickname,
}
mock_template_to_pic.assert_awaited_once_with( mock_template_to_pic.assert_awaited_once_with(
template_path=str((mock_template_path_new / "check").absolute()), template_path=str((mock_template_path_new / "check").absolute()),
template_name="main.html", template_name="main.html",
templates={ templates={"data": data},
"data": {
"cpu_info": "1.0% - 1.0Ghz [1 core]",
"cpu_process": 1.0,
"ram_info": "1.0 / 1.0 GB",
"ram_process": 100.0,
"swap_info": "1.0 / 1.0 GB",
"swap_process": 100.0,
"disk_info": "1.0 / 1.0 GB",
"disk_process": 100.0,
"brand_raw": cpuinfo_get_cpu_info["brand_raw"],
"baidu": "red",
"google": "red",
"system": f"{platform_uname.system} " f"{platform_uname.release}",
"version": __get_version(),
"plugin_count": len(nonebot.get_loaded_plugins()),
"nickname": BotConfig.self_nickname,
}
},
pages={ pages={
"viewport": {"width": 195, "height": 750}, "viewport": {"width": 195, "height": 750},
"base_url": f"file://{mock_template_path_new.absolute()}", "base_url": f"file://{mock_template_path_new.absolute()}",

View File

@ -47,7 +47,7 @@ def pytest_configure(config: pytest.Config) -> None:
}, },
"host": "127.0.0.1", "host": "127.0.0.1",
"port": 8080, "port": 8080,
"log_level": "DEBUG", "log_level": "INFO",
} }
@ -60,9 +60,7 @@ def _init_bot(nonebug_init: None):
nonebot.load_plugin("nonebot_plugin_alconna") nonebot.load_plugin("nonebot_plugin_alconna")
nonebot.load_plugin("nonebot_plugin_apscheduler") nonebot.load_plugin("nonebot_plugin_apscheduler")
nonebot.load_plugin("nonebot_plugin_userinfo")
nonebot.load_plugin("nonebot_plugin_htmlrender") nonebot.load_plugin("nonebot_plugin_htmlrender")
nonebot.load_plugins("zhenxun/builtin_plugins") nonebot.load_plugins("zhenxun/builtin_plugins")
nonebot.load_plugins("zhenxun/plugins") nonebot.load_plugins("zhenxun/plugins")

View File

@ -7,6 +7,7 @@ from zhenxun.configs.utils import PluginExtraData
from zhenxun.services.log import logger from zhenxun.services.log import logger
from zhenxun.utils.enum import PluginType from zhenxun.utils.enum import PluginType
from zhenxun.utils.message import MessageUtils from zhenxun.utils.message import MessageUtils
from zhenxun.utils.utils import is_number
from .data_source import ShopManage from .data_source import ShopManage
@ -31,10 +32,10 @@ __plugin_meta__ = PluginMetadata(
_matcher = on_alconna( _matcher = on_alconna(
Alconna( Alconna(
"插件商店", "插件商店",
Subcommand("add", Args["plugin_id", int | str]), Subcommand("add", Args["plugin_id", str]),
Subcommand("remove", Args["plugin_id", int | str]), Subcommand("remove", Args["plugin_id", str]),
Subcommand("search", Args["plugin_name_or_author", str]), Subcommand("search", Args["plugin_name_or_author", str]),
Subcommand("update", Args["plugin_id", int | str]), Subcommand("update", Args["plugin_id", str]),
Subcommand("update_all"), Subcommand("update_all"),
), ),
permission=SUPERUSER, permission=SUPERUSER,
@ -90,12 +91,12 @@ async def _(session: EventSession):
@_matcher.assign("add") @_matcher.assign("add")
async def _(session: EventSession, plugin_id: int | str): async def _(session: EventSession, plugin_id: str):
try: try:
if isinstance(plugin_id, str): if is_number(plugin_id):
await MessageUtils.build_message(f"正在添加插件 Module: {plugin_id}").send()
else:
await MessageUtils.build_message(f"正在添加插件 Id: {plugin_id}").send() await MessageUtils.build_message(f"正在添加插件 Id: {plugin_id}").send()
else:
await MessageUtils.build_message(f"正在添加插件 Module: {plugin_id}").send()
result = await ShopManage.add_plugin(plugin_id) result = await ShopManage.add_plugin(plugin_id)
except Exception as e: except Exception as e:
logger.error(f"添加插件 Id: {plugin_id}失败", "插件商店", session=session, e=e) logger.error(f"添加插件 Id: {plugin_id}失败", "插件商店", session=session, e=e)
@ -107,7 +108,7 @@ async def _(session: EventSession, plugin_id: int | str):
@_matcher.assign("remove") @_matcher.assign("remove")
async def _(session: EventSession, plugin_id: int | str): async def _(session: EventSession, plugin_id: str):
try: try:
result = await ShopManage.remove_plugin(plugin_id) result = await ShopManage.remove_plugin(plugin_id)
except Exception as e: except Exception as e:
@ -138,12 +139,12 @@ async def _(session: EventSession, plugin_name_or_author: str):
@_matcher.assign("update") @_matcher.assign("update")
async def _(session: EventSession, plugin_id: int | str): async def _(session: EventSession, plugin_id: str):
try: try:
if isinstance(plugin_id, str): if is_number(plugin_id):
await MessageUtils.build_message(f"正在更新插件 Module: {plugin_id}").send()
else:
await MessageUtils.build_message(f"正在更新插件 Id: {plugin_id}").send() await MessageUtils.build_message(f"正在更新插件 Id: {plugin_id}").send()
else:
await MessageUtils.build_message(f"正在更新插件 Module: {plugin_id}").send()
result = await ShopManage.update_plugin(plugin_id) result = await ShopManage.update_plugin(plugin_id)
except Exception as e: except Exception as e:
logger.error(f"更新插件 Id: {plugin_id}失败", "插件商店", session=session, e=e) logger.error(f"更新插件 Id: {plugin_id}失败", "插件商店", session=session, e=e)

View File

@ -14,6 +14,7 @@ from zhenxun.utils.github_utils import GithubUtils
from zhenxun.utils.github_utils.models import RepoAPI from zhenxun.utils.github_utils.models import RepoAPI
from zhenxun.utils.http_utils import AsyncHttpx from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.utils.image_utils import BuildImage, ImageTemplate, RowStyle from zhenxun.utils.image_utils import BuildImage, ImageTemplate, RowStyle
from zhenxun.utils.utils import is_number
from .config import BASE_PATH, DEFAULT_GITHUB_URL, EXTRA_GITHUB_URL from .config import BASE_PATH, DEFAULT_GITHUB_URL, EXTRA_GITHUB_URL
@ -175,7 +176,7 @@ class ShopManage:
) )
@classmethod @classmethod
async def add_plugin(cls, plugin_id: int | str) -> str: async def add_plugin(cls, plugin_id: str) -> str:
"""添加插件 """添加插件
参数: 参数:
@ -268,7 +269,7 @@ class ShopManage:
raise Exception("插件下载失败") raise Exception("插件下载失败")
@classmethod @classmethod
async def remove_plugin(cls, plugin_id: int | str) -> str: async def remove_plugin(cls, plugin_id: str) -> str:
"""移除插件 """移除插件
参数: 参数:
@ -344,7 +345,7 @@ class ShopManage:
) )
@classmethod @classmethod
async def update_plugin(cls, plugin_id: int | str) -> str: async def update_plugin(cls, plugin_id: str) -> str:
"""更新插件 """更新插件
参数: 参数:
@ -441,12 +442,13 @@ class ShopManage:
) )
@classmethod @classmethod
async def _resolve_plugin_key(cls, plugin_id: int | str) -> str: async def _resolve_plugin_key(cls, plugin_id: str) -> str:
data: dict[str, StorePluginInfo] = await cls.get_data() data: dict[str, StorePluginInfo] = await cls.get_data()
if isinstance(plugin_id, int): if is_number(plugin_id):
if plugin_id < 0 or plugin_id >= len(data): idx = int(plugin_id)
if idx < 0 or idx >= len(data):
raise ValueError("插件ID不存在...") raise ValueError("插件ID不存在...")
return list(data.keys())[plugin_id] return list(data.keys())[idx]
elif isinstance(plugin_id, str): elif isinstance(plugin_id, str):
if plugin_id not in [v.module for k, v in data.items()]: if plugin_id not in [v.module for k, v in data.items()]:
raise ValueError("插件Module不存在...") raise ValueError("插件Module不存在...")

View File

@ -228,3 +228,19 @@ def is_valid_date(date_text: str, separator: str = "-") -> bool:
return True return True
except ValueError: except ValueError:
return False return False
def is_number(text: str) -> bool:
"""是否为数字
参数:
text: 文本
返回:
bool: 是否为数字
"""
try:
float(text)
return True
except ValueError:
return False