增强插件商店功能,支持在下载文件时指定稀疏检出路径和目标目录。优化二进制文件处理逻辑,提升文件下载的准确性和效率。

This commit is contained in:
HibiKier 2025-08-26 18:01:32 +08:00
parent a89d87a0b4
commit f3ff5a3404
4 changed files with 191 additions and 23 deletions

View File

@ -268,6 +268,7 @@ class StoreManager:
elif source == "git":
repo_type = RepoType.GITHUB
replace_module_path = module_path.replace(".", "/")
plugin_name = module_path.split(".")[-1]
if is_dir:
files = await RepoFileManager.list_directory_files(
github_url, replace_module_path, repo_type=repo_type
@ -278,7 +279,11 @@ class StoreManager:
files = [file for file in files if not file.is_dir]
download_files = [(file.path, local_path / file.path) for file in files]
await RepoFileManager.download_files(
github_url, download_files, repo_type=repo_type
github_url,
download_files,
repo_type=repo_type,
sparse_path=replace_module_path,
target_dir=local_path / plugin_name,
)
requirement_paths = [

View File

@ -12,10 +12,12 @@ from zhenxun.services.log import logger
from zhenxun.utils.github_utils import GithubUtils
from zhenxun.utils.github_utils.models import AliyunTreeType, GitHubStrategy, TreeType
from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.utils.utils import is_binary_file
from .config import LOG_COMMAND, RepoConfig
from .exceptions import FileNotFoundError, NetworkError, RepoManagerError
from .models import FileDownloadResult, RepoFileInfo, RepoType
from .utils import sparse_checkout_clone
class RepoFileManager:
@ -466,6 +468,8 @@ class RepoFileManager:
branch: str = "main",
repo_type: RepoType | None = None,
ignore_error: bool = False,
sparse_path: str | None = None,
target_dir: Path | None = None,
) -> FileDownloadResult:
"""
下载单个文件
@ -476,10 +480,19 @@ class RepoFileManager:
branch: 分支名称
repo_type: 仓库类型如果为None则自动判断
ignore_error: 是否忽略错误
sparse_path: 稀疏检出路径
target_dir: 稀疏目标目录
返回:
FileDownloadResult: 下载结果
"""
# 参数一致性校验sparse_path 与 target_dir 必须同时有值或同时为 None
if (sparse_path is None) ^ (target_dir is None):
raise RepoManagerError(
"参数错误: sparse_path 与 target_dir 必须同时提供或同时为 None"
)
# 确定仓库类型和所有者
repo_name = (
repo_url.split("/tree/")[0].split("/")[-1].replace(".git", "").strip()
@ -497,12 +510,43 @@ class RepoFileManager:
file_path=file_path,
version=branch,
)
if (
any(is_binary_file(file_name) for file_name in file_path_mapping)
and repo_type == RepoType.ALIYUN
and sparse_path
and target_dir
):
return await self._handle_binary_with_sparse_checkout(
repo_url=repo_url,
branch=branch,
sparse_path=sparse_path,
target_dir=target_dir,
result=result,
)
else:
# 不包含二进制时
return await self._download_and_write_files(
repo_url=repo_url,
file_paths=[f[0] for f in file_path],
file_path_mapping=file_path_mapping,
branch=branch,
repo_type=repo_type,
ignore_error=ignore_error,
result=result,
)
async def _download_and_write_files(
self,
repo_url: str,
file_paths: list[str],
file_path_mapping: dict[str, Path],
branch: str,
repo_type: RepoType | None,
ignore_error: bool,
result: FileDownloadResult,
) -> FileDownloadResult:
try:
# 由于我们传入的是列表,所以这里一定返回列表
file_paths = [f[0] for f in file_path]
if len(file_paths) == 1:
# 如果只有一个文件,可能返回单个元组
file_contents_result = await self.get_file_content(
repo_url, file_paths[0], branch, repo_type, ignore_error
)
@ -513,7 +557,6 @@ class RepoFileManager:
else:
file_contents = cast(list[tuple[str, str]], file_contents_result)
else:
# 多个文件一定返回列表
file_contents = cast(
list[tuple[str, str]],
await self.get_file_content(
@ -524,7 +567,6 @@ class RepoFileManager:
for repo_file_path, content in file_contents:
local_path = file_path_mapping[repo_file_path]
local_path.parent.mkdir(parents=True, exist_ok=True)
# 使用二进制模式写入文件,避免编码问题
if isinstance(content, str):
content_bytes = content.encode("utf-8")
else:
@ -533,7 +575,6 @@ class RepoFileManager:
async with aiofiles.open(local_path, "wb") as f:
await f.write(content_bytes)
result.success = True
# 计算文件大小
result.file_size = sum(
len(content.encode("utf-8") if isinstance(content, str) else content)
for _, content in file_contents
@ -545,3 +586,36 @@ class RepoFileManager:
result.success = False
result.error_message = str(e)
return result
async def _handle_binary_with_sparse_checkout(
self,
repo_url: str,
branch: str,
sparse_path: str,
target_dir: Path,
result: FileDownloadResult,
) -> FileDownloadResult:
try:
await sparse_checkout_clone(
repo_url=repo_url,
branch=branch,
sparse_path=sparse_path,
target_dir=target_dir,
)
total_size = 0
if target_dir.exists():
for f in target_dir.rglob("*"):
if f.is_file():
try:
total_size += f.stat().st_size
except Exception:
pass
result.success = True
result.file_size = total_size
logger.info(f"sparse-checkout 克隆成功: {target_dir}")
return result
except Exception as e:
logger.error(f"sparse-checkout 克隆失败: {e}")
result.success = False
result.error_message = str(e)
return result

View File

@ -133,3 +133,75 @@ def filter_files(
result = [file for file in result if not re.match(regex_pattern, file)]
return result
async def sparse_checkout_clone(
repo_url: str,
branch: str,
sparse_path: str,
target_dir: Path,
) -> None:
"""
使用 git 稀疏检出克隆指定路径到目标目录完全独立于主项目 git
关键保障:
- target_dir 下检测/初始化 .git所有 git 操作均以 cwd=target_dir 执行
- 强制拉取与工作区覆盖: fetch --forcecheckout -Breset --hardclean -xdf
- 反复设置 sparse-checkout 路径确保路径更新生效
"""
target_dir.mkdir(parents=True, exist_ok=True)
if not await check_git():
raise RuntimeError("未检测到可用的 git 命令")
git_dir = target_dir / ".git"
if not git_dir.exists():
success, out, err = await run_git_command("init", target_dir)
if not success:
raise RuntimeError(f"git init 失败: {err or out}")
success, out, err = await run_git_command(
f"remote add origin {repo_url}", target_dir
)
if not success:
raise RuntimeError(f"添加远程失败: {err or out}")
else:
success, out, err = await run_git_command(
f"remote set-url origin {repo_url}", target_dir
)
if not success:
# 兜底尝试添加
await run_git_command(f"remote add origin {repo_url}", target_dir)
# 启用稀疏检出(重复设置以确保幂等)
await run_git_command("config core.sparseCheckout true", target_dir)
await run_git_command("sparse-checkout init --cone", target_dir)
# 设置需要检出的路径(每次都覆盖配置)
if not sparse_path:
raise RuntimeError("sparse-checkout 路径不能为空")
success, out, err = await run_git_command(
f"sparse-checkout set {sparse_path}", target_dir
)
if not success:
raise RuntimeError(f"配置稀疏路径失败: {err or out}")
# 强制拉取并同步到远端
success, out, err = await run_git_command(
f"fetch --force --depth 1 origin {branch}", target_dir
)
if not success:
raise RuntimeError(f"fetch 失败: {err or out}")
# 使用远端强制更新本地分支并覆盖工作区
success, out, err = await run_git_command(
f"checkout -B {branch} origin/{branch}", target_dir
)
if not success:
# 回退方案
success2, out2, err2 = await run_git_command(f"checkout {branch}", target_dir)
if not success2:
raise RuntimeError(f"checkout 失败: {(err or out) or (err2 or out2)}")
# 强制对齐工作区
await run_git_command(f"reset --hard origin/{branch}", target_dir)
await run_git_command("clean -xdf", target_dir)

View File

@ -65,22 +65,39 @@ class ResourceDirManager:
def is_binary_file(file_path: str) -> bool:
"""判断是否为二进制文件"""
binary_extensions = {
".jpg",
".jpeg",
".png",
".gif",
".bmp",
".ico",
".pdf",
".zip",
".rar",
".7z",
".exe",
".dll",
}
return any(file_path.lower().endswith(ext) for ext in binary_extensions)
"""判断是否为二进制文件
参数:
file_path: 文件路径
返回:
bool: 是否为二进制文件
"""
# fmt: off
# 精简但包含图片和字体的二进制文件扩展名集合
BINARY_EXTENSIONS = frozenset({
# 图片文件
"jpg", "jpeg", "png", "gif", "bmp", "ico", "webp", "tiff", "tif", "svg",
# 字体文件
"ttf", "otf", "woff", "woff2", "eot",
# 压缩文件
"zip", "rar", "7z", "tar", "gz", "bz2", "xz",
# 可执行文件和库
"exe", "dll", "so", "dylib",
# 文档文件
"pdf", "doc", "docx", "xls", "xlsx", "ppt", "pptx",
# 多媒体文件
"mp3", "mp4", "avi", "mov", "wmv", "flv",
# 其他常见二进制文件
"bin", "dat", "db", "class", "pyc"
})
# 使用os.path.splitext高效提取扩展名
_, ext = os.path.splitext(file_path)
# 去除点号并转换为小写
ext_clean = ext.lstrip(".").lower()
return ext_clean in BINARY_EXTENSIONS
def cn2py(word: str) -> str: