🎨 插件商店代码优化

This commit is contained in:
HibiKier 2024-08-24 20:24:05 +08:00
parent 7390799c80
commit ceb8f663c0

View File

@ -1,9 +1,7 @@
import os
import shutil
import subprocess
from pathlib import Path
import nonebot
import ujson as json
from zhenxun.models.plugin_info import PluginInfo
@ -68,7 +66,6 @@ async def download_file(url: str):
url: 插件详情url
异常:
ValueError: 访问失败
ValueError: 下载失败
"""
data_list = []
@ -139,6 +136,36 @@ class ShopManage:
raise ValueError(f"下载错误, code: {res.status_code}")
return json.loads(res.text)
@classmethod
def version_check(cls, plugin_info: dict, suc_plugin: dict[str, str]):
module = plugin_info["module"]
if module in suc_plugin:
if plugin_info["version"] != suc_plugin[module]:
return f"{suc_plugin[module]} (有更新->{plugin_info['version']})"
return plugin_info["version"]
@classmethod
def get_url_path(cls, module_path: str, is_dir: bool) -> str:
url_path = None
path = BASE_PATH
module_path_split = module_path.split(".")
if len(module_path_split) == 2:
"""单个文件或文件夹"""
if is_dir:
url_path = "/".join(module_path_split)
else:
url_path = "/".join(module_path_split) + ".py"
else:
"""嵌套文件或文件夹"""
for p in module_path_split[:-1]:
path = path / p
path.mkdir(parents=True, exist_ok=True)
if is_dir:
url_path = f"{'/'.join(module_path_split)}"
else:
url_path = f"{'/'.join(module_path_split)}.py"
return url_path
@classmethod
async def get_plugins_info(cls) -> BuildImage | str:
"""插件列表
@ -151,9 +178,10 @@ class ShopManage:
for k in data.copy():
if data[k]["plugin_type"]:
data[k]["plugin_type"] = cls.type2name[data[k]["plugin_type"]]
plugin_list = await PluginInfo.filter(load_status=True).values_list("module", "version")
plugin_list = await PluginInfo.filter(load_status=True).values_list(
"module", "version"
)
suc_plugin = {p[0]: p[1] for p in plugin_list if p[1]}
data_list = [
[
"已安装" if plugin_info[1]["module"] in suc_plugin else "",
@ -161,12 +189,7 @@ class ShopManage:
plugin_info[0],
plugin_info[1]["description"],
plugin_info[1]["author"],
(
f"{suc_plugin[plugin_info[1]['module']]} (有更新->{plugin_info[1]['version']})"
if plugin_info[1]["module"] in suc_plugin
and plugin_info[1]["version"] != suc_plugin[plugin_info[1]["module"]]
else plugin_info[1]["version"]
),
cls.version_check(plugin_info[1], suc_plugin),
plugin_info[1]["plugin_type"],
]
for id, plugin_info in enumerate(data.items())
@ -181,29 +204,21 @@ class ShopManage:
@classmethod
async def add_plugin(cls, plugin_id: int) -> str:
"""添加插件
参数:
plugin_id: 插件id
返回:
str: 返回消息
"""
data: dict = await cls.__get_data()
if plugin_id < 0 or plugin_id >= len(data):
return "插件ID不存在..."
plugin_key = list(data.keys())[plugin_id]
plugin_info = data[plugin_key]
module_path_split = plugin_info["module_path"].split(".")
url_path = None
path = BASE_PATH
if len(module_path_split) == 2:
"""单个文件或文件夹"""
if plugin_info["is_dir"]:
url_path = "/".join(module_path_split)
else:
url_path = "/".join(module_path_split) + ".py"
else:
"""嵌套文件或文件夹"""
for p in module_path_split[:-1]:
path = path / p
path.mkdir(parents=True, exist_ok=True)
if plugin_info["is_dir"]:
url_path = f"{'/'.join(module_path_split)}"
else:
url_path = f"{'/'.join(module_path_split)}.py"
url_path = cls.get_url_path(plugin_info["module_path"], plugin_info["is_dir"])
if not url_path:
return "插件下载地址构建失败..."
logger.debug(f"尝试下载插件 URL: {url_path}", "插件管理")
@ -217,6 +232,14 @@ class ShopManage:
@classmethod
async def remove_plugin(cls, plugin_id: int) -> str:
"""移除插件
参数:
plugin_id: 插件id
返回:
str: 返回消息
"""
data: dict = await cls.__get_data()
if plugin_id < 0 or plugin_id >= len(data):
return "插件ID不存在..."
@ -238,18 +261,28 @@ class ShopManage:
@classmethod
async def search_plugin(cls, plugin_name_or_author: str) -> BuildImage | str:
"""搜索插件
参数:
plugin_name_or_author: 插件名称或作者
返回:
BuildImage | str: 返回消息
"""
data: dict = await cls.__get_data()
column_name = ["-", "ID", "名称", "简介", "作者", "版本", "类型"]
for k in data.copy():
if data[k]["plugin_type"]:
data[k]["plugin_type"] = cls.type2name[data[k]["plugin_type"]]
plugin_list = await PluginInfo.filter(load_status=True).values_list("module", "version")
plugin_list = await PluginInfo.filter(load_status=True).values_list(
"module", "version"
)
suc_plugin = {p[0]: p[1] for p in plugin_list if p[1]}
filtered_data = [
(id, plugin_info)
for id, plugin_info in enumerate(data.items())
if plugin_name_or_author.lower() in plugin_info[0].lower() or
plugin_name_or_author.lower() in plugin_info[1]["author"].lower()
if plugin_name_or_author.lower() in plugin_info[0].lower()
or plugin_name_or_author.lower() in plugin_info[1]["author"].lower()
]
data_list = [
@ -259,12 +292,7 @@ class ShopManage:
plugin_info[0],
plugin_info[1]["description"],
plugin_info[1]["author"],
(
f"{suc_plugin[plugin_info[1]['module']]} (有更新->{plugin_info[1]['version']})"
if plugin_info[1]["module"] in suc_plugin
and plugin_info[1]["version"] != suc_plugin[plugin_info[1]["module"]]
else plugin_info[1]["version"]
),
cls.version_check(plugin_info[1], suc_plugin),
plugin_info[1]["plugin_type"],
]
for id, plugin_info in filtered_data
@ -281,29 +309,23 @@ class ShopManage:
@classmethod
async def update_plugin(cls, plugin_id: int) -> str:
"""更新插件
参数:
plugin_id: 插件id
返回:
str: 返回消息
"""
data: dict = await cls.__get_data()
if plugin_id < 0 or plugin_id >= len(data):
return "插件ID不存在..."
plugin_key = list(data.keys())[plugin_id]
plugin_info = data[plugin_key]
module_path_split = plugin_info["module_path"].split(".")
url_path = None
path = BASE_PATH
if len(module_path_split) == 2:
"""单个文件或文件夹"""
if plugin_info["is_dir"]:
url_path = "/".join(module_path_split)
else:
url_path = "/".join(module_path_split) + ".py"
else:
"""嵌套文件或文件夹"""
for p in module_path_split[:-1]:
path = path / p
path.mkdir(parents=True, exist_ok=True)
if plugin_info["is_dir"]:
url_path = f"{'/'.join(module_path_split)}"
else:
url_path = f"{'/'.join(module_path_split)}.py"
url_path = url_path = cls.get_url_path(
plugin_info["module_path"], plugin_info["is_dir"]
)
if not url_path:
return "插件下载地址构建失败..."
logger.debug(f"尝试下载插件 URL: {url_path}", "插件管理")