🐛修复添加插件返回403的问题

This commit is contained in:
AkashiCoin 2024-09-02 00:59:32 +08:00
parent 8fe061738a
commit 3c39ce4ce4
18 changed files with 1090 additions and 452 deletions

View File

@ -25,5 +25,8 @@
"userinfo",
"zhenxun"
],
"python.analysis.autoImportCompletions": true
"python.analysis.autoImportCompletions": true,
"python.testing.pytestArgs": ["tests"],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}

View File

@ -1,3 +1,7 @@
import io
import os
import tarfile
import zipfile
from typing import cast
from pathlib import Path
from collections.abc import Callable
@ -9,18 +13,19 @@ from nonebot.adapters.onebot.v11 import Bot
from nonebot.adapters.onebot.v11.message import Message
from tests.config import BotId, UserId, GroupId, MessageId
from tests.utils import (
get_response_json,
_v11_group_message_event,
_v11_private_message_send,
)
from tests.utils import get_response_json as _get_response_json
from tests.utils import _v11_group_message_event, _v11_private_message_send
def get_response_json(file: str) -> dict:
return _get_response_json(Path() / "auto_update", file)
def init_mocked_api(mocked_api: MockRouter) -> None:
mocked_api.get(
url="https://api.github.com/repos/HibiKier/zhenxun_bot/releases/latest",
name="release_latest",
).respond(json=get_response_json(path="release_latest.json"))
).respond(json=get_response_json("release_latest.json"))
mocked_api.get(
url="https://raw.githubusercontent.com/HibiKier/zhenxun_bot/dev/__version__",
name="dev_branch_version",
@ -38,62 +43,105 @@ def init_mocked_api(mocked_api: MockRouter) -> None:
"Location": "https://codeload.github.com/HibiKier/zhenxun_bot/legacy.tar.gz/refs/tags/v0.2.2"
},
)
import io
import tarfile
tar_buffer = io.BytesIO()
zip_bytes = io.BytesIO()
from zhenxun.builtin_plugins.auto_update.config import (
REQ_TXT_FILE,
PYPROJECT_FILE,
PYPROJECT_LOCK_FILE,
REPLACE_FOLDERS,
REQ_TXT_FILE_STRING,
PYPROJECT_FILE_STRING,
PYPROJECT_LOCK_FILE_STRING,
)
# 指定要添加到压缩文件中的文件路径列表
file_paths: list[Path] = [
PYPROJECT_FILE,
PYPROJECT_LOCK_FILE,
REQ_TXT_FILE,
file_paths: list[str] = [
PYPROJECT_FILE_STRING,
PYPROJECT_LOCK_FILE_STRING,
REQ_TXT_FILE_STRING,
]
# 打开一个tarfile对象写入到上面创建的BytesIO对象中
with tarfile.open(mode="w:gz", fileobj=tar_buffer) as tar:
_extracted_from_init_mocked_api_43(tarfile, tar, file_paths, io)
_extracted_from_init_mocked_api_43(tar, file_paths, folders=REPLACE_FOLDERS)
with zipfile.ZipFile(zip_bytes, mode="w", compression=zipfile.ZIP_DEFLATED) as zipf:
_extracted_from_init_mocked_api_zip(zipf, file_paths, folders=REPLACE_FOLDERS)
mocked_api.get(
url="https://codeload.github.com/HibiKier/zhenxun_bot/legacy.tar.gz/refs/tags/v0.2.2",
name="release_download_url_redirect",
).respond(
content=tar_buffer.getvalue(),
)
mocked_api.get(
url="https://ghproxy.cc/https://github.com/HibiKier/zhenxun_bot/archive/refs/heads/dev.zip",
name="dev_download_url",
).respond(
content=zip_bytes.getvalue(),
)
mocked_api.get(
url="https://ghproxy.cc/https://github.com/HibiKier/zhenxun_bot/archive/refs/heads/main.zip",
name="main_download_url",
).respond(
content=zip_bytes.getvalue(),
)
# TODO Rename this here and in `init_mocked_api`
def _extracted_from_init_mocked_api_43(tarfile, tar, file_paths, io):
def _extracted_from_init_mocked_api_zip(
zipf: zipfile.ZipFile, file_paths: list[str], folders: list[str] = []
):
# 假设有一个文件夹名为 folder_name
folder_name = "my_folder/"
# 添加文件夹到 ZIP 中,注意 ZIP 中文件夹路径应以 '/' 结尾
zipf.writestr(folder_name, "") # 空内容表示这是一个文件夹
for file_path in file_paths:
# 将文件添加到 ZIP 中,路径为 folder_name + file_name
zipf.writestr(f"{folder_name}{os.path.basename(file_path)}", b"new")
base_folder = f"{folder_name}zhenxun/"
zipf.writestr(base_folder, "")
for folder in folders:
zipf.writestr(f"{base_folder}{folder}/", "")
# TODO Rename this here and in `init_mocked_api`
def _extracted_from_init_mocked_api_43(
tar: tarfile.TarFile, file_paths: list[str], folders: list[str] = []
):
folder_name = "my_folder"
tarinfo = tarfile.TarInfo(folder_name)
_extracted_from__extracted_from_init_mocked_api_43_30(tarinfo, tar)
# 读取并添加指定的文件
for file_path in file_paths:
# 创建TarInfo对象
tar_buffer = io.BytesIO(b"new")
tarinfo = tarfile.TarInfo(
f"{folder_name}/{file_path}"
) # 使用文件名作为tar中的名字
tarinfo.mode = 0o644 # 设置文件夹权限
tarinfo.size = len(tar_buffer.getvalue()) # 设置文件大小
# 添加文件
tar.addfile(tarinfo, fileobj=tar_buffer)
base_folder = f"{folder_name}/zhenxun"
tarinfo = tarfile.TarInfo(base_folder)
_extracted_from__extracted_from_init_mocked_api_43_30(tarinfo, tar)
for folder in folders:
tarinfo = tarfile.TarInfo(f"{base_folder}{folder}")
_extracted_from__extracted_from_init_mocked_api_43_30(tarinfo, tar)
# TODO Rename this here and in `_extracted_from_init_mocked_api_43`
def _extracted_from__extracted_from_init_mocked_api_43_30(tarinfo, tar):
tarinfo.type = tarfile.DIRTYPE
tarinfo.mode = 0o755
tar.addfile(tarinfo)
# 读取并添加指定的文件
for file_path in file_paths:
# 读取文件内容
with open(file_path, "rb") as file:
file_content = file.read()
# 使用BytesIO创建文件内容
file_buffer = io.BytesIO(file_content)
# 创建TarInfo对象
tarinfo = tarfile.TarInfo(
f"{folder_name}/{file_path.name}"
) # 使用文件名作为tar中的名字
tarinfo.mode = 0o644 # 设置文件夹权限
tarinfo.size = len(file_content)
# 添加文件
tar.addfile(tarinfo, fileobj=file_buffer)
async def test_check_update_release(
app: App,
@ -103,20 +151,64 @@ async def test_check_update_release(
tmp_path: Path,
) -> None:
"""
测试检查更新
测试检查更新release
"""
from zhenxun.builtin_plugins.auto_update import _matcher
from zhenxun.builtin_plugins.auto_update.config import (
REPLACE_FOLDERS,
REQ_TXT_FILE_STRING,
PYPROJECT_FILE_STRING,
PYPROJECT_LOCK_FILE_STRING,
)
init_mocked_api(mocked_api=mocked_api)
mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.REPLACE_FOLDERS",
return_value=[],
)
mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.install_requirement",
return_value=None,
)
mock_tmp_path = mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.TMP_PATH",
new=tmp_path / "auto_update",
)
mock_base_path = mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.BASE_PATH",
new=tmp_path / "zhenxun",
)
mock_backup_path = mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.BACKUP_PATH",
new=tmp_path / "backup",
)
mock_download_gz_file = mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.DOWNLOAD_GZ_FILE",
new=mock_tmp_path / "download_latest_file.tar.gz",
)
mock_download_zip_file = mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.DOWNLOAD_ZIP_FILE",
new=mock_tmp_path / "download_latest_file.zip",
)
mock_pyproject_file = mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.PYPROJECT_FILE",
new=tmp_path / PYPROJECT_FILE_STRING,
)
mock_pyproject_lock_file = mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.PYPROJECT_LOCK_FILE",
new=tmp_path / PYPROJECT_LOCK_FILE_STRING,
)
mock_req_txt_file = mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.REQ_TXT_FILE",
new=tmp_path / REQ_TXT_FILE_STRING,
)
# 确保目录下有一个子目录,以便 os.listdir() 能返回一个目录名
mock_tmp_path.mkdir(parents=True, exist_ok=True)
for folder in REPLACE_FOLDERS:
(mock_base_path / folder).mkdir(parents=True, exist_ok=True)
mock_pyproject_file.write_bytes(b"")
mock_pyproject_lock_file.write_bytes(b"")
mock_req_txt_file.write_bytes(b"")
async with app.test_matcher(_matcher) as ctx:
bot = create_bot(ctx)
@ -150,3 +242,245 @@ async def test_check_update_release(
assert mocked_api["release_latest"].called
assert mocked_api["release_download_url"].called
assert mocked_api["release_download_url_redirect"].called
assert (mock_backup_path / PYPROJECT_FILE_STRING).exists()
assert (mock_backup_path / PYPROJECT_LOCK_FILE_STRING).exists()
assert (mock_backup_path / REQ_TXT_FILE_STRING).exists()
assert not mock_download_gz_file.exists()
assert not mock_download_zip_file.exists()
assert mock_pyproject_file.read_bytes() == b"new"
assert mock_pyproject_lock_file.read_bytes() == b"new"
assert mock_req_txt_file.read_bytes() == b"new"
for folder in REPLACE_FOLDERS:
assert not (mock_base_path / folder).exists()
for folder in REPLACE_FOLDERS:
assert (mock_backup_path / folder).exists()
async def test_check_update_dev(
app: App,
mocker: MockerFixture,
mocked_api: MockRouter,
create_bot: Callable,
tmp_path: Path,
) -> None:
"""
测试检查更新开发环境
"""
from zhenxun.builtin_plugins.auto_update import _matcher
from zhenxun.builtin_plugins.auto_update.config import (
REPLACE_FOLDERS,
REQ_TXT_FILE_STRING,
PYPROJECT_FILE_STRING,
PYPROJECT_LOCK_FILE_STRING,
)
init_mocked_api(mocked_api=mocked_api)
mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.install_requirement",
return_value=None,
)
mock_tmp_path = mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.TMP_PATH",
new=tmp_path / "auto_update",
)
mock_base_path = mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.BASE_PATH",
new=tmp_path / "zhenxun",
)
mock_backup_path = mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.BACKUP_PATH",
new=tmp_path / "backup",
)
mock_download_gz_file = mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.DOWNLOAD_GZ_FILE",
new=mock_tmp_path / "download_latest_file.tar.gz",
)
mock_download_zip_file = mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.DOWNLOAD_ZIP_FILE",
new=mock_tmp_path / "download_latest_file.zip",
)
mock_pyproject_file = mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.PYPROJECT_FILE",
new=tmp_path / PYPROJECT_FILE_STRING,
)
mock_pyproject_lock_file = mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.PYPROJECT_LOCK_FILE",
new=tmp_path / PYPROJECT_LOCK_FILE_STRING,
)
mock_req_txt_file = mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.REQ_TXT_FILE",
new=tmp_path / REQ_TXT_FILE_STRING,
)
# 确保目录下有一个子目录,以便 os.listdir() 能返回一个目录名
mock_tmp_path.mkdir(parents=True, exist_ok=True)
for folder in REPLACE_FOLDERS:
(mock_base_path / folder).mkdir(parents=True, exist_ok=True)
mock_pyproject_file.write_bytes(b"")
mock_pyproject_lock_file.write_bytes(b"")
mock_req_txt_file.write_bytes(b"")
async with app.test_matcher(_matcher) as ctx:
bot = create_bot(ctx)
bot = cast(Bot, bot)
raw_message = "检查更新 dev"
event = _v11_group_message_event(
raw_message,
self_id=BotId.QQ_BOT,
user_id=UserId.SUPERUSER,
group_id=GroupId.GROUP_ID_LEVEL_5,
message_id=MessageId.MESSAGE_ID,
to_me=True,
)
ctx.receive_event(bot, event)
ctx.should_call_api(
"send_msg",
_v11_private_message_send(
message="检测真寻已更新版本更新v0.2.2 -> v0.2.2\n开始更新...",
user_id=UserId.SUPERUSER,
),
)
ctx.should_call_send(
event=event,
message=Message(
"版本更新完成\n" "版本: v0.2.2 -> v0.2.2\n" "请重新启动真寻以完成更新!"
),
result=None,
bot=bot,
)
ctx.should_finished(_matcher)
assert mocked_api["dev_download_url"].called
assert (mock_backup_path / PYPROJECT_FILE_STRING).exists()
assert (mock_backup_path / PYPROJECT_LOCK_FILE_STRING).exists()
assert (mock_backup_path / REQ_TXT_FILE_STRING).exists()
assert not mock_download_gz_file.exists()
assert not mock_download_zip_file.exists()
assert mock_pyproject_file.read_bytes() == b"new"
assert mock_pyproject_lock_file.read_bytes() == b"new"
assert mock_req_txt_file.read_bytes() == b"new"
for folder in REPLACE_FOLDERS:
assert (mock_base_path / folder).exists()
for folder in REPLACE_FOLDERS:
assert (mock_backup_path / folder).exists()
async def test_check_update_main(
app: App,
mocker: MockerFixture,
mocked_api: MockRouter,
create_bot: Callable,
tmp_path: Path,
) -> None:
"""
测试检查更新正式环境
"""
from zhenxun.builtin_plugins.auto_update import _matcher
from zhenxun.builtin_plugins.auto_update.config import (
REPLACE_FOLDERS,
REQ_TXT_FILE_STRING,
PYPROJECT_FILE_STRING,
PYPROJECT_LOCK_FILE_STRING,
)
init_mocked_api(mocked_api=mocked_api)
mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.install_requirement",
return_value=None,
)
mock_tmp_path = mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.TMP_PATH",
new=tmp_path / "auto_update",
)
mock_base_path = mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.BASE_PATH",
new=tmp_path / "zhenxun",
)
mock_backup_path = mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.BACKUP_PATH",
new=tmp_path / "backup",
)
mock_download_gz_file = mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.DOWNLOAD_GZ_FILE",
new=mock_tmp_path / "download_latest_file.tar.gz",
)
mock_download_zip_file = mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.DOWNLOAD_ZIP_FILE",
new=mock_tmp_path / "download_latest_file.zip",
)
mock_pyproject_file = mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.PYPROJECT_FILE",
new=tmp_path / PYPROJECT_FILE_STRING,
)
mock_pyproject_lock_file = mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.PYPROJECT_LOCK_FILE",
new=tmp_path / PYPROJECT_LOCK_FILE_STRING,
)
mock_req_txt_file = mocker.patch(
"zhenxun.builtin_plugins.auto_update._data_source.REQ_TXT_FILE",
new=tmp_path / REQ_TXT_FILE_STRING,
)
# 确保目录下有一个子目录,以便 os.listdir() 能返回一个目录名
mock_tmp_path.mkdir(parents=True, exist_ok=True)
for folder in REPLACE_FOLDERS:
(mock_base_path / folder).mkdir(parents=True, exist_ok=True)
mock_pyproject_file.write_bytes(b"")
mock_pyproject_lock_file.write_bytes(b"")
mock_req_txt_file.write_bytes(b"")
async with app.test_matcher(_matcher) as ctx:
bot = create_bot(ctx)
bot = cast(Bot, bot)
raw_message = "检查更新 main"
event = _v11_group_message_event(
raw_message,
self_id=BotId.QQ_BOT,
user_id=UserId.SUPERUSER,
group_id=GroupId.GROUP_ID_LEVEL_5,
message_id=MessageId.MESSAGE_ID,
to_me=True,
)
ctx.receive_event(bot, event)
ctx.should_call_api(
"send_msg",
_v11_private_message_send(
message="检测真寻已更新版本更新v0.2.2 -> v0.2.2\n开始更新...",
user_id=UserId.SUPERUSER,
),
)
ctx.should_call_send(
event=event,
message=Message(
"版本更新完成\n" "版本: v0.2.2 -> v0.2.2\n" "请重新启动真寻以完成更新!"
),
result=None,
bot=bot,
)
ctx.should_finished(_matcher)
assert mocked_api["main_download_url"].called
assert (mock_backup_path / PYPROJECT_FILE_STRING).exists()
assert (mock_backup_path / PYPROJECT_LOCK_FILE_STRING).exists()
assert (mock_backup_path / REQ_TXT_FILE_STRING).exists()
assert not mock_download_gz_file.exists()
assert not mock_download_zip_file.exists()
assert mock_pyproject_file.read_bytes() == b"new"
assert mock_pyproject_lock_file.read_bytes() == b"new"
assert mock_req_txt_file.read_bytes() == b"new"
for folder in REPLACE_FOLDERS:
assert (mock_base_path / folder).exists()
for folder in REPLACE_FOLDERS:
assert (mock_backup_path / folder).exists()

