🐛 修复自检在ARM上的问题 (#1607)

* 🐛 修复自检在ARM上的问题

*  优化测试
This commit is contained in:
AkashiCoin 2024-09-07 16:17:29 +08:00 committed by GitHub
parent 62fd93c1ff
commit e618b19283
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 284 additions and 14 deletions

View File

@ -0,0 +1,231 @@
import platform
from typing import cast
from pathlib import Path
from collections.abc import Callable
import nonebot
from nonebug import App
from respx import MockRouter
from pytest_mock import MockerFixture
from nonebot.adapters.onebot.v11 import Bot
from nonebot.adapters.onebot.v11.event import GroupMessageEvent
from tests.utils import _v11_group_message_event
from tests.config import BotId, UserId, GroupId, MessageId
platform_uname = platform.uname_result(
system="Linux",
node="zhenxun",
release="5.15.0-1027-azure",
version="#1 SMP Debian 5.15.0-1027-azure",
machine="x86_64",
) # type: ignore
cpuinfo_get_cpu_info = {"brand_raw": "Intel(R) Core(TM) i7-10700K"}
def init_mocker(mocker: MockerFixture, tmp_path: Path):
mock_psutil = mocker.patch("zhenxun.builtin_plugins.check.data_source.psutil")
mock_cpuinfo = mocker.patch("zhenxun.builtin_plugins.check.data_source.cpuinfo")
mock_cpuinfo.get_cpu_info.return_value = cpuinfo_get_cpu_info
mock_platform = mocker.patch("zhenxun.builtin_plugins.check.data_source.platform")
mock_platform.uname.return_value = platform_uname
mock_template_to_pic = mocker.patch("zhenxun.builtin_plugins.check.template_to_pic")
mock_template_to_pic_return = mocker.AsyncMock()
mock_template_to_pic.return_value = mock_template_to_pic_return
mock_build_message = mocker.patch(
"zhenxun.builtin_plugins.check.MessageUtils.build_message"
)
mock_build_message_return = mocker.AsyncMock()
mock_build_message.return_value = mock_build_message_return
mock_template_path_new = tmp_path / "resources" / "template"
mocker.patch(
"zhenxun.builtin_plugins.check.TEMPLATE_PATH", new=mock_template_path_new
)
return (
mock_psutil,
mock_cpuinfo,
mock_platform,
mock_template_to_pic,
mock_template_to_pic_return,
mock_build_message,
mock_build_message_return,
mock_template_path_new,
)
async def test_check(
app: App,
mocker: MockerFixture,
mocked_api: MockRouter,
create_bot: Callable,
tmp_path: Path,
) -> None:
"""
测试自检
"""
from zhenxun.configs.config import BotConfig
from zhenxun.builtin_plugins.check import _matcher
from zhenxun.builtin_plugins.check.data_source import __get_version
(
mock_psutil,
mock_cpuinfo,
mock_platform,
mock_template_to_pic,
mock_template_to_pic_return,
mock_build_message,
mock_build_message_return,
mock_template_path_new,
) = init_mocker(mocker, tmp_path)
async with app.test_matcher(_matcher) as ctx:
bot = create_bot(ctx)
bot: Bot = cast(Bot, bot)
raw_message = "自检"
event: GroupMessageEvent = _v11_group_message_event(
message=raw_message,
self_id=BotId.QQ_BOT,
user_id=UserId.SUPERUSER,
group_id=GroupId.GROUP_ID_LEVEL_5,
message_id=MessageId.MESSAGE_ID_3,
to_me=True,
)
ctx.receive_event(bot=bot, event=event)
mock_template_to_pic.assert_awaited_once_with(
template_path=str((mock_template_path_new / "check").absolute()),
template_name="main.html",
templates={
"data": {
"cpu_info": "1.0% - 1.0Ghz [1 core]",
"cpu_process": 1.0,
"ram_info": "1.0 / 1.0 GB",
"ram_process": 100.0,
"swap_info": "1.0 / 1.0 GB",
"swap_process": 100.0,
"disk_info": "1.0 / 1.0 GB",
"disk_process": 100.0,
"brand_raw": cpuinfo_get_cpu_info["brand_raw"],
"baidu": "red",
"google": "red",
"system": f"{platform_uname.system} " f"{platform_uname.release}",
"version": __get_version(),
"plugin_count": len(nonebot.get_loaded_plugins()),
"nickname": BotConfig.self_nickname,
}
},
pages={
"viewport": {"width": 195, "height": 750},
"base_url": f"file://{mock_template_path_new.absolute()}",
},
wait=2,
)
mock_template_to_pic.assert_awaited_once()
mock_build_message.assert_called_once_with(mock_template_to_pic_return)
mock_build_message_return.send.assert_awaited_once()
async def test_check_arm(
app: App,
mocker: MockerFixture,
mocked_api: MockRouter,
create_bot: Callable,
tmp_path: Path,
) -> None:
"""
测试自检arm
"""
from zhenxun.configs.config import BotConfig
from zhenxun.builtin_plugins.check import _matcher
from zhenxun.builtin_plugins.check.data_source import __get_version
platform_uname_arm = platform.uname_result(
system="Linux",
node="zhenxun",
release="5.15.0-1017-oracle",
version="#22~20.04.1-Ubuntu SMP Wed Aug 24 11:13:15 UTC 2022",
machine="aarch64",
) # type: ignore
mock_subprocess_check_output = mocker.patch(
"zhenxun.builtin_plugins.check.data_source.subprocess.check_output"
)
mock_environ_copy = mocker.patch(
"zhenxun.builtin_plugins.check.data_source.os.environ.copy"
)
mock_environ_copy_return = mocker.MagicMock()
mock_environ_copy.return_value = mock_environ_copy_return
(
mock_psutil,
mock_cpuinfo,
mock_platform,
mock_template_to_pic,
mock_template_to_pic_return,
mock_build_message,
mock_build_message_return,
mock_template_path_new,
) = init_mocker(mocker, tmp_path)
mock_platform.uname.return_value = platform_uname_arm
mock_cpuinfo.get_cpu_info.return_value = {}
mock_psutil.cpu_freq.return_value = {}
async with app.test_matcher(_matcher) as ctx:
bot = create_bot(ctx)
bot: Bot = cast(Bot, bot)
raw_message = "自检"
event: GroupMessageEvent = _v11_group_message_event(
message=raw_message,
self_id=BotId.QQ_BOT,
user_id=UserId.SUPERUSER,
group_id=GroupId.GROUP_ID_LEVEL_5,
message_id=MessageId.MESSAGE_ID_3,
to_me=True,
)
ctx.receive_event(bot=bot, event=event)
mock_template_to_pic.assert_awaited_once_with(
template_path=str((mock_template_path_new / "check").absolute()),
template_name="main.html",
templates={
"data": {
"cpu_info": "1.0% - 0.0Ghz [1 core]",
"cpu_process": 1.0,
"ram_info": "1.0 / 1.0 GB",
"ram_process": 100.0,
"swap_info": "1.0 / 1.0 GB",
"swap_process": 100.0,
"disk_info": "1.0 / 1.0 GB",
"disk_process": 100.0,
"brand_raw": "",
"baidu": "red",
"google": "red",
"system": f"{platform_uname_arm.system} "
f"{platform_uname_arm.release}",
"version": __get_version(),
"plugin_count": len(nonebot.get_loaded_plugins()),
"nickname": BotConfig.self_nickname,
}
},
pages={
"viewport": {"width": 195, "height": 750},
"base_url": f"file://{mock_template_path_new.absolute()}",
},
wait=2,
)
mock_subprocess_check_output.assert_has_calls(
[
mocker.call(["lscpu"], env=mock_environ_copy_return),
mocker.call().decode(),
mocker.call().decode().splitlines(),
mocker.call().decode().splitlines().__iter__(),
mocker.call(["dmidecode", "-s", "processor-frequency"]),
mocker.call().decode(),
mocker.call().decode().split(),
mocker.call().decode().split().__getitem__(0),
mocker.call().decode().split().__getitem__().__float__(),
] # type: ignore
)
mock_template_to_pic.assert_awaited_once()
mock_build_message.assert_called_once_with(mock_template_to_pic_return)
mock_build_message_return.send.assert_awaited_once()

View File

@ -1,22 +1,24 @@
import os
import platform
from dataclasses import dataclass
import subprocess
from pathlib import Path
from dataclasses import dataclass
import psutil
import cpuinfo
import nonebot
import psutil
from httpx import ConnectTimeout, NetworkError
from nonebot.utils import run_sync
from pydantic import BaseModel
from nonebot.utils import run_sync
from zhenxun.configs.config import BotConfig
from zhenxun.services.log import logger
from zhenxun.configs.config import BotConfig
from zhenxun.utils.http_utils import AsyncHttpx
BAIDU_URL = "https://www.baidu.com/"
GOOGLE_URL = "https://www.google.com/"
VERSION_FILE = Path() / "__version__"
ARM_KEY = "aarch64"
@dataclass
@ -31,9 +33,11 @@ class CPUInfo:
@classmethod
def get_cpu_info(cls):
cpu_core = psutil.cpu_count(logical=False)
cpu_usage = psutil.cpu_percent(interval=1)
cpu_freq = round(psutil.cpu_freq().current / 1000, 2)
cpu_usage = psutil.cpu_percent(interval=0.1)
if _cpu_freq := psutil.cpu_freq():
cpu_freq = round(_cpu_freq.current / 1000, 2)
else:
cpu_freq = 0
return CPUInfo(core=cpu_core, usage=cpu_usage, freq=cpu_freq)
@ -102,8 +106,9 @@ class SystemInfo(BaseModel):
def get_system_info(self):
return {
"cpu_info": f"{self.cpu.usage}% - {self.cpu.freq}Ghz [{self.cpu.core} core]",
"cpu_process": psutil.cpu_percent(),
"cpu_info": f"{self.cpu.usage}% - {self.cpu.freq}Ghz "
f"[{self.cpu.core} core]",
"cpu_process": self.cpu.usage,
"ram_info": f"{self.ram.usage} / {self.ram.total} GB",
"ram_process": (
0 if self.ram.total == 0 else (self.ram.usage / self.ram.total * 100)
@ -120,14 +125,14 @@ class SystemInfo(BaseModel):
@run_sync
def __build_status() -> dict:
def __build_status() -> SystemInfo:
"""获取 `CPU` `RAM` `SWAP` `DISK` 信息"""
cpu = CPUInfo.get_cpu_info()
ram = RAMInfo.get_ram_info()
swap = SwapMemory.get_swap_info()
disk = DiskInfo.get_disk_info()
return SystemInfo(cpu=cpu, ram=ram, swap=swap, disk=disk).get_system_info()
return SystemInfo(cpu=cpu, ram=ram, swap=swap, disk=disk)
async def __get_network_info():
@ -155,16 +160,50 @@ def __get_version() -> str | None:
return None
def __get_arm_cpu():
env = os.environ.copy()
env["LC_ALL"] = "en_US.UTF-8"
cpu_info = subprocess.check_output(["lscpu"], env=env).decode()
model_name = ""
cpu_freq = 0
for line in cpu_info.splitlines():
if "Model name" in line:
model_name = line.split(":")[1].strip()
if "CPU MHz" in line:
cpu_freq = float(line.split(":")[1].strip())
return model_name, cpu_freq
def __get_arm_oracle_cpu_freq():
cpu_freq = subprocess.check_output(
["dmidecode", "-s", "processor-frequency"]
).decode()
return round(float(cpu_freq.split()[0]) / 1000, 2)
async def get_status_info() -> dict:
"""获取信息"""
data = await __build_status()
system = platform.uname()
if system.machine == ARM_KEY and not (
cpuinfo.get_cpu_info().get("brand_raw") and data.cpu.freq
):
model_name, cpu_freq = __get_arm_cpu()
if not data.cpu.freq:
data.cpu.freq = cpu_freq or __get_arm_oracle_cpu_freq()
data = data.get_system_info()
data["brand_raw"] = model_name
else:
data = data.get_system_info()
data["brand_raw"] = cpuinfo.get_cpu_info().get("brand_raw", "Unknown")
baidu, google = await __get_network_info()
data["baidu"] = "#8CC265" if baidu else "red"
data["google"] = "#8CC265" if google else "red"
system = platform.uname()
data["system"] = f"{system.system} {system.release}"
data["version"] = __get_version()
data["brand_raw"] = cpuinfo.get_cpu_info()["brand_raw"]
data["plugin_count"] = len(nonebot.get_loaded_plugins())
data["nickname"] = BotConfig.self_nickname
return data