mirror of
https://github.com/zhenxun-org/zhenxun_bot.git
synced 2025-12-15 14:22:55 +08:00
Compare commits
5 Commits
d48724a9ea
...
7b7c8ac0c1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7b7c8ac0c1 | ||
|
|
34e24cdee9 | ||
|
|
3cc882b116 | ||
|
|
ee699fb345 | ||
|
|
631e66d54f |
@ -1 +1 @@
|
||||
__version__: v0.2.4-da6d5b4
|
||||
__version__: v0.2.4-3cc882b
|
||||
|
||||
@ -84,13 +84,16 @@ async def _(
|
||||
):
|
||||
result = ""
|
||||
await MessageUtils.build_message("正在进行检查更新...").send(reply_to=True)
|
||||
|
||||
if not ver_type.available:
|
||||
result += await UpdateManager.check_version()
|
||||
logger.info("查看当前版本...", "检查更新", session=session)
|
||||
await MessageUtils.build_message(result).finish()
|
||||
return
|
||||
|
||||
ver_type_str = ver_type.result
|
||||
source_str = source.result
|
||||
if ver_type_str in {"main", "release"}:
|
||||
if not ver_type.available:
|
||||
result += await UpdateManager.check_version()
|
||||
logger.info("查看当前版本...", "检查更新", session=session)
|
||||
await MessageUtils.build_message(result).finish()
|
||||
try:
|
||||
result += await UpdateManager.update_zhenxun(
|
||||
bot,
|
||||
|
||||
@ -1,37 +1,135 @@
|
||||
import asyncio
|
||||
from typing import Literal
|
||||
|
||||
from nonebot.adapters import Bot
|
||||
from packaging.specifiers import SpecifierSet
|
||||
from packaging.version import InvalidVersion, Version
|
||||
|
||||
from zhenxun.services.log import logger
|
||||
from zhenxun.utils.http_utils import AsyncHttpx
|
||||
from zhenxun.utils.manager.virtual_env_package_manager import VirtualEnvPackageManager
|
||||
from zhenxun.utils.manager.zhenxun_repo_manager import (
|
||||
ZhenxunRepoConfig,
|
||||
ZhenxunRepoManager,
|
||||
)
|
||||
from zhenxun.utils.platform import PlatformUtils
|
||||
from zhenxun.utils.repo_utils import RepoFileManager
|
||||
|
||||
LOG_COMMAND = "AutoUpdate"
|
||||
|
||||
|
||||
class UpdateManager:
|
||||
@staticmethod
|
||||
async def _get_latest_commit_date(owner: str, repo: str, path: str) -> str:
|
||||
"""获取文件最新 commit 日期"""
|
||||
api_url = f"https://api.github.com/repos/{owner}/{repo}/commits"
|
||||
params = {"path": path, "page": 1, "per_page": 1}
|
||||
try:
|
||||
data = await AsyncHttpx.get_json(api_url, params=params)
|
||||
if data and isinstance(data, list) and data[0]:
|
||||
date_str = data[0]["commit"]["committer"]["date"]
|
||||
return date_str.split("T")[0]
|
||||
except Exception as e:
|
||||
logger.warning(f"获取 {owner}/{repo}/{path} 的 commit 日期失败", e=e)
|
||||
return "获取失败"
|
||||
|
||||
@classmethod
|
||||
async def check_version(cls) -> str:
|
||||
"""检查更新版本
|
||||
"""检查真寻和资源的版本"""
|
||||
bot_cur_version = cls.__get_version()
|
||||
|
||||
返回:
|
||||
str: 更新信息
|
||||
"""
|
||||
cur_version = cls.__get_version()
|
||||
release_data = await ZhenxunRepoManager.zhenxun_get_latest_releases_data()
|
||||
if not release_data:
|
||||
return "检查更新获取版本失败..."
|
||||
return (
|
||||
"检测到当前版本更新\n"
|
||||
f"当前版本:{cur_version}\n"
|
||||
f"最新版本:{release_data.get('name')}\n"
|
||||
f"创建日期:{release_data.get('created_at')}\n"
|
||||
f"更新内容:\n{release_data.get('body')}"
|
||||
release_task = ZhenxunRepoManager.zhenxun_get_latest_releases_data()
|
||||
dev_version_task = RepoFileManager.get_file_content(
|
||||
ZhenxunRepoConfig.ZHENXUN_BOT_GITHUB_URL, "__version__"
|
||||
)
|
||||
bot_commit_date_task = cls._get_latest_commit_date(
|
||||
"HibiKier", "zhenxun_bot", "__version__"
|
||||
)
|
||||
res_commit_date_task = cls._get_latest_commit_date(
|
||||
"zhenxun-org", "zhenxun-bot-resources", "__version__"
|
||||
)
|
||||
|
||||
(
|
||||
release_data,
|
||||
dev_version_text,
|
||||
bot_commit_date,
|
||||
res_commit_date,
|
||||
) = await asyncio.gather(
|
||||
release_task,
|
||||
dev_version_task,
|
||||
bot_commit_date_task,
|
||||
res_commit_date_task,
|
||||
return_exceptions=True,
|
||||
)
|
||||
|
||||
if isinstance(release_data, dict):
|
||||
bot_release_version = release_data.get("name", "获取失败")
|
||||
bot_release_date = release_data.get("created_at", "").split("T")[0]
|
||||
else:
|
||||
bot_release_version = "获取失败"
|
||||
bot_release_date = "获取失败"
|
||||
logger.warning(f"获取 Bot release 信息失败: {release_data}")
|
||||
|
||||
if isinstance(dev_version_text, str):
|
||||
bot_dev_version = dev_version_text.split(":")[-1].strip()
|
||||
else:
|
||||
bot_dev_version = "获取失败"
|
||||
bot_commit_date = "获取失败"
|
||||
logger.warning(f"获取 Bot dev 版本信息失败: {dev_version_text}")
|
||||
|
||||
bot_update_hint = ""
|
||||
try:
|
||||
cur_base_v = bot_cur_version.split("-")[0].lstrip("v")
|
||||
dev_base_v = bot_dev_version.split("-")[0].lstrip("v")
|
||||
|
||||
if Version(cur_base_v) < Version(dev_base_v):
|
||||
bot_update_hint = "\n-> 发现新开发版本, 可用 `检查更新 main` 更新"
|
||||
elif (
|
||||
Version(cur_base_v) == Version(dev_base_v)
|
||||
and bot_cur_version != bot_dev_version
|
||||
):
|
||||
bot_update_hint = "\n-> 发现新开发版本, 可用 `检查更新 main` 更新"
|
||||
except (InvalidVersion, TypeError, IndexError):
|
||||
if bot_cur_version != bot_dev_version and bot_dev_version != "获取失败":
|
||||
bot_update_hint = "\n-> 发现新开发版本, 可用 `检查更新 main` 更新"
|
||||
|
||||
bot_update_info = (
|
||||
f"当前版本: {bot_cur_version}\n"
|
||||
f"最新开发版: {bot_dev_version} (更新于: {bot_commit_date})\n"
|
||||
f"最新正式版: {bot_release_version} (发布于: {bot_release_date})"
|
||||
f"{bot_update_hint}"
|
||||
)
|
||||
|
||||
res_version_file = ZhenxunRepoConfig.RESOURCE_PATH / "__version__"
|
||||
res_cur_version = "未找到"
|
||||
if res_version_file.exists():
|
||||
if text := res_version_file.open(encoding="utf8").readline():
|
||||
res_cur_version = text.split(":")[-1].strip()
|
||||
|
||||
res_latest_version = "获取失败"
|
||||
try:
|
||||
res_latest_version_text = await RepoFileManager.get_file_content(
|
||||
ZhenxunRepoConfig.RESOURCE_GITHUB_URL, "__version__"
|
||||
)
|
||||
res_latest_version = res_latest_version_text.split(":")[-1].strip()
|
||||
except Exception as e:
|
||||
res_commit_date = "获取失败"
|
||||
logger.warning(f"获取资源版本信息失败: {e}")
|
||||
|
||||
res_update_hint = ""
|
||||
try:
|
||||
if Version(res_cur_version) < Version(res_latest_version):
|
||||
res_update_hint = "\n-> 发现新资源版本, 可用 `检查更新 resource` 更新"
|
||||
except (InvalidVersion, TypeError):
|
||||
pass
|
||||
|
||||
res_update_info = (
|
||||
f"当前版本: {res_cur_version}\n"
|
||||
f"最新版本: {res_latest_version} (更新于: {res_commit_date})"
|
||||
f"{res_update_hint}"
|
||||
)
|
||||
|
||||
return f"『绪山真寻 Bot』\n{bot_update_info}\n\n『真寻资源』\n{res_update_info}"
|
||||
|
||||
@classmethod
|
||||
async def update_webui(
|
||||
@ -125,6 +223,7 @@ class UpdateManager:
|
||||
f"检测真寻已更新,当前版本:{cur_version}\n开始更新...",
|
||||
user_id,
|
||||
)
|
||||
result_message = ""
|
||||
if zip:
|
||||
new_version = await ZhenxunRepoManager.zhenxun_zip_update(version_type)
|
||||
await PlatformUtils.send_superuser(
|
||||
@ -133,7 +232,7 @@ class UpdateManager:
|
||||
await VirtualEnvPackageManager.install_requirement(
|
||||
ZhenxunRepoConfig.REQUIREMENTS_FILE
|
||||
)
|
||||
return (
|
||||
result_message = (
|
||||
f"版本更新完成!\n版本: {cur_version} -> {new_version}\n"
|
||||
"请重新启动真寻以完成更新!"
|
||||
)
|
||||
@ -155,13 +254,54 @@ class UpdateManager:
|
||||
await VirtualEnvPackageManager.install_requirement(
|
||||
ZhenxunRepoConfig.REQUIREMENTS_FILE
|
||||
)
|
||||
return (
|
||||
result_message = (
|
||||
f"版本更新完成!\n"
|
||||
f"版本: {cur_version} -> {result.new_version}\n"
|
||||
f"变更文件个数: {len(result.changed_files)}"
|
||||
f"{'' if source == 'git' else '(阿里云更新不支持查看变更文件)'}\n"
|
||||
"请重新启动真寻以完成更新!"
|
||||
)
|
||||
resource_warning = ""
|
||||
if version_type == "main":
|
||||
try:
|
||||
spec_content = await RepoFileManager.get_file_content(
|
||||
ZhenxunRepoConfig.ZHENXUN_BOT_GITHUB_URL, "resources.spec"
|
||||
)
|
||||
required_spec_str = None
|
||||
for line in spec_content.splitlines():
|
||||
if line.startswith("require_resources_version:"):
|
||||
required_spec_str = line.split(":", 1)[1].strip().strip("\"'")
|
||||
break
|
||||
if required_spec_str:
|
||||
res_version_file = ZhenxunRepoConfig.RESOURCE_PATH / "__version__"
|
||||
local_res_version_str = "0.0.0"
|
||||
if res_version_file.exists():
|
||||
if text := res_version_file.open(encoding="utf8").readline():
|
||||
local_res_version_str = text.split(":")[-1].strip()
|
||||
|
||||
spec = SpecifierSet(required_spec_str)
|
||||
local_ver = Version(local_res_version_str)
|
||||
if not spec.contains(local_ver):
|
||||
warning_header = (
|
||||
f"⚠️ **资源版本不兼容!**\n"
|
||||
f"当前代码需要资源版本: `{required_spec_str}`\n"
|
||||
f"您当前的资源版本是: `{local_res_version_str}`\n"
|
||||
"**将自动为您更新资源文件...**"
|
||||
)
|
||||
await PlatformUtils.send_superuser(bot, warning_header, user_id)
|
||||
resource_update_source = None if zip else source
|
||||
resource_update_result = await cls.update_resources(
|
||||
source=resource_update_source, force=force
|
||||
)
|
||||
resource_warning = (
|
||||
f"\n\n{warning_header}\n{resource_update_result}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"检查资源版本兼容性时出错: {e}", LOG_COMMAND, e=e)
|
||||
resource_warning = (
|
||||
"\n\n⚠️ 检查资源版本兼容性时出错,建议手动运行 `检查更新 resource`"
|
||||
)
|
||||
return result_message + resource_warning
|
||||
|
||||
@classmethod
|
||||
def __get_version(cls) -> str:
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
import random
|
||||
import shutil
|
||||
@ -10,6 +11,7 @@ from zhenxun.configs.path_config import TEMP_PATH
|
||||
from zhenxun.models.plugin_info import PluginInfo
|
||||
from zhenxun.services.log import logger
|
||||
from zhenxun.services.plugin_init import PluginInitManager
|
||||
from zhenxun.utils.enum import PluginType
|
||||
from zhenxun.utils.image_utils import BuildImage, ImageTemplate, RowStyle
|
||||
from zhenxun.utils.manager.virtual_env_package_manager import VirtualEnvPackageManager
|
||||
from zhenxun.utils.repo_utils import RepoFileManager
|
||||
@ -183,6 +185,8 @@ class StoreManager:
|
||||
StorePluginInfo: 插件信息
|
||||
bool: 是否是外部插件
|
||||
"""
|
||||
plugin_list: list[StorePluginInfo]
|
||||
extra_plugin_list: list[StorePluginInfo]
|
||||
plugin_list, extra_plugin_list = await cls.get_data()
|
||||
plugin_info = None
|
||||
is_external = False
|
||||
@ -206,6 +210,12 @@ class StoreManager:
|
||||
if is_remove:
|
||||
if plugin_info.module not in modules:
|
||||
raise PluginStoreException(f"插件 {plugin_info.name} 未安装,无法移除")
|
||||
if plugin_obj := await PluginInfo.get_plugin(
|
||||
module=plugin_info.module, plugin_type=PluginType.PARENT
|
||||
):
|
||||
plugin_info.module_path = plugin_obj.module_path
|
||||
elif plugin_obj := await PluginInfo.get_plugin(module=plugin_info.module):
|
||||
plugin_info.module_path = plugin_obj.module_path
|
||||
return plugin_info, is_external
|
||||
|
||||
if is_update:
|
||||
@ -237,9 +247,7 @@ class StoreManager:
|
||||
plugin_info.github_url = f"{github_url_split[0]}/tree/{version_split[1]}"
|
||||
logger.info(f"正在安装插件 {plugin_info.name}...", LOG_COMMAND)
|
||||
await cls.install_plugin_with_repo(
|
||||
plugin_info.github_url,
|
||||
plugin_info.module_path,
|
||||
plugin_info.is_dir,
|
||||
plugin_info,
|
||||
is_external,
|
||||
source,
|
||||
)
|
||||
@ -248,9 +256,7 @@ class StoreManager:
|
||||
@classmethod
|
||||
async def install_plugin_with_repo(
|
||||
cls,
|
||||
github_url: str,
|
||||
module_path: str,
|
||||
is_dir: bool,
|
||||
plugin_info: StorePluginInfo,
|
||||
is_external: bool = False,
|
||||
source: str | None = None,
|
||||
):
|
||||
@ -267,18 +273,26 @@ class StoreManager:
|
||||
repo_type = RepoType.ALIYUN
|
||||
elif source == "git":
|
||||
repo_type = RepoType.GITHUB
|
||||
replace_module_path = module_path.replace(".", "/")
|
||||
plugin_name = module_path.split(".")[-1]
|
||||
module_path = plugin_info.module_path
|
||||
is_dir = plugin_info.is_dir
|
||||
github_url = plugin_info.github_url
|
||||
assert github_url
|
||||
replace_module_path = module_path.replace(".", "/").lstrip("/")
|
||||
plugin_name = module_path.split(".")[-1] or plugin_info.module
|
||||
if is_dir:
|
||||
files = await RepoFileManager.list_directory_files(
|
||||
github_url, replace_module_path, repo_type=repo_type
|
||||
)
|
||||
else:
|
||||
files = [RepoFileInfo(path=f"{replace_module_path}.py", is_dir=False)]
|
||||
local_path = BASE_PATH / "plugins" if is_external else BASE_PATH
|
||||
target_dir = BASE_PATH / "plugins" / plugin_name
|
||||
if not is_external:
|
||||
target_dir = BASE_PATH
|
||||
elif is_dir:
|
||||
target_dir = BASE_PATH / "plugins" / plugin_name
|
||||
else:
|
||||
target_dir = BASE_PATH / "plugins"
|
||||
files = [file for file in files if not file.is_dir]
|
||||
download_files = [(file.path, local_path / file.path) for file in files]
|
||||
download_files = [(file.path, target_dir / file.path) for file in files]
|
||||
result = await RepoFileManager.download_files(
|
||||
github_url,
|
||||
download_files,
|
||||
@ -298,7 +312,7 @@ class StoreManager:
|
||||
|
||||
is_install_req = False
|
||||
for requirement_path in requirement_paths:
|
||||
requirement_file = local_path / requirement_path.path
|
||||
requirement_file = target_dir / requirement_path.path
|
||||
if requirement_file.exists():
|
||||
is_install_req = True
|
||||
await VirtualEnvPackageManager.install_requirement(requirement_file)
|
||||
@ -341,13 +355,11 @@ class StoreManager:
|
||||
str: 返回消息
|
||||
"""
|
||||
plugin_info, _ = await cls.get_plugin_by_value(index_or_module, is_remove=True)
|
||||
path = BASE_PATH
|
||||
if plugin_info.github_url:
|
||||
path = BASE_PATH / "plugins"
|
||||
for p in plugin_info.module_path.split("."):
|
||||
path = path / p
|
||||
module_path = plugin_info.module_path
|
||||
module = module_path.split(".")[-1]
|
||||
path = BASE_PATH.parent / Path(module_path.replace(".", os.sep))
|
||||
if not plugin_info.is_dir:
|
||||
path = Path(f"{path}.py")
|
||||
path = path.parent / f"{module}.py"
|
||||
if not path.exists():
|
||||
return f"插件 {plugin_info.name} 不存在..."
|
||||
logger.debug(f"尝试移除插件 {plugin_info.name} 文件: {path}", LOG_COMMAND)
|
||||
@ -356,7 +368,7 @@ class StoreManager:
|
||||
shutil.rmtree(path, onerror=win_on_rm_error)
|
||||
else:
|
||||
path.unlink()
|
||||
await PluginInitManager.remove(f"zhenxun.{plugin_info.module_path}")
|
||||
await PluginInitManager.remove(module_path)
|
||||
return f"插件 {plugin_info.name} 移除成功! 重启后生效"
|
||||
|
||||
@classmethod
|
||||
@ -423,9 +435,7 @@ class StoreManager:
|
||||
if plugin_info.github_url is None:
|
||||
plugin_info.github_url = DEFAULT_GITHUB_URL
|
||||
await cls.install_plugin_with_repo(
|
||||
plugin_info.github_url,
|
||||
plugin_info.module_path,
|
||||
plugin_info.is_dir,
|
||||
plugin_info,
|
||||
is_external,
|
||||
)
|
||||
return f"插件 {plugin_info.name} 更新成功! 重启后生效"
|
||||
@ -473,9 +483,7 @@ class StoreManager:
|
||||
plugin_info.github_url = DEFAULT_GITHUB_URL
|
||||
is_external = False
|
||||
await cls.install_plugin_with_repo(
|
||||
plugin_info.github_url,
|
||||
plugin_info.module_path,
|
||||
plugin_info.is_dir,
|
||||
plugin_info,
|
||||
is_external,
|
||||
)
|
||||
update_success_list.append(plugin_info.name)
|
||||
|
||||
@ -77,7 +77,7 @@ class PluginInfo(Model):
|
||||
返回:
|
||||
Self | None: 插件
|
||||
"""
|
||||
if filter_parent:
|
||||
if not kwargs.get("plugin_type") and filter_parent:
|
||||
return await cls.get_or_none(
|
||||
load_status=load_status, plugin_type__not=PluginType.PARENT, **kwargs
|
||||
)
|
||||
@ -96,7 +96,7 @@ class PluginInfo(Model):
|
||||
返回:
|
||||
list[Self]: 插件列表
|
||||
"""
|
||||
if filter_parent:
|
||||
if not kwargs.get("plugin_type") and filter_parent:
|
||||
return await cls.filter(
|
||||
load_status=load_status, plugin_type__not=PluginType.PARENT, **kwargs
|
||||
).all()
|
||||
|
||||
@ -87,7 +87,7 @@ class PluginInitManager:
|
||||
|
||||
@classmethod
|
||||
async def remove(cls, module_path: str):
|
||||
"""运行指定插件安装方法"""
|
||||
"""运行指定插件移除方法"""
|
||||
if model := cls.plugins.get(module_path):
|
||||
if model.remove:
|
||||
class_ = model.class_()
|
||||
|
||||
@ -4,7 +4,7 @@ from pathlib import Path
|
||||
from typing import Any, Literal
|
||||
|
||||
from nonebot_plugin_alconna import UniMessage
|
||||
from nonebot_plugin_htmlrender import get_browser
|
||||
from nonebot_plugin_htmlrender.browser import get_browser
|
||||
from playwright.async_api import Page
|
||||
|
||||
from zhenxun.utils.message import MessageUtils
|
||||
|
||||
@ -326,7 +326,7 @@ class RepoFileManager:
|
||||
|
||||
# 获取仓库树信息
|
||||
strategy = GitHubStrategy()
|
||||
strategy.body = await GitHubStrategy.parse_repo_info(repo_info)
|
||||
strategy.body = await strategy.parse_repo_info(repo_info)
|
||||
|
||||
# 处理目录路径,确保格式正确
|
||||
if directory_path and not directory_path.endswith("/") and recursive:
|
||||
@ -480,7 +480,7 @@ class RepoFileManager:
|
||||
target_dir: Path | None = None,
|
||||
) -> FileDownloadResult:
|
||||
"""
|
||||
下载单个文件
|
||||
下载多个文件
|
||||
|
||||
参数:
|
||||
repo_url: 仓库URL
|
||||
|
||||
@ -7,6 +7,7 @@ import base64
|
||||
from pathlib import Path
|
||||
import re
|
||||
import shutil
|
||||
import tempfile
|
||||
|
||||
from zhenxun.services.log import logger
|
||||
|
||||
@ -145,80 +146,85 @@ async def sparse_checkout_clone(
|
||||
target_dir: Path,
|
||||
) -> None:
|
||||
"""
|
||||
使用 git 稀疏检出克隆指定路径到目标目录(完全独立于主项目 git)。
|
||||
使用 git 稀疏检出克隆指定路径到目标目录(在临时目录中操作)。
|
||||
|
||||
关键保障:
|
||||
- 在 target_dir 下检测/初始化 .git,所有 git 操作均以 cwd=target_dir 执行
|
||||
- 强制拉取与工作区覆盖: fetch --force、checkout -B、reset --hard、clean -xdf
|
||||
- 反复设置 sparse-checkout 路径,确保路径更新生效
|
||||
- 在临时目录中执行所有 git 操作,避免影响 target_dir 中的现有内容
|
||||
- 只操作 target_dir/sparse_path 路径,不影响 target_dir 其他内容
|
||||
"""
|
||||
target_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if not await check_git():
|
||||
raise GitUnavailableError()
|
||||
|
||||
git_dir = target_dir / ".git"
|
||||
if not git_dir.exists():
|
||||
success, out, err = await run_git_command("init", target_dir)
|
||||
# 在临时目录中进行 git 操作
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
temp_path = Path(temp_dir)
|
||||
|
||||
# 初始化临时目录为 git 仓库
|
||||
success, out, err = await run_git_command("init", temp_path)
|
||||
if not success:
|
||||
raise RuntimeError(f"git init 失败: {err or out}")
|
||||
success, out, err = await run_git_command(
|
||||
f"remote add origin {repo_url}", target_dir
|
||||
f"remote add origin {repo_url}", temp_path
|
||||
)
|
||||
if not success:
|
||||
raise RuntimeError(f"添加远程失败: {err or out}")
|
||||
else:
|
||||
|
||||
# 启用稀疏检出(使用 --no-cone 模式以获得更精确的控制)
|
||||
await run_git_command("config core.sparseCheckout true", temp_path)
|
||||
await run_git_command("sparse-checkout init --no-cone", temp_path)
|
||||
|
||||
# 设置需要检出的路径(每次都覆盖配置)
|
||||
if not sparse_path:
|
||||
raise RuntimeError("sparse-checkout 路径不能为空")
|
||||
|
||||
# 使用 --no-cone 模式,直接指定要检出的具体路径
|
||||
success, out, err = await run_git_command(
|
||||
f"remote set-url origin {repo_url}", target_dir
|
||||
f"sparse-checkout set {sparse_path}/", temp_path
|
||||
)
|
||||
if not success:
|
||||
# 兜底尝试添加
|
||||
await run_git_command(f"remote add origin {repo_url}", target_dir)
|
||||
raise RuntimeError(f"配置稀疏路径失败: {err or out}")
|
||||
|
||||
# 启用稀疏检出(使用 --no-cone 模式以获得更精确的控制)
|
||||
await run_git_command("config core.sparseCheckout true", target_dir)
|
||||
await run_git_command("sparse-checkout init --no-cone", target_dir)
|
||||
# 强制拉取并同步到远端
|
||||
success, out, err = await run_git_command(
|
||||
f"fetch --force --depth 1 origin {branch}", temp_path
|
||||
)
|
||||
if not success:
|
||||
raise RuntimeError(f"fetch 失败: {err or out}")
|
||||
|
||||
# 设置需要检出的路径(每次都覆盖配置)
|
||||
if not sparse_path:
|
||||
raise RuntimeError("sparse-checkout 路径不能为空")
|
||||
# 使用远端强制更新本地分支并覆盖工作区
|
||||
success, out, err = await run_git_command(
|
||||
f"checkout -B {branch} origin/{branch}", temp_path
|
||||
)
|
||||
if not success:
|
||||
# 回退方案
|
||||
success2, out2, err2 = await run_git_command(
|
||||
f"checkout {branch}", temp_path
|
||||
)
|
||||
if not success2:
|
||||
raise RuntimeError(f"checkout 失败: {(err or out) or (err2 or out2)}")
|
||||
|
||||
# 使用 --no-cone 模式,直接指定要检出的具体路径
|
||||
# 例如:sparse_path="plugins/mahiro" -> 只检出 plugins/mahiro/ 下的内容
|
||||
success, out, err = await run_git_command(
|
||||
f"sparse-checkout set {sparse_path}/", target_dir
|
||||
)
|
||||
if not success:
|
||||
raise RuntimeError(f"配置稀疏路径失败: {err or out}")
|
||||
# 强制对齐工作区
|
||||
await run_git_command(f"reset --hard origin/{branch}", temp_path)
|
||||
await run_git_command("clean -xdf", temp_path)
|
||||
|
||||
# 强制拉取并同步到远端
|
||||
success, out, err = await run_git_command(
|
||||
f"fetch --force --depth 1 origin {branch}", target_dir
|
||||
)
|
||||
if not success:
|
||||
raise RuntimeError(f"fetch 失败: {err or out}")
|
||||
# 将检出的文件移动到目标位置
|
||||
source_path = temp_path / sparse_path
|
||||
if source_path.exists():
|
||||
# 确保目标路径存在
|
||||
target_path = target_dir / sparse_path
|
||||
target_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# 使用远端强制更新本地分支并覆盖工作区
|
||||
success, out, err = await run_git_command(
|
||||
f"checkout -B {branch} origin/{branch}", target_dir
|
||||
)
|
||||
if not success:
|
||||
# 回退方案
|
||||
success2, out2, err2 = await run_git_command(f"checkout {branch}", target_dir)
|
||||
if not success2:
|
||||
raise RuntimeError(f"checkout 失败: {(err or out) or (err2 or out2)}")
|
||||
# 如果目标路径已存在,先清理
|
||||
if target_path.exists():
|
||||
if target_path.is_dir():
|
||||
shutil.rmtree(target_path)
|
||||
else:
|
||||
target_path.unlink()
|
||||
|
||||
# 强制对齐工作区
|
||||
await run_git_command(f"reset --hard origin/{branch}", target_dir)
|
||||
await run_git_command("clean -xdf", target_dir)
|
||||
|
||||
dir_path = target_dir / Path(sparse_path)
|
||||
for f in dir_path.iterdir():
|
||||
shutil.move(f, target_dir / f.name)
|
||||
dir_name = sparse_path.split("/")[0]
|
||||
rm_path = target_dir / dir_name
|
||||
if rm_path.exists():
|
||||
shutil.rmtree(rm_path)
|
||||
# 移动整个目录结构到目标位置
|
||||
shutil.move(str(source_path), str(target_path))
|
||||
|
||||
|
||||
def prepare_aliyun_url(repo_url: str) -> str:
|
||||
|
||||
Loading…
Reference in New Issue
Block a user