View File

@ -1,147 +0,0 @@
from typing import cast
from pathlib import Path
from collections.abc import Callable
from nonebug import App
from respx import MockRouter
from pytest_mock import MockerFixture
from nonebot.adapters.onebot.v11 import Bot
from nonebot.adapters.onebot.v11.message import Message
from nonebot.adapters.onebot.v11.event import GroupMessageEvent
from tests.config import BotId, UserId, GroupId, MessageId
from tests.utils import get_response_json, _v11_group_message_event
def init_mocked_api(mocked_api: MockRouter) -> None:
mocked_api.get(
"https://cdn.jsdelivr.net/gh/zhenxun-org/zhenxun_bot_plugins/plugins.json",
name="basic_plugins",
).respond(200, json=get_response_json("basic_plugins.json"))
mocked_api.get(
"https://raw.githubusercontent.com/zhenxun-org/zhenxun_bot_plugins_index/index/plugins.json",
name="extra_plugins",
).respond(200, json=get_response_json("extra_plugins.json"))
mocked_api.get(
"https://api.github.com/repos/zhenxun-org/zhenxun_bot_plugins/contents/plugins/search_image?ref=main",
name="search_image_plugin_api",
).respond(200, json=get_response_json("search_image_plugin_api.json"))
mocked_api.get(
"https://raw.githubusercontent.com/zhenxun-org/zhenxun_bot_plugins/main/plugins/search_image/__init__.py",
name="search_image_plugin_file_init",
).respond(content=b"")
mocked_api.get(
"https://api.github.com/repos/xuanerwa/zhenxun_github_sub/contents/",
name="github_sub_plugin_contents",
).respond(json=get_response_json("github_sub_plugin_contents.json"))
mocked_api.get(
"https://api.github.com/repos/xuanerwa/zhenxun_github_sub/contents/github_sub?ref=main",
name="github_sub_plugin_api",
).respond(json=get_response_json("github_sub_plugin_api.json"))
mocked_api.get(
"https://raw.githubusercontent.com/xuanerwa/zhenxun_github_sub/main/github_sub/__init__.py",
name="github_sub_plugin_file_init",
).respond(content=b"")
async def test_add_plugin_basic(
app: App,
mocker: MockerFixture,
mocked_api: MockRouter,
create_bot: Callable,
tmp_path: Path,
) -> None:
"""
测试添加基础插件
"""
from zhenxun.builtin_plugins.plugin_store import _matcher
init_mocked_api(mocked_api=mocked_api)
mocker.patch(
"zhenxun.builtin_plugins.plugin_store.data_source.BASE_PATH",
return_value=tmp_path / "zhenxun",
)
plugin_id = 1
async with app.test_matcher(_matcher) as ctx:
bot = create_bot(ctx)
bot: Bot = cast(Bot, bot)
raw_message = f"添加插件 {plugin_id}"
event: GroupMessageEvent = _v11_group_message_event(
message=raw_message,
self_id=BotId.QQ_BOT,
user_id=UserId.SUPERUSER,
group_id=GroupId.GROUP_ID_LEVEL_5,
message_id=MessageId.MESSAGE_ID,
to_me=True,
)
ctx.receive_event(bot=bot, event=event)
ctx.should_call_send(
event=event,
message=Message(message=f"正在添加插件 Id: {plugin_id}"),
result=None,
bot=bot,
)
ctx.should_call_send(
event=event,
message=Message(message="插件 识图 安装成功! 重启后生效"),
result=None,
bot=bot,
)
assert mocked_api["basic_plugins"].called
assert mocked_api["extra_plugins"].called
assert mocked_api["search_image_plugin_api"].called
assert mocked_api["search_image_plugin_file_init"].called
async def test_add_plugin_extra(
app: App,
mocker: MockerFixture,
mocked_api: MockRouter,
create_bot: Callable,
tmp_path: Path,
) -> None:
"""
测试添加额外插件
"""
from zhenxun.builtin_plugins.plugin_store import _matcher
init_mocked_api(mocked_api=mocked_api)
mocker.patch(
"zhenxun.builtin_plugins.plugin_store.data_source.BASE_PATH",
return_value=tmp_path / "zhenxun",
)
plugin_id = 3
async with app.test_matcher(_matcher) as ctx:
bot = create_bot(ctx)
bot: Bot = cast(Bot, bot)
raw_message: str = f"添加插件 {plugin_id}"
event: GroupMessageEvent = _v11_group_message_event(
message=raw_message,
self_id=BotId.QQ_BOT,
user_id=UserId.SUPERUSER,
group_id=GroupId.GROUP_ID_LEVEL_5,
message_id=MessageId.MESSAGE_ID,
to_me=True,
)
ctx.receive_event(bot=bot, event=event)
ctx.should_call_send(
event=event,
message=Message(message=f"正在添加插件 Id: {plugin_id}"),
result=None,
bot=bot,
)
ctx.should_call_send(
event=event,
message=Message(message="插件 github订阅 安装成功! 重启后生效"),
result=None,
bot=bot,
)
assert mocked_api["basic_plugins"].called
assert mocked_api["extra_plugins"].called
assert mocked_api["github_sub_plugin_contents"].called
assert mocked_api["github_sub_plugin_api"].called
assert mocked_api["github_sub_plugin_file_init"].called

