重构插件商店,支持Gitee插件管理,更新相关逻辑和配置

This commit is contained in:
HibiKier 2025-06-20 10:48:12 +08:00
parent 811fae6208
commit 6e58818e8b
9 changed files with 448 additions and 176 deletions

View File

@ -1,20 +0,0 @@
name: Sync to GitCode (Only on PR Merge to Main)
on:
pull_request:
types: [closed] # 监听 PR 关闭事件
branches: [main] # 仅当目标分支是 main 时才触发
jobs:
sync:
if: github.event.pull_request.merged == true # 仅当 PR 被合并时运行
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
ref: main # 检出 main 分支
fetch-depth: 0 # 获取完整历史
- name: Push to GitCode
run: |
git remote add gitcode https://qq_41605780:${{ secrets.GITCODE_TOKEN }}@gitcode.com/qq_41605780/zhenxun_bot.git
git push gitcode HEAD:main --force-with-lease

View File

@ -11,6 +11,8 @@
"displayname", "displayname",
"flmt", "flmt",
"getbbox", "getbbox",
"gitcode",
"GITEE",
"hibiapi", "hibiapi",
"httpx", "httpx",
"jsdelivr", "jsdelivr",

View File

@ -9,7 +9,7 @@ from zhenxun.utils.enum import PluginType
from zhenxun.utils.message import MessageUtils from zhenxun.utils.message import MessageUtils
from zhenxun.utils.utils import is_number from zhenxun.utils.utils import is_number
from .data_source import ShopManage from .data_source import StoreManager
__plugin_meta__ = PluginMetadata( __plugin_meta__ = PluginMetadata(
name="插件商店", name="插件商店",
@ -82,7 +82,7 @@ _matcher.shortcut(
@_matcher.assign("$main") @_matcher.assign("$main")
async def _(session: EventSession): async def _(session: EventSession):
try: try:
result = await ShopManage.get_plugins_info() result = await StoreManager.get_plugins_info()
logger.info("查看插件列表", "插件商店", session=session) logger.info("查看插件列表", "插件商店", session=session)
await MessageUtils.build_message(result).send() await MessageUtils.build_message(result).send()
except Exception as e: except Exception as e:
@ -97,7 +97,7 @@ async def _(session: EventSession, plugin_id: str):
await MessageUtils.build_message(f"正在添加插件 Id: {plugin_id}").send() await MessageUtils.build_message(f"正在添加插件 Id: {plugin_id}").send()
else: else:
await MessageUtils.build_message(f"正在添加插件 Module: {plugin_id}").send() await MessageUtils.build_message(f"正在添加插件 Module: {plugin_id}").send()
result = await ShopManage.add_plugin(plugin_id) result = await StoreManager.add_plugin(plugin_id)
except Exception as e: except Exception as e:
logger.error(f"添加插件 Id: {plugin_id}失败", "插件商店", session=session, e=e) logger.error(f"添加插件 Id: {plugin_id}失败", "插件商店", session=session, e=e)
await MessageUtils.build_message( await MessageUtils.build_message(
@ -110,7 +110,7 @@ async def _(session: EventSession, plugin_id: str):
@_matcher.assign("remove") @_matcher.assign("remove")
async def _(session: EventSession, plugin_id: str): async def _(session: EventSession, plugin_id: str):
try: try:
result = await ShopManage.remove_plugin(plugin_id) result = await StoreManager.remove_plugin(plugin_id)
except Exception as e: except Exception as e:
logger.error(f"移除插件 Id: {plugin_id}失败", "插件商店", session=session, e=e) logger.error(f"移除插件 Id: {plugin_id}失败", "插件商店", session=session, e=e)
await MessageUtils.build_message( await MessageUtils.build_message(
@ -123,7 +123,7 @@ async def _(session: EventSession, plugin_id: str):
@_matcher.assign("search") @_matcher.assign("search")
async def _(session: EventSession, plugin_name_or_author: str): async def _(session: EventSession, plugin_name_or_author: str):
try: try:
result = await ShopManage.search_plugin(plugin_name_or_author) result = await StoreManager.search_plugin(plugin_name_or_author)
except Exception as e: except Exception as e:
logger.error( logger.error(
f"搜索插件 name: {plugin_name_or_author}失败", f"搜索插件 name: {plugin_name_or_author}失败",
@ -145,7 +145,7 @@ async def _(session: EventSession, plugin_id: str):
await MessageUtils.build_message(f"正在更新插件 Id: {plugin_id}").send() await MessageUtils.build_message(f"正在更新插件 Id: {plugin_id}").send()
else: else:
await MessageUtils.build_message(f"正在更新插件 Module: {plugin_id}").send() await MessageUtils.build_message(f"正在更新插件 Module: {plugin_id}").send()
result = await ShopManage.update_plugin(plugin_id) result = await StoreManager.update_plugin(plugin_id)
except Exception as e: except Exception as e:
logger.error(f"更新插件 Id: {plugin_id}失败", "插件商店", session=session, e=e) logger.error(f"更新插件 Id: {plugin_id}失败", "插件商店", session=session, e=e)
await MessageUtils.build_message( await MessageUtils.build_message(
@ -159,7 +159,7 @@ async def _(session: EventSession, plugin_id: str):
async def _(session: EventSession): async def _(session: EventSession):
try: try:
await MessageUtils.build_message("正在更新全部插件").send() await MessageUtils.build_message("正在更新全部插件").send()
result = await ShopManage.update_all_plugin() result = await StoreManager.update_all_plugin()
except Exception as e: except Exception as e:
logger.error("更新全部插件失败", "插件商店", session=session, e=e) logger.error("更新全部插件失败", "插件商店", session=session, e=e)
await MessageUtils.build_message(f"更新全部插件失败 e: {e}").finish() await MessageUtils.build_message(f"更新全部插件失败 e: {e}").finish()

View File

@ -10,5 +10,13 @@ DEFAULT_GITHUB_URL = "https://github.com/zhenxun-org/zhenxun_bot_plugins/tree/ma
EXTRA_GITHUB_URL = "https://github.com/zhenxun-org/zhenxun_bot_plugins_index/tree/index" EXTRA_GITHUB_URL = "https://github.com/zhenxun-org/zhenxun_bot_plugins_index/tree/index"
"""插件库索引github仓库地址""" """插件库索引github仓库地址"""
DEFAULT_GITCODE_RAW_URL = "https://raw.gitcode.com/gh_mirrors/zh/zhenxun_bot/raw/main" GITEE_RAW_URL = "https://gitee.com/two_Dimension/zhenxun_bot_plugins/raw/main"
"""伴生插件gitcode仓库地址""" """GITEE仓库文件内容"""
GITEE_CONTENTS_URL = (
"https://gitee.com/api/v5/repos/two_Dimension/zhenxun_bot_plugins/contents"
)
"""GITEE仓库文件列表获取"""
LOG_COMMAND = "插件商店"

View File

@ -1,12 +1,11 @@
from pathlib import Path from pathlib import Path
import shutil import shutil
import subprocess
from aiocache import cached from aiocache import cached
import ujson as json import ujson as json
from zhenxun.builtin_plugins.auto_update.config import REQ_TXT_FILE_STRING from zhenxun.builtin_plugins.auto_update.config import REQ_TXT_FILE_STRING
from zhenxun.builtin_plugins.plugin_store.models import StorePluginInfo from zhenxun.builtin_plugins.plugin_store.models import GiteeContents, StorePluginInfo
from zhenxun.models.plugin_info import PluginInfo from zhenxun.models.plugin_info import PluginInfo
from zhenxun.services.log import logger from zhenxun.services.log import logger
from zhenxun.services.plugin_init import PluginInitManager from zhenxun.services.plugin_init import PluginInitManager
@ -14,9 +13,17 @@ from zhenxun.utils.github_utils import GithubUtils
from zhenxun.utils.github_utils.models import RepoAPI from zhenxun.utils.github_utils.models import RepoAPI
from zhenxun.utils.http_utils import AsyncHttpx from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.utils.image_utils import BuildImage, ImageTemplate, RowStyle from zhenxun.utils.image_utils import BuildImage, ImageTemplate, RowStyle
from zhenxun.utils.manager.virtual_env_package_manager import VirtualEnvPackageManager
from zhenxun.utils.utils import is_number from zhenxun.utils.utils import is_number
from .config import BASE_PATH, DEFAULT_GITHUB_URL, EXTRA_GITHUB_URL from .config import (
BASE_PATH,
DEFAULT_GITHUB_URL,
EXTRA_GITHUB_URL,
GITEE_CONTENTS_URL,
GITEE_RAW_URL,
LOG_COMMAND,
)
def row_style(column: str, text: str) -> RowStyle: def row_style(column: str, text: str) -> RowStyle:
@ -39,72 +46,87 @@ def install_requirement(plugin_path: Path):
requirement_files = ["requirement.txt", "requirements.txt"] requirement_files = ["requirement.txt", "requirements.txt"]
requirement_paths = [plugin_path / file for file in requirement_files] requirement_paths = [plugin_path / file for file in requirement_files]
existing_requirements = next( if existing_requirements := next(
(path for path in requirement_paths if path.exists()), None (path for path in requirement_paths if path.exists()), None
) ):
VirtualEnvPackageManager.install_requirement(existing_requirements)
if not existing_requirements:
logger.debug(
f"No requirement.txt found for plugin: {plugin_path.name}", "插件管理"
)
return
try:
result = subprocess.run(
["poetry", "run", "pip", "install", "-r", str(existing_requirements)],
check=True,
capture_output=True,
text=True,
)
logger.debug(
"Successfully installed dependencies for"
f" plugin: {plugin_path.name}. Output:\n{result.stdout}",
"插件管理",
)
except subprocess.CalledProcessError:
logger.error(
f"Failed to install dependencies for plugin: {plugin_path.name}. "
" Error:\n{e.stderr}"
)
class ShopManage: class StoreManager:
@classmethod @classmethod
@cached(60) async def get_github_plugins(cls) -> list[StorePluginInfo]:
async def get_data(cls) -> dict[str, StorePluginInfo]: """获取github插件列表信息
"""获取插件信息数据
异常:
ValueError: 访问请求失败
返回: 返回:
dict: 插件信息数据 list[StorePluginInfo]: 插件列表数据
""" """
default_github_repo = GithubUtils.parse_github_url(DEFAULT_GITHUB_URL) return []
extra_github_repo = GithubUtils.parse_github_url(EXTRA_GITHUB_URL) repo_info = GithubUtils.parse_github_url(DEFAULT_GITHUB_URL)
for repo_info in [default_github_repo, extra_github_repo]:
if await repo_info.update_repo_commit(): if await repo_info.update_repo_commit():
logger.info(f"获取最新提交: {repo_info.branch}", "插件管理") logger.info(f"获取最新提交: {repo_info.branch}", LOG_COMMAND)
else: else:
logger.warning(f"获取最新提交失败: {repo_info}", "插件管理") logger.warning(f"获取最新提交失败: {repo_info}", LOG_COMMAND)
default_github_url = await default_github_repo.get_raw_download_urls( default_github_url = await repo_info.get_raw_download_urls("plugins.json")
"plugins.json" response = await AsyncHttpx.get(default_github_url, check_status_code=200)
if response.status_code == 200:
return [StorePluginInfo(**detail) for detail in json.loads(response.text)]
else:
logger.warning(
f"获取github插件列表失败: {response.status_code}", LOG_COMMAND
) )
extra_github_url = await extra_github_repo.get_raw_download_urls("plugins.json") return []
res = await AsyncHttpx.get(default_github_url, check_status_code=200)
res2 = await AsyncHttpx.get(extra_github_url, check_status_code=200)
# 检查请求结果 @classmethod
if res.status_code != 200 or res2.status_code != 200: async def get_gitee_plugins(cls) -> list[StorePluginInfo]:
raise ValueError(f"下载错误, code: {res.status_code}, {res2.status_code}") """获取gitcode插件列表信息
# 解析并合并返回的 JSON 数据 返回:
data1 = json.loads(res.text) list[StorePluginInfo]: 插件列表数据
data2 = json.loads(res2.text) """
return { url = f"{GITEE_RAW_URL}/plugins.json"
name: StorePluginInfo(**detail) response = await AsyncHttpx.get(url, check_status_code=200)
for name, detail in {**data1, **data2}.items() if response.status_code == 200:
} return [StorePluginInfo(**detail) for detail in json.loads(response.text)]
else:
logger.warning(
f"获取gitee插件列表失败: {response.status_code}", LOG_COMMAND
)
return []
@classmethod
async def get_extra_plugins(cls) -> list[StorePluginInfo]:
"""获取额外插件列表信息
返回:
list[StorePluginInfo]: 插件列表数据
"""
repo_info = GithubUtils.parse_github_url(EXTRA_GITHUB_URL)
if await repo_info.update_repo_commit():
logger.info(f"获取最新提交: {repo_info.branch}", LOG_COMMAND)
else:
logger.warning(f"获取最新提交失败: {repo_info}", LOG_COMMAND)
extra_github_url = await repo_info.get_raw_download_urls("plugins.json")
response = await AsyncHttpx.get(extra_github_url, check_status_code=200)
if response.status_code == 200:
return [StorePluginInfo(**detail) for detail in json.loads(response.text)]
else:
logger.warning(
f"获取github扩展插件列表失败: {response.status_code}", LOG_COMMAND
)
return []
@classmethod
@cached(60)
async def get_data(cls) -> list[StorePluginInfo]:
"""获取插件信息数据
返回:
list[StorePluginInfo]: 插件信息数据
"""
plugins = await cls.get_github_plugins() or await cls.get_gitee_plugins()
# extra_plugins = await cls.get_extra_plugins()
extra_plugins = []
return [*plugins, *extra_plugins]
@classmethod @classmethod
def version_check(cls, plugin_info: StorePluginInfo, suc_plugin: dict[str, str]): def version_check(cls, plugin_info: StorePluginInfo, suc_plugin: dict[str, str]):
@ -112,7 +134,7 @@ class ShopManage:
参数: 参数:
plugin_info: StorePluginInfo plugin_info: StorePluginInfo
suc_plugin: dict[str, str] suc_plugin: 模块名: 版本号
返回: 返回:
str: 版本号 str: 版本号
@ -132,7 +154,7 @@ class ShopManage:
参数: 参数:
plugin_info: StorePluginInfo plugin_info: StorePluginInfo
suc_plugin: dict[str, str] suc_plugin: 模块名: 版本号
返回: 返回:
bool: 是否有更新 bool: 是否有更新
@ -156,21 +178,21 @@ class ShopManage:
返回: 返回:
BuildImage | str: 返回消息 BuildImage | str: 返回消息
""" """
data: dict[str, StorePluginInfo] = await cls.get_data() plugin_list: list[StorePluginInfo] = await cls.get_data()
column_name = ["-", "ID", "名称", "简介", "作者", "版本", "类型"] column_name = ["-", "ID", "名称", "简介", "作者", "版本", "类型"]
plugin_list = await cls.get_loaded_plugins("module", "version") db_plugin_list = await cls.get_loaded_plugins("module", "version")
suc_plugin = {p[0]: (p[1] or "0.1") for p in plugin_list} suc_plugin = {p[0]: (p[1] or "0.1") for p in db_plugin_list}
data_list = [ data_list = [
[ [
"已安装" if plugin_info[1].module in suc_plugin else "", "已安装" if plugin_info.module in suc_plugin else "",
id, id,
plugin_info[0], plugin_info.name,
plugin_info[1].description, plugin_info.description,
plugin_info[1].author, plugin_info.author,
cls.version_check(plugin_info[1], suc_plugin), cls.version_check(plugin_info, suc_plugin),
plugin_info[1].plugin_type_name, plugin_info.plugin_type_name,
] ]
for id, plugin_info in enumerate(data.items()) for id, plugin_info in enumerate(plugin_list)
] ]
return await ImageTemplate.table_page( return await ImageTemplate.table_page(
"插件列表", "插件列表",
@ -190,14 +212,14 @@ class ShopManage:
返回: 返回:
str: 返回消息 str: 返回消息
""" """
data: dict[str, StorePluginInfo] = await cls.get_data() plugin_list: list[StorePluginInfo] = await cls.get_data()
try: try:
plugin_key = await cls._resolve_plugin_key(plugin_id) plugin_key = await cls._resolve_plugin_key(plugin_id)
except ValueError as e: except ValueError as e:
return str(e) return str(e)
plugin_list = await cls.get_loaded_plugins("module") db_plugin_list = await cls.get_loaded_plugins("module")
plugin_info = data[plugin_key] plugin_info = next(p for p in plugin_list if p.module == plugin_key)
if plugin_info.module in [p[0] for p in plugin_list]: if plugin_info.module in [p[0] for p in db_plugin_list]:
return f"插件 {plugin_key} 已安装,无需重复安装" return f"插件 {plugin_key} 已安装,无需重复安装"
is_external = True is_external = True
if plugin_info.github_url is None: if plugin_info.github_url is None:
@ -207,34 +229,83 @@ class ShopManage:
if len(version_split) > 1: if len(version_split) > 1:
github_url_split = plugin_info.github_url.split("/tree/") github_url_split = plugin_info.github_url.split("/tree/")
plugin_info.github_url = f"{github_url_split[0]}/tree/{version_split[1]}" plugin_info.github_url = f"{github_url_split[0]}/tree/{version_split[1]}"
logger.info(f"正在安装插件 {plugin_key}...") logger.info(f"正在安装插件 {plugin_key}...", LOG_COMMAND)
await cls.install_plugin_with_repo( download_type = "GITHUB"
plugin_info.github_url, try:
plugin_info.module_path, # await cls.install_plugin_with_repo(
plugin_info.is_dir, # plugin_info.github_url,
is_external, # plugin_info.module_path,
) # plugin_info.is_dir,
return f"插件 {plugin_key} 安装成功! 重启后生效" # is_external,
# )
pass
except Exception as e:
download_type = "GITEE"
logger.error(f"GITHUB 插件 {plugin_key} 更新失败", LOG_COMMAND, e=e)
await cls.install_plugin_with_gitee(plugin_info.module_path, plugin_info.is_dir)
return f"插件 {download_type} {plugin_key} 安装成功! 重启后生效"
@classmethod
async def __get_download_files(cls, url: str, data_list: list[tuple[str, str]]):
response = await AsyncHttpx.get(url)
response.raise_for_status()
for item in [GiteeContents(**item) for item in response.json()]:
if item.type == "dir":
await cls.__get_download_files(item.url, data_list)
else:
data_list.append((item.path, item.download_url))
@classmethod
async def install_plugin_with_gitee(cls, module_path: str, is_dir: bool):
module_path = module_path.replace(".", "/")
data_list = []
if is_dir:
DIR_URL = f"{GITEE_CONTENTS_URL}/{module_path}"
await cls.__get_download_files(DIR_URL, data_list)
else:
FILE_URL = f"{GITEE_RAW_URL}/{module_path}.py"
data_list.append((f"{module_path}.py", FILE_URL))
if not data_list:
raise ValueError("获取插件文件失败(目录为空),请检查地址是否正确")
download_urls = []
download_paths = []
requirement_file = None
for item in data_list:
file_path = BASE_PATH / Path(item[0])
if file_path.is_file():
file_path.parent.mkdir(parents=True, exist_ok=True)
download_urls.append(item[1])
download_paths.append(file_path)
if "requirement" in item[0] and str(item[0]).endswith(".txt"):
requirement_file = file_path
await AsyncHttpx.gather_download_file(download_urls, download_paths)
if requirement_file:
VirtualEnvPackageManager.install_requirement(requirement_file)
@classmethod @classmethod
async def install_plugin_with_repo( async def install_plugin_with_repo(
cls, github_url: str, module_path: str, is_dir: bool, is_external: bool = False cls,
github_url: str,
module_path: str,
is_dir: bool,
is_external: bool = False,
): ):
files: list[str]
repo_api: RepoAPI repo_api: RepoAPI
repo_info = GithubUtils.parse_github_url(github_url) repo_info = GithubUtils.parse_github_url(github_url)
if await repo_info.update_repo_commit(): if await repo_info.update_repo_commit():
logger.info(f"获取最新提交: {repo_info.branch}", "插件管理") logger.info(f"获取最新提交: {repo_info.branch}", LOG_COMMAND)
else: else:
logger.warning(f"获取最新提交失败: {repo_info}", "插件管理") logger.warning(f"获取最新提交失败: {repo_info}", LOG_COMMAND)
logger.debug(f"成功获取仓库信息: {repo_info}", "插件管理") logger.debug(f"成功获取仓库信息: {repo_info}", LOG_COMMAND)
for repo_api in GithubUtils.iter_api_strategies(): for repo_api in GithubUtils.iter_api_strategies():
try: try:
await repo_api.parse_repo_info(repo_info) await repo_api.parse_repo_info(repo_info)
break break
except Exception as e: except Exception as e:
logger.warning( logger.warning(
f"获取插件文件失败: {e} | API类型: {repo_api.strategy}", "插件管理" f"获取插件文件失败 | API类型: {repo_api.strategy}",
LOG_COMMAND,
e=e,
) )
continue continue
else: else:
@ -250,7 +321,7 @@ class ShopManage:
base_path = BASE_PATH / "plugins" if is_external else BASE_PATH base_path = BASE_PATH / "plugins" if is_external else BASE_PATH
base_path = base_path if module_path else base_path / repo_info.repo base_path = base_path if module_path else base_path / repo_info.repo
download_paths: list[Path | str] = [base_path / file for file in files] download_paths: list[Path | str] = [base_path / file for file in files]
logger.debug(f"插件下载路径: {download_paths}", "插件管理") logger.debug(f"插件下载路径: {download_paths}", LOG_COMMAND)
result = await AsyncHttpx.gather_download_file(download_urls, download_paths) result = await AsyncHttpx.gather_download_file(download_urls, download_paths)
for _id, success in enumerate(result): for _id, success in enumerate(result):
if not success: if not success:
@ -265,12 +336,12 @@ class ShopManage:
req_files.extend( req_files.extend(
repo_api.get_files(f"{replace_module_path}/requirement.txt", False) repo_api.get_files(f"{replace_module_path}/requirement.txt", False)
) )
logger.debug(f"获取插件依赖文件列表: {req_files}", "插件管理") logger.debug(f"获取插件依赖文件列表: {req_files}", LOG_COMMAND)
req_download_urls = [ req_download_urls = [
await repo_info.get_raw_download_urls(file) for file in req_files await repo_info.get_raw_download_urls(file) for file in req_files
] ]
req_paths: list[Path | str] = [plugin_path / file for file in req_files] req_paths: list[Path | str] = [plugin_path / file for file in req_files]
logger.debug(f"插件依赖文件下载路径: {req_paths}", "插件管理") logger.debug(f"插件依赖文件下载路径: {req_paths}", LOG_COMMAND)
if req_files: if req_files:
result = await AsyncHttpx.gather_download_file( result = await AsyncHttpx.gather_download_file(
req_download_urls, req_paths req_download_urls, req_paths
@ -278,7 +349,7 @@ class ShopManage:
for success in result: for success in result:
if not success: if not success:
raise Exception("插件依赖文件下载失败") raise Exception("插件依赖文件下载失败")
logger.debug(f"插件依赖文件列表: {req_paths}", "插件管理") logger.debug(f"插件依赖文件列表: {req_paths}", LOG_COMMAND)
install_requirement(plugin_path) install_requirement(plugin_path)
except ValueError as e: except ValueError as e:
logger.warning("未获取到依赖文件路径...", e=e) logger.warning("未获取到依赖文件路径...", e=e)
@ -295,12 +366,12 @@ class ShopManage:
返回: 返回:
str: 返回消息 str: 返回消息
""" """
data: dict[str, StorePluginInfo] = await cls.get_data() plugin_list: list[StorePluginInfo] = await cls.get_data()
try: try:
plugin_key = await cls._resolve_plugin_key(plugin_id) plugin_key = await cls._resolve_plugin_key(plugin_id)
except ValueError as e: except ValueError as e:
return str(e) return str(e)
plugin_info = data[plugin_key] plugin_info = next(p for p in plugin_list if p.module == plugin_key)
path = BASE_PATH path = BASE_PATH
if plugin_info.github_url: if plugin_info.github_url:
path = BASE_PATH / "plugins" path = BASE_PATH / "plugins"
@ -310,7 +381,7 @@ class ShopManage:
path = Path(f"{path}.py") path = Path(f"{path}.py")
if not path.exists(): if not path.exists():
return f"插件 {plugin_key} 不存在..." return f"插件 {plugin_key} 不存在..."
logger.debug(f"尝试移除插件 {plugin_key} 文件: {path}", "插件管理") logger.debug(f"尝试移除插件 {plugin_key} 文件: {path}", LOG_COMMAND)
if plugin_info.is_dir: if plugin_info.is_dir:
shutil.rmtree(path) shutil.rmtree(path)
else: else:
@ -328,25 +399,25 @@ class ShopManage:
返回: 返回:
BuildImage | str: 返回消息 BuildImage | str: 返回消息
""" """
data: dict[str, StorePluginInfo] = await cls.get_data() plugin_list: list[StorePluginInfo] = await cls.get_data()
plugin_list = await cls.get_loaded_plugins("module", "version") db_plugin_list = await cls.get_loaded_plugins("module", "version")
suc_plugin = {p[0]: (p[1] or "Unknown") for p in plugin_list} suc_plugin = {p[0]: (p[1] or "Unknown") for p in db_plugin_list}
filtered_data = [ filtered_data = [
(id, plugin_info) (id, plugin_info)
for id, plugin_info in enumerate(data.items()) for id, plugin_info in enumerate(plugin_list)
if plugin_name_or_author.lower() in plugin_info[0].lower() if plugin_name_or_author.lower() in plugin_info.name.lower()
or plugin_name_or_author.lower() in plugin_info[1].author.lower() or plugin_name_or_author.lower() in plugin_info.author.lower()
] ]
data_list = [ data_list = [
[ [
"已安装" if plugin_info[1].module in suc_plugin else "", "已安装" if plugin_info.module in suc_plugin else "",
id, id,
plugin_info[0], plugin_info.name,
plugin_info[1].description, plugin_info.description,
plugin_info[1].author, plugin_info.author,
cls.version_check(plugin_info[1], suc_plugin), cls.version_check(plugin_info, suc_plugin),
plugin_info[1].plugin_type_name, plugin_info.plugin_type_name,
] ]
for id, plugin_info in filtered_data for id, plugin_info in filtered_data
] ]
@ -354,7 +425,7 @@ class ShopManage:
return "未找到相关插件..." return "未找到相关插件..."
column_name = ["-", "ID", "名称", "简介", "作者", "版本", "类型"] column_name = ["-", "ID", "名称", "简介", "作者", "版本", "类型"]
return await ImageTemplate.table_page( return await ImageTemplate.table_page(
"插件列表", "商店列表",
"通过添加/移除插件 ID 来管理插件", "通过添加/移除插件 ID 来管理插件",
column_name, column_name,
data_list, data_list,
@ -376,26 +447,34 @@ class ShopManage:
plugin_key = await cls._resolve_plugin_key(plugin_id) plugin_key = await cls._resolve_plugin_key(plugin_id)
except ValueError as e: except ValueError as e:
return str(e) return str(e)
logger.info(f"尝试更新插件 {plugin_key}", "插件管理") logger.info(f"尝试更新插件 {plugin_key}", LOG_COMMAND)
plugin_info = data[plugin_key] plugin_info = data[plugin_key]
plugin_list = await cls.get_loaded_plugins("module", "version") plugin_list = await cls.get_loaded_plugins("module", "version")
suc_plugin = {p[0]: (p[1] or "Unknown") for p in plugin_list} suc_plugin = {p[0]: (p[1] or "Unknown") for p in plugin_list}
if plugin_info.module not in [p[0] for p in plugin_list]: if plugin_info.module not in [p[0] for p in plugin_list]:
return f"插件 {plugin_key} 未安装,无法更新" return f"插件 {plugin_key} 未安装,无法更新"
logger.debug(f"当前插件列表: {suc_plugin}", "插件管理") logger.debug(f"当前插件列表: {suc_plugin}", LOG_COMMAND)
if cls.check_version_is_new(plugin_info, suc_plugin): if cls.check_version_is_new(plugin_info, suc_plugin):
return f"插件 {plugin_key} 已是最新版本" return f"插件 {plugin_key} 已是最新版本"
is_external = True is_external = True
if plugin_info.github_url is None: if plugin_info.github_url is None:
plugin_info.github_url = DEFAULT_GITHUB_URL plugin_info.github_url = DEFAULT_GITHUB_URL
is_external = False is_external = False
download_type = "GITHUB"
try:
await cls.install_plugin_with_repo( await cls.install_plugin_with_repo(
plugin_info.github_url, plugin_info.github_url,
plugin_info.module_path, plugin_info.module_path,
plugin_info.is_dir, plugin_info.is_dir,
is_external, is_external,
) )
return f"插件 {plugin_key} 更新成功! 重启后生效" except Exception as e:
download_type = "GITEE"
logger.error(f"GITHUB 插件 {plugin_key} 更新失败", LOG_COMMAND, e=e)
await cls.install_plugin_with_gitee(
plugin_info.module_path, plugin_info.is_dir
)
return f"插件 {download_type} {plugin_key} 更新成功! 重启后生效"
@classmethod @classmethod
async def update_all_plugin(cls) -> str: async def update_all_plugin(cls) -> str:
@ -407,38 +486,61 @@ class ShopManage:
返回: 返回:
str: 返回消息 str: 返回消息
""" """
data: dict[str, StorePluginInfo] = await cls.get_data() plugin_list: list[StorePluginInfo] = await cls.get_data()
plugin_list = list(data.keys()) plugin_name_list = [p.name for p in plugin_list]
update_failed_list = [] update_failed_list = []
update_success_list = [] update_success_list = []
result = "--已更新{}个插件 {}个失败 {}个成功--" result = "--已更新{}个插件 {}个失败 {}个成功--"
logger.info(f"尝试更新全部插件 {plugin_list}", "插件管理") logger.info(f"尝试更新全部插件 {plugin_name_list}", LOG_COMMAND)
for plugin_key in plugin_list: for plugin_info in plugin_list:
try: try:
plugin_info = data[plugin_key] db_plugin_list = await cls.get_loaded_plugins("module", "version")
plugin_list = await cls.get_loaded_plugins("module", "version") suc_plugin = {p[0]: (p[1] or "Unknown") for p in db_plugin_list}
suc_plugin = {p[0]: (p[1] or "Unknown") for p in plugin_list} if plugin_info.module not in [p[0] for p in db_plugin_list]:
if plugin_info.module not in [p[0] for p in plugin_list]: logger.debug(
logger.debug(f"插件 {plugin_key} 未安装,跳过", "插件管理") f"插件 {plugin_info.name}({plugin_info.module}) 未安装,跳过",
LOG_COMMAND,
)
continue continue
if cls.check_version_is_new(plugin_info, suc_plugin): if cls.check_version_is_new(plugin_info, suc_plugin):
logger.debug(f"插件 {plugin_key} 已是最新版本,跳过", "插件管理") logger.debug(
f"插件 {plugin_info.name}({plugin_info.module}) 已是最新版本"
",跳过",
LOG_COMMAND,
)
continue continue
logger.info(f"正在更新插件 {plugin_key}", "插件管理") logger.info(
f"正在更新插件 {plugin_info.name}({plugin_info.module})",
LOG_COMMAND,
)
is_external = True is_external = True
if plugin_info.github_url is None: if plugin_info.github_url is None:
plugin_info.github_url = DEFAULT_GITHUB_URL plugin_info.github_url = DEFAULT_GITHUB_URL
is_external = False is_external = False
try:
await cls.install_plugin_with_repo( await cls.install_plugin_with_repo(
plugin_info.github_url, plugin_info.github_url,
plugin_info.module_path, plugin_info.module_path,
plugin_info.is_dir, plugin_info.is_dir,
is_external, is_external,
) )
update_success_list.append(plugin_key)
except Exception as e: except Exception as e:
logger.error(f"更新插件 {plugin_key} 失败: {e}", "插件管理") logger.error(
update_failed_list.append(plugin_key) f"GITHUB 插件 {plugin_info.name}({plugin_info.module}) 更新失败",
LOG_COMMAND,
e=e,
)
await cls.install_plugin_with_gitee(
plugin_info.module_path, plugin_info.is_dir
)
update_success_list.append(plugin_info.name)
except Exception as e:
logger.error(
f"更新插件 {plugin_info.name}({plugin_info.module}) 失败",
LOG_COMMAND,
e=e,
)
update_failed_list.append(plugin_info.name)
if not update_success_list and not update_failed_list: if not update_success_list and not update_failed_list:
return "全部插件已是最新版本" return "全部插件已是最新版本"
if update_success_list: if update_success_list:
@ -460,13 +562,13 @@ class ShopManage:
@classmethod @classmethod
async def _resolve_plugin_key(cls, plugin_id: str) -> str: async def _resolve_plugin_key(cls, plugin_id: str) -> str:
data: dict[str, StorePluginInfo] = await cls.get_data() plugin_list: list[StorePluginInfo] = await cls.get_data()
if is_number(plugin_id): if is_number(plugin_id):
idx = int(plugin_id) idx = int(plugin_id)
if idx < 0 or idx >= len(data): if idx < 0 or idx >= len(plugin_list):
raise ValueError("插件ID不存在...") raise ValueError("插件ID不存在...")
return list(data.keys())[idx] return plugin_list[idx].module
elif isinstance(plugin_id, str): elif isinstance(plugin_id, str):
if plugin_id not in [v.module for k, v in data.items()]: if plugin_id not in [v.module for v in plugin_list]:
raise ValueError("插件Module不存在...") raise ValueError("插件Module不存在...")
return {v.module: k for k, v in data.items()}[plugin_id] return plugin_id

View File

@ -1,3 +1,5 @@
from typing import Any, Literal
from nonebot.compat import model_dump from nonebot.compat import model_dump
from pydantic import BaseModel from pydantic import BaseModel
@ -13,9 +15,30 @@ type2name: dict[str, str] = {
} }
class GiteeContents(BaseModel):
"""Gitee Api内容"""
type: Literal["file", "dir"]
"""类型"""
size: Any
"""文件大小"""
name: str
"""文件名"""
path: str
"""文件路径"""
url: str
"""文件链接"""
html_url: str
"""文件html链接"""
download_url: str
"""文件raw链接"""
class StorePluginInfo(BaseModel): class StorePluginInfo(BaseModel):
"""插件信息""" """插件信息"""
name: str
"""插件名"""
module: str module: str
"""模块名""" """模块名"""
module_path: str module_path: str

View File

@ -456,7 +456,7 @@ class ConfigsManager:
except Exception as e: except Exception as e:
logger.warning( logger.warning(
f"配置项类型转换 MODULE: [<u><y>{module}</y></u>]" f"配置项类型转换 MODULE: [<u><y>{module}</y></u>]"
" | KEY: [<u><y>{key}</y></u>]", f" | KEY: [<u><y>{key}</y></u>]",
e=e, e=e,
) )
value = config.value or config.default_value value = config.value or config.default_value

View File

@ -13,7 +13,6 @@ from httpx import ConnectTimeout, HTTPStatusError, Response
from nonebot_plugin_alconna import UniMessage from nonebot_plugin_alconna import UniMessage
from nonebot_plugin_htmlrender import get_browser from nonebot_plugin_htmlrender import get_browser
from playwright.async_api import Page from playwright.async_api import Page
from retrying import retry
import rich import rich
from zhenxun.configs.config import BotConfig from zhenxun.configs.config import BotConfig
@ -31,7 +30,6 @@ class AsyncHttpx:
} }
@classmethod @classmethod
@retry(stop_max_attempt_number=3)
async def get( async def get(
cls, cls,
url: str | list[str], url: str | list[str],

View File

@ -0,0 +1,159 @@
from pathlib import Path
import subprocess
from subprocess import CalledProcessError
from typing import ClassVar
from zhenxun.services.log import logger
BAT_FILE = Path() / "win启动.bat"
LOG_COMMAND = "VirtualEnvPackageManager"
class VirtualEnvPackageManager:
WIN_COMMAND: ClassVar[list[str]] = [
"./Python310/python.exe",
"-m",
"pip",
]
DEFAULT_COMMAND: ClassVar[list[str]] = ["poetry", "run", "pip"]
@classmethod
def __get_command(cls) -> list[str]:
return cls.WIN_COMMAND if BAT_FILE.exists() else cls.DEFAULT_COMMAND
@classmethod
def install(cls, package: list[str] | str):
"""安装依赖包
参数:
package: 安装依赖包名称或列表
"""
if isinstance(package, str):
package = [package]
try:
command = cls.__get_command()
command.append("install")
command.append(" ".join(package))
logger.info(f"执行虚拟环境安装包指令: {command}", LOG_COMMAND)
result = subprocess.run(
command,
check=True,
capture_output=True,
text=True,
)
logger.debug(
f"安装虚拟环境包指令执行完成: {result.stdout}",
LOG_COMMAND,
)
except CalledProcessError as e:
logger.error(f"安装虚拟环境包指令执行失败: {e.stderr}.", LOG_COMMAND)
@classmethod
def uninstall(cls, package: list[str] | str):
"""卸载依赖包
参数:
package: 卸载依赖包名称或列表
"""
if isinstance(package, str):
package = [package]
try:
command = cls.__get_command()
command.append("uninstall")
command.append(" ".join(package))
logger.info(f"执行虚拟环境卸载包指令: {command}", LOG_COMMAND)
result = subprocess.run(
command,
check=True,
capture_output=True,
text=True,
)
logger.debug(
f"卸载虚拟环境包指令执行完成: {result.stdout}",
LOG_COMMAND,
)
except CalledProcessError as e:
logger.error(f"卸载虚拟环境包指令执行失败: {e.stderr}.", LOG_COMMAND)
@classmethod
def update(cls, package: list[str] | str):
"""更新依赖包
参数:
package: 更新依赖包名称或列表
"""
if isinstance(package, str):
package = [package]
try:
command = cls.__get_command()
command.append("install")
command.append("--upgrade")
command.append(" ".join(package))
logger.info(f"执行虚拟环境更新包指令: {command}", LOG_COMMAND)
result = subprocess.run(
command,
check=True,
capture_output=True,
text=True,
)
logger.debug(f"更新虚拟环境包指令执行完成: {result.stdout}", LOG_COMMAND)
except CalledProcessError as e:
logger.error(f"更新虚拟环境包指令执行失败: {e.stderr}.", LOG_COMMAND)
@classmethod
def install_requirement(cls, requirement_file: Path):
"""安装依赖文件
参数:
requirement_file: requirement文件路径
异常:
FileNotFoundError: 文件不存在
"""
if not requirement_file.exists():
raise FileNotFoundError(f"依赖文件 {requirement_file} 不存在", LOG_COMMAND)
try:
command = cls.__get_command()
command.append("install")
command.append("-r")
command.append(str(requirement_file.absolute()))
logger.info(f"执行虚拟环境安装依赖文件指令: {command}", LOG_COMMAND)
result = subprocess.run(
command,
check=True,
capture_output=True,
text=True,
)
logger.debug(
f"安装虚拟环境依赖文件指令执行完成: {result.stdout}",
LOG_COMMAND,
)
except CalledProcessError as e:
logger.error(
f"安装虚拟环境依赖文件指令执行失败: {e.stderr}.",
LOG_COMMAND,
)
@classmethod
def list(cls) -> str:
"""列出已安装的依赖包"""
try:
command = cls.__get_command()
command.append("list")
logger.info(f"执行虚拟环境列出包指令: {command}", LOG_COMMAND)
result = subprocess.run(
command,
check=True,
capture_output=True,
text=True,
)
logger.debug(
f"列出虚拟环境包指令执行完成: {result.stdout}",
LOG_COMMAND,
)
return result.stdout
except CalledProcessError as e:
logger.error(f"列出虚拟环境包指令执行失败: {e.stderr}.", LOG_COMMAND)
return ""