diff --git a/.github/workflows/publish-docker.yml b/.github/workflows/publish-docker.yml new file mode 100644 index 00000000..816eb25d --- /dev/null +++ b/.github/workflows/publish-docker.yml @@ -0,0 +1,58 @@ +# +name: Create and publish a Docker image + +# Configures this workflow to run on demand via workflow_dispatch. +on: + workflow_dispatch: + +# Defines two custom environment variables for the workflow. These are used for the Container registry domain, and a name for the Docker image that this workflow builds. +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }} + +# There is a single job in this workflow. It's configured to run on the latest available version of Ubuntu. +jobs: + build-and-push-image: + runs-on: ubuntu-latest + # Sets the permissions granted to the `GITHUB_TOKEN` for the actions in this job. + permissions: + contents: read + packages: write + attestations: write + id-token: write + # + steps: + - name: Checkout repository + uses: actions/checkout@v4 + # Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here. + - name: Log in to the Container registry + uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + # This step uses [docker/metadata-action](https://github.com/docker/metadata-action#about) to extract tags and labels that will be applied to the specified image. The `id` "meta" allows the output of this step to be referenced in a subsequent step. The `images` value provides the base name for the tags and labels. + - name: Extract metadata (tags, labels) for Docker + id: meta + uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + # This step uses the `docker/build-push-action` action to build the image, based on your repository's `Dockerfile`. If the build succeeds, it pushes the image to GitHub Packages. + # It uses the `context` parameter to define the build's context as the set of files located in the specified path. For more information, see [Usage](https://github.com/docker/build-push-action#usage) in the README of the `docker/build-push-action` repository. + # It uses the `tags` and `labels` parameters to tag and label the image with the output from the "meta" step. + - name: Build and push Docker image + id: push + uses: docker/build-push-action@f2a1d5e99d037542a71f64918e516c093c6f3fc4 + with: + context: . + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + + # This step generates an artifact attestation for the image, which is an unforgeable statement about where and how it was built. It increases supply chain security for people who consume the image. For more information, see [Using artifact attestations to establish provenance for builds](/actions/security-guides/using-artifact-attestations-to-establish-provenance-for-builds). + - name: Generate artifact attestation + uses: actions/attest-build-provenance@v2 + with: + subject-name: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME}} + subject-digest: ${{ steps.push.outputs.digest }} + push-to-registry: true diff --git a/.vscode/settings.json b/.vscode/settings.json index 1b227fb6..e6830243 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -11,6 +11,8 @@ "displayname", "flmt", "getbbox", + "gitcode", + "GITEE", "hibiapi", "httpx", "jsdelivr", diff --git a/tests/builtin_plugins/plugin_store/test_add_plugin.py b/tests/builtin_plugins/plugin_store/test_add_plugin.py index 3dd2ebbb..5a0edab8 100644 --- a/tests/builtin_plugins/plugin_store/test_add_plugin.py +++ b/tests/builtin_plugins/plugin_store/test_add_plugin.py @@ -359,7 +359,7 @@ async def test_add_plugin_exist( init_mocked_api(mocked_api=mocked_api) mocker.patch( - "zhenxun.builtin_plugins.plugin_store.data_source.ShopManage.get_loaded_plugins", + "zhenxun.builtin_plugins.plugin_store.data_source.StoreManager.get_loaded_plugins", return_value=[("search_image", "0.1")], ) plugin_id = 1 diff --git a/tests/builtin_plugins/plugin_store/test_search_plugin.py b/tests/builtin_plugins/plugin_store/test_search_plugin.py index 8bc6876e..404fee5e 100644 --- a/tests/builtin_plugins/plugin_store/test_search_plugin.py +++ b/tests/builtin_plugins/plugin_store/test_search_plugin.py @@ -57,7 +57,7 @@ async def test_search_plugin_name( ) ctx.receive_event(bot=bot, event=event) mock_table_page.assert_awaited_once_with( - "插件列表", + "商店插件列表", "通过添加/移除插件 ID 来管理插件", ["-", "ID", "名称", "简介", "作者", "版本", "类型"], [ @@ -123,7 +123,7 @@ async def test_search_plugin_author( ) ctx.receive_event(bot=bot, event=event) mock_table_page.assert_awaited_once_with( - "插件列表", + "商店插件列表", "通过添加/移除插件 ID 来管理插件", ["-", "ID", "名称", "简介", "作者", "版本", "类型"], [ diff --git a/tests/builtin_plugins/plugin_store/test_update_all_plugin.py b/tests/builtin_plugins/plugin_store/test_update_all_plugin.py index 2a490da7..95360f6b 100644 --- a/tests/builtin_plugins/plugin_store/test_update_all_plugin.py +++ b/tests/builtin_plugins/plugin_store/test_update_all_plugin.py @@ -32,7 +32,7 @@ async def test_update_all_plugin_basic_need_update( new=tmp_path / "zhenxun", ) mocker.patch( - "zhenxun.builtin_plugins.plugin_store.data_source.ShopManage.get_loaded_plugins", + "zhenxun.builtin_plugins.plugin_store.data_source.StoreManager.get_loaded_plugins", return_value=[("search_image", "0.0")], ) @@ -87,7 +87,7 @@ async def test_update_all_plugin_basic_is_new( new=tmp_path / "zhenxun", ) mocker.patch( - "zhenxun.builtin_plugins.plugin_store.data_source.ShopManage.get_loaded_plugins", + "zhenxun.builtin_plugins.plugin_store.data_source.StoreManager.get_loaded_plugins", return_value=[("search_image", "0.1")], ) diff --git a/tests/builtin_plugins/plugin_store/test_update_plugin.py b/tests/builtin_plugins/plugin_store/test_update_plugin.py index 952191d6..2cb88d1b 100644 --- a/tests/builtin_plugins/plugin_store/test_update_plugin.py +++ b/tests/builtin_plugins/plugin_store/test_update_plugin.py @@ -32,7 +32,7 @@ async def test_update_plugin_basic_need_update( new=tmp_path / "zhenxun", ) mocker.patch( - "zhenxun.builtin_plugins.plugin_store.data_source.ShopManage.get_loaded_plugins", + "zhenxun.builtin_plugins.plugin_store.data_source.StoreManager.get_loaded_plugins", return_value=[("search_image", "0.0")], ) @@ -87,7 +87,7 @@ async def test_update_plugin_basic_is_new( new=tmp_path / "zhenxun", ) mocker.patch( - "zhenxun.builtin_plugins.plugin_store.data_source.ShopManage.get_loaded_plugins", + "zhenxun.builtin_plugins.plugin_store.data_source.StoreManager.get_loaded_plugins", return_value=[("search_image", "0.1")], ) diff --git a/tests/response/plugin_store/basic_plugins.json b/tests/response/plugin_store/basic_plugins.json index 7459e2ec..f0306836 100644 --- a/tests/response/plugin_store/basic_plugins.json +++ b/tests/response/plugin_store/basic_plugins.json @@ -1,5 +1,6 @@ -{ - "鸡汤": { +[ + { + "name": "鸡汤", "module": "jitang", "module_path": "plugins.alapi.jitang", "description": "喏,亲手为你煮的鸡汤", @@ -9,7 +10,8 @@ "plugin_type": "NORMAL", "is_dir": false }, - "识图": { + { + "name": "识图", "module": "search_image", "module_path": "plugins.search_image", "description": "以图搜图,看破本源", @@ -19,7 +21,8 @@ "plugin_type": "NORMAL", "is_dir": true }, - "网易云热评": { + { + "name": "网易云热评", "module": "comments_163", "module_path": "plugins.alapi.comments_163", "description": "生了个人,我很抱歉", @@ -29,7 +32,8 @@ "plugin_type": "NORMAL", "is_dir": false }, - "B站订阅": { + { + "name": "B站订阅", "module": "bilibili_sub", "module_path": "plugins.bilibili_sub", "description": "非常便利的B站订阅通知", @@ -39,4 +43,4 @@ "plugin_type": "NORMAL", "is_dir": true } -} +] diff --git a/tests/response/plugin_store/extra_plugins.json b/tests/response/plugin_store/extra_plugins.json index 9d92f859..ca5e7f0a 100644 --- a/tests/response/plugin_store/extra_plugins.json +++ b/tests/response/plugin_store/extra_plugins.json @@ -1,5 +1,6 @@ -{ - "github订阅": { +[ + { + "name": "github订阅", "module": "github_sub", "module_path": "github_sub", "description": "订阅github用户或仓库", @@ -10,7 +11,8 @@ "is_dir": true, "github_url": "https://github.com/xuanerwa/zhenxun_github_sub" }, - "Minecraft查服": { + { + "name": "Minecraft查服", "module": "mc_check", "module_path": "mc_check", "description": "Minecraft服务器状态查询,支持IPv6", @@ -21,4 +23,4 @@ "is_dir": true, "github_url": "https://github.com/molanp/zhenxun_check_Minecraft" } -} +] diff --git a/zhenxun/builtin_plugins/plugin_store/__init__.py b/zhenxun/builtin_plugins/plugin_store/__init__.py index 7e9f52a0..72d6d7dd 100644 --- a/zhenxun/builtin_plugins/plugin_store/__init__.py +++ b/zhenxun/builtin_plugins/plugin_store/__init__.py @@ -9,7 +9,7 @@ from zhenxun.utils.enum import PluginType from zhenxun.utils.message import MessageUtils from zhenxun.utils.utils import is_number -from .data_source import ShopManage +from .data_source import StoreManager __plugin_meta__ = PluginMetadata( name="插件商店", @@ -82,7 +82,7 @@ _matcher.shortcut( @_matcher.assign("$main") async def _(session: EventSession): try: - result = await ShopManage.get_plugins_info() + result = await StoreManager.get_plugins_info() logger.info("查看插件列表", "插件商店", session=session) await MessageUtils.build_message(result).send() except Exception as e: @@ -97,7 +97,7 @@ async def _(session: EventSession, plugin_id: str): await MessageUtils.build_message(f"正在添加插件 Id: {plugin_id}").send() else: await MessageUtils.build_message(f"正在添加插件 Module: {plugin_id}").send() - result = await ShopManage.add_plugin(plugin_id) + result = await StoreManager.add_plugin(plugin_id) except Exception as e: logger.error(f"添加插件 Id: {plugin_id}失败", "插件商店", session=session, e=e) await MessageUtils.build_message( @@ -110,7 +110,7 @@ async def _(session: EventSession, plugin_id: str): @_matcher.assign("remove") async def _(session: EventSession, plugin_id: str): try: - result = await ShopManage.remove_plugin(plugin_id) + result = await StoreManager.remove_plugin(plugin_id) except Exception as e: logger.error(f"移除插件 Id: {plugin_id}失败", "插件商店", session=session, e=e) await MessageUtils.build_message( @@ -123,7 +123,7 @@ async def _(session: EventSession, plugin_id: str): @_matcher.assign("search") async def _(session: EventSession, plugin_name_or_author: str): try: - result = await ShopManage.search_plugin(plugin_name_or_author) + result = await StoreManager.search_plugin(plugin_name_or_author) except Exception as e: logger.error( f"搜索插件 name: {plugin_name_or_author}失败", @@ -145,7 +145,7 @@ async def _(session: EventSession, plugin_id: str): await MessageUtils.build_message(f"正在更新插件 Id: {plugin_id}").send() else: await MessageUtils.build_message(f"正在更新插件 Module: {plugin_id}").send() - result = await ShopManage.update_plugin(plugin_id) + result = await StoreManager.update_plugin(plugin_id) except Exception as e: logger.error(f"更新插件 Id: {plugin_id}失败", "插件商店", session=session, e=e) await MessageUtils.build_message( @@ -159,7 +159,7 @@ async def _(session: EventSession, plugin_id: str): async def _(session: EventSession): try: await MessageUtils.build_message("正在更新全部插件").send() - result = await ShopManage.update_all_plugin() + result = await StoreManager.update_all_plugin() except Exception as e: logger.error("更新全部插件失败", "插件商店", session=session, e=e) await MessageUtils.build_message(f"更新全部插件失败 e: {e}").finish() diff --git a/zhenxun/builtin_plugins/plugin_store/config.py b/zhenxun/builtin_plugins/plugin_store/config.py index dacaffec..dd48a5c7 100644 --- a/zhenxun/builtin_plugins/plugin_store/config.py +++ b/zhenxun/builtin_plugins/plugin_store/config.py @@ -9,3 +9,5 @@ DEFAULT_GITHUB_URL = "https://github.com/zhenxun-org/zhenxun_bot_plugins/tree/ma EXTRA_GITHUB_URL = "https://github.com/zhenxun-org/zhenxun_bot_plugins_index/tree/index" """插件库索引github仓库地址""" + +LOG_COMMAND = "插件商店" diff --git a/zhenxun/builtin_plugins/plugin_store/data_source.py b/zhenxun/builtin_plugins/plugin_store/data_source.py index 1df053f9..6633d404 100644 --- a/zhenxun/builtin_plugins/plugin_store/data_source.py +++ b/zhenxun/builtin_plugins/plugin_store/data_source.py @@ -1,6 +1,5 @@ from pathlib import Path import shutil -import subprocess from aiocache import cached import ujson as json @@ -14,9 +13,15 @@ from zhenxun.utils.github_utils import GithubUtils from zhenxun.utils.github_utils.models import RepoAPI from zhenxun.utils.http_utils import AsyncHttpx from zhenxun.utils.image_utils import BuildImage, ImageTemplate, RowStyle +from zhenxun.utils.manager.virtual_env_package_manager import VirtualEnvPackageManager from zhenxun.utils.utils import is_number -from .config import BASE_PATH, DEFAULT_GITHUB_URL, EXTRA_GITHUB_URL +from .config import ( + BASE_PATH, + DEFAULT_GITHUB_URL, + EXTRA_GITHUB_URL, + LOG_COMMAND, +) BAT_FILE = Path() / "win启动.bat" @@ -45,74 +50,69 @@ def install_requirement(plugin_path: Path): requirement_files = ["requirement.txt", "requirements.txt"] requirement_paths = [plugin_path / file for file in requirement_files] - existing_requirements = next( + if existing_requirements := next( (path for path in requirement_paths if path.exists()), None - ) - - if not existing_requirements: - logger.debug( - f"No requirement.txt found for plugin: {plugin_path.name}", "插件管理" - ) - return - - try: - command = WIN_COMMAND if BAT_FILE.exists() else DEFAULT_COMMAND - command.append(str(existing_requirements)) - result = subprocess.run( - command, - check=True, - capture_output=True, - text=True, - ) - logger.debug( - "Successfully installed dependencies for" - f" plugin: {plugin_path.name}. Output:\n{result.stdout}", - "插件管理", - ) - except subprocess.CalledProcessError: - logger.error( - f"Failed to install dependencies for plugin: {plugin_path.name}. " - " Error:\n{e.stderr}" - ) + ): + VirtualEnvPackageManager.install_requirement(existing_requirements) -class ShopManage: +class StoreManager: @classmethod - @cached(60) - async def get_data(cls) -> dict[str, StorePluginInfo]: - """获取插件信息数据 - - 异常: - ValueError: 访问请求失败 + async def get_github_plugins(cls) -> list[StorePluginInfo]: + """获取github插件列表信息 返回: - dict: 插件信息数据 + list[StorePluginInfo]: 插件列表数据 """ - default_github_repo = GithubUtils.parse_github_url(DEFAULT_GITHUB_URL) - extra_github_repo = GithubUtils.parse_github_url(EXTRA_GITHUB_URL) - for repo_info in [default_github_repo, extra_github_repo]: - if await repo_info.update_repo_commit(): - logger.info(f"获取最新提交: {repo_info.branch}", "插件管理") - else: - logger.warning(f"获取最新提交失败: {repo_info}", "插件管理") - default_github_url = await default_github_repo.get_raw_download_urls( - "plugins.json" - ) - extra_github_url = await extra_github_repo.get_raw_download_urls("plugins.json") - res = await AsyncHttpx.get(default_github_url, check_status_code=200) - res2 = await AsyncHttpx.get(extra_github_url, check_status_code=200) + repo_info = GithubUtils.parse_github_url(DEFAULT_GITHUB_URL) + if await repo_info.update_repo_commit(): + logger.info(f"获取最新提交: {repo_info.branch}", LOG_COMMAND) + else: + logger.warning(f"获取最新提交失败: {repo_info}", LOG_COMMAND) + default_github_url = await repo_info.get_raw_download_urls("plugins.json") + response = await AsyncHttpx.get(default_github_url, check_status_code=200) + if response.status_code == 200: + logger.info("获取github插件列表成功", LOG_COMMAND) + return [StorePluginInfo(**detail) for detail in json.loads(response.text)] + else: + logger.warning( + f"获取github插件列表失败: {response.status_code}", LOG_COMMAND + ) + return [] - # 检查请求结果 - if res.status_code != 200 or res2.status_code != 200: - raise ValueError(f"下载错误, code: {res.status_code}, {res2.status_code}") + @classmethod + async def get_extra_plugins(cls) -> list[StorePluginInfo]: + """获取额外插件列表信息 - # 解析并合并返回的 JSON 数据 - data1 = json.loads(res.text) - data2 = json.loads(res2.text) - return { - name: StorePluginInfo(**detail) - for name, detail in {**data1, **data2}.items() - } + 返回: + list[StorePluginInfo]: 插件列表数据 + """ + repo_info = GithubUtils.parse_github_url(EXTRA_GITHUB_URL) + if await repo_info.update_repo_commit(): + logger.info(f"获取最新提交: {repo_info.branch}", LOG_COMMAND) + else: + logger.warning(f"获取最新提交失败: {repo_info}", LOG_COMMAND) + extra_github_url = await repo_info.get_raw_download_urls("plugins.json") + response = await AsyncHttpx.get(extra_github_url, check_status_code=200) + if response.status_code == 200: + return [StorePluginInfo(**detail) for detail in json.loads(response.text)] + else: + logger.warning( + f"获取github扩展插件列表失败: {response.status_code}", LOG_COMMAND + ) + return [] + + @classmethod + @cached(60) + async def get_data(cls) -> list[StorePluginInfo]: + """获取插件信息数据 + + 返回: + list[StorePluginInfo]: 插件信息数据 + """ + plugins = await cls.get_github_plugins() + extra_plugins = await cls.get_extra_plugins() + return [*plugins, *extra_plugins] @classmethod def version_check(cls, plugin_info: StorePluginInfo, suc_plugin: dict[str, str]): @@ -120,7 +120,7 @@ class ShopManage: 参数: plugin_info: StorePluginInfo - suc_plugin: dict[str, str] + suc_plugin: 模块名: 版本号 返回: str: 版本号 @@ -140,7 +140,7 @@ class ShopManage: 参数: plugin_info: StorePluginInfo - suc_plugin: dict[str, str] + suc_plugin: 模块名: 版本号 返回: bool: 是否有更新 @@ -164,21 +164,21 @@ class ShopManage: 返回: BuildImage | str: 返回消息 """ - data: dict[str, StorePluginInfo] = await cls.get_data() + plugin_list: list[StorePluginInfo] = await cls.get_data() column_name = ["-", "ID", "名称", "简介", "作者", "版本", "类型"] - plugin_list = await cls.get_loaded_plugins("module", "version") - suc_plugin = {p[0]: (p[1] or "0.1") for p in plugin_list} + db_plugin_list = await cls.get_loaded_plugins("module", "version") + suc_plugin = {p[0]: (p[1] or "0.1") for p in db_plugin_list} data_list = [ [ - "已安装" if plugin_info[1].module in suc_plugin else "", + "已安装" if plugin_info.module in suc_plugin else "", id, - plugin_info[0], - plugin_info[1].description, - plugin_info[1].author, - cls.version_check(plugin_info[1], suc_plugin), - plugin_info[1].plugin_type_name, + plugin_info.name, + plugin_info.description, + plugin_info.author, + cls.version_check(plugin_info, suc_plugin), + plugin_info.plugin_type_name, ] - for id, plugin_info in enumerate(data.items()) + for id, plugin_info in enumerate(plugin_list) ] return await ImageTemplate.table_page( "插件列表", @@ -198,15 +198,15 @@ class ShopManage: 返回: str: 返回消息 """ - data: dict[str, StorePluginInfo] = await cls.get_data() + plugin_list: list[StorePluginInfo] = await cls.get_data() try: plugin_key = await cls._resolve_plugin_key(plugin_id) except ValueError as e: return str(e) - plugin_list = await cls.get_loaded_plugins("module") - plugin_info = data[plugin_key] - if plugin_info.module in [p[0] for p in plugin_list]: - return f"插件 {plugin_key} 已安装,无需重复安装" + db_plugin_list = await cls.get_loaded_plugins("module") + plugin_info = next(p for p in plugin_list if p.module == plugin_key) + if plugin_info.module in [p[0] for p in db_plugin_list]: + return f"插件 {plugin_info.name} 已安装,无需重复安装" is_external = True if plugin_info.github_url is None: plugin_info.github_url = DEFAULT_GITHUB_URL @@ -215,34 +215,39 @@ class ShopManage: if len(version_split) > 1: github_url_split = plugin_info.github_url.split("/tree/") plugin_info.github_url = f"{github_url_split[0]}/tree/{version_split[1]}" - logger.info(f"正在安装插件 {plugin_key}...") + logger.info(f"正在安装插件 {plugin_info.name}...", LOG_COMMAND) await cls.install_plugin_with_repo( plugin_info.github_url, plugin_info.module_path, plugin_info.is_dir, is_external, ) - return f"插件 {plugin_key} 安装成功! 重启后生效" + return f"插件 {plugin_info.name} 安装成功! 重启后生效" @classmethod async def install_plugin_with_repo( - cls, github_url: str, module_path: str, is_dir: bool, is_external: bool = False + cls, + github_url: str, + module_path: str, + is_dir: bool, + is_external: bool = False, ): - files: list[str] repo_api: RepoAPI repo_info = GithubUtils.parse_github_url(github_url) if await repo_info.update_repo_commit(): - logger.info(f"获取最新提交: {repo_info.branch}", "插件管理") + logger.info(f"获取最新提交: {repo_info.branch}", LOG_COMMAND) else: - logger.warning(f"获取最新提交失败: {repo_info}", "插件管理") - logger.debug(f"成功获取仓库信息: {repo_info}", "插件管理") + logger.warning(f"获取最新提交失败: {repo_info}", LOG_COMMAND) + logger.debug(f"成功获取仓库信息: {repo_info}", LOG_COMMAND) for repo_api in GithubUtils.iter_api_strategies(): try: await repo_api.parse_repo_info(repo_info) break except Exception as e: logger.warning( - f"获取插件文件失败: {e} | API类型: {repo_api.strategy}", "插件管理" + f"获取插件文件失败 | API类型: {repo_api.strategy}", + LOG_COMMAND, + e=e, ) continue else: @@ -258,7 +263,7 @@ class ShopManage: base_path = BASE_PATH / "plugins" if is_external else BASE_PATH base_path = base_path if module_path else base_path / repo_info.repo download_paths: list[Path | str] = [base_path / file for file in files] - logger.debug(f"插件下载路径: {download_paths}", "插件管理") + logger.debug(f"插件下载路径: {download_paths}", LOG_COMMAND) result = await AsyncHttpx.gather_download_file(download_urls, download_paths) for _id, success in enumerate(result): if not success: @@ -273,12 +278,12 @@ class ShopManage: req_files.extend( repo_api.get_files(f"{replace_module_path}/requirement.txt", False) ) - logger.debug(f"获取插件依赖文件列表: {req_files}", "插件管理") + logger.debug(f"获取插件依赖文件列表: {req_files}", LOG_COMMAND) req_download_urls = [ await repo_info.get_raw_download_urls(file) for file in req_files ] req_paths: list[Path | str] = [plugin_path / file for file in req_files] - logger.debug(f"插件依赖文件下载路径: {req_paths}", "插件管理") + logger.debug(f"插件依赖文件下载路径: {req_paths}", LOG_COMMAND) if req_files: result = await AsyncHttpx.gather_download_file( req_download_urls, req_paths @@ -286,7 +291,7 @@ class ShopManage: for success in result: if not success: raise Exception("插件依赖文件下载失败") - logger.debug(f"插件依赖文件列表: {req_paths}", "插件管理") + logger.debug(f"插件依赖文件列表: {req_paths}", LOG_COMMAND) install_requirement(plugin_path) except ValueError as e: logger.warning("未获取到依赖文件路径...", e=e) @@ -303,12 +308,12 @@ class ShopManage: 返回: str: 返回消息 """ - data: dict[str, StorePluginInfo] = await cls.get_data() + plugin_list: list[StorePluginInfo] = await cls.get_data() try: plugin_key = await cls._resolve_plugin_key(plugin_id) except ValueError as e: return str(e) - plugin_info = data[plugin_key] + plugin_info = next(p for p in plugin_list if p.module == plugin_key) path = BASE_PATH if plugin_info.github_url: path = BASE_PATH / "plugins" @@ -317,14 +322,14 @@ class ShopManage: if not plugin_info.is_dir: path = Path(f"{path}.py") if not path.exists(): - return f"插件 {plugin_key} 不存在..." - logger.debug(f"尝试移除插件 {plugin_key} 文件: {path}", "插件管理") + return f"插件 {plugin_info.name} 不存在..." + logger.debug(f"尝试移除插件 {plugin_info.name} 文件: {path}", LOG_COMMAND) if plugin_info.is_dir: shutil.rmtree(path) else: path.unlink() await PluginInitManager.remove(f"zhenxun.{plugin_info.module_path}") - return f"插件 {plugin_key} 移除成功! 重启后生效" + return f"插件 {plugin_info.name} 移除成功! 重启后生效" @classmethod async def search_plugin(cls, plugin_name_or_author: str) -> BuildImage | str: @@ -336,25 +341,25 @@ class ShopManage: 返回: BuildImage | str: 返回消息 """ - data: dict[str, StorePluginInfo] = await cls.get_data() - plugin_list = await cls.get_loaded_plugins("module", "version") - suc_plugin = {p[0]: (p[1] or "Unknown") for p in plugin_list} + plugin_list: list[StorePluginInfo] = await cls.get_data() + db_plugin_list = await cls.get_loaded_plugins("module", "version") + suc_plugin = {p[0]: (p[1] or "Unknown") for p in db_plugin_list} filtered_data = [ (id, plugin_info) - for id, plugin_info in enumerate(data.items()) - if plugin_name_or_author.lower() in plugin_info[0].lower() - or plugin_name_or_author.lower() in plugin_info[1].author.lower() + for id, plugin_info in enumerate(plugin_list) + if plugin_name_or_author.lower() in plugin_info.name.lower() + or plugin_name_or_author.lower() in plugin_info.author.lower() ] data_list = [ [ - "已安装" if plugin_info[1].module in suc_plugin else "", + "已安装" if plugin_info.module in suc_plugin else "", id, - plugin_info[0], - plugin_info[1].description, - plugin_info[1].author, - cls.version_check(plugin_info[1], suc_plugin), - plugin_info[1].plugin_type_name, + plugin_info.name, + plugin_info.description, + plugin_info.author, + cls.version_check(plugin_info, suc_plugin), + plugin_info.plugin_type_name, ] for id, plugin_info in filtered_data ] @@ -362,7 +367,7 @@ class ShopManage: return "未找到相关插件..." column_name = ["-", "ID", "名称", "简介", "作者", "版本", "类型"] return await ImageTemplate.table_page( - "插件列表", + "商店插件列表", "通过添加/移除插件 ID 来管理插件", column_name, data_list, @@ -379,20 +384,20 @@ class ShopManage: 返回: str: 返回消息 """ - data: dict[str, StorePluginInfo] = await cls.get_data() + plugin_list: list[StorePluginInfo] = await cls.get_data() try: plugin_key = await cls._resolve_plugin_key(plugin_id) except ValueError as e: return str(e) - logger.info(f"尝试更新插件 {plugin_key}", "插件管理") - plugin_info = data[plugin_key] - plugin_list = await cls.get_loaded_plugins("module", "version") - suc_plugin = {p[0]: (p[1] or "Unknown") for p in plugin_list} - if plugin_info.module not in [p[0] for p in plugin_list]: - return f"插件 {plugin_key} 未安装,无法更新" - logger.debug(f"当前插件列表: {suc_plugin}", "插件管理") + plugin_info = next(p for p in plugin_list if p.module == plugin_key) + logger.info(f"尝试更新插件 {plugin_info.name}", LOG_COMMAND) + db_plugin_list = await cls.get_loaded_plugins("module", "version") + suc_plugin = {p[0]: (p[1] or "Unknown") for p in db_plugin_list} + if plugin_info.module not in [p[0] for p in db_plugin_list]: + return f"插件 {plugin_info.name} 未安装,无法更新" + logger.debug(f"当前插件列表: {suc_plugin}", LOG_COMMAND) if cls.check_version_is_new(plugin_info, suc_plugin): - return f"插件 {plugin_key} 已是最新版本" + return f"插件 {plugin_info.name} 已是最新版本" is_external = True if plugin_info.github_url is None: plugin_info.github_url = DEFAULT_GITHUB_URL @@ -403,7 +408,7 @@ class ShopManage: plugin_info.is_dir, is_external, ) - return f"插件 {plugin_key} 更新成功! 重启后生效" + return f"插件 {plugin_info.name} 更新成功! 重启后生效" @classmethod async def update_all_plugin(cls) -> str: @@ -415,24 +420,33 @@ class ShopManage: 返回: str: 返回消息 """ - data: dict[str, StorePluginInfo] = await cls.get_data() - plugin_list = list(data.keys()) + plugin_list: list[StorePluginInfo] = await cls.get_data() + plugin_name_list = [p.name for p in plugin_list] update_failed_list = [] update_success_list = [] result = "--已更新{}个插件 {}个失败 {}个成功--" - logger.info(f"尝试更新全部插件 {plugin_list}", "插件管理") - for plugin_key in plugin_list: + logger.info(f"尝试更新全部插件 {plugin_name_list}", LOG_COMMAND) + for plugin_info in plugin_list: try: - plugin_info = data[plugin_key] - plugin_list = await cls.get_loaded_plugins("module", "version") - suc_plugin = {p[0]: (p[1] or "Unknown") for p in plugin_list} - if plugin_info.module not in [p[0] for p in plugin_list]: - logger.debug(f"插件 {plugin_key} 未安装,跳过", "插件管理") + db_plugin_list = await cls.get_loaded_plugins("module", "version") + suc_plugin = {p[0]: (p[1] or "Unknown") for p in db_plugin_list} + if plugin_info.module not in [p[0] for p in db_plugin_list]: + logger.debug( + f"插件 {plugin_info.name}({plugin_info.module}) 未安装,跳过", + LOG_COMMAND, + ) continue if cls.check_version_is_new(plugin_info, suc_plugin): - logger.debug(f"插件 {plugin_key} 已是最新版本,跳过", "插件管理") + logger.debug( + f"插件 {plugin_info.name}({plugin_info.module}) 已是最新版本" + ",跳过", + LOG_COMMAND, + ) continue - logger.info(f"正在更新插件 {plugin_key}", "插件管理") + logger.info( + f"正在更新插件 {plugin_info.name}({plugin_info.module})", + LOG_COMMAND, + ) is_external = True if plugin_info.github_url is None: plugin_info.github_url = DEFAULT_GITHUB_URL @@ -443,10 +457,14 @@ class ShopManage: plugin_info.is_dir, is_external, ) - update_success_list.append(plugin_key) + update_success_list.append(plugin_info.name) except Exception as e: - logger.error(f"更新插件 {plugin_key} 失败: {e}", "插件管理") - update_failed_list.append(plugin_key) + logger.error( + f"更新插件 {plugin_info.name}({plugin_info.module}) 失败", + LOG_COMMAND, + e=e, + ) + update_failed_list.append(plugin_info.name) if not update_success_list and not update_failed_list: return "全部插件已是最新版本" if update_success_list: @@ -468,13 +486,13 @@ class ShopManage: @classmethod async def _resolve_plugin_key(cls, plugin_id: str) -> str: - data: dict[str, StorePluginInfo] = await cls.get_data() + plugin_list: list[StorePluginInfo] = await cls.get_data() if is_number(plugin_id): idx = int(plugin_id) - if idx < 0 or idx >= len(data): + if idx < 0 or idx >= len(plugin_list): raise ValueError("插件ID不存在...") - return list(data.keys())[idx] + return plugin_list[idx].module elif isinstance(plugin_id, str): - if plugin_id not in [v.module for k, v in data.items()]: + if plugin_id not in [v.module for v in plugin_list]: raise ValueError("插件Module不存在...") - return {v.module: k for k, v in data.items()}[plugin_id] + return plugin_id diff --git a/zhenxun/builtin_plugins/plugin_store/models.py b/zhenxun/builtin_plugins/plugin_store/models.py index df65dd56..2bea1315 100644 --- a/zhenxun/builtin_plugins/plugin_store/models.py +++ b/zhenxun/builtin_plugins/plugin_store/models.py @@ -1,3 +1,5 @@ +from typing import Any, Literal + from nonebot.compat import model_dump from pydantic import BaseModel @@ -13,9 +15,30 @@ type2name: dict[str, str] = { } +class GiteeContents(BaseModel): + """Gitee Api内容""" + + type: Literal["file", "dir"] + """类型""" + size: Any + """文件大小""" + name: str + """文件名""" + path: str + """文件路径""" + url: str + """文件链接""" + html_url: str + """文件html链接""" + download_url: str + """文件raw链接""" + + class StorePluginInfo(BaseModel): """插件信息""" + name: str + """插件名""" module: str """模块名""" module_path: str diff --git a/zhenxun/builtin_plugins/web_ui/api/tabs/plugin_manage/store.py b/zhenxun/builtin_plugins/web_ui/api/tabs/plugin_manage/store.py index acff6356..9ee6ff41 100644 --- a/zhenxun/builtin_plugins/web_ui/api/tabs/plugin_manage/store.py +++ b/zhenxun/builtin_plugins/web_ui/api/tabs/plugin_manage/store.py @@ -1,6 +1,7 @@ from fastapi import APIRouter from fastapi.responses import JSONResponse from nonebot import require +from nonebot.compat import model_dump from zhenxun.models.plugin_info import PluginInfo from zhenxun.services.log import logger @@ -22,12 +23,12 @@ router = APIRouter(prefix="/store") async def _() -> Result[dict]: try: require("plugin_store") - from zhenxun.builtin_plugins.plugin_store import ShopManage + from zhenxun.builtin_plugins.plugin_store import StoreManager - data = await ShopManage.get_data() + data = await StoreManager.get_data() plugin_list = [ - {**data[name].to_dict(), "name": name, "id": idx} - for idx, name in enumerate(data) + {**model_dump(plugin), "name": plugin.name, "id": idx} + for idx, plugin in enumerate(data) ] modules = await PluginInfo.filter(load_status=True).values_list( "module", flat=True @@ -48,9 +49,9 @@ async def _() -> Result[dict]: async def _(param: PluginIr) -> Result: try: require("plugin_store") - from zhenxun.builtin_plugins.plugin_store import ShopManage + from zhenxun.builtin_plugins.plugin_store import StoreManager - result = await ShopManage.add_plugin(param.id) # type: ignore + result = await StoreManager.add_plugin(param.id) # type: ignore return Result.ok(info=result) except Exception as e: return Result.fail(f"安装插件失败: {type(e)}: {e}") @@ -66,9 +67,9 @@ async def _(param: PluginIr) -> Result: async def _(param: PluginIr) -> Result: try: require("plugin_store") - from zhenxun.builtin_plugins.plugin_store import ShopManage + from zhenxun.builtin_plugins.plugin_store import StoreManager - result = await ShopManage.update_plugin(param.id) # type: ignore + result = await StoreManager.update_plugin(param.id) # type: ignore return Result.ok(info=result) except Exception as e: return Result.fail(f"更新插件失败: {type(e)}: {e}") @@ -84,9 +85,9 @@ async def _(param: PluginIr) -> Result: async def _(param: PluginIr) -> Result: try: require("plugin_store") - from zhenxun.builtin_plugins.plugin_store import ShopManage + from zhenxun.builtin_plugins.plugin_store import StoreManager - result = await ShopManage.remove_plugin(param.id) # type: ignore + result = await StoreManager.remove_plugin(param.id) # type: ignore return Result.ok(info=result) except Exception as e: return Result.fail(f"移除插件失败: {type(e)}: {e}") diff --git a/zhenxun/configs/utils/__init__.py b/zhenxun/configs/utils/__init__.py index 189c1891..bd84d9b1 100644 --- a/zhenxun/configs/utils/__init__.py +++ b/zhenxun/configs/utils/__init__.py @@ -287,8 +287,10 @@ class ConfigsManager: try: return config.arg_parser(value_to_process) except Exception as e: - logger.warning( - f"arg_parser 执行失败 (key: {key}),将尝试其他方法。", e=e + logger.debug( + f"配置项类型转换 MODULE: [{module}]" + f" | KEY: [{key}] 将使用原始值", + e=e, ) if config.type: diff --git a/zhenxun/utils/github_utils/const.py b/zhenxun/utils/github_utils/const.py index 23effa4c..68fffad9 100644 --- a/zhenxun/utils/github_utils/const.py +++ b/zhenxun/utils/github_utils/const.py @@ -21,6 +21,9 @@ CACHED_API_TTL = 300 RAW_CONTENT_FORMAT = "https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{path}" """raw content格式""" +GITEE_RAW_CONTENT_FORMAT = "https://gitee.com/{owner}/{repo}/raw/main/{path}" +"""gitee raw content格式""" + ARCHIVE_URL_FORMAT = "https://github.com/{owner}/{repo}/archive/refs/heads/{branch}.zip" """archive url格式""" diff --git a/zhenxun/utils/github_utils/func.py b/zhenxun/utils/github_utils/func.py index b3f9a6f9..db3afa03 100644 --- a/zhenxun/utils/github_utils/func.py +++ b/zhenxun/utils/github_utils/func.py @@ -4,6 +4,7 @@ from zhenxun.utils.http_utils import AsyncHttpx from .const import ( ARCHIVE_URL_FORMAT, + GITEE_RAW_CONTENT_FORMAT, RAW_CONTENT_FORMAT, RELEASE_ASSETS_FORMAT, RELEASE_SOURCE_FORMAT, @@ -21,6 +22,7 @@ async def __get_fastest_formats(formats: dict[str, str]) -> list[str]: async def get_fastest_raw_formats() -> list[str]: """获取最快的raw下载地址格式""" formats: dict[str, str] = { + "https://gitee.com/": GITEE_RAW_CONTENT_FORMAT, "https://raw.githubusercontent.com/": RAW_CONTENT_FORMAT, "https://ghproxy.cc/": f"https://ghproxy.cc/{RAW_CONTENT_FORMAT}", "https://gh-proxy.com/": f"https://gh-proxy.com/{RAW_CONTENT_FORMAT}", diff --git a/zhenxun/utils/html_template/__init__.py b/zhenxun/utils/html_template/__init__.py new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/zhenxun/utils/html_template/__init__.py @@ -0,0 +1 @@ + diff --git a/zhenxun/utils/html_template/component.py b/zhenxun/utils/html_template/component.py new file mode 100644 index 00000000..c23ed503 --- /dev/null +++ b/zhenxun/utils/html_template/component.py @@ -0,0 +1,36 @@ +from abc import ABC +from typing import Literal + +from pydantic import BaseModel + + +class Style(BaseModel): + """常用样式""" + + padding: str = "0px" + margin: str = "0px" + border: str = "0px" + border_radius: str = "0px" + text_align: Literal["left", "right", "center"] = "left" + color: str = "#000" + font_size: str = "16px" + + +class Component(ABC): + def __init__(self, background_color: str = "#fff", is_container: bool = False): + self.extra_style = [] + self.style = Style() + self.background_color = background_color + self.is_container = is_container + self.children = [] + + def add_child(self, child: "Component | str"): + self.children.append(child) + + def set_style(self, style: Style): + self.style = style + + def add_style(self, style: str): + self.extra_style.append(style) + + def to_html(self) -> str: ... diff --git a/zhenxun/utils/html_template/components/title.py b/zhenxun/utils/html_template/components/title.py new file mode 100644 index 00000000..860ad17e --- /dev/null +++ b/zhenxun/utils/html_template/components/title.py @@ -0,0 +1,15 @@ +from ..component import Component, Style +from ..container import Row + + +class Title(Component): + def __init__(self, text: str, color: str = "#000"): + self.text = text + self.color = color + + def build(self): + row = Row() + style = Style(font_size="36px", color=self.color) + row.set_style(style) + + # def diff --git a/zhenxun/utils/html_template/container.py b/zhenxun/utils/html_template/container.py new file mode 100644 index 00000000..3d5341c0 --- /dev/null +++ b/zhenxun/utils/html_template/container.py @@ -0,0 +1,31 @@ +from .component import Component + + +class Row(Component): + def __init__(self, background_color: str = "#fff"): + super().__init__(background_color, True) + + +class Col(Component): + def __init__(self, background_color: str = "#fff"): + super().__init__(background_color, True) + + +class Container(Component): + def __init__(self, background_color: str = "#fff"): + super().__init__(background_color, True) + self.children = [] + + +class GlobalOverview: + def __init__(self, name: str): + self.name = name + self.class_name: dict[str, list[str]] = {} + self.content = None + + def set_content(self, content: Container): + self.content = content + + def add_class(self, class_name: str, contents: list[str]): + """全局样式""" + self.class_name[class_name] = contents diff --git a/zhenxun/utils/http_utils.py b/zhenxun/utils/http_utils.py index b41cc331..0ccf777f 100644 --- a/zhenxun/utils/http_utils.py +++ b/zhenxun/utils/http_utils.py @@ -7,10 +7,9 @@ from typing import Any, ClassVar, Literal, cast import aiofiles import httpx -from httpx import AsyncHTTPTransport, HTTPStatusError, Response +from httpx import AsyncHTTPTransport, HTTPStatusError, Proxy, Response from nonebot_plugin_alconna import UniMessage from nonebot_plugin_htmlrender import get_browser -from packaging.version import parse as parse_version from playwright.async_api import Page from rich.progress import ( BarColumn, @@ -25,23 +24,41 @@ from zhenxun.services.log import logger from zhenxun.utils.message import MessageUtils from zhenxun.utils.user_agent import get_user_agent -CLIENT_KEY = ["use_proxy", "proxy", "verify", "headers"] +CLIENT_KEY = ["use_proxy", "proxies", "proxy", "verify", "headers"] def get_async_client( - proxies: dict[str, str] | None = None, verify: bool = False, **kwargs + proxies: dict[str, str] | None = None, + proxy: str | None = None, + verify: bool = False, + **kwargs, ) -> httpx.AsyncClient: - check_httpx_version = parse_version(httpx.__version__) >= parse_version("0.28.0") transport = kwargs.pop("transport", None) or AsyncHTTPTransport(verify=verify) - - if not check_httpx_version: - return httpx.AsyncClient(proxies=proxies, transport=transport, **kwargs) # type: ignore - proxy_str = None if proxies: - proxy_str = proxies.get("http://") or proxies.get("https://") - if not proxy_str: - logger.warning(f"代理字典 {proxies} 中未能提取出有效的URL,代理已被忽略。") - return httpx.AsyncClient(proxy=proxy_str, transport=transport, **kwargs) # type: ignore + http_proxy = proxies.get("http://") + https_proxy = proxies.get("https://") + return httpx.AsyncClient( + mounts={ + "http://": AsyncHTTPTransport( + proxy=Proxy(http_proxy) if http_proxy else None + ), + "https://": AsyncHTTPTransport( + proxy=Proxy(https_proxy) if https_proxy else None + ), + }, + transport=transport, + **kwargs, + ) + elif proxy: + return httpx.AsyncClient( + mounts={ + "http://": AsyncHTTPTransport(proxy=Proxy(proxy)), + "https://": AsyncHTTPTransport(proxy=Proxy(proxy)), + }, + transport=transport, + **kwargs, + ) + return httpx.AsyncClient(transport=transport, **kwargs) class AsyncHttpx: @@ -60,7 +77,8 @@ class AsyncHttpx: cls, *, use_proxy: bool = True, - proxy: dict[str, str] | None = None, + proxies: dict[str, str] | None = None, + proxy: str | None = None, headers: dict[str, str] | None = None, verify: bool = False, **kwargs, @@ -72,7 +90,8 @@ class AsyncHttpx: 参数: use_proxy: 是否使用在类中定义的默认代理。 - proxy: 手动指定的代理,会覆盖默认代理。 + proxies: 手动指定的代理,会覆盖默认代理。 + proxy: 单个代理,用于兼容旧版本,不再使用 headers: 需要合并到客户端的自定义请求头。 verify: 是否验证 SSL 证书。 **kwargs: 其他所有传递给 httpx.AsyncClient 的参数。 @@ -80,14 +99,18 @@ class AsyncHttpx: 返回: AsyncGenerator[httpx.AsyncClient, None]: 生成器。 """ - proxies_to_use = proxy or (cls.default_proxy if use_proxy else None) + proxies_to_use = proxies or (cls.default_proxy if use_proxy else None) final_headers = get_user_agent() if headers: final_headers.update(headers) async with get_async_client( - proxies=proxies_to_use, verify=verify, headers=final_headers, **kwargs + proxies=proxies_to_use, + proxy=proxy, + verify=verify, + headers=final_headers, + **kwargs, ) as client: yield client diff --git a/zhenxun/utils/manager/virtual_env_package_manager.py b/zhenxun/utils/manager/virtual_env_package_manager.py new file mode 100644 index 00000000..5c4b16e5 --- /dev/null +++ b/zhenxun/utils/manager/virtual_env_package_manager.py @@ -0,0 +1,159 @@ +from pathlib import Path +import subprocess +from subprocess import CalledProcessError +from typing import ClassVar + +from zhenxun.services.log import logger + +BAT_FILE = Path() / "win启动.bat" + +LOG_COMMAND = "VirtualEnvPackageManager" + + +class VirtualEnvPackageManager: + WIN_COMMAND: ClassVar[list[str]] = [ + "./Python310/python.exe", + "-m", + "pip", + ] + + DEFAULT_COMMAND: ClassVar[list[str]] = ["poetry", "run", "pip"] + + @classmethod + def __get_command(cls) -> list[str]: + return cls.WIN_COMMAND if BAT_FILE.exists() else cls.DEFAULT_COMMAND + + @classmethod + def install(cls, package: list[str] | str): + """安装依赖包 + + 参数: + package: 安装依赖包名称或列表 + """ + if isinstance(package, str): + package = [package] + try: + command = cls.__get_command() + command.append("install") + command.append(" ".join(package)) + logger.info(f"执行虚拟环境安装包指令: {command}", LOG_COMMAND) + result = subprocess.run( + command, + check=True, + capture_output=True, + text=True, + ) + logger.debug( + f"安装虚拟环境包指令执行完成: {result.stdout}", + LOG_COMMAND, + ) + except CalledProcessError as e: + logger.error(f"安装虚拟环境包指令执行失败: {e.stderr}.", LOG_COMMAND) + + @classmethod + def uninstall(cls, package: list[str] | str): + """卸载依赖包 + + 参数: + package: 卸载依赖包名称或列表 + """ + if isinstance(package, str): + package = [package] + try: + command = cls.__get_command() + command.append("uninstall") + command.append(" ".join(package)) + logger.info(f"执行虚拟环境卸载包指令: {command}", LOG_COMMAND) + result = subprocess.run( + command, + check=True, + capture_output=True, + text=True, + ) + logger.debug( + f"卸载虚拟环境包指令执行完成: {result.stdout}", + LOG_COMMAND, + ) + except CalledProcessError as e: + logger.error(f"卸载虚拟环境包指令执行失败: {e.stderr}.", LOG_COMMAND) + + @classmethod + def update(cls, package: list[str] | str): + """更新依赖包 + + 参数: + package: 更新依赖包名称或列表 + """ + if isinstance(package, str): + package = [package] + try: + command = cls.__get_command() + command.append("install") + command.append("--upgrade") + command.append(" ".join(package)) + logger.info(f"执行虚拟环境更新包指令: {command}", LOG_COMMAND) + result = subprocess.run( + command, + check=True, + capture_output=True, + text=True, + ) + logger.debug(f"更新虚拟环境包指令执行完成: {result.stdout}", LOG_COMMAND) + except CalledProcessError as e: + logger.error(f"更新虚拟环境包指令执行失败: {e.stderr}.", LOG_COMMAND) + + @classmethod + def install_requirement(cls, requirement_file: Path): + """安装依赖文件 + + 参数: + requirement_file: requirement文件路径 + + 异常: + FileNotFoundError: 文件不存在 + """ + if not requirement_file.exists(): + raise FileNotFoundError(f"依赖文件 {requirement_file} 不存在", LOG_COMMAND) + try: + command = cls.__get_command() + command.append("install") + command.append("-r") + command.append(str(requirement_file.absolute())) + logger.info(f"执行虚拟环境安装依赖文件指令: {command}", LOG_COMMAND) + result = subprocess.run( + command, + check=True, + capture_output=True, + text=True, + ) + logger.debug( + f"安装虚拟环境依赖文件指令执行完成: {result.stdout}", + LOG_COMMAND, + ) + except CalledProcessError as e: + logger.error( + f"安装虚拟环境依赖文件指令执行失败: {e.stderr}.", + LOG_COMMAND, + ) + + @classmethod + def list(cls) -> str: + """列出已安装的依赖包""" + try: + command = cls.__get_command() + command.append("list") + logger.info(f"执行虚拟环境列出包指令: {command}", LOG_COMMAND) + result = subprocess.run( + command, + check=True, + capture_output=True, + text=True, + ) + logger.debug( + f"列出虚拟环境包指令执行完成: {result.stdout}", + LOG_COMMAND, + ) + return result.stdout + except CalledProcessError as e: + logger.error(f"列出虚拟环境包指令执行失败: {e.stderr}.", LOG_COMMAND) + return ""