From e618b19283c9a624d1eadde05bb09787ca4a6d70 Mon Sep 17 00:00:00 2001 From: AkashiCoin Date: Sat, 7 Sep 2024 16:17:29 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20=E4=BF=AE=E5=A4=8D=E8=87=AA?= =?UTF-8?q?=E6=A3=80=E5=9C=A8ARM=E4=B8=8A=E7=9A=84=E9=97=AE=E9=A2=98=20(#1?= =?UTF-8?q?607)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 🐛 修复自检在ARM上的问题 * :white_check_mark: 优化测试 --- tests/builtin_plugins/check/test_check.py | 231 +++++++++++++++++++ zhenxun/builtin_plugins/check/data_source.py | 67 ++++-- 2 files changed, 284 insertions(+), 14 deletions(-) create mode 100644 tests/builtin_plugins/check/test_check.py diff --git a/tests/builtin_plugins/check/test_check.py b/tests/builtin_plugins/check/test_check.py new file mode 100644 index 00000000..5dbd3f6c --- /dev/null +++ b/tests/builtin_plugins/check/test_check.py @@ -0,0 +1,231 @@ +import platform +from typing import cast +from pathlib import Path +from collections.abc import Callable + +import nonebot +from nonebug import App +from respx import MockRouter +from pytest_mock import MockerFixture +from nonebot.adapters.onebot.v11 import Bot +from nonebot.adapters.onebot.v11.event import GroupMessageEvent + +from tests.utils import _v11_group_message_event +from tests.config import BotId, UserId, GroupId, MessageId + +platform_uname = platform.uname_result( + system="Linux", + node="zhenxun", + release="5.15.0-1027-azure", + version="#1 SMP Debian 5.15.0-1027-azure", + machine="x86_64", +) # type: ignore +cpuinfo_get_cpu_info = {"brand_raw": "Intel(R) Core(TM) i7-10700K"} + + +def init_mocker(mocker: MockerFixture, tmp_path: Path): + mock_psutil = mocker.patch("zhenxun.builtin_plugins.check.data_source.psutil") + mock_cpuinfo = mocker.patch("zhenxun.builtin_plugins.check.data_source.cpuinfo") + mock_cpuinfo.get_cpu_info.return_value = cpuinfo_get_cpu_info + + mock_platform = mocker.patch("zhenxun.builtin_plugins.check.data_source.platform") + mock_platform.uname.return_value = platform_uname + + mock_template_to_pic = mocker.patch("zhenxun.builtin_plugins.check.template_to_pic") + mock_template_to_pic_return = mocker.AsyncMock() + mock_template_to_pic.return_value = mock_template_to_pic_return + + mock_build_message = mocker.patch( + "zhenxun.builtin_plugins.check.MessageUtils.build_message" + ) + mock_build_message_return = mocker.AsyncMock() + mock_build_message.return_value = mock_build_message_return + + mock_template_path_new = tmp_path / "resources" / "template" + mocker.patch( + "zhenxun.builtin_plugins.check.TEMPLATE_PATH", new=mock_template_path_new + ) + return ( + mock_psutil, + mock_cpuinfo, + mock_platform, + mock_template_to_pic, + mock_template_to_pic_return, + mock_build_message, + mock_build_message_return, + mock_template_path_new, + ) + + +async def test_check( + app: App, + mocker: MockerFixture, + mocked_api: MockRouter, + create_bot: Callable, + tmp_path: Path, +) -> None: + """ + 测试自检 + """ + from zhenxun.configs.config import BotConfig + from zhenxun.builtin_plugins.check import _matcher + from zhenxun.builtin_plugins.check.data_source import __get_version + + ( + mock_psutil, + mock_cpuinfo, + mock_platform, + mock_template_to_pic, + mock_template_to_pic_return, + mock_build_message, + mock_build_message_return, + mock_template_path_new, + ) = init_mocker(mocker, tmp_path) + async with app.test_matcher(_matcher) as ctx: + bot = create_bot(ctx) + bot: Bot = cast(Bot, bot) + raw_message = "自检" + event: GroupMessageEvent = _v11_group_message_event( + message=raw_message, + self_id=BotId.QQ_BOT, + user_id=UserId.SUPERUSER, + group_id=GroupId.GROUP_ID_LEVEL_5, + message_id=MessageId.MESSAGE_ID_3, + to_me=True, + ) + ctx.receive_event(bot=bot, event=event) + mock_template_to_pic.assert_awaited_once_with( + template_path=str((mock_template_path_new / "check").absolute()), + template_name="main.html", + templates={ + "data": { + "cpu_info": "1.0% - 1.0Ghz [1 core]", + "cpu_process": 1.0, + "ram_info": "1.0 / 1.0 GB", + "ram_process": 100.0, + "swap_info": "1.0 / 1.0 GB", + "swap_process": 100.0, + "disk_info": "1.0 / 1.0 GB", + "disk_process": 100.0, + "brand_raw": cpuinfo_get_cpu_info["brand_raw"], + "baidu": "red", + "google": "red", + "system": f"{platform_uname.system} " f"{platform_uname.release}", + "version": __get_version(), + "plugin_count": len(nonebot.get_loaded_plugins()), + "nickname": BotConfig.self_nickname, + } + }, + pages={ + "viewport": {"width": 195, "height": 750}, + "base_url": f"file://{mock_template_path_new.absolute()}", + }, + wait=2, + ) + mock_template_to_pic.assert_awaited_once() + mock_build_message.assert_called_once_with(mock_template_to_pic_return) + mock_build_message_return.send.assert_awaited_once() + + +async def test_check_arm( + app: App, + mocker: MockerFixture, + mocked_api: MockRouter, + create_bot: Callable, + tmp_path: Path, +) -> None: + """ + 测试自检(arm) + """ + from zhenxun.configs.config import BotConfig + from zhenxun.builtin_plugins.check import _matcher + from zhenxun.builtin_plugins.check.data_source import __get_version + + platform_uname_arm = platform.uname_result( + system="Linux", + node="zhenxun", + release="5.15.0-1017-oracle", + version="#22~20.04.1-Ubuntu SMP Wed Aug 24 11:13:15 UTC 2022", + machine="aarch64", + ) # type: ignore + mock_subprocess_check_output = mocker.patch( + "zhenxun.builtin_plugins.check.data_source.subprocess.check_output" + ) + mock_environ_copy = mocker.patch( + "zhenxun.builtin_plugins.check.data_source.os.environ.copy" + ) + mock_environ_copy_return = mocker.MagicMock() + mock_environ_copy.return_value = mock_environ_copy_return + ( + mock_psutil, + mock_cpuinfo, + mock_platform, + mock_template_to_pic, + mock_template_to_pic_return, + mock_build_message, + mock_build_message_return, + mock_template_path_new, + ) = init_mocker(mocker, tmp_path) + + mock_platform.uname.return_value = platform_uname_arm + mock_cpuinfo.get_cpu_info.return_value = {} + mock_psutil.cpu_freq.return_value = {} + + async with app.test_matcher(_matcher) as ctx: + bot = create_bot(ctx) + bot: Bot = cast(Bot, bot) + raw_message = "自检" + event: GroupMessageEvent = _v11_group_message_event( + message=raw_message, + self_id=BotId.QQ_BOT, + user_id=UserId.SUPERUSER, + group_id=GroupId.GROUP_ID_LEVEL_5, + message_id=MessageId.MESSAGE_ID_3, + to_me=True, + ) + ctx.receive_event(bot=bot, event=event) + mock_template_to_pic.assert_awaited_once_with( + template_path=str((mock_template_path_new / "check").absolute()), + template_name="main.html", + templates={ + "data": { + "cpu_info": "1.0% - 0.0Ghz [1 core]", + "cpu_process": 1.0, + "ram_info": "1.0 / 1.0 GB", + "ram_process": 100.0, + "swap_info": "1.0 / 1.0 GB", + "swap_process": 100.0, + "disk_info": "1.0 / 1.0 GB", + "disk_process": 100.0, + "brand_raw": "", + "baidu": "red", + "google": "red", + "system": f"{platform_uname_arm.system} " + f"{platform_uname_arm.release}", + "version": __get_version(), + "plugin_count": len(nonebot.get_loaded_plugins()), + "nickname": BotConfig.self_nickname, + } + }, + pages={ + "viewport": {"width": 195, "height": 750}, + "base_url": f"file://{mock_template_path_new.absolute()}", + }, + wait=2, + ) + mock_subprocess_check_output.assert_has_calls( + [ + mocker.call(["lscpu"], env=mock_environ_copy_return), + mocker.call().decode(), + mocker.call().decode().splitlines(), + mocker.call().decode().splitlines().__iter__(), + mocker.call(["dmidecode", "-s", "processor-frequency"]), + mocker.call().decode(), + mocker.call().decode().split(), + mocker.call().decode().split().__getitem__(0), + mocker.call().decode().split().__getitem__().__float__(), + ] # type: ignore + ) + mock_template_to_pic.assert_awaited_once() + mock_build_message.assert_called_once_with(mock_template_to_pic_return) + mock_build_message_return.send.assert_awaited_once() diff --git a/zhenxun/builtin_plugins/check/data_source.py b/zhenxun/builtin_plugins/check/data_source.py index 325e413a..34d7fc0b 100644 --- a/zhenxun/builtin_plugins/check/data_source.py +++ b/zhenxun/builtin_plugins/check/data_source.py @@ -1,22 +1,24 @@ +import os import platform -from dataclasses import dataclass +import subprocess from pathlib import Path +from dataclasses import dataclass +import psutil import cpuinfo import nonebot -import psutil -from httpx import ConnectTimeout, NetworkError -from nonebot.utils import run_sync from pydantic import BaseModel +from nonebot.utils import run_sync -from zhenxun.configs.config import BotConfig from zhenxun.services.log import logger +from zhenxun.configs.config import BotConfig from zhenxun.utils.http_utils import AsyncHttpx BAIDU_URL = "https://www.baidu.com/" GOOGLE_URL = "https://www.google.com/" VERSION_FILE = Path() / "__version__" +ARM_KEY = "aarch64" @dataclass @@ -31,9 +33,11 @@ class CPUInfo: @classmethod def get_cpu_info(cls): cpu_core = psutil.cpu_count(logical=False) - cpu_usage = psutil.cpu_percent(interval=1) - cpu_freq = round(psutil.cpu_freq().current / 1000, 2) - + cpu_usage = psutil.cpu_percent(interval=0.1) + if _cpu_freq := psutil.cpu_freq(): + cpu_freq = round(_cpu_freq.current / 1000, 2) + else: + cpu_freq = 0 return CPUInfo(core=cpu_core, usage=cpu_usage, freq=cpu_freq) @@ -102,8 +106,9 @@ class SystemInfo(BaseModel): def get_system_info(self): return { - "cpu_info": f"{self.cpu.usage}% - {self.cpu.freq}Ghz [{self.cpu.core} core]", - "cpu_process": psutil.cpu_percent(), + "cpu_info": f"{self.cpu.usage}% - {self.cpu.freq}Ghz " + f"[{self.cpu.core} core]", + "cpu_process": self.cpu.usage, "ram_info": f"{self.ram.usage} / {self.ram.total} GB", "ram_process": ( 0 if self.ram.total == 0 else (self.ram.usage / self.ram.total * 100) @@ -120,14 +125,14 @@ class SystemInfo(BaseModel): @run_sync -def __build_status() -> dict: +def __build_status() -> SystemInfo: """获取 `CPU` `RAM` `SWAP` `DISK` 信息""" cpu = CPUInfo.get_cpu_info() ram = RAMInfo.get_ram_info() swap = SwapMemory.get_swap_info() disk = DiskInfo.get_disk_info() - return SystemInfo(cpu=cpu, ram=ram, swap=swap, disk=disk).get_system_info() + return SystemInfo(cpu=cpu, ram=ram, swap=swap, disk=disk) async def __get_network_info(): @@ -155,16 +160,50 @@ def __get_version() -> str | None: return None +def __get_arm_cpu(): + env = os.environ.copy() + env["LC_ALL"] = "en_US.UTF-8" + cpu_info = subprocess.check_output(["lscpu"], env=env).decode() + model_name = "" + cpu_freq = 0 + for line in cpu_info.splitlines(): + if "Model name" in line: + model_name = line.split(":")[1].strip() + if "CPU MHz" in line: + cpu_freq = float(line.split(":")[1].strip()) + return model_name, cpu_freq + + +def __get_arm_oracle_cpu_freq(): + cpu_freq = subprocess.check_output( + ["dmidecode", "-s", "processor-frequency"] + ).decode() + return round(float(cpu_freq.split()[0]) / 1000, 2) + + async def get_status_info() -> dict: """获取信息""" data = await __build_status() + + system = platform.uname() + if system.machine == ARM_KEY and not ( + cpuinfo.get_cpu_info().get("brand_raw") and data.cpu.freq + ): + model_name, cpu_freq = __get_arm_cpu() + if not data.cpu.freq: + data.cpu.freq = cpu_freq or __get_arm_oracle_cpu_freq() + data = data.get_system_info() + data["brand_raw"] = model_name + else: + data = data.get_system_info() + data["brand_raw"] = cpuinfo.get_cpu_info().get("brand_raw", "Unknown") + baidu, google = await __get_network_info() data["baidu"] = "#8CC265" if baidu else "red" data["google"] = "#8CC265" if google else "red" - system = platform.uname() + data["system"] = f"{system.system} {system.release}" data["version"] = __get_version() - data["brand_raw"] = cpuinfo.get_cpu_info()["brand_raw"] data["plugin_count"] = len(nonebot.get_loaded_plugins()) data["nickname"] = BotConfig.self_nickname return data