zhenxun_bot/zhenxun/utils/repo_utils/utils.py
molanp ee699fb345
fix(plugin_store): 修复插件商店的安装与卸载逻辑 (#2050)
* fix(plugin_store): 修复插件商店的安装与卸载逻辑

- 优化了插件安装、更新和移除的逻辑
- 调整了插件路径的处理方式,支持更灵活的安装位置
- 重构了 `install_plugin_with_repo` 方法,使用 `StorePluginInfo` 对象作为参数
- 修复了一些潜在的路径问题和模块命名问题

* refactor(zhenxun): 优化插件信息获取逻辑

- 将 PluginInfo.get_or_none 替换为 get_plugin 方法,简化插件信息获取逻辑
- 优化了插件移除操作中的插件信息获取流程

* refactor(zhenxun): 优化 sparse_checkout_clone 函数的实现

- 将 git 操作移至临时目录中执行,避免影响目标目录中的现有内容
- 简化了稀疏检出的配置和执行过程
- 改进了错误处理和回退逻辑
- 优化了文件移动和目录清理的操作

* 🐛 添加移除插件时二次查询

*  plugin_info.get_plugin参数包含plugin_type时无效过滤

---------

Co-authored-by: HibiKier <45528451+HibiKier@users.noreply.github.com>
2025-09-12 17:38:24 +08:00

261 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
仓库管理工具的工具函数
"""
import asyncio
import base64
from pathlib import Path
import re
import shutil
import tempfile
from zhenxun.services.log import logger
from .config import LOG_COMMAND, RepoConfig
from .exceptions import GitUnavailableError
async def check_git() -> bool:
"""
检查环境变量中是否存在 git
返回:
bool: 是否存在git命令
"""
try:
process = await asyncio.create_subprocess_shell(
"git --version",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await process.communicate()
return bool(stdout)
except Exception as e:
logger.error("检查git命令失败", LOG_COMMAND, e=e)
return False
async def clean_git(cwd: Path):
"""
清理git仓库
参数:
cwd: 工作目录
"""
await run_git_command("reset --hard", cwd)
await run_git_command("clean -xdf", cwd)
async def run_git_command(
command: str, cwd: Path | None = None
) -> tuple[bool, str, str]:
"""
运行git命令
参数:
command: 命令
cwd: 工作目录
返回:
tuple[bool, str, str]: (是否成功, 标准输出, 标准错误)
"""
try:
full_command = f"git {command}"
# 将Path对象转换为字符串
cwd_str = str(cwd) if cwd else None
process = await asyncio.create_subprocess_shell(
full_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd_str,
)
stdout_bytes, stderr_bytes = await process.communicate()
stdout = stdout_bytes.decode("utf-8").strip()
stderr = stderr_bytes.decode("utf-8").strip()
return process.returncode == 0, stdout, stderr
except Exception as e:
logger.error(f"运行git命令失败: {command}, 错误: {e}")
return False, "", str(e)
def glob_to_regex(pattern: str) -> str:
"""
将glob模式转换为正则表达式
参数:
pattern: glob模式"*.py"
返回:
str: 正则表达式
"""
# 转义特殊字符
regex = re.escape(pattern)
# 替换glob通配符
regex = regex.replace(r"\*\*", ".*") # ** -> .*
regex = regex.replace(r"\*", "[^/]*") # * -> [^/]*
regex = regex.replace(r"\?", "[^/]") # ? -> [^/]
# 添加开始和结束标记
regex = f"^{regex}$"
return regex
def filter_files(
files: list[str],
include_patterns: list[str] | None = None,
exclude_patterns: list[str] | None = None,
) -> list[str]:
"""
过滤文件列表
参数:
files: 文件列表
include_patterns: 包含的文件模式列表,如 ["*.py", "docs/*.md"]
exclude_patterns: 排除的文件模式列表,如 ["__pycache__/*", "*.pyc"]
返回:
list[str]: 过滤后的文件列表
"""
result = files.copy()
# 应用包含模式
if include_patterns:
included = []
for pattern in include_patterns:
regex_pattern = glob_to_regex(pattern)
included.extend(file for file in result if re.match(regex_pattern, file))
result = included
# 应用排除模式
if exclude_patterns:
for pattern in exclude_patterns:
regex_pattern = glob_to_regex(pattern)
result = [file for file in result if not re.match(regex_pattern, file)]
return result
async def sparse_checkout_clone(
repo_url: str,
branch: str,
sparse_path: str,
target_dir: Path,
) -> None:
"""
使用 git 稀疏检出克隆指定路径到目标目录(在临时目录中操作)。
关键保障:
- 在临时目录中执行所有 git 操作,避免影响 target_dir 中的现有内容
- 只操作 target_dir/sparse_path 路径,不影响 target_dir 其他内容
"""
target_dir.mkdir(parents=True, exist_ok=True)
if not await check_git():
raise GitUnavailableError()
# 在临时目录中进行 git 操作
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# 初始化临时目录为 git 仓库
success, out, err = await run_git_command("init", temp_path)
if not success:
raise RuntimeError(f"git init 失败: {err or out}")
success, out, err = await run_git_command(
f"remote add origin {repo_url}", temp_path
)
if not success:
raise RuntimeError(f"添加远程失败: {err or out}")
# 启用稀疏检出(使用 --no-cone 模式以获得更精确的控制)
await run_git_command("config core.sparseCheckout true", temp_path)
await run_git_command("sparse-checkout init --no-cone", temp_path)
# 设置需要检出的路径(每次都覆盖配置)
if not sparse_path:
raise RuntimeError("sparse-checkout 路径不能为空")
# 使用 --no-cone 模式,直接指定要检出的具体路径
success, out, err = await run_git_command(
f"sparse-checkout set {sparse_path}/", temp_path
)
if not success:
raise RuntimeError(f"配置稀疏路径失败: {err or out}")
# 强制拉取并同步到远端
success, out, err = await run_git_command(
f"fetch --force --depth 1 origin {branch}", temp_path
)
if not success:
raise RuntimeError(f"fetch 失败: {err or out}")
# 使用远端强制更新本地分支并覆盖工作区
success, out, err = await run_git_command(
f"checkout -B {branch} origin/{branch}", temp_path
)
if not success:
# 回退方案
success2, out2, err2 = await run_git_command(
f"checkout {branch}", temp_path
)
if not success2:
raise RuntimeError(f"checkout 失败: {(err or out) or (err2 or out2)}")
# 强制对齐工作区
await run_git_command(f"reset --hard origin/{branch}", temp_path)
await run_git_command("clean -xdf", temp_path)
# 将检出的文件移动到目标位置
source_path = temp_path / sparse_path
if source_path.exists():
# 确保目标路径存在
target_path = target_dir / sparse_path
target_path.parent.mkdir(parents=True, exist_ok=True)
# 如果目标路径已存在,先清理
if target_path.exists():
if target_path.is_dir():
shutil.rmtree(target_path)
else:
target_path.unlink()
# 移动整个目录结构到目标位置
shutil.move(str(source_path), str(target_path))
def prepare_aliyun_url(repo_url: str) -> str:
"""解析阿里云CodeUp的仓库URL
参数:
repo_url: 仓库URL
返回:
str: 解析后的仓库URL
"""
config = RepoConfig.get_instance()
repo_name = repo_url.split("/tree/")[0].split("/")[-1].replace(".git", "")
# 构建仓库URL
# 阿里云CodeUp的仓库URL格式通常为
# https://codeup.aliyun.com/{organization_id}/{organization_name}/{repo_name}.git
url = f"https://codeup.aliyun.com/{config.aliyun_codeup.organization_id}/{config.aliyun_codeup.organization_name}/{repo_name}.git"
# 添加访问令牌 - 使用base64解码后的令牌
if config.aliyun_codeup.rdc_access_token_encrypted:
try:
# 解码RDC访问令牌
token = base64.b64decode(
config.aliyun_codeup.rdc_access_token_encrypted.encode()
).decode()
# 阿里云CodeUp使用oauth2:token的格式进行身份验证
url = url.replace("https://", f"https://oauth2:{token}@")
logger.debug(f"使用RDC令牌构建阿里云URL: {url.split('@')[0]}@***")
except Exception as e:
logger.error(f"解码RDC令牌失败: {e}")
return url