Compare commits

..

1 Commits

Author SHA1 Message Date
AkashiCoin
3ed4851b88 🎉 chore(version): Update version to v0.2.4-c7ef6fd
Some checks failed
Sequential Lint and Type Check / ruff-call (push) Has been cancelled
Sequential Lint and Type Check / pyright-call (push) Has been cancelled
2025-09-11 02:32:00 +00:00
9 changed files with 103 additions and 260 deletions

View File

@ -1 +1 @@
__version__: v0.2.4-3cc882b __version__: v0.2.4-c7ef6fd

View File

@ -84,16 +84,13 @@ async def _(
): ):
result = "" result = ""
await MessageUtils.build_message("正在进行检查更新...").send(reply_to=True) await MessageUtils.build_message("正在进行检查更新...").send(reply_to=True)
if not ver_type.available:
result += await UpdateManager.check_version()
logger.info("查看当前版本...", "检查更新", session=session)
await MessageUtils.build_message(result).finish()
return
ver_type_str = ver_type.result ver_type_str = ver_type.result
source_str = source.result source_str = source.result
if ver_type_str in {"main", "release"}: if ver_type_str in {"main", "release"}:
if not ver_type.available:
result += await UpdateManager.check_version()
logger.info("查看当前版本...", "检查更新", session=session)
await MessageUtils.build_message(result).finish()
try: try:
result += await UpdateManager.update_zhenxun( result += await UpdateManager.update_zhenxun(
bot, bot,

View File

@ -1,135 +1,37 @@
import asyncio
from typing import Literal from typing import Literal
from nonebot.adapters import Bot from nonebot.adapters import Bot
from packaging.specifiers import SpecifierSet
from packaging.version import InvalidVersion, Version
from zhenxun.services.log import logger from zhenxun.services.log import logger
from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.utils.manager.virtual_env_package_manager import VirtualEnvPackageManager from zhenxun.utils.manager.virtual_env_package_manager import VirtualEnvPackageManager
from zhenxun.utils.manager.zhenxun_repo_manager import ( from zhenxun.utils.manager.zhenxun_repo_manager import (
ZhenxunRepoConfig, ZhenxunRepoConfig,
ZhenxunRepoManager, ZhenxunRepoManager,
) )
from zhenxun.utils.platform import PlatformUtils from zhenxun.utils.platform import PlatformUtils
from zhenxun.utils.repo_utils import RepoFileManager
LOG_COMMAND = "AutoUpdate" LOG_COMMAND = "AutoUpdate"
class UpdateManager: class UpdateManager:
@staticmethod
async def _get_latest_commit_date(owner: str, repo: str, path: str) -> str:
"""获取文件最新 commit 日期"""
api_url = f"https://api.github.com/repos/{owner}/{repo}/commits"
params = {"path": path, "page": 1, "per_page": 1}
try:
data = await AsyncHttpx.get_json(api_url, params=params)
if data and isinstance(data, list) and data[0]:
date_str = data[0]["commit"]["committer"]["date"]
return date_str.split("T")[0]
except Exception as e:
logger.warning(f"获取 {owner}/{repo}/{path} 的 commit 日期失败", e=e)
return "获取失败"
@classmethod @classmethod
async def check_version(cls) -> str: async def check_version(cls) -> str:
"""检查真寻和资源的版本""" """检查更新版本
bot_cur_version = cls.__get_version()
release_task = ZhenxunRepoManager.zhenxun_get_latest_releases_data() 返回:
dev_version_task = RepoFileManager.get_file_content( str: 更新信息
ZhenxunRepoConfig.ZHENXUN_BOT_GITHUB_URL, "__version__" """
cur_version = cls.__get_version()
release_data = await ZhenxunRepoManager.zhenxun_get_latest_releases_data()
if not release_data:
return "检查更新获取版本失败..."
return (
"检测到当前版本更新\n"
f"当前版本:{cur_version}\n"
f"最新版本:{release_data.get('name')}\n"
f"创建日期:{release_data.get('created_at')}\n"
f"更新内容:\n{release_data.get('body')}"
) )
bot_commit_date_task = cls._get_latest_commit_date(
"HibiKier", "zhenxun_bot", "__version__"
)
res_commit_date_task = cls._get_latest_commit_date(
"zhenxun-org", "zhenxun-bot-resources", "__version__"
)
(
release_data,
dev_version_text,
bot_commit_date,
res_commit_date,
) = await asyncio.gather(
release_task,
dev_version_task,
bot_commit_date_task,
res_commit_date_task,
return_exceptions=True,
)
if isinstance(release_data, dict):
bot_release_version = release_data.get("name", "获取失败")
bot_release_date = release_data.get("created_at", "").split("T")[0]
else:
bot_release_version = "获取失败"
bot_release_date = "获取失败"
logger.warning(f"获取 Bot release 信息失败: {release_data}")
if isinstance(dev_version_text, str):
bot_dev_version = dev_version_text.split(":")[-1].strip()
else:
bot_dev_version = "获取失败"
bot_commit_date = "获取失败"
logger.warning(f"获取 Bot dev 版本信息失败: {dev_version_text}")
bot_update_hint = ""
try:
cur_base_v = bot_cur_version.split("-")[0].lstrip("v")
dev_base_v = bot_dev_version.split("-")[0].lstrip("v")
if Version(cur_base_v) < Version(dev_base_v):
bot_update_hint = "\n-> 发现新开发版本, 可用 `检查更新 main` 更新"
elif (
Version(cur_base_v) == Version(dev_base_v)
and bot_cur_version != bot_dev_version
):
bot_update_hint = "\n-> 发现新开发版本, 可用 `检查更新 main` 更新"
except (InvalidVersion, TypeError, IndexError):
if bot_cur_version != bot_dev_version and bot_dev_version != "获取失败":
bot_update_hint = "\n-> 发现新开发版本, 可用 `检查更新 main` 更新"
bot_update_info = (
f"当前版本: {bot_cur_version}\n"
f"最新开发版: {bot_dev_version} (更新于: {bot_commit_date})\n"
f"最新正式版: {bot_release_version} (发布于: {bot_release_date})"
f"{bot_update_hint}"
)
res_version_file = ZhenxunRepoConfig.RESOURCE_PATH / "__version__"
res_cur_version = "未找到"
if res_version_file.exists():
if text := res_version_file.open(encoding="utf8").readline():
res_cur_version = text.split(":")[-1].strip()
res_latest_version = "获取失败"
try:
res_latest_version_text = await RepoFileManager.get_file_content(
ZhenxunRepoConfig.RESOURCE_GITHUB_URL, "__version__"
)
res_latest_version = res_latest_version_text.split(":")[-1].strip()
except Exception as e:
res_commit_date = "获取失败"
logger.warning(f"获取资源版本信息失败: {e}")
res_update_hint = ""
try:
if Version(res_cur_version) < Version(res_latest_version):
res_update_hint = "\n-> 发现新资源版本, 可用 `检查更新 resource` 更新"
except (InvalidVersion, TypeError):
pass
res_update_info = (
f"当前版本: {res_cur_version}\n"
f"最新版本: {res_latest_version} (更新于: {res_commit_date})"
f"{res_update_hint}"
)
return f"『绪山真寻 Bot』\n{bot_update_info}\n\n『真寻资源』\n{res_update_info}"
@classmethod @classmethod
async def update_webui( async def update_webui(
@ -223,7 +125,6 @@ class UpdateManager:
f"检测真寻已更新,当前版本:{cur_version}\n开始更新...", f"检测真寻已更新,当前版本:{cur_version}\n开始更新...",
user_id, user_id,
) )
result_message = ""
if zip: if zip:
new_version = await ZhenxunRepoManager.zhenxun_zip_update(version_type) new_version = await ZhenxunRepoManager.zhenxun_zip_update(version_type)
await PlatformUtils.send_superuser( await PlatformUtils.send_superuser(
@ -232,7 +133,7 @@ class UpdateManager:
await VirtualEnvPackageManager.install_requirement( await VirtualEnvPackageManager.install_requirement(
ZhenxunRepoConfig.REQUIREMENTS_FILE ZhenxunRepoConfig.REQUIREMENTS_FILE
) )
result_message = ( return (
f"版本更新完成!\n版本: {cur_version} -> {new_version}\n" f"版本更新完成!\n版本: {cur_version} -> {new_version}\n"
"请重新启动真寻以完成更新!" "请重新启动真寻以完成更新!"
) )
@ -254,54 +155,13 @@ class UpdateManager:
await VirtualEnvPackageManager.install_requirement( await VirtualEnvPackageManager.install_requirement(
ZhenxunRepoConfig.REQUIREMENTS_FILE ZhenxunRepoConfig.REQUIREMENTS_FILE
) )
result_message = ( return (
f"版本更新完成!\n" f"版本更新完成!\n"
f"版本: {cur_version} -> {result.new_version}\n" f"版本: {cur_version} -> {result.new_version}\n"
f"变更文件个数: {len(result.changed_files)}" f"变更文件个数: {len(result.changed_files)}"
f"{'' if source == 'git' else '(阿里云更新不支持查看变更文件)'}\n" f"{'' if source == 'git' else '(阿里云更新不支持查看变更文件)'}\n"
"请重新启动真寻以完成更新!" "请重新启动真寻以完成更新!"
) )
resource_warning = ""
if version_type == "main":
try:
spec_content = await RepoFileManager.get_file_content(
ZhenxunRepoConfig.ZHENXUN_BOT_GITHUB_URL, "resources.spec"
)
required_spec_str = None
for line in spec_content.splitlines():
if line.startswith("require_resources_version:"):
required_spec_str = line.split(":", 1)[1].strip().strip("\"'")
break
if required_spec_str:
res_version_file = ZhenxunRepoConfig.RESOURCE_PATH / "__version__"
local_res_version_str = "0.0.0"
if res_version_file.exists():
if text := res_version_file.open(encoding="utf8").readline():
local_res_version_str = text.split(":")[-1].strip()
spec = SpecifierSet(required_spec_str)
local_ver = Version(local_res_version_str)
if not spec.contains(local_ver):
warning_header = (
f"⚠️ **资源版本不兼容!**\n"
f"当前代码需要资源版本: `{required_spec_str}`\n"
f"您当前的资源版本是: `{local_res_version_str}`\n"
"**将自动为您更新资源文件...**"
)
await PlatformUtils.send_superuser(bot, warning_header, user_id)
resource_update_source = None if zip else source
resource_update_result = await cls.update_resources(
source=resource_update_source, force=force
)
resource_warning = (
f"\n\n{warning_header}\n{resource_update_result}"
)
except Exception as e:
logger.warning(f"检查资源版本兼容性时出错: {e}", LOG_COMMAND, e=e)
resource_warning = (
"\n\n⚠️ 检查资源版本兼容性时出错,建议手动运行 `检查更新 resource`"
)
return result_message + resource_warning
@classmethod @classmethod
def __get_version(cls) -> str: def __get_version(cls) -> str:

View File

@ -1,4 +1,3 @@
import os
from pathlib import Path from pathlib import Path
import random import random
import shutil import shutil
@ -11,7 +10,6 @@ from zhenxun.configs.path_config import TEMP_PATH
from zhenxun.models.plugin_info import PluginInfo from zhenxun.models.plugin_info import PluginInfo
from zhenxun.services.log import logger from zhenxun.services.log import logger
from zhenxun.services.plugin_init import PluginInitManager from zhenxun.services.plugin_init import PluginInitManager
from zhenxun.utils.enum import PluginType
from zhenxun.utils.image_utils import BuildImage, ImageTemplate, RowStyle from zhenxun.utils.image_utils import BuildImage, ImageTemplate, RowStyle
from zhenxun.utils.manager.virtual_env_package_manager import VirtualEnvPackageManager from zhenxun.utils.manager.virtual_env_package_manager import VirtualEnvPackageManager
from zhenxun.utils.repo_utils import RepoFileManager from zhenxun.utils.repo_utils import RepoFileManager
@ -185,8 +183,6 @@ class StoreManager:
StorePluginInfo: 插件信息 StorePluginInfo: 插件信息
bool: 是否是外部插件 bool: 是否是外部插件
""" """
plugin_list: list[StorePluginInfo]
extra_plugin_list: list[StorePluginInfo]
plugin_list, extra_plugin_list = await cls.get_data() plugin_list, extra_plugin_list = await cls.get_data()
plugin_info = None plugin_info = None
is_external = False is_external = False
@ -210,12 +206,6 @@ class StoreManager:
if is_remove: if is_remove:
if plugin_info.module not in modules: if plugin_info.module not in modules:
raise PluginStoreException(f"插件 {plugin_info.name} 未安装,无法移除") raise PluginStoreException(f"插件 {plugin_info.name} 未安装,无法移除")
if plugin_obj := await PluginInfo.get_plugin(
module=plugin_info.module, plugin_type=PluginType.PARENT
):
plugin_info.module_path = plugin_obj.module_path
elif plugin_obj := await PluginInfo.get_plugin(module=plugin_info.module):
plugin_info.module_path = plugin_obj.module_path
return plugin_info, is_external return plugin_info, is_external
if is_update: if is_update:
@ -247,7 +237,9 @@ class StoreManager:
plugin_info.github_url = f"{github_url_split[0]}/tree/{version_split[1]}" plugin_info.github_url = f"{github_url_split[0]}/tree/{version_split[1]}"
logger.info(f"正在安装插件 {plugin_info.name}...", LOG_COMMAND) logger.info(f"正在安装插件 {plugin_info.name}...", LOG_COMMAND)
await cls.install_plugin_with_repo( await cls.install_plugin_with_repo(
plugin_info, plugin_info.github_url,
plugin_info.module_path,
plugin_info.is_dir,
is_external, is_external,
source, source,
) )
@ -256,7 +248,9 @@ class StoreManager:
@classmethod @classmethod
async def install_plugin_with_repo( async def install_plugin_with_repo(
cls, cls,
plugin_info: StorePluginInfo, github_url: str,
module_path: str,
is_dir: bool,
is_external: bool = False, is_external: bool = False,
source: str | None = None, source: str | None = None,
): ):
@ -273,26 +267,18 @@ class StoreManager:
repo_type = RepoType.ALIYUN repo_type = RepoType.ALIYUN
elif source == "git": elif source == "git":
repo_type = RepoType.GITHUB repo_type = RepoType.GITHUB
module_path = plugin_info.module_path replace_module_path = module_path.replace(".", "/")
is_dir = plugin_info.is_dir plugin_name = module_path.split(".")[-1]
github_url = plugin_info.github_url
assert github_url
replace_module_path = module_path.replace(".", "/").lstrip("/")
plugin_name = module_path.split(".")[-1] or plugin_info.module
if is_dir: if is_dir:
files = await RepoFileManager.list_directory_files( files = await RepoFileManager.list_directory_files(
github_url, replace_module_path, repo_type=repo_type github_url, replace_module_path, repo_type=repo_type
) )
else: else:
files = [RepoFileInfo(path=f"{replace_module_path}.py", is_dir=False)] files = [RepoFileInfo(path=f"{replace_module_path}.py", is_dir=False)]
if not is_external: local_path = BASE_PATH / "plugins" if is_external else BASE_PATH
target_dir = BASE_PATH target_dir = BASE_PATH / "plugins" / plugin_name
elif is_dir:
target_dir = BASE_PATH / "plugins" / plugin_name
else:
target_dir = BASE_PATH / "plugins"
files = [file for file in files if not file.is_dir] files = [file for file in files if not file.is_dir]
download_files = [(file.path, target_dir / file.path) for file in files] download_files = [(file.path, local_path / file.path) for file in files]
result = await RepoFileManager.download_files( result = await RepoFileManager.download_files(
github_url, github_url,
download_files, download_files,
@ -312,7 +298,7 @@ class StoreManager:
is_install_req = False is_install_req = False
for requirement_path in requirement_paths: for requirement_path in requirement_paths:
requirement_file = target_dir / requirement_path.path requirement_file = local_path / requirement_path.path
if requirement_file.exists(): if requirement_file.exists():
is_install_req = True is_install_req = True
await VirtualEnvPackageManager.install_requirement(requirement_file) await VirtualEnvPackageManager.install_requirement(requirement_file)
@ -355,11 +341,13 @@ class StoreManager:
str: 返回消息 str: 返回消息
""" """
plugin_info, _ = await cls.get_plugin_by_value(index_or_module, is_remove=True) plugin_info, _ = await cls.get_plugin_by_value(index_or_module, is_remove=True)
module_path = plugin_info.module_path path = BASE_PATH
module = module_path.split(".")[-1] if plugin_info.github_url:
path = BASE_PATH.parent / Path(module_path.replace(".", os.sep)) path = BASE_PATH / "plugins"
for p in plugin_info.module_path.split("."):
path = path / p
if not plugin_info.is_dir: if not plugin_info.is_dir:
path = path.parent / f"{module}.py" path = Path(f"{path}.py")
if not path.exists(): if not path.exists():
return f"插件 {plugin_info.name} 不存在..." return f"插件 {plugin_info.name} 不存在..."
logger.debug(f"尝试移除插件 {plugin_info.name} 文件: {path}", LOG_COMMAND) logger.debug(f"尝试移除插件 {plugin_info.name} 文件: {path}", LOG_COMMAND)
@ -368,7 +356,7 @@ class StoreManager:
shutil.rmtree(path, onerror=win_on_rm_error) shutil.rmtree(path, onerror=win_on_rm_error)
else: else:
path.unlink() path.unlink()
await PluginInitManager.remove(module_path) await PluginInitManager.remove(f"zhenxun.{plugin_info.module_path}")
return f"插件 {plugin_info.name} 移除成功! 重启后生效" return f"插件 {plugin_info.name} 移除成功! 重启后生效"
@classmethod @classmethod
@ -435,7 +423,9 @@ class StoreManager:
if plugin_info.github_url is None: if plugin_info.github_url is None:
plugin_info.github_url = DEFAULT_GITHUB_URL plugin_info.github_url = DEFAULT_GITHUB_URL
await cls.install_plugin_with_repo( await cls.install_plugin_with_repo(
plugin_info, plugin_info.github_url,
plugin_info.module_path,
plugin_info.is_dir,
is_external, is_external,
) )
return f"插件 {plugin_info.name} 更新成功! 重启后生效" return f"插件 {plugin_info.name} 更新成功! 重启后生效"
@ -483,7 +473,9 @@ class StoreManager:
plugin_info.github_url = DEFAULT_GITHUB_URL plugin_info.github_url = DEFAULT_GITHUB_URL
is_external = False is_external = False
await cls.install_plugin_with_repo( await cls.install_plugin_with_repo(
plugin_info, plugin_info.github_url,
plugin_info.module_path,
plugin_info.is_dir,
is_external, is_external,
) )
update_success_list.append(plugin_info.name) update_success_list.append(plugin_info.name)

View File

@ -77,7 +77,7 @@ class PluginInfo(Model):
返回: 返回:
Self | None: 插件 Self | None: 插件
""" """
if not kwargs.get("plugin_type") and filter_parent: if filter_parent:
return await cls.get_or_none( return await cls.get_or_none(
load_status=load_status, plugin_type__not=PluginType.PARENT, **kwargs load_status=load_status, plugin_type__not=PluginType.PARENT, **kwargs
) )
@ -96,7 +96,7 @@ class PluginInfo(Model):
返回: 返回:
list[Self]: 插件列表 list[Self]: 插件列表
""" """
if not kwargs.get("plugin_type") and filter_parent: if filter_parent:
return await cls.filter( return await cls.filter(
load_status=load_status, plugin_type__not=PluginType.PARENT, **kwargs load_status=load_status, plugin_type__not=PluginType.PARENT, **kwargs
).all() ).all()

View File

@ -87,7 +87,7 @@ class PluginInitManager:
@classmethod @classmethod
async def remove(cls, module_path: str): async def remove(cls, module_path: str):
"""运行指定插件移除方法""" """运行指定插件安装方法"""
if model := cls.plugins.get(module_path): if model := cls.plugins.get(module_path):
if model.remove: if model.remove:
class_ = model.class_() class_ = model.class_()

View File

@ -4,7 +4,7 @@ from pathlib import Path
from typing import Any, Literal from typing import Any, Literal
from nonebot_plugin_alconna import UniMessage from nonebot_plugin_alconna import UniMessage
from nonebot_plugin_htmlrender.browser import get_browser from nonebot_plugin_htmlrender import get_browser
from playwright.async_api import Page from playwright.async_api import Page
from zhenxun.utils.message import MessageUtils from zhenxun.utils.message import MessageUtils

View File

@ -326,7 +326,7 @@ class RepoFileManager:
# 获取仓库树信息 # 获取仓库树信息
strategy = GitHubStrategy() strategy = GitHubStrategy()
strategy.body = await strategy.parse_repo_info(repo_info) strategy.body = await GitHubStrategy.parse_repo_info(repo_info)
# 处理目录路径,确保格式正确 # 处理目录路径,确保格式正确
if directory_path and not directory_path.endswith("/") and recursive: if directory_path and not directory_path.endswith("/") and recursive:
@ -480,7 +480,7 @@ class RepoFileManager:
target_dir: Path | None = None, target_dir: Path | None = None,
) -> FileDownloadResult: ) -> FileDownloadResult:
""" """
下载个文件 下载个文件
参数: 参数:
repo_url: 仓库URL repo_url: 仓库URL

View File

@ -7,7 +7,6 @@ import base64
from pathlib import Path from pathlib import Path
import re import re
import shutil import shutil
import tempfile
from zhenxun.services.log import logger from zhenxun.services.log import logger
@ -146,85 +145,80 @@ async def sparse_checkout_clone(
target_dir: Path, target_dir: Path,
) -> None: ) -> None:
""" """
使用 git 稀疏检出克隆指定路径到目标目录在临时目录中操作 使用 git 稀疏检出克隆指定路径到目标目录完全独立于主项目 git
关键保障: 关键保障:
- 在临时目录中执行所有 git 操作避免影响 target_dir 中的现有内容 - target_dir 下检测/初始化 .git所有 git 操作均以 cwd=target_dir 执行
- 只操作 target_dir/sparse_path 路径不影响 target_dir 其他内容 - 强制拉取与工作区覆盖: fetch --forcecheckout -Breset --hardclean -xdf
- 反复设置 sparse-checkout 路径确保路径更新生效
""" """
target_dir.mkdir(parents=True, exist_ok=True) target_dir.mkdir(parents=True, exist_ok=True)
if not await check_git(): if not await check_git():
raise GitUnavailableError() raise GitUnavailableError()
# 在临时目录中进行 git 操作 git_dir = target_dir / ".git"
with tempfile.TemporaryDirectory() as temp_dir: if not git_dir.exists():
temp_path = Path(temp_dir) success, out, err = await run_git_command("init", target_dir)
# 初始化临时目录为 git 仓库
success, out, err = await run_git_command("init", temp_path)
if not success: if not success:
raise RuntimeError(f"git init 失败: {err or out}") raise RuntimeError(f"git init 失败: {err or out}")
success, out, err = await run_git_command( success, out, err = await run_git_command(
f"remote add origin {repo_url}", temp_path f"remote add origin {repo_url}", target_dir
) )
if not success: if not success:
raise RuntimeError(f"添加远程失败: {err or out}") raise RuntimeError(f"添加远程失败: {err or out}")
else:
# 启用稀疏检出(使用 --no-cone 模式以获得更精确的控制)
await run_git_command("config core.sparseCheckout true", temp_path)
await run_git_command("sparse-checkout init --no-cone", temp_path)
# 设置需要检出的路径(每次都覆盖配置)
if not sparse_path:
raise RuntimeError("sparse-checkout 路径不能为空")
# 使用 --no-cone 模式,直接指定要检出的具体路径
success, out, err = await run_git_command( success, out, err = await run_git_command(
f"sparse-checkout set {sparse_path}/", temp_path f"remote set-url origin {repo_url}", target_dir
) )
if not success: if not success:
raise RuntimeError(f"配置稀疏路径失败: {err or out}") # 兜底尝试添加
await run_git_command(f"remote add origin {repo_url}", target_dir)
# 强制拉取并同步到远端 # 启用稀疏检出(使用 --no-cone 模式以获得更精确的控制)
success, out, err = await run_git_command( await run_git_command("config core.sparseCheckout true", target_dir)
f"fetch --force --depth 1 origin {branch}", temp_path await run_git_command("sparse-checkout init --no-cone", target_dir)
)
if not success:
raise RuntimeError(f"fetch 失败: {err or out}")
# 使用远端强制更新本地分支并覆盖工作区 # 设置需要检出的路径(每次都覆盖配置)
success, out, err = await run_git_command( if not sparse_path:
f"checkout -B {branch} origin/{branch}", temp_path raise RuntimeError("sparse-checkout 路径不能为空")
)
if not success:
# 回退方案
success2, out2, err2 = await run_git_command(
f"checkout {branch}", temp_path
)
if not success2:
raise RuntimeError(f"checkout 失败: {(err or out) or (err2 or out2)}")
# 强制对齐工作区 # 使用 --no-cone 模式,直接指定要检出的具体路径
await run_git_command(f"reset --hard origin/{branch}", temp_path) # 例如sparse_path="plugins/mahiro" -> 只检出 plugins/mahiro/ 下的内容
await run_git_command("clean -xdf", temp_path) success, out, err = await run_git_command(
f"sparse-checkout set {sparse_path}/", target_dir
)
if not success:
raise RuntimeError(f"配置稀疏路径失败: {err or out}")
# 将检出的文件移动到目标位置 # 强制拉取并同步到远端
source_path = temp_path / sparse_path success, out, err = await run_git_command(
if source_path.exists(): f"fetch --force --depth 1 origin {branch}", target_dir
# 确保目标路径存在 )
target_path = target_dir / sparse_path if not success:
target_path.parent.mkdir(parents=True, exist_ok=True) raise RuntimeError(f"fetch 失败: {err or out}")
# 如果目标路径已存在,先清理 # 使用远端强制更新本地分支并覆盖工作区
if target_path.exists(): success, out, err = await run_git_command(
if target_path.is_dir(): f"checkout -B {branch} origin/{branch}", target_dir
shutil.rmtree(target_path) )
else: if not success:
target_path.unlink() # 回退方案
success2, out2, err2 = await run_git_command(f"checkout {branch}", target_dir)
if not success2:
raise RuntimeError(f"checkout 失败: {(err or out) or (err2 or out2)}")
# 移动整个目录结构到目标位置 # 强制对齐工作区
shutil.move(str(source_path), str(target_path)) await run_git_command(f"reset --hard origin/{branch}", target_dir)
await run_git_command("clean -xdf", target_dir)
dir_path = target_dir / Path(sparse_path)
for f in dir_path.iterdir():
shutil.move(f, target_dir / f.name)
dir_name = sparse_path.split("/")[0]
rm_path = target_dir / dir_name
if rm_path.exists():
shutil.rmtree(rm_path)
def prepare_aliyun_url(repo_url: str) -> str: def prepare_aliyun_url(repo_url: str) -> str: