diff --git a/zhenxun/builtin_plugins/plugin_store/data_source.py b/zhenxun/builtin_plugins/plugin_store/data_source.py index d8372008..2e01a8c8 100644 --- a/zhenxun/builtin_plugins/plugin_store/data_source.py +++ b/zhenxun/builtin_plugins/plugin_store/data_source.py @@ -1,9 +1,7 @@ -import os import shutil import subprocess from pathlib import Path -import nonebot import ujson as json from zhenxun.models.plugin_info import PluginInfo @@ -68,7 +66,6 @@ async def download_file(url: str): url: 插件详情url 异常: - ValueError: 访问失败 ValueError: 下载失败 """ data_list = [] @@ -139,6 +136,36 @@ class ShopManage: raise ValueError(f"下载错误, code: {res.status_code}") return json.loads(res.text) + @classmethod + def version_check(cls, plugin_info: dict, suc_plugin: dict[str, str]): + module = plugin_info["module"] + if module in suc_plugin: + if plugin_info["version"] != suc_plugin[module]: + return f"{suc_plugin[module]} (有更新->{plugin_info['version']})" + return plugin_info["version"] + + @classmethod + def get_url_path(cls, module_path: str, is_dir: bool) -> str: + url_path = None + path = BASE_PATH + module_path_split = module_path.split(".") + if len(module_path_split) == 2: + """单个文件或文件夹""" + if is_dir: + url_path = "/".join(module_path_split) + else: + url_path = "/".join(module_path_split) + ".py" + else: + """嵌套文件或文件夹""" + for p in module_path_split[:-1]: + path = path / p + path.mkdir(parents=True, exist_ok=True) + if is_dir: + url_path = f"{'/'.join(module_path_split)}" + else: + url_path = f"{'/'.join(module_path_split)}.py" + return url_path + @classmethod async def get_plugins_info(cls) -> BuildImage | str: """插件列表 @@ -151,9 +178,10 @@ class ShopManage: for k in data.copy(): if data[k]["plugin_type"]: data[k]["plugin_type"] = cls.type2name[data[k]["plugin_type"]] - plugin_list = await PluginInfo.filter(load_status=True).values_list("module", "version") + plugin_list = await PluginInfo.filter(load_status=True).values_list( + "module", "version" + ) suc_plugin = {p[0]: p[1] for p in plugin_list if p[1]} - data_list = [ [ "已安装" if plugin_info[1]["module"] in suc_plugin else "", @@ -161,12 +189,7 @@ class ShopManage: plugin_info[0], plugin_info[1]["description"], plugin_info[1]["author"], - ( - f"{suc_plugin[plugin_info[1]['module']]} (有更新->{plugin_info[1]['version']})" - if plugin_info[1]["module"] in suc_plugin - and plugin_info[1]["version"] != suc_plugin[plugin_info[1]["module"]] - else plugin_info[1]["version"] - ), + cls.version_check(plugin_info[1], suc_plugin), plugin_info[1]["plugin_type"], ] for id, plugin_info in enumerate(data.items()) @@ -181,29 +204,21 @@ class ShopManage: @classmethod async def add_plugin(cls, plugin_id: int) -> str: + """添加插件 + + 参数: + plugin_id: 插件id + + 返回: + str: 返回消息 + """ data: dict = await cls.__get_data() if plugin_id < 0 or plugin_id >= len(data): return "插件ID不存在..." plugin_key = list(data.keys())[plugin_id] plugin_info = data[plugin_key] module_path_split = plugin_info["module_path"].split(".") - url_path = None - path = BASE_PATH - if len(module_path_split) == 2: - """单个文件或文件夹""" - if plugin_info["is_dir"]: - url_path = "/".join(module_path_split) - else: - url_path = "/".join(module_path_split) + ".py" - else: - """嵌套文件或文件夹""" - for p in module_path_split[:-1]: - path = path / p - path.mkdir(parents=True, exist_ok=True) - if plugin_info["is_dir"]: - url_path = f"{'/'.join(module_path_split)}" - else: - url_path = f"{'/'.join(module_path_split)}.py" + url_path = cls.get_url_path(plugin_info["module_path"], plugin_info["is_dir"]) if not url_path: return "插件下载地址构建失败..." logger.debug(f"尝试下载插件 URL: {url_path}", "插件管理") @@ -217,6 +232,14 @@ class ShopManage: @classmethod async def remove_plugin(cls, plugin_id: int) -> str: + """移除插件 + + 参数: + plugin_id: 插件id + + 返回: + str: 返回消息 + """ data: dict = await cls.__get_data() if plugin_id < 0 or plugin_id >= len(data): return "插件ID不存在..." @@ -235,21 +258,31 @@ class ShopManage: else: path.unlink() return f"插件 {plugin_key} 移除成功!" - + @classmethod async def search_plugin(cls, plugin_name_or_author: str) -> BuildImage | str: + """搜索插件 + + 参数: + plugin_name_or_author: 插件名称或作者 + + 返回: + BuildImage | str: 返回消息 + """ data: dict = await cls.__get_data() column_name = ["-", "ID", "名称", "简介", "作者", "版本", "类型"] for k in data.copy(): if data[k]["plugin_type"]: data[k]["plugin_type"] = cls.type2name[data[k]["plugin_type"]] - plugin_list = await PluginInfo.filter(load_status=True).values_list("module", "version") + plugin_list = await PluginInfo.filter(load_status=True).values_list( + "module", "version" + ) suc_plugin = {p[0]: p[1] for p in plugin_list if p[1]} filtered_data = [ (id, plugin_info) for id, plugin_info in enumerate(data.items()) - if plugin_name_or_author.lower() in plugin_info[0].lower() or - plugin_name_or_author.lower() in plugin_info[1]["author"].lower() + if plugin_name_or_author.lower() in plugin_info[0].lower() + or plugin_name_or_author.lower() in plugin_info[1]["author"].lower() ] data_list = [ @@ -259,12 +292,7 @@ class ShopManage: plugin_info[0], plugin_info[1]["description"], plugin_info[1]["author"], - ( - f"{suc_plugin[plugin_info[1]['module']]} (有更新->{plugin_info[1]['version']})" - if plugin_info[1]["module"] in suc_plugin - and plugin_info[1]["version"] != suc_plugin[plugin_info[1]["module"]] - else plugin_info[1]["version"] - ), + cls.version_check(plugin_info[1], suc_plugin), plugin_info[1]["plugin_type"], ] for id, plugin_info in filtered_data @@ -278,32 +306,26 @@ class ShopManage: data_list, text_style=row_style, ) - + @classmethod async def update_plugin(cls, plugin_id: int) -> str: + """更新插件 + + 参数: + plugin_id: 插件id + + 返回: + str: 返回消息 + """ data: dict = await cls.__get_data() if plugin_id < 0 or plugin_id >= len(data): return "插件ID不存在..." plugin_key = list(data.keys())[plugin_id] plugin_info = data[plugin_key] module_path_split = plugin_info["module_path"].split(".") - url_path = None - path = BASE_PATH - if len(module_path_split) == 2: - """单个文件或文件夹""" - if plugin_info["is_dir"]: - url_path = "/".join(module_path_split) - else: - url_path = "/".join(module_path_split) + ".py" - else: - """嵌套文件或文件夹""" - for p in module_path_split[:-1]: - path = path / p - path.mkdir(parents=True, exist_ok=True) - if plugin_info["is_dir"]: - url_path = f"{'/'.join(module_path_split)}" - else: - url_path = f"{'/'.join(module_path_split)}.py" + url_path = url_path = cls.get_url_path( + plugin_info["module_path"], plugin_info["is_dir"] + ) if not url_path: return "插件下载地址构建失败..." logger.debug(f"尝试下载插件 URL: {url_path}", "插件管理")