View File

@ -0,0 +1,309 @@
# ruff: noqa: ASYNC230
from typing import cast
from pathlib import Path
from collections.abc import Callable
from nonebug import App
from respx import MockRouter
from pytest_mock import MockerFixture
from nonebot.adapters.onebot.v11 import Bot
from nonebot.adapters.onebot.v11.message import Message
from nonebot.adapters.onebot.v11.event import GroupMessageEvent
from tests.utils import _v11_group_message_event
from tests.config import BotId, UserId, GroupId, MessageId
from tests.utils import get_response_json as _get_response_json
def get_response_json(file: str) -> dict:
return _get_response_json(Path() / "plugin_store", file=file)
def init_mocked_api(mocked_api: MockRouter) -> None:
mocked_api.get(
"https://data.jsdelivr.com/v1/packages/gh/xuanerwa/zhenxun_github_sub@main",
name="github_sub_plugin_metadata",
).respond(json=get_response_json("github_sub_plugin_metadata.json"))
mocked_api.get(
"https://data.jsdelivr.com/v1/packages/gh/zhenxun-org/zhenxun_bot_plugins@main",
name="zhenxun_bot_plugins_metadata",
).respond(json=get_response_json("zhenxun_bot_plugins_metadata.json"))
mocked_api.get(
"https://cdn.jsdelivr.net/gh/zhenxun-org/zhenxun_bot_plugins/plugins.json",
name="basic_plugins",
).respond(200, json=get_response_json("basic_plugins.json"))
mocked_api.get(
"https://raw.githubusercontent.com/zhenxun-org/zhenxun_bot_plugins_index/index/plugins.json",
name="extra_plugins",
).respond(200, json=get_response_json("extra_plugins.json"))
mocked_api.get(
"https://raw.githubusercontent.com/zhenxun-org/zhenxun_bot_plugins/main/plugins/search_image/__init__.py",
name="search_image_plugin_file_init",
).respond(content=b"")
mocked_api.get(
"https://raw.githubusercontent.com/zhenxun-org/zhenxun_bot_plugins/main/plugins/alapi/jitang.py",
name="jitang_plugin_file",
).respond(content=b"")
mocked_api.get(
"https://raw.githubusercontent.com/xuanerwa/zhenxun_github_sub/main/github_sub/__init__.py",
name="github_sub_plugin_file_init",
).respond(content=b"")
async def test_add_plugin_basic(
app: App,
mocker: MockerFixture,
mocked_api: MockRouter,
create_bot: Callable,
tmp_path: Path,
) -> None:
"""
测试添加基础插件
"""
from zhenxun.builtin_plugins.plugin_store import _matcher
init_mocked_api(mocked_api=mocked_api)
mock_base_path = mocker.patch(
"zhenxun.builtin_plugins.plugin_store.data_source.BASE_PATH",
new=tmp_path / "zhenxun",
)
plugin_id = 1
async with app.test_matcher(_matcher) as ctx:
bot = create_bot(ctx)
bot: Bot = cast(Bot, bot)
raw_message = f"添加插件 {plugin_id}"
event: GroupMessageEvent = _v11_group_message_event(
message=raw_message,
self_id=BotId.QQ_BOT,
user_id=UserId.SUPERUSER,
group_id=GroupId.GROUP_ID_LEVEL_5,
message_id=MessageId.MESSAGE_ID,
to_me=True,
)
ctx.receive_event(bot=bot, event=event)
ctx.should_call_send(
event=event,
message=Message(message=f"正在添加插件 Id: {plugin_id}"),
result=None,
bot=bot,
)
ctx.should_call_send(
event=event,
message=Message(message="插件 识图 安装成功! 重启后生效"),
result=None,
bot=bot,
)
assert mocked_api["basic_plugins"].called
assert mocked_api["extra_plugins"].called
assert mocked_api["zhenxun_bot_plugins_metadata"].called
assert mocked_api["search_image_plugin_file_init"].called
assert (mock_base_path / "plugins" / "search_image" / "__init__.py").is_file()
async def test_add_plugin_basic_is_not_dir(
app: App,
mocker: MockerFixture,
mocked_api: MockRouter,
create_bot: Callable,
tmp_path: Path,
) -> None:
"""
测试添加基础插件插件不是目录
"""
from zhenxun.builtin_plugins.plugin_store import _matcher
init_mocked_api(mocked_api=mocked_api)
mock_base_path = mocker.patch(
"zhenxun.builtin_plugins.plugin_store.data_source.BASE_PATH",
new=tmp_path / "zhenxun",
)
plugin_id = 0
async with app.test_matcher(_matcher) as ctx:
bot = create_bot(ctx)
bot: Bot = cast(Bot, bot)
raw_message = f"添加插件 {plugin_id}"
event: GroupMessageEvent = _v11_group_message_event(
message=raw_message,
self_id=BotId.QQ_BOT,
user_id=UserId.SUPERUSER,
group_id=GroupId.GROUP_ID_LEVEL_5,
message_id=MessageId.MESSAGE_ID,
to_me=True,
)
ctx.receive_event(bot=bot, event=event)
ctx.should_call_send(
event=event,
message=Message(message=f"正在添加插件 Id: {plugin_id}"),
result=None,
bot=bot,
)
ctx.should_call_send(
event=event,
message=Message(message="插件 鸡汤 安装成功! 重启后生效"),
result=None,
bot=bot,
)
assert mocked_api["basic_plugins"].called
assert mocked_api["extra_plugins"].called
assert mocked_api["zhenxun_bot_plugins_metadata"].called
assert mocked_api["jitang_plugin_file"].called
assert (mock_base_path / "plugins" / "alapi" / "jitang.py").is_file()
async def test_add_plugin_extra(
app: App,
mocker: MockerFixture,
mocked_api: MockRouter,
create_bot: Callable,
tmp_path: Path,
) -> None:
"""
测试添加额外插件
"""
from zhenxun.builtin_plugins.plugin_store import _matcher
init_mocked_api(mocked_api=mocked_api)
mock_base_path = mocker.patch(
"zhenxun.builtin_plugins.plugin_store.data_source.BASE_PATH",
new=tmp_path / "zhenxun",
)
plugin_id = 3
async with app.test_matcher(_matcher) as ctx:
bot = create_bot(ctx)
bot: Bot = cast(Bot, bot)
raw_message: str = f"添加插件 {plugin_id}"
event: GroupMessageEvent = _v11_group_message_event(
message=raw_message,
self_id=BotId.QQ_BOT,
user_id=UserId.SUPERUSER,
group_id=GroupId.GROUP_ID_LEVEL_5,
message_id=MessageId.MESSAGE_ID,
to_me=True,
)
ctx.receive_event(bot=bot, event=event)
ctx.should_call_send(
event=event,
message=Message(message=f"正在添加插件 Id: {plugin_id}"),
result=None,
bot=bot,
)
ctx.should_call_send(
event=event,
message=Message(message="插件 github订阅 安装成功! 重启后生效"),
result=None,
bot=bot,
)
assert mocked_api["basic_plugins"].called
assert mocked_api["extra_plugins"].called
assert mocked_api["github_sub_plugin_metadata"].called
assert mocked_api["github_sub_plugin_file_init"].called
assert (mock_base_path / "plugins" / "github_sub" / "__init__.py").is_file()
async def test_update_plugin_basic(
app: App,
mocker: MockerFixture,
mocked_api: MockRouter,
create_bot: Callable,
tmp_path: Path,
) -> None:
"""
测试更新基础插件
"""
from zhenxun.builtin_plugins.plugin_store import _matcher
init_mocked_api(mocked_api=mocked_api)
mock_base_path = mocker.patch(
"zhenxun.builtin_plugins.plugin_store.data_source.BASE_PATH",
new=tmp_path / "zhenxun",
)
plugin_id = 1
async with app.test_matcher(_matcher) as ctx:
bot = create_bot(ctx)
bot: Bot = cast(Bot, bot)
raw_message = f"更新插件 {plugin_id}"
event: GroupMessageEvent = _v11_group_message_event(
message=raw_message,
self_id=BotId.QQ_BOT,
user_id=UserId.SUPERUSER,
group_id=GroupId.GROUP_ID_LEVEL_5,
message_id=MessageId.MESSAGE_ID,
to_me=True,
)
ctx.receive_event(bot=bot, event=event)
ctx.should_call_send(
event=event,
message=Message(message=f"正在更新插件 Id: {plugin_id}"),
result=None,
bot=bot,
)
ctx.should_call_send(
event=event,
message=Message(message="插件 识图 更新成功! 重启后生效"),
result=None,
bot=bot,
)
assert mocked_api["basic_plugins"].called
assert mocked_api["extra_plugins"].called
assert mocked_api["zhenxun_bot_plugins_metadata"].called
assert mocked_api["search_image_plugin_file_init"].called
assert (mock_base_path / "plugins" / "search_image" / "__init__.py").is_file()
async def test_remove_plugin(
app: App,
mocker: MockerFixture,
mocked_api: MockRouter,
create_bot: Callable,
tmp_path: Path,
) -> None:
"""
测试删除插件
"""
from zhenxun.builtin_plugins.plugin_store import _matcher
init_mocked_api(mocked_api=mocked_api)
mock_base_path = mocker.patch(
"zhenxun.builtin_plugins.plugin_store.data_source.BASE_PATH",
new=tmp_path / "zhenxun",
)
plugin_path = mock_base_path / "plugins" / "search_image"
plugin_path.mkdir(parents=True, exist_ok=True)
with open(plugin_path / "__init__.py", "w") as f:
f.write("")
plugin_id = 1
async with app.test_matcher(_matcher) as ctx:
bot = create_bot(ctx)
bot: Bot = cast(Bot, bot)
raw_message = f"移除插件 {plugin_id}"
event: GroupMessageEvent = _v11_group_message_event(
message=raw_message,
self_id=BotId.QQ_BOT,
user_id=UserId.SUPERUSER,
group_id=GroupId.GROUP_ID_LEVEL_5,
message_id=MessageId.MESSAGE_ID,
to_me=True,
)
ctx.receive_event(bot=bot, event=event)
ctx.should_call_send(
event=event,
message=Message(message="插件 识图 移除成功! 重启后生效"),
result=None,
bot=bot,
)
assert mocked_api["basic_plugins"].called
assert mocked_api["extra_plugins"].called
assert not (mock_base_path / "plugins" / "search_image" / "__init__.py").is_file()

View File

@ -73,10 +73,13 @@ async def app(app: App, tmp_path: Path, mocker: MockerFixture):
mock_config_path.TEMP_PATH = tmp_path / "resources" / "temp"
# mock_config_path.TEMP_PATH.mkdir(parents=True, exist_ok=True)
mocker.patch("zhenxun.configs.path_config", return_value=mock_config_path)
mocker.patch("zhenxun.configs.path_config", new=mock_config_path)
await init()
# await driver._lifespan.startup()
yield app
# await driver._lifespan.shutdown()
await disconnect()

View File

@ -1,18 +0,0 @@
[
{
"name": "__init__.py",
"path": "github_sub/__init__.py",
"sha": "7d17fd49fe82fa3897afcef61b2c694ed93a4ba3",
"size": 7551,
"url": "https://api.github.com/repos/xuanerwa/zhenxun_github_sub/contents/github_sub/__init__.py?ref=main",
"html_url": "https://github.com/xuanerwa/zhenxun_github_sub/blob/main/github_sub/__init__.py",
"git_url": "https://api.github.com/repos/xuanerwa/zhenxun_github_sub/git/blobs/7d17fd49fe82fa3897afcef61b2c694ed93a4ba3",
"download_url": "https://raw.githubusercontent.com/xuanerwa/zhenxun_github_sub/main/github_sub/__init__.py",
"type": "file",
"_links": {
"self": "https://api.github.com/repos/xuanerwa/zhenxun_github_sub/contents/github_sub/__init__.py?ref=main",
"git": "https://api.github.com/repos/xuanerwa/zhenxun_github_sub/git/blobs/7d17fd49fe82fa3897afcef61b2c694ed93a4ba3",
"html": "https://github.com/xuanerwa/zhenxun_github_sub/blob/main/github_sub/__init__.py"
}
}
]

