Compare commits

...

4 Commits

Author SHA1 Message Date
AkashiCoin
34e24cdee9 🎉 chore(version): Update version to v0.2.4-3cc882b
Some checks failed
Sequential Lint and Type Check / ruff-call (push) Has been cancelled
Sequential Lint and Type Check / pyright-call (push) Has been cancelled
2025-09-12 09:38:51 +00:00
Rumio
3cc882b116
feat(auto_update): 增强自动更新与版本检查 (#2042)
Some checks failed
检查bot是否运行正常 / bot check (push) Has been cancelled
CodeQL Code Security Analysis / Analyze (${{ matrix.language }}) (none, javascript-typescript) (push) Has been cancelled
CodeQL Code Security Analysis / Analyze (${{ matrix.language }}) (none, python) (push) Has been cancelled
Sequential Lint and Type Check / ruff-call (push) Has been cancelled
Release Drafter / Update Release Draft (push) Has been cancelled
Force Sync to Aliyun / sync (push) Has been cancelled
Update Version / update-version (push) Has been cancelled
Sequential Lint and Type Check / pyright-call (push) Has been cancelled
- 优化 `检查更新` 默认行为,未指定类型时直接显示版本信息
- 扩展版本详情显示:当前版本、最新开发版/正式版(含日期)、资源版本及更新提示
- 新增更新后资源兼容性检查,自动读取 `resources.spec` 并提示更新
- 使用 `asyncio.gather` 并发获取版本信息,引入 `packaging` 库提高比较准确性
- 优化错误处理与日志记录

Co-authored-by: webjoin111 <455457521@qq.com>
2025-09-12 17:38:41 +08:00
molanp
ee699fb345
fix(plugin_store): 修复插件商店的安装与卸载逻辑 (#2050)
* fix(plugin_store): 修复插件商店的安装与卸载逻辑

- 优化了插件安装、更新和移除的逻辑
- 调整了插件路径的处理方式,支持更灵活的安装位置
- 重构了 `install_plugin_with_repo` 方法,使用 `StorePluginInfo` 对象作为参数
- 修复了一些潜在的路径问题和模块命名问题

* refactor(zhenxun): 优化插件信息获取逻辑

- 将 PluginInfo.get_or_none 替换为 get_plugin 方法,简化插件信息获取逻辑
- 优化了插件移除操作中的插件信息获取流程

* refactor(zhenxun): 优化 sparse_checkout_clone 函数的实现

- 将 git 操作移至临时目录中执行,避免影响目标目录中的现有内容
- 简化了稀疏检出的配置和执行过程
- 改进了错误处理和回退逻辑
- 优化了文件移动和目录清理的操作

* 🐛 添加移除插件时二次查询

*  plugin_info.get_plugin参数包含plugin_type时无效过滤

---------

Co-authored-by: HibiKier <45528451+HibiKier@users.noreply.github.com>
2025-09-12 17:38:24 +08:00
molanp
631e66d54f
fix(htmlrender): 更新htmlrender 导入 路径 (#2051)
- 将 get_browser 的导入路径从 nonebot_plugin_htmlrender 更新为 nonebot_plugin_htmlrender.browser
2025-09-12 16:41:43 +08:00
9 changed files with 260 additions and 103 deletions

View File

@ -1 +1 @@
__version__: v0.2.4-da6d5b4 __version__: v0.2.4-3cc882b

View File

@ -84,13 +84,16 @@ async def _(
): ):
result = "" result = ""
await MessageUtils.build_message("正在进行检查更新...").send(reply_to=True) await MessageUtils.build_message("正在进行检查更新...").send(reply_to=True)
if not ver_type.available:
result += await UpdateManager.check_version()
logger.info("查看当前版本...", "检查更新", session=session)
await MessageUtils.build_message(result).finish()
return
ver_type_str = ver_type.result ver_type_str = ver_type.result
source_str = source.result source_str = source.result
if ver_type_str in {"main", "release"}: if ver_type_str in {"main", "release"}:
if not ver_type.available:
result += await UpdateManager.check_version()
logger.info("查看当前版本...", "检查更新", session=session)
await MessageUtils.build_message(result).finish()
try: try:
result += await UpdateManager.update_zhenxun( result += await UpdateManager.update_zhenxun(
bot, bot,

View File

@ -1,37 +1,135 @@
import asyncio
from typing import Literal from typing import Literal
from nonebot.adapters import Bot from nonebot.adapters import Bot
from packaging.specifiers import SpecifierSet
from packaging.version import InvalidVersion, Version
from zhenxun.services.log import logger from zhenxun.services.log import logger
from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.utils.manager.virtual_env_package_manager import VirtualEnvPackageManager from zhenxun.utils.manager.virtual_env_package_manager import VirtualEnvPackageManager
from zhenxun.utils.manager.zhenxun_repo_manager import ( from zhenxun.utils.manager.zhenxun_repo_manager import (
ZhenxunRepoConfig, ZhenxunRepoConfig,
ZhenxunRepoManager, ZhenxunRepoManager,
) )
from zhenxun.utils.platform import PlatformUtils from zhenxun.utils.platform import PlatformUtils
from zhenxun.utils.repo_utils import RepoFileManager
LOG_COMMAND = "AutoUpdate" LOG_COMMAND = "AutoUpdate"
class UpdateManager: class UpdateManager:
@staticmethod
async def _get_latest_commit_date(owner: str, repo: str, path: str) -> str:
"""获取文件最新 commit 日期"""
api_url = f"https://api.github.com/repos/{owner}/{repo}/commits"
params = {"path": path, "page": 1, "per_page": 1}
try:
data = await AsyncHttpx.get_json(api_url, params=params)
if data and isinstance(data, list) and data[0]:
date_str = data[0]["commit"]["committer"]["date"]
return date_str.split("T")[0]
except Exception as e:
logger.warning(f"获取 {owner}/{repo}/{path} 的 commit 日期失败", e=e)
return "获取失败"
@classmethod @classmethod
async def check_version(cls) -> str: async def check_version(cls) -> str:
"""检查更新版本 """检查真寻和资源的版本"""
bot_cur_version = cls.__get_version()
返回: release_task = ZhenxunRepoManager.zhenxun_get_latest_releases_data()
str: 更新信息 dev_version_task = RepoFileManager.get_file_content(
""" ZhenxunRepoConfig.ZHENXUN_BOT_GITHUB_URL, "__version__"
cur_version = cls.__get_version()
release_data = await ZhenxunRepoManager.zhenxun_get_latest_releases_data()
if not release_data:
return "检查更新获取版本失败..."
return (
"检测到当前版本更新\n"
f"当前版本:{cur_version}\n"
f"最新版本:{release_data.get('name')}\n"
f"创建日期:{release_data.get('created_at')}\n"
f"更新内容:\n{release_data.get('body')}"
) )
bot_commit_date_task = cls._get_latest_commit_date(
"HibiKier", "zhenxun_bot", "__version__"
)
res_commit_date_task = cls._get_latest_commit_date(
"zhenxun-org", "zhenxun-bot-resources", "__version__"
)
(
release_data,
dev_version_text,
bot_commit_date,
res_commit_date,
) = await asyncio.gather(
release_task,
dev_version_task,
bot_commit_date_task,
res_commit_date_task,
return_exceptions=True,
)
if isinstance(release_data, dict):
bot_release_version = release_data.get("name", "获取失败")
bot_release_date = release_data.get("created_at", "").split("T")[0]
else:
bot_release_version = "获取失败"
bot_release_date = "获取失败"
logger.warning(f"获取 Bot release 信息失败: {release_data}")
if isinstance(dev_version_text, str):
bot_dev_version = dev_version_text.split(":")[-1].strip()
else:
bot_dev_version = "获取失败"
bot_commit_date = "获取失败"
logger.warning(f"获取 Bot dev 版本信息失败: {dev_version_text}")
bot_update_hint = ""
try:
cur_base_v = bot_cur_version.split("-")[0].lstrip("v")
dev_base_v = bot_dev_version.split("-")[0].lstrip("v")
if Version(cur_base_v) < Version(dev_base_v):
bot_update_hint = "\n-> 发现新开发版本, 可用 `检查更新 main` 更新"
elif (
Version(cur_base_v) == Version(dev_base_v)
and bot_cur_version != bot_dev_version
):
bot_update_hint = "\n-> 发现新开发版本, 可用 `检查更新 main` 更新"
except (InvalidVersion, TypeError, IndexError):
if bot_cur_version != bot_dev_version and bot_dev_version != "获取失败":
bot_update_hint = "\n-> 发现新开发版本, 可用 `检查更新 main` 更新"
bot_update_info = (
f"当前版本: {bot_cur_version}\n"
f"最新开发版: {bot_dev_version} (更新于: {bot_commit_date})\n"
f"最新正式版: {bot_release_version} (发布于: {bot_release_date})"
f"{bot_update_hint}"
)
res_version_file = ZhenxunRepoConfig.RESOURCE_PATH / "__version__"
res_cur_version = "未找到"
if res_version_file.exists():
if text := res_version_file.open(encoding="utf8").readline():
res_cur_version = text.split(":")[-1].strip()
res_latest_version = "获取失败"
try:
res_latest_version_text = await RepoFileManager.get_file_content(
ZhenxunRepoConfig.RESOURCE_GITHUB_URL, "__version__"
)
res_latest_version = res_latest_version_text.split(":")[-1].strip()
except Exception as e:
res_commit_date = "获取失败"
logger.warning(f"获取资源版本信息失败: {e}")
res_update_hint = ""
try:
if Version(res_cur_version) < Version(res_latest_version):
res_update_hint = "\n-> 发现新资源版本, 可用 `检查更新 resource` 更新"
except (InvalidVersion, TypeError):
pass
res_update_info = (
f"当前版本: {res_cur_version}\n"
f"最新版本: {res_latest_version} (更新于: {res_commit_date})"
f"{res_update_hint}"
)
return f"『绪山真寻 Bot』\n{bot_update_info}\n\n『真寻资源』\n{res_update_info}"
@classmethod @classmethod
async def update_webui( async def update_webui(
@ -125,6 +223,7 @@ class UpdateManager:
f"检测真寻已更新,当前版本:{cur_version}\n开始更新...", f"检测真寻已更新,当前版本:{cur_version}\n开始更新...",
user_id, user_id,
) )
result_message = ""
if zip: if zip:
new_version = await ZhenxunRepoManager.zhenxun_zip_update(version_type) new_version = await ZhenxunRepoManager.zhenxun_zip_update(version_type)
await PlatformUtils.send_superuser( await PlatformUtils.send_superuser(
@ -133,7 +232,7 @@ class UpdateManager:
await VirtualEnvPackageManager.install_requirement( await VirtualEnvPackageManager.install_requirement(
ZhenxunRepoConfig.REQUIREMENTS_FILE ZhenxunRepoConfig.REQUIREMENTS_FILE
) )
return ( result_message = (
f"版本更新完成!\n版本: {cur_version} -> {new_version}\n" f"版本更新完成!\n版本: {cur_version} -> {new_version}\n"
"请重新启动真寻以完成更新!" "请重新启动真寻以完成更新!"
) )
@ -155,13 +254,54 @@ class UpdateManager:
await VirtualEnvPackageManager.install_requirement( await VirtualEnvPackageManager.install_requirement(
ZhenxunRepoConfig.REQUIREMENTS_FILE ZhenxunRepoConfig.REQUIREMENTS_FILE
) )
return ( result_message = (
f"版本更新完成!\n" f"版本更新完成!\n"
f"版本: {cur_version} -> {result.new_version}\n" f"版本: {cur_version} -> {result.new_version}\n"
f"变更文件个数: {len(result.changed_files)}" f"变更文件个数: {len(result.changed_files)}"
f"{'' if source == 'git' else '(阿里云更新不支持查看变更文件)'}\n" f"{'' if source == 'git' else '(阿里云更新不支持查看变更文件)'}\n"
"请重新启动真寻以完成更新!" "请重新启动真寻以完成更新!"
) )
resource_warning = ""
if version_type == "main":
try:
spec_content = await RepoFileManager.get_file_content(
ZhenxunRepoConfig.ZHENXUN_BOT_GITHUB_URL, "resources.spec"
)
required_spec_str = None
for line in spec_content.splitlines():
if line.startswith("require_resources_version:"):
required_spec_str = line.split(":", 1)[1].strip().strip("\"'")
break
if required_spec_str:
res_version_file = ZhenxunRepoConfig.RESOURCE_PATH / "__version__"
local_res_version_str = "0.0.0"
if res_version_file.exists():
if text := res_version_file.open(encoding="utf8").readline():
local_res_version_str = text.split(":")[-1].strip()
spec = SpecifierSet(required_spec_str)
local_ver = Version(local_res_version_str)
if not spec.contains(local_ver):
warning_header = (
f"⚠️ **资源版本不兼容!**\n"
f"当前代码需要资源版本: `{required_spec_str}`\n"
f"您当前的资源版本是: `{local_res_version_str}`\n"
"**将自动为您更新资源文件...**"
)
await PlatformUtils.send_superuser(bot, warning_header, user_id)
resource_update_source = None if zip else source
resource_update_result = await cls.update_resources(
source=resource_update_source, force=force
)
resource_warning = (
f"\n\n{warning_header}\n{resource_update_result}"
)
except Exception as e:
logger.warning(f"检查资源版本兼容性时出错: {e}", LOG_COMMAND, e=e)
resource_warning = (
"\n\n⚠️ 检查资源版本兼容性时出错,建议手动运行 `检查更新 resource`"
)
return result_message + resource_warning
@classmethod @classmethod
def __get_version(cls) -> str: def __get_version(cls) -> str:

View File

@ -1,3 +1,4 @@
import os
from pathlib import Path from pathlib import Path
import random import random
import shutil import shutil
@ -10,6 +11,7 @@ from zhenxun.configs.path_config import TEMP_PATH
from zhenxun.models.plugin_info import PluginInfo from zhenxun.models.plugin_info import PluginInfo
from zhenxun.services.log import logger from zhenxun.services.log import logger
from zhenxun.services.plugin_init import PluginInitManager from zhenxun.services.plugin_init import PluginInitManager
from zhenxun.utils.enum import PluginType
from zhenxun.utils.image_utils import BuildImage, ImageTemplate, RowStyle from zhenxun.utils.image_utils import BuildImage, ImageTemplate, RowStyle
from zhenxun.utils.manager.virtual_env_package_manager import VirtualEnvPackageManager from zhenxun.utils.manager.virtual_env_package_manager import VirtualEnvPackageManager
from zhenxun.utils.repo_utils import RepoFileManager from zhenxun.utils.repo_utils import RepoFileManager
@ -183,6 +185,8 @@ class StoreManager:
StorePluginInfo: 插件信息 StorePluginInfo: 插件信息
bool: 是否是外部插件 bool: 是否是外部插件
""" """
plugin_list: list[StorePluginInfo]
extra_plugin_list: list[StorePluginInfo]
plugin_list, extra_plugin_list = await cls.get_data() plugin_list, extra_plugin_list = await cls.get_data()
plugin_info = None plugin_info = None
is_external = False is_external = False
@ -206,6 +210,12 @@ class StoreManager:
if is_remove: if is_remove:
if plugin_info.module not in modules: if plugin_info.module not in modules:
raise PluginStoreException(f"插件 {plugin_info.name} 未安装,无法移除") raise PluginStoreException(f"插件 {plugin_info.name} 未安装,无法移除")
if plugin_obj := await PluginInfo.get_plugin(
module=plugin_info.module, plugin_type=PluginType.PARENT
):
plugin_info.module_path = plugin_obj.module_path
elif plugin_obj := await PluginInfo.get_plugin(module=plugin_info.module):
plugin_info.module_path = plugin_obj.module_path
return plugin_info, is_external return plugin_info, is_external
if is_update: if is_update:
@ -237,9 +247,7 @@ class StoreManager:
plugin_info.github_url = f"{github_url_split[0]}/tree/{version_split[1]}" plugin_info.github_url = f"{github_url_split[0]}/tree/{version_split[1]}"
logger.info(f"正在安装插件 {plugin_info.name}...", LOG_COMMAND) logger.info(f"正在安装插件 {plugin_info.name}...", LOG_COMMAND)
await cls.install_plugin_with_repo( await cls.install_plugin_with_repo(
plugin_info.github_url, plugin_info,
plugin_info.module_path,
plugin_info.is_dir,
is_external, is_external,
source, source,
) )
@ -248,9 +256,7 @@ class StoreManager:
@classmethod @classmethod
async def install_plugin_with_repo( async def install_plugin_with_repo(
cls, cls,
github_url: str, plugin_info: StorePluginInfo,
module_path: str,
is_dir: bool,
is_external: bool = False, is_external: bool = False,
source: str | None = None, source: str | None = None,
): ):
@ -267,18 +273,26 @@ class StoreManager:
repo_type = RepoType.ALIYUN repo_type = RepoType.ALIYUN
elif source == "git": elif source == "git":
repo_type = RepoType.GITHUB repo_type = RepoType.GITHUB
replace_module_path = module_path.replace(".", "/") module_path = plugin_info.module_path
plugin_name = module_path.split(".")[-1] is_dir = plugin_info.is_dir
github_url = plugin_info.github_url
assert github_url
replace_module_path = module_path.replace(".", "/").lstrip("/")
plugin_name = module_path.split(".")[-1] or plugin_info.module
if is_dir: if is_dir:
files = await RepoFileManager.list_directory_files( files = await RepoFileManager.list_directory_files(
github_url, replace_module_path, repo_type=repo_type github_url, replace_module_path, repo_type=repo_type
) )
else: else:
files = [RepoFileInfo(path=f"{replace_module_path}.py", is_dir=False)] files = [RepoFileInfo(path=f"{replace_module_path}.py", is_dir=False)]
local_path = BASE_PATH / "plugins" if is_external else BASE_PATH if not is_external:
target_dir = BASE_PATH / "plugins" / plugin_name target_dir = BASE_PATH
elif is_dir:
target_dir = BASE_PATH / "plugins" / plugin_name
else:
target_dir = BASE_PATH / "plugins"
files = [file for file in files if not file.is_dir] files = [file for file in files if not file.is_dir]
download_files = [(file.path, local_path / file.path) for file in files] download_files = [(file.path, target_dir / file.path) for file in files]
result = await RepoFileManager.download_files( result = await RepoFileManager.download_files(
github_url, github_url,
download_files, download_files,
@ -298,7 +312,7 @@ class StoreManager:
is_install_req = False is_install_req = False
for requirement_path in requirement_paths: for requirement_path in requirement_paths:
requirement_file = local_path / requirement_path.path requirement_file = target_dir / requirement_path.path
if requirement_file.exists(): if requirement_file.exists():
is_install_req = True is_install_req = True
await VirtualEnvPackageManager.install_requirement(requirement_file) await VirtualEnvPackageManager.install_requirement(requirement_file)
@ -341,13 +355,11 @@ class StoreManager:
str: 返回消息 str: 返回消息
""" """
plugin_info, _ = await cls.get_plugin_by_value(index_or_module, is_remove=True) plugin_info, _ = await cls.get_plugin_by_value(index_or_module, is_remove=True)
path = BASE_PATH module_path = plugin_info.module_path
if plugin_info.github_url: module = module_path.split(".")[-1]
path = BASE_PATH / "plugins" path = BASE_PATH.parent / Path(module_path.replace(".", os.sep))
for p in plugin_info.module_path.split("."):
path = path / p
if not plugin_info.is_dir: if not plugin_info.is_dir:
path = Path(f"{path}.py") path = path.parent / f"{module}.py"
if not path.exists(): if not path.exists():
return f"插件 {plugin_info.name} 不存在..." return f"插件 {plugin_info.name} 不存在..."
logger.debug(f"尝试移除插件 {plugin_info.name} 文件: {path}", LOG_COMMAND) logger.debug(f"尝试移除插件 {plugin_info.name} 文件: {path}", LOG_COMMAND)
@ -356,7 +368,7 @@ class StoreManager:
shutil.rmtree(path, onerror=win_on_rm_error) shutil.rmtree(path, onerror=win_on_rm_error)
else: else:
path.unlink() path.unlink()
await PluginInitManager.remove(f"zhenxun.{plugin_info.module_path}") await PluginInitManager.remove(module_path)
return f"插件 {plugin_info.name} 移除成功! 重启后生效" return f"插件 {plugin_info.name} 移除成功! 重启后生效"
@classmethod @classmethod
@ -423,9 +435,7 @@ class StoreManager:
if plugin_info.github_url is None: if plugin_info.github_url is None:
plugin_info.github_url = DEFAULT_GITHUB_URL plugin_info.github_url = DEFAULT_GITHUB_URL
await cls.install_plugin_with_repo( await cls.install_plugin_with_repo(
plugin_info.github_url, plugin_info,
plugin_info.module_path,
plugin_info.is_dir,
is_external, is_external,
) )
return f"插件 {plugin_info.name} 更新成功! 重启后生效" return f"插件 {plugin_info.name} 更新成功! 重启后生效"
@ -473,9 +483,7 @@ class StoreManager:
plugin_info.github_url = DEFAULT_GITHUB_URL plugin_info.github_url = DEFAULT_GITHUB_URL
is_external = False is_external = False
await cls.install_plugin_with_repo( await cls.install_plugin_with_repo(
plugin_info.github_url, plugin_info,
plugin_info.module_path,
plugin_info.is_dir,
is_external, is_external,
) )
update_success_list.append(plugin_info.name) update_success_list.append(plugin_info.name)

View File

@ -77,7 +77,7 @@ class PluginInfo(Model):
返回: 返回:
Self | None: 插件 Self | None: 插件
""" """
if filter_parent: if not kwargs.get("plugin_type") and filter_parent:
return await cls.get_or_none( return await cls.get_or_none(
load_status=load_status, plugin_type__not=PluginType.PARENT, **kwargs load_status=load_status, plugin_type__not=PluginType.PARENT, **kwargs
) )
@ -96,7 +96,7 @@ class PluginInfo(Model):
返回: 返回:
list[Self]: 插件列表 list[Self]: 插件列表
""" """
if filter_parent: if not kwargs.get("plugin_type") and filter_parent:
return await cls.filter( return await cls.filter(
load_status=load_status, plugin_type__not=PluginType.PARENT, **kwargs load_status=load_status, plugin_type__not=PluginType.PARENT, **kwargs
).all() ).all()

View File

@ -87,7 +87,7 @@ class PluginInitManager:
@classmethod @classmethod
async def remove(cls, module_path: str): async def remove(cls, module_path: str):
"""运行指定插件安装方法""" """运行指定插件移除方法"""
if model := cls.plugins.get(module_path): if model := cls.plugins.get(module_path):
if model.remove: if model.remove:
class_ = model.class_() class_ = model.class_()

View File

@ -4,7 +4,7 @@ from pathlib import Path
from typing import Any, Literal from typing import Any, Literal
from nonebot_plugin_alconna import UniMessage from nonebot_plugin_alconna import UniMessage
from nonebot_plugin_htmlrender import get_browser from nonebot_plugin_htmlrender.browser import get_browser
from playwright.async_api import Page from playwright.async_api import Page
from zhenxun.utils.message import MessageUtils from zhenxun.utils.message import MessageUtils

View File

@ -326,7 +326,7 @@ class RepoFileManager:
# 获取仓库树信息 # 获取仓库树信息
strategy = GitHubStrategy() strategy = GitHubStrategy()
strategy.body = await GitHubStrategy.parse_repo_info(repo_info) strategy.body = await strategy.parse_repo_info(repo_info)
# 处理目录路径,确保格式正确 # 处理目录路径,确保格式正确
if directory_path and not directory_path.endswith("/") and recursive: if directory_path and not directory_path.endswith("/") and recursive:
@ -480,7 +480,7 @@ class RepoFileManager:
target_dir: Path | None = None, target_dir: Path | None = None,
) -> FileDownloadResult: ) -> FileDownloadResult:
""" """
下载个文件 下载个文件
参数: 参数:
repo_url: 仓库URL repo_url: 仓库URL

View File

@ -7,6 +7,7 @@ import base64
from pathlib import Path from pathlib import Path
import re import re
import shutil import shutil
import tempfile
from zhenxun.services.log import logger from zhenxun.services.log import logger
@ -145,80 +146,85 @@ async def sparse_checkout_clone(
target_dir: Path, target_dir: Path,
) -> None: ) -> None:
""" """
使用 git 稀疏检出克隆指定路径到目标目录完全独立于主项目 git 使用 git 稀疏检出克隆指定路径到目标目录在临时目录中操作
关键保障: 关键保障:
- target_dir 下检测/初始化 .git所有 git 操作均以 cwd=target_dir 执行 - 在临时目录中执行所有 git 操作避免影响 target_dir 中的现有内容
- 强制拉取与工作区覆盖: fetch --forcecheckout -Breset --hardclean -xdf - 只操作 target_dir/sparse_path 路径不影响 target_dir 其他内容
- 反复设置 sparse-checkout 路径确保路径更新生效
""" """
target_dir.mkdir(parents=True, exist_ok=True) target_dir.mkdir(parents=True, exist_ok=True)
if not await check_git(): if not await check_git():
raise GitUnavailableError() raise GitUnavailableError()
git_dir = target_dir / ".git" # 在临时目录中进行 git 操作
if not git_dir.exists(): with tempfile.TemporaryDirectory() as temp_dir:
success, out, err = await run_git_command("init", target_dir) temp_path = Path(temp_dir)
# 初始化临时目录为 git 仓库
success, out, err = await run_git_command("init", temp_path)
if not success: if not success:
raise RuntimeError(f"git init 失败: {err or out}") raise RuntimeError(f"git init 失败: {err or out}")
success, out, err = await run_git_command( success, out, err = await run_git_command(
f"remote add origin {repo_url}", target_dir f"remote add origin {repo_url}", temp_path
) )
if not success: if not success:
raise RuntimeError(f"添加远程失败: {err or out}") raise RuntimeError(f"添加远程失败: {err or out}")
else:
# 启用稀疏检出(使用 --no-cone 模式以获得更精确的控制)
await run_git_command("config core.sparseCheckout true", temp_path)
await run_git_command("sparse-checkout init --no-cone", temp_path)
# 设置需要检出的路径(每次都覆盖配置)
if not sparse_path:
raise RuntimeError("sparse-checkout 路径不能为空")
# 使用 --no-cone 模式,直接指定要检出的具体路径
success, out, err = await run_git_command( success, out, err = await run_git_command(
f"remote set-url origin {repo_url}", target_dir f"sparse-checkout set {sparse_path}/", temp_path
) )
if not success: if not success:
# 兜底尝试添加 raise RuntimeError(f"配置稀疏路径失败: {err or out}")
await run_git_command(f"remote add origin {repo_url}", target_dir)
# 启用稀疏检出(使用 --no-cone 模式以获得更精确的控制) # 强制拉取并同步到远端
await run_git_command("config core.sparseCheckout true", target_dir) success, out, err = await run_git_command(
await run_git_command("sparse-checkout init --no-cone", target_dir) f"fetch --force --depth 1 origin {branch}", temp_path
)
if not success:
raise RuntimeError(f"fetch 失败: {err or out}")
# 设置需要检出的路径(每次都覆盖配置) # 使用远端强制更新本地分支并覆盖工作区
if not sparse_path: success, out, err = await run_git_command(
raise RuntimeError("sparse-checkout 路径不能为空") f"checkout -B {branch} origin/{branch}", temp_path
)
if not success:
# 回退方案
success2, out2, err2 = await run_git_command(
f"checkout {branch}", temp_path
)
if not success2:
raise RuntimeError(f"checkout 失败: {(err or out) or (err2 or out2)}")
# 使用 --no-cone 模式,直接指定要检出的具体路径 # 强制对齐工作区
# 例如sparse_path="plugins/mahiro" -> 只检出 plugins/mahiro/ 下的内容 await run_git_command(f"reset --hard origin/{branch}", temp_path)
success, out, err = await run_git_command( await run_git_command("clean -xdf", temp_path)
f"sparse-checkout set {sparse_path}/", target_dir
)
if not success:
raise RuntimeError(f"配置稀疏路径失败: {err or out}")
# 强制拉取并同步到远端 # 将检出的文件移动到目标位置
success, out, err = await run_git_command( source_path = temp_path / sparse_path
f"fetch --force --depth 1 origin {branch}", target_dir if source_path.exists():
) # 确保目标路径存在
if not success: target_path = target_dir / sparse_path
raise RuntimeError(f"fetch 失败: {err or out}") target_path.parent.mkdir(parents=True, exist_ok=True)
# 使用远端强制更新本地分支并覆盖工作区 # 如果目标路径已存在,先清理
success, out, err = await run_git_command( if target_path.exists():
f"checkout -B {branch} origin/{branch}", target_dir if target_path.is_dir():
) shutil.rmtree(target_path)
if not success: else:
# 回退方案 target_path.unlink()
success2, out2, err2 = await run_git_command(f"checkout {branch}", target_dir)
if not success2:
raise RuntimeError(f"checkout 失败: {(err or out) or (err2 or out2)}")
# 强制对齐工作区 # 移动整个目录结构到目标位置
await run_git_command(f"reset --hard origin/{branch}", target_dir) shutil.move(str(source_path), str(target_path))
await run_git_command("clean -xdf", target_dir)
dir_path = target_dir / Path(sparse_path)
for f in dir_path.iterdir():
shutil.move(f, target_dir / f.name)
dir_name = sparse_path.split("/")[0]
rm_path = target_dir / dir_name
if rm_path.exists():
shutil.rmtree(rm_path)
def prepare_aliyun_url(repo_url: str) -> str: def prepare_aliyun_url(repo_url: str) -> str: