Merge pull request #1572 from molanp/dev

feat(plugin_store): 添加搜索插件功能和更新插件功能及更新配置URL
This commit is contained in:
HibiKier 2024-08-24 20:05:33 +08:00 committed by GitHub
commit 7390799c80
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 146 additions and 13 deletions

View File

@ -3,7 +3,7 @@ from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import Alconna, Args, Subcommand, on_alconna
from nonebot_plugin_session import EventSession
from zhenxun.configs.utils import PluginExtraData
from zhenxun.configs.utils import PluginExtraData
from zhenxun.services.log import logger
from zhenxun.utils.enum import PluginType
from zhenxun.utils.message import MessageUtils
@ -17,6 +17,8 @@ __plugin_meta__ = PluginMetadata(
插件商店 : 查看当前的插件商店
添加插件 id : 添加插件
移除插件 id : 移除插件
搜索插件 name or author : 搜索插件
更新插件 id : 更新插件
""".strip(),
extra=PluginExtraData(
author="HibiKier",
@ -30,13 +32,14 @@ _matcher = on_alconna(
"插件商店",
Subcommand("add", Args["plugin_id", int]),
Subcommand("remove", Args["plugin_id", int]),
Subcommand("search", Args["plugin_name_or_author", str]),
Subcommand("update", Args["plugin_id", int]),
),
permission=SUPERUSER,
priority=1,
block=True,
)
_matcher.shortcut(
r"添加插件",
command="插件商店",
@ -51,6 +54,19 @@ _matcher.shortcut(
prefix=True,
)
_matcher.shortcut(
r"搜索插件",
command="插件商店",
arguments=["search", "{%0}"],
prefix=True,
)
_matcher.shortcut(
r"更新插件",
command="插件商店",
arguments=["update", "{%0}"],
prefix=True,
)
@_matcher.assign("$main")
async def _(session: EventSession):
@ -66,6 +82,9 @@ async def _(session: EventSession):
@_matcher.assign("add")
async def _(session: EventSession, plugin_id: int):
try:
await MessageUtils.build_message(
f"正在添加插件 Id: {plugin_id}"
).send()
result = await ShopManage.add_plugin(plugin_id)
except Exception as e:
logger.error(f"添加插件 Id: {plugin_id}失败", "插件商店", session=session, e=e)
@ -73,7 +92,7 @@ async def _(session: EventSession, plugin_id: int):
f"添加插件 Id: {plugin_id} 失败 e: {e}"
).finish()
logger.info(f"添加插件 Id: {plugin_id}", "插件商店", session=session)
await MessageUtils.build_message(result).finish()
await MessageUtils.build_message(result).send()
@_matcher.assign("remove")
@ -86,4 +105,31 @@ async def _(session: EventSession, plugin_id: int):
f"移除插件 Id: {plugin_id} 失败 e: {e}"
).finish()
logger.info(f"移除插件 Id: {plugin_id}", "插件商店", session=session)
await MessageUtils.build_message(result).finish()
await MessageUtils.build_message(result).send()
@_matcher.assign("search")
async def _(session: EventSession, plugin_name_or_author: str):
try:
result = await ShopManage.search_plugin(plugin_name_or_author)
except Exception as e:
logger.error(f"搜索插件 name: {plugin_name_or_author}失败", "插件商店", session=session, e=e)
await MessageUtils.build_message(
f"搜索插件 name: {plugin_name_or_author} 失败 e: {e}"
).finish()
logger.info(f"搜索插件 name: {plugin_name_or_author}", "插件商店", session=session)
await MessageUtils.build_message(result).send()
@_matcher.assign("update")
async def _(session: EventSession, plugin_id: int):
try:
await MessageUtils.build_message(
f"正在更新插件 Id: {plugin_id}"
).send()
result = await ShopManage.update_plugin(plugin_id)
except Exception as e:
logger.error(f"更新插件 Id: {plugin_id}失败", "插件商店", session=session, e=e)
await MessageUtils.build_message(
f"更新插件 Id: {plugin_id} 失败 e: {e}"
).finish()
logger.info(f"更新插件 Id: {plugin_id}", "插件商店", session=session)
await MessageUtils.build_message(result).send()

View File

@ -6,6 +6,7 @@ from pathlib import Path
import nonebot
import ujson as json
from zhenxun.models.plugin_info import PluginInfo
from zhenxun.services.log import logger
from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.utils.image_utils import BuildImage, ImageTemplate, RowStyle
@ -150,18 +151,25 @@ class ShopManage:
for k in data.copy():
if data[k]["plugin_type"]:
data[k]["plugin_type"] = cls.type2name[data[k]["plugin_type"]]
suc_plugin = [p.name for p in nonebot.get_loaded_plugins()]
plugin_list = await PluginInfo.filter(load_status=True).values_list("module", "version")
suc_plugin = {p[0]: p[1] for p in plugin_list if p[1]}
data_list = [
[
"已安装" if v[1]["module"] in suc_plugin else "",
i,
v[0],
v[1]["description"],
v[1]["author"],
v[1]["version"],
v[1]["plugin_type"],
"已安装" if plugin_info[1]["module"] in suc_plugin else "",
id,
plugin_info[0],
plugin_info[1]["description"],
plugin_info[1]["author"],
(
f"{suc_plugin[plugin_info[1]['module']]} (有更新->{plugin_info[1]['version']})"
if plugin_info[1]["module"] in suc_plugin
and plugin_info[1]["version"] != suc_plugin[plugin_info[1]["module"]]
else plugin_info[1]["version"]
),
plugin_info[1]["plugin_type"],
]
for i, v in enumerate(data.items())
for id, plugin_info in enumerate(data.items())
]
return await ImageTemplate.table_page(
"插件列表",
@ -227,3 +235,82 @@ class ShopManage:
else:
path.unlink()
return f"插件 {plugin_key} 移除成功!"
@classmethod
async def search_plugin(cls, plugin_name_or_author: str) -> BuildImage | str:
data: dict = await cls.__get_data()
column_name = ["-", "ID", "名称", "简介", "作者", "版本", "类型"]
for k in data.copy():
if data[k]["plugin_type"]:
data[k]["plugin_type"] = cls.type2name[data[k]["plugin_type"]]
plugin_list = await PluginInfo.filter(load_status=True).values_list("module", "version")
suc_plugin = {p[0]: p[1] for p in plugin_list if p[1]}
filtered_data = [
(id, plugin_info)
for id, plugin_info in enumerate(data.items())
if plugin_name_or_author.lower() in plugin_info[0].lower() or
plugin_name_or_author.lower() in plugin_info[1]["author"].lower()
]
data_list = [
[
"已安装" if plugin_info[1]["module"] in suc_plugin else "",
id,
plugin_info[0],
plugin_info[1]["description"],
plugin_info[1]["author"],
(
f"{suc_plugin[plugin_info[1]['module']]} (有更新->{plugin_info[1]['version']})"
if plugin_info[1]["module"] in suc_plugin
and plugin_info[1]["version"] != suc_plugin[plugin_info[1]["module"]]
else plugin_info[1]["version"]
),
plugin_info[1]["plugin_type"],
]
for id, plugin_info in filtered_data
]
if not data_list:
return "未找到相关插件..."
return await ImageTemplate.table_page(
"插件列表",
f"通过安装/卸载插件 ID 来管理插件",
column_name,
data_list,
text_style=row_style,
)
@classmethod
async def update_plugin(cls, plugin_id: int) -> str:
data: dict = await cls.__get_data()
if plugin_id < 0 or plugin_id >= len(data):
return "插件ID不存在..."
plugin_key = list(data.keys())[plugin_id]
plugin_info = data[plugin_key]
module_path_split = plugin_info["module_path"].split(".")
url_path = None
path = BASE_PATH
if len(module_path_split) == 2:
"""单个文件或文件夹"""
if plugin_info["is_dir"]:
url_path = "/".join(module_path_split)
else:
url_path = "/".join(module_path_split) + ".py"
else:
"""嵌套文件或文件夹"""
for p in module_path_split[:-1]:
path = path / p
path.mkdir(parents=True, exist_ok=True)
if plugin_info["is_dir"]:
url_path = f"{'/'.join(module_path_split)}"
else:
url_path = f"{'/'.join(module_path_split)}.py"
if not url_path:
return "插件下载地址构建失败..."
logger.debug(f"尝试下载插件 URL: {url_path}", "插件管理")
await download_file(DOWNLOAD_URL.format(url_path))
# 安装依赖
plugin_path = BASE_PATH / "/".join(module_path_split)
install_requirement(plugin_path)
return f"插件 {plugin_key} 更新成功!"