View File

@ -1,18 +0,0 @@
[
{
"name": "github_sub",
"path": "github_sub",
"sha": "0f7d76bcf472e2ab0610fa542b067633d6e3ae7e",
"size": 0,
"url": "https://api.github.com/repos/xuanerwa/zhenxun_github_sub/contents/github_sub?ref=main",
"html_url": "https://github.com/xuanerwa/zhenxun_github_sub/tree/main/github_sub",
"git_url": "https://api.github.com/repos/xuanerwa/zhenxun_github_sub/git/trees/0f7d76bcf472e2ab0610fa542b067633d6e3ae7e",
"download_url": null,
"type": "dir",
"_links": {
"self": "https://api.github.com/repos/xuanerwa/zhenxun_github_sub/contents/github_sub?ref=main",
"git": "https://api.github.com/repos/xuanerwa/zhenxun_github_sub/git/trees/0f7d76bcf472e2ab0610fa542b067633d6e3ae7e",
"html": "https://github.com/xuanerwa/zhenxun_github_sub/tree/main/github_sub"
}
}
]

View File

@ -0,0 +1,23 @@
{
"type": "gh",
"name": "xuanerwa/zhenxun_github_sub",
"version": "main",
"default": null,
"files": [
{
"type": "directory",
"name": "github_sub",
"files": [
{
"type": "file",
"name": "__init__.py",
"hash": "z1C5BBK0+atbDghbyRlF2xIDwk0HQdHM1yXQZkF7/t8=",
"size": 7551
}
]
}
],
"links": {
"stats": "https://data.jsdelivr.com/v1/stats/packages/gh/xuanerwa/zhenxun_github_sub@main"
}
}

View File

@ -0,0 +1,71 @@
{
"type": "gh",
"name": "zhenxun-org/zhenxun_bot_plugins",
"version": "main",
"default": null,
"files": [
{
"type": "directory",
"name": "plugins",
"files": [
{
"type": "directory",
"name": "search_image",
"files": [
{
"type": "file",
"name": "__init__.py",
"hash": "a4Yp9HPoBzMwvnQDT495u0yYqTQWofkOyHxEi1FdVb0=",
"size": 3010
}
]
},
{
"type": "directory",
"name": "alapi",
"files": [
{
"type": "file",
"name": "__init__.py",
"hash": "ndDxtO0pAq3ZTb4RdqW7FTDgOGC/RjS1dnwdaQfT0uQ=",
"size": 284
},
{
"type": "file",
"name": "_data_source.py",
"hash": "KOLqtj4TQWWQco5bA4tWFc7A0z1ruMyDk1RiKeqJHRA=",
"size": 919
},
{
"type": "file",
"name": "comments_163.py",
"hash": "Q5pZsj1Pj+EJMdKYcPtLqejcXAWUQIoXVQG49PZPaSI=",
"size": 1593
},
{
"type": "file",
"name": "cover.py",
"hash": "QSjtcy0oVrjaRiAWZKmUJlp0L4DQqEcdYNmExNo9mgc=",
"size": 1438
},
{
"type": "file",
"name": "jitang.py",
"hash": "xh43Osxt0xogTH448gUMC+/DaSGmCFme8DWUqC25IbU=",
"size": 1411
},
{
"type": "file",
"name": "poetry.py",
"hash": "Aj2unoNQboj3/0LhIrYU+dCa5jvMdpjMYXYUayhjuz4=",
"size": 1530
}
]
}
]
}
],
"links": {
"stats": "https://data.jsdelivr.com/v1/stats/packages/gh/zhenxun-org/zhenxun_bot_plugins@main"
}
}

View File

@ -1,18 +0,0 @@
[
{
"name": "__init__.py",
"path": "plugins/search_image/__init__.py",
"sha": "38e86de0caafe6c3e88d973fb5c4bc9d1430d213",
"size": 3010,
"url": "https://api.github.com/repos/zhenxun-org/zhenxun_bot_plugins/contents/plugins/search_image/__init__.py?ref=main",
"html_url": "https://github.com/zhenxun-org/zhenxun_bot_plugins/blob/main/plugins/search_image/__init__.py",
"git_url": "https://api.github.com/repos/zhenxun-org/zhenxun_bot_plugins/git/blobs/38e86de0caafe6c3e88d973fb5c4bc9d1430d213",
"download_url": "https://raw.githubusercontent.com/zhenxun-org/zhenxun_bot_plugins/main/plugins/search_image/__init__.py",
"type": "file",
"_links": {
"self": "https://api.github.com/repos/zhenxun-org/zhenxun_bot_plugins/contents/plugins/search_image/__init__.py?ref=main",
"git": "https://api.github.com/repos/zhenxun-org/zhenxun_bot_plugins/git/blobs/38e86de0caafe6c3e88d973fb5c4bc9d1430d213",
"html": "https://github.com/zhenxun-org/zhenxun_bot_plugins/blob/main/plugins/search_image/__init__.py"
}
}
]

View File

@ -5,18 +5,20 @@ from nonebot.adapters.onebot.v11.event import Sender
from nonebot.adapters.onebot.v11 import Message, MessageSegment, GroupMessageEvent
def get_response_json(path: str) -> dict:
def get_response_json(base_path: Path, file: str) -> dict:
try:
return json.loads(
(Path(__file__).parent / "response" / path).read_text(encoding="utf8")
(Path(__file__).parent / "response" / base_path / file).read_text(
encoding="utf8"
)
)
except (FileNotFoundError, json.JSONDecodeError) as e:
raise ValueError(f"Error reading or parsing JSON file: {e}") from e
def get_content_bytes(path: str) -> bytes:
def get_content_bytes(base_path: Path, path: str) -> bytes:
try:
return (Path(__file__).parent / "content" / path).read_bytes()
return (Path(__file__).parent / "content" / base_path / path).read_bytes()
except FileNotFoundError as e:
raise ValueError(f"Error reading file: {e}") from e

View File

@ -1,3 +1,4 @@
import re
from pathlib import Path
BASE_PATH = Path() / "zhenxun"
@ -13,7 +14,14 @@ CONFIG_INDEX_URL = "https://raw.githubusercontent.com/zhenxun-org/zhenxun_bot_pl
CONFIG_INDEX_CDN_URL = "https://cdn.jsdelivr.net/gh/zhenxun-org/zhenxun_bot_plugins_index@index/plugins.json"
"""插件索引库信息文件cdn"""
DOWNLOAD_URL = (
"https://api.github.com/repos/zhenxun-org/zhenxun_bot_plugins/contents/{}?ref=main"
DEFAULT_GITHUB_URL = "https://github.com/zhenxun-org/zhenxun_bot_plugins/tree/main"
GITHUB_REPO_URL_PATTERN = re.compile(
r"^https://github.com/(?P<owner>[^/]+)/(?P<repo>[^/]+)(/tree/(?P<branch>[^/]+))?$"
)
"""插件下载地址"""
"""github仓库地址正则"""
JSD_PACKAGE_API_FORMAT = (
"https://data.jsdelivr.com/v1/packages/gh/{owner}/{repo}@{branch}"
)
"""jsdelivr包地址格式"""

View File

@ -1,22 +1,30 @@
import re
import shutil
import subprocess
from pathlib import Path
import aiofiles
import ujson as json
from zhenxun.services.log import logger
from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.models.plugin_info import PluginInfo
from zhenxun.utils.image_utils import RowStyle, BuildImage, ImageTemplate
from zhenxun.builtin_plugins.auto_update.config import REQ_TXT_FILE_STRING
from zhenxun.builtin_plugins.plugin_store.models import (
FileInfo,
FileType,
RepoInfo,
JsdPackageInfo,
StorePluginInfo,
)
from .config import (
BASE_PATH,
CONFIG_URL,
DOWNLOAD_URL,
CONFIG_INDEX_URL,
DEFAULT_GITHUB_URL,
CONFIG_INDEX_CDN_URL,
JSD_PACKAGE_API_FORMAT,
GITHUB_REPO_URL_PATTERN,
)
@ -36,66 +44,65 @@ def row_style(column: str, text: str) -> RowStyle:
return style
async def recurrence_get_url(
url: str,
data_list: list[tuple[str, str]],
ignore_list: list[str] | None = None,
api_url: str | None = None,
):
"""递归获取目录下所有文件
def full_files_path(
jsd_package_info: JsdPackageInfo, module_path: str, is_dir: bool = True
) -> list[FileInfo]:
"""
获取文件路径
参数:
url: 信息url
data_list: 数据列表
jsd_package_info: JsdPackageInfo
module_path: 模块路径
is_dir: 是否为目录
异常:
ValueError: 访问错误
返回:
list[FileInfo]: 文件路径
"""
if ignore_list is None:
ignore_list = []
logger.debug(f"访问插件下载信息 URL: {url}", "插件管理")
res = await AsyncHttpx.get(url)
if res.status_code != 200:
raise ValueError(f"访问错误, code: {res.status_code}")
json_data = res.json()
if isinstance(json_data, list):
data_list.extend((v.get("download_url"), v["path"]) for v in json_data)
else:
data_list.append((json_data.get("download_url"), json_data["path"]))
for download_url, path in data_list:
if not download_url:
_url = api_url + path if api_url else DOWNLOAD_URL.format(path)
if _url not in ignore_list:
ignore_list.append(_url)
await recurrence_get_url(_url, data_list, ignore_list, api_url)
paths: list[str] = module_path.split(".")
cur_files: list[FileInfo] = jsd_package_info.files
for path in paths:
for cur_file in cur_files:
if (
cur_file.type == FileType.DIR
and cur_file.name == path
and cur_file.files
and (is_dir or path != paths[-1])
):
cur_files = cur_file.files
break
if not is_dir and path == paths[-1] and cur_file.name.split(".")[0] == path:
return cur_files
else:
raise ValueError(f"模块路径 {module_path} 不存在")
return cur_files
async def download_file(url: str, _is: bool = False, api_url: str | None = None):
"""下载文件
def recurrence_files(
files: list[FileInfo], dir_path: str, is_dir: bool = True
) -> list[str]:
"""
递归获取文件路径
参数:
url: 插件详情url
_is: 是否为第三方插件
url_start : 第三方插件url
files: 文件列表
dir_path: 目录路径
is_dir: 是否为目录
异常:
ValueError: 下载失败
返回:
list[str]: 文件路径
"""
data_list = []
await recurrence_get_url(url, data_list, api_url=api_url)
for download_url, path in data_list:
if download_url and "." in path:
logger.debug(f"下载文件: {path}", "插件管理")
base_path = BASE_PATH / "plugins" if _is else BASE_PATH
file = base_path / path
file.parent.mkdir(parents=True, exist_ok=True)
r = await AsyncHttpx.get(download_url)
if r.status_code != 200:
raise ValueError(f"文件下载错误, code: {r.status_code}")
content = r.text.replace("\r\n", "\n") # 统一换行符为 UNIX 风格
async with aiofiles.open(file, "w", encoding="utf8") as f:
logger.debug(f"写入文件: {file}", "插件管理")
await f.write(content)
paths = []
for file in files:
if is_dir and file.type == FileType.DIR and file.files:
paths.extend(
recurrence_files(file.files, f"{dir_path}/{file.name}", is_dir)
)
elif file.type == FileType.FILE:
if dir_path.endswith(file.name):
paths.append(dir_path)
elif is_dir:
paths.append(f"{dir_path}/{file.name}")
return paths
def install_requirement(plugin_path: Path):
@ -132,17 +139,8 @@ def install_requirement(plugin_path: Path):
class ShopManage:
type2name = { # noqa: RUF012
"NORMAL": "普通插件",
"ADMIN": "管理员插件",
"SUPERUSER": "超级用户插件",
"ADMIN_SUPERUSER": "管理员/超级用户插件",
"DEPENDANT": "依赖插件",
"HIDDEN": "其他插件",
}
@classmethod
async def __get_data(cls) -> dict:
async def __get_data(cls) -> dict[str, StorePluginInfo]:
"""获取插件信息数据
异常:
@ -165,36 +163,42 @@ class ShopManage:
# 解析并合并返回的 JSON 数据
data1 = json.loads(res.text)
data2 = json.loads(res2.text)
return {**data1, **data2}
return {
name: StorePluginInfo(**detail)
for name, detail in {**data1, **data2}.items()
}
@classmethod
def version_check(cls, plugin_info: dict, suc_plugin: dict[str, str]):
module = plugin_info["module"]
if module in suc_plugin and plugin_info["version"] != suc_plugin[module]:
return f"{suc_plugin[module]} (有更新->{plugin_info['version']})"
return plugin_info["version"]
def version_check(cls, plugin_info: StorePluginInfo, suc_plugin: dict[str, str]):
"""版本检查
参数:
plugin_info: StorePluginInfo
suc_plugin: dict[str, str]
返回:
str: 版本号
"""
module = plugin_info.module
if cls.check_version_is_new(plugin_info, suc_plugin):
return f"{suc_plugin[module]} (有更新->{plugin_info.version})"
return plugin_info.version
@classmethod
def get_url_path(cls, module_path: str, is_dir: bool) -> str:
url_path = None
path = BASE_PATH
module_path_split = module_path.split(".")
if len(module_path_split) == 2:
"""单个文件或文件夹"""
if is_dir:
url_path = "/".join(module_path_split)
else:
url_path = "/".join(module_path_split) + ".py"
else:
"""嵌套文件或文件夹"""
for p in module_path_split[:-1]:
path = path / p
path.mkdir(parents=True, exist_ok=True)
if is_dir:
url_path = f"{'/'.join(module_path_split)}"
else:
url_path = f"{'/'.join(module_path_split)}.py"
return url_path
def check_version_is_new(
cls, plugin_info: StorePluginInfo, suc_plugin: dict[str, str]
):
"""检查版本是否有更新
参数:
plugin_info: StorePluginInfo
suc_plugin: dict[str, str]
返回:
bool: 是否有更新
"""
module = plugin_info.module
return module in suc_plugin and plugin_info.version != suc_plugin[module]
@classmethod
async def get_plugins_info(cls) -> BuildImage | str:
@ -203,24 +207,21 @@ class ShopManage:
返回:
BuildImage | str: 返回消息
"""
data: dict = await cls.__get_data()
data: dict[str, StorePluginInfo] = await cls.__get_data()
column_name = ["-", "ID", "名称", "简介", "作者", "版本", "类型"]
for k in data.copy():
if data[k]["plugin_type"]:
data[k]["plugin_type"] = cls.type2name[data[k]["plugin_type"]]
plugin_list = await PluginInfo.filter(load_status=True).values_list(
"module", "version"
)
suc_plugin = {p[0]: (p[1] or "0.1") for p in plugin_list}
data_list = [
[
"已安装" if plugin_info[1]["module"] in suc_plugin else "",
"已安装" if plugin_info[1].module in suc_plugin else "",
id,
plugin_info[0],
plugin_info[1]["description"],
plugin_info[1]["author"],
plugin_info[1].description,
plugin_info[1].author,
cls.version_check(plugin_info[1], suc_plugin),
plugin_info[1]["plugin_type"],
plugin_info[1].plugin_type_name,
]
for id, plugin_info in enumerate(data.items())
]
@ -242,57 +243,101 @@ class ShopManage:
返回:
str: 返回消息
"""
data: dict = await cls.__get_data()
data: dict[str, StorePluginInfo] = await cls.__get_data()
if plugin_id < 0 or plugin_id >= len(data):
return "插件ID不存在..."
plugin_key = list(data.keys())[plugin_id]
plugin_info = data[plugin_key]
module_path_split = plugin_info["module_path"].split(".")
url_path = cls.get_url_path(plugin_info["module_path"], plugin_info["is_dir"])
if not url_path and plugin_info["module_path"]:
return "插件下载地址构建失败..."
logger.debug(f"尝试下载插件 URL: {url_path}", "插件管理")
github_url = plugin_info.get("github_url")
if github_url:
if not (r := re.search(r"github\.com/([^/]+/[^/]+)", github_url)):
return "github地址格式错误"
github_path = r[1]
api_url = f"https://api.github.com/repos/{github_path}/contents/"
download_url = f"{api_url}{url_path}?ref=main"
else:
download_url = DOWNLOAD_URL.format(url_path)
api_url = None
await download_file(download_url, bool(github_url), api_url)
# 安装依赖
plugin_path = BASE_PATH / "/".join(module_path_split)
if url_path and github_url and api_url:
plugin_path = BASE_PATH / "plugins" / "/".join(module_path_split)
res = await AsyncHttpx.get(api_url)
if res.status_code != 200:
return f"访问错误, code: {res.status_code}"
json_data = res.json()
if requirement_file := next(
(
v
for v in json_data
if v["name"] in ["requirements.txt", "requirement.txt"]
),
None,
):
r = await AsyncHttpx.get(requirement_file.get("download_url"))
if r.status_code != 200:
raise ValueError(f"文件下载错误, code: {r.status_code}")
requirement_path = plugin_path / requirement_file["name"]
async with aiofiles.open(requirement_path, "w", encoding="utf8") as f:
logger.debug(f"写入文件: {requirement_path}", "插件管理")
await f.write(r.text)
install_requirement(plugin_path)
is_external = True
if plugin_info.github_url is None:
plugin_info.github_url = DEFAULT_GITHUB_URL
is_external = False
logger.info(f"正在安装插件 {plugin_key}...")
await cls.install_plugin_with_repo(
plugin_info.github_url,
plugin_info.module_path,
plugin_info.is_dir,
is_external,
)
return f"插件 {plugin_key} 安装成功! 重启后生效"
@classmethod
async def get_repo_package_info(cls, repo_info: RepoInfo) -> JsdPackageInfo:
"""获取插件包信息
参数:
repo_info: 仓库信息
返回:
JsdPackageInfo: 插件包信息
"""
jsd_package_url: str = JSD_PACKAGE_API_FORMAT.format(
owner=repo_info.owner, repo=repo_info.repo, branch=repo_info.branch
)
res = await AsyncHttpx.get(url=jsd_package_url)
if res.status_code != 200:
raise ValueError(f"下载错误, code: {res.status_code}")
return JsdPackageInfo(**res.json())
@classmethod
def expand_github_url(cls, github_url: str) -> RepoInfo:
if matched := GITHUB_REPO_URL_PATTERN.match(github_url):
return RepoInfo(**matched.groupdict()) # type: ignore
raise ValueError("github地址格式错误")
@classmethod
async def install_plugin_with_repo(
cls, github_url: str, module_path: str, is_dir: bool, is_external: bool = False
):
repo_info = cls.expand_github_url(github_url)
logger.debug(f"成功获取仓库信息: {repo_info}", "插件管理")
jsd_package_info: JsdPackageInfo = await cls.get_repo_package_info(
repo_info=repo_info
)
files = full_files_path(jsd_package_info, module_path, is_dir)
files = recurrence_files(
files,
module_path.replace(".", "/") + ("" if is_dir else ".py"),
is_dir,
)
logger.debug(f"获取插件文件列表: {files}", "插件管理")
download_urls = [repo_info.get_download_url_with_path(file) for file in files]
base_path = BASE_PATH / "plugins" if is_external else BASE_PATH
download_paths: list[Path | str] = [base_path / file for file in files]
logger.debug(f"插件下载路径: {download_paths}", "插件管理")
result = await AsyncHttpx.gather_download_file(download_urls, download_paths)
for _id, success in enumerate(result):
if not success:
break
else:
# 安装依赖
plugin_path = base_path / "/".join(module_path.split("."))
req_files = recurrence_files(
jsd_package_info.files, REQ_TXT_FILE_STRING, False
)
req_files.extend(
recurrence_files(jsd_package_info.files, "requirement.txt", False)
)
logger.debug(f"获取插件依赖文件列表: {req_files}", "插件管理")
req_download_urls = [
repo_info.get_download_url_with_path(file) for file in req_files
]
req_paths: list[Path | str] = [plugin_path / file for file in req_files]
logger.debug(f"插件依赖文件下载路径: {req_paths}", "插件管理")
if req_files:
result = await AsyncHttpx.gather_download_file(
req_download_urls, req_paths
)
for _id, success in enumerate(result):
if not success:
break
else:
logger.debug(f"插件依赖文件列表: {req_paths}", "插件管理")
install_requirement(plugin_path)
raise Exception("插件依赖文件下载失败")
return True
raise Exception("插件下载失败")
@classmethod
async def remove_plugin(cls, plugin_id: int) -> str:
"""移除插件
@ -303,22 +348,22 @@ class ShopManage:
返回:
str: 返回消息
"""
data: dict = await cls.__get_data()
data = await cls.__get_data()
if plugin_id < 0 or plugin_id >= len(data):
return "插件ID不存在..."
plugin_key = list(data.keys())[plugin_id]
plugin_info = data[plugin_key]
path = BASE_PATH
if plugin_info.get("github_url"):
if plugin_info.github_url:
path = BASE_PATH / "plugins"
for p in plugin_info["module_path"].split("."):
for p in plugin_info.module_path.split("."):
path = path / p
if not plugin_info["is_dir"]:
if not plugin_info.is_dir:
path = Path(f"{path}.py")
if not path.exists():
return f"插件 {plugin_key} 不存在..."
logger.debug(f"尝试移除插件 {plugin_key} 文件: {path}", "插件管理")
if plugin_info["is_dir"]:
if plugin_info.is_dir:
shutil.rmtree(path)
else:
path.unlink()
@ -334,10 +379,7 @@ class ShopManage:
返回:
BuildImage | str: 返回消息
"""
data: dict = await cls.__get_data()
for k in data.copy():
if data[k]["plugin_type"]:
data[k]["plugin_type"] = cls.type2name[data[k]["plugin_type"]]
data = await cls.__get_data()
plugin_list = await PluginInfo.filter(load_status=True).values_list(
"module", "version"
)
@ -346,18 +388,18 @@ class ShopManage:
(id, plugin_info)
for id, plugin_info in enumerate(data.items())
if plugin_name_or_author.lower() in plugin_info[0].lower()
or plugin_name_or_author.lower() in plugin_info[1]["author"].lower()
or plugin_name_or_author.lower() in plugin_info[1].author.lower()
]
data_list = [
[
"已安装" if plugin_info[1]["module"] in suc_plugin else "",
"已安装" if plugin_info[1].module in suc_plugin else "",
id,
plugin_info[0],
plugin_info[1]["description"],
plugin_info[1]["author"],
plugin_info[1].description,
plugin_info[1].author,
cls.version_check(plugin_info[1], suc_plugin),
plugin_info[1]["plugin_type"],
plugin_info[1].plugin_type_name,
]
for id, plugin_info in filtered_data
]
@ -382,53 +424,27 @@ class ShopManage:
返回:
str: 返回消息
"""
data: dict = await cls.__get_data()
data = await cls.__get_data()
if plugin_id < 0 or plugin_id >= len(data):
return "插件ID不存在..."
plugin_key = list(data.keys())[plugin_id]
logger.info(f"尝试更新插件 {plugin_key}", "插件管理")
plugin_info = data[plugin_key]
module_path_split = plugin_info["module_path"].split(".")
url_path = cls.get_url_path(plugin_info["module_path"], plugin_info["is_dir"])
if not url_path and plugin_info["module_path"]:
return "插件下载地址构建失败..."
logger.debug(f"尝试下载插件 URL: {url_path}", "插件管理")
github_url = plugin_info.get("github_url")
if github_url:
if not (r := re.search(r"github\.com/([^/]+/[^/]+)", github_url)):
return "github地址格式错误..."
github_path = r[1]
api_url = f"https://api.github.com/repos/{github_path}/contents/"
download_url = f"{api_url}{url_path}?ref=main"
else:
download_url = DOWNLOAD_URL.format(url_path)
api_url = None
await download_file(download_url, bool(github_url), api_url)
# 安装依赖
plugin_path = BASE_PATH / "/".join(module_path_split)
if url_path and github_url and api_url:
plugin_path = BASE_PATH / "plugins" / "/".join(module_path_split)
res = await AsyncHttpx.get(api_url)
if res.status_code != 200:
return f"访问错误, code: {res.status_code}"
json_data = res.json()
if requirement_file := next(
(
v
for v in json_data
if v["name"] in ["requirements.txt", "requirement.txt"]
),
None,
):
r = await AsyncHttpx.get(requirement_file.get("download_url"))
if r.status_code != 200:
raise ValueError(f"文件下载错误, code: {r.status_code}")
requirement_path = plugin_path / requirement_file["name"]
async with aiofiles.open(requirement_path, "w", encoding="utf8") as f:
logger.debug(f"写入文件: {requirement_path}", "插件管理")
await f.write(r.text)
install_requirement(plugin_path)
plugin_list = await PluginInfo.filter(load_status=True).values_list(
"module", "version"
)
suc_plugin = {p[0]: p[1] for p in plugin_list if p[1]}
logger.debug(f"当前插件列表: {suc_plugin}", "插件管理")
if cls.check_version_is_new(plugin_info, suc_plugin):
return f"插件 {plugin_key} 已是最新版本"
is_external = True
if plugin_info.github_url is None:
plugin_info.github_url = DEFAULT_GITHUB_URL
is_external = False
await cls.install_plugin_with_repo(
plugin_info.github_url,
plugin_info.module_path,
plugin_info.is_dir,
is_external,
)
return f"插件 {plugin_key} 更新成功! 重启后生效"

View File

@ -0,0 +1,70 @@
from strenum import StrEnum
from pydantic import BaseModel, validator
from zhenxun.utils.enum import PluginType
type2name: dict[str, str] = {
"NORMAL": "普通插件",
"ADMIN": "管理员插件",
"SUPERUSER": "超级用户插件",
"ADMIN_SUPERUSER": "管理员/超级用户插件",
"DEPENDANT": "依赖插件",
"HIDDEN": "其他插件",
}
class StorePluginInfo(BaseModel):
"""插件信息"""
module: str
module_path: str
description: str
usage: str
author: str
version: str
plugin_type: PluginType
is_dir: bool
github_url: str | None
@property
def plugin_type_name(self):
return type2name[self.plugin_type.value]
class RepoInfo(BaseModel):
"""仓库信息"""
owner: str
repo: str
branch: str | None
@validator("branch", pre=True, always=True)
def set_default_branch(cls, v):
return "main" if v is None else v
def get_download_url_with_path(self, path: str):
return f"https://raw.githubusercontent.com/{self.owner}/{self.repo}/{self.branch}/{path}"
class FileType(StrEnum):
"""文件类型"""
FILE = "file"
DIR = "directory"
class FileInfo(BaseModel):
"""文件信息"""
type: FileType
name: str
files: list["FileInfo"] | None
class JsdPackageInfo(BaseModel):
"""jsd包信息"""
type: str
name: str
version: str
files: list[FileInfo]