feat(update): 移除资源管理器,重构更新逻辑,支持通过ZhenxunRepoManager进行资源和Web UI的更新

This commit is contained in:
HibiKier 2025-08-04 23:32:11 +08:00
parent a3142ad065
commit 70f363c0ce
14 changed files with 562 additions and 1927 deletions

View File

@ -1,89 +0,0 @@
SUPERUSERS=[""]
COMMAND_START=[""]
SESSION_RUNNING_EXPRESSION="别急呀,小真寻要宕机了!QAQ"
NICKNAME=["真寻", "小真寻", "绪山真寻", "小寻子"]
SESSION_EXPIRE_TIMEOUT=00:00:30
ALCONNA_USE_COMMAND_START=True
# 全局图片统一使用bytes发送当真寻与协议端不在同一服务器上时为True
IMAGE_TO_BYTES = True
# 回复消息时自称
SELF_NICKNAME="小真寻"
# 官bot appid:bot账号
QBOT_ID_DATA = '{
}'
# 数据库配置
# 示例: "postgres://user:password@127.0.0.1:5432/database"
# 示例: "mysql://user:password@127.0.0.1:3306/database"
# 示例: "sqlite:data/db/zhenxun.db" 在data目录下建立db文件夹
DB_URL = ""
# NONE: 不使用缓存, MEMORY: 使用内存缓存, REDIS: 使用Redis缓存
CACHE_MODE = NONE
# REDIS配置使用REDIS替换Cache内存缓存
# REDIS地址
# REDIS_HOST = "127.0.0.1"
# REDIS端口
# REDIS_PORT = 6379
# REDIS密码
# REDIS_PASSWORD = ""
# REDIS过期时间
# REDIS_EXPIRE = 600
# 系统代理
# SYSTEM_PROXY = "http://127.0.0.1:7890"
PLATFORM_SUPERUSERS = '
{
"qq": [""],
"dodo": [""]
}
'
DRIVER=~fastapi+~httpx+~websockets
# LOG_LEVEL = DEBUG
# 服务器和端口
HOST = 127.0.0.1
PORT = 8080
# kook adapter toekn
# kaiheila_bots =[{"token": ""}]
# # discode adapter
# DISCORD_BOTS='
# [
# {
# "token": "",
# "intent": {
# "guild_messages": true,
# "direct_messages": true
# },
# "application_commands": {"*": ["*"]}
# }
# ]
# '
# DISCORD_PROXY=''
# # dodo adapter
# DODO_BOTS='
# [
# {
# "client_id": "",
# "token": ""
# }
# ]
# '
# application_commands的{"*": ["*"]}代表将全部应用命令注册为全局应用命令
# {"admin": ["123", "456"]}则代表将admin命令注册为id是123、456服务器的局部命令其余命令不注册

View File

@ -17,7 +17,7 @@ from zhenxun.models.user_console import UserConsole
from zhenxun.services.log import logger from zhenxun.services.log import logger
from zhenxun.utils.decorator.shop import shop_register from zhenxun.utils.decorator.shop import shop_register
from zhenxun.utils.manager.priority_manager import PriorityLifecycle from zhenxun.utils.manager.priority_manager import PriorityLifecycle
from zhenxun.utils.manager.resource_manager import ResourceManager from zhenxun.utils.manager.zhenxun_repo_manager import ZhenxunRepoManager
from zhenxun.utils.platform import PlatformUtils from zhenxun.utils.platform import PlatformUtils
driver: Driver = nonebot.get_driver() driver: Driver = nonebot.get_driver()
@ -85,7 +85,8 @@ from bag_users t1
@PriorityLifecycle.on_startup(priority=5) @PriorityLifecycle.on_startup(priority=5)
async def _(): async def _():
await ResourceManager.init_resources() if not ZhenxunRepoManager.check_resources_exists():
await ZhenxunRepoManager.resources_update(branch="test")
"""签到与用户的数据迁移""" """签到与用户的数据迁移"""
if goods_list := await GoodsInfo.filter(uuid__isnull=True).all(): if goods_list := await GoodsInfo.filter(uuid__isnull=True).all():
for goods in goods_list: for goods in goods_list:

View File

@ -16,10 +16,6 @@ from nonebot_plugin_uninfo import Uninfo
from zhenxun.configs.utils import PluginExtraData from zhenxun.configs.utils import PluginExtraData
from zhenxun.services.log import logger from zhenxun.services.log import logger
from zhenxun.utils.enum import PluginType from zhenxun.utils.enum import PluginType
from zhenxun.utils.manager.resource_manager import (
DownloadResourceException,
ResourceManager,
)
from zhenxun.utils.message import MessageUtils from zhenxun.utils.message import MessageUtils
from ._data_source import UpdateManager from ._data_source import UpdateManager
@ -68,7 +64,6 @@ _matcher = on_alconna(
Option("-f|--force", action=store_true, help_text="强制更新"), Option("-f|--force", action=store_true, help_text="强制更新"),
Option("-s", Args["source?", ["git", "ali"]], help_text="更新源"), Option("-s", Args["source?", ["git", "ali"]], help_text="更新源"),
Option("-z|--zip", action=store_true, help_text="下载zip文件"), Option("-z|--zip", action=store_true, help_text="下载zip文件"),
Option("-t", Args["update_type?", ["git", "download"]], help_text="更新方式"),
), ),
priority=1, priority=1,
block=True, block=True,
@ -86,39 +81,52 @@ async def _(
force: Query[bool] = Query("force", False), force: Query[bool] = Query("force", False),
source: Query[str] = Query("source", "ali"), source: Query[str] = Query("source", "ali"),
zip: Query[bool] = Query("zip", False), zip: Query[bool] = Query("zip", False),
update_type: Query[str] = Query("update_type", "git"),
): ):
result = "" result = ""
await MessageUtils.build_message("正在进行检查更新...").send(reply_to=True) await MessageUtils.build_message("正在进行检查更新...").send(reply_to=True)
if ver_type.result in {"main", "release"}: ver_type_str = ver_type.result
source_str = source.result
if ver_type_str in {"main", "release"}:
if not ver_type.available: if not ver_type.available:
result = await UpdateManager.check_version() result += await UpdateManager.check_version()
logger.info("查看当前版本...", "检查更新", session=session) logger.info("查看当前版本...", "检查更新", session=session)
await MessageUtils.build_message(result).finish() await MessageUtils.build_message(result).finish()
try: try:
result = await UpdateManager.update( result += await UpdateManager.update_zhenxun(
bot, bot,
session.user.id, session.user.id,
ver_type.result, ver_type_str, # type: ignore
force.result, force.result,
source.result, source_str, # type: ignore
zip.result, zip.result,
update_type.result,
) )
await MessageUtils.build_message(result).finish(reply_to=True)
except Exception as e: except Exception as e:
logger.error("版本更新失败...", "检查更新", session=session, e=e) logger.error("版本更新失败...", "检查更新", session=session, e=e)
await MessageUtils.build_message(f"更新版本失败...e: {e}").finish() await MessageUtils.build_message(f"更新版本失败...e: {e}").finish()
elif ver_type.result == "webui": elif ver_type.result == "webui":
result = await UpdateManager.update_webui(zip.result, source.result) if zip.result:
source_str = None
try:
result += await UpdateManager.update_webui(
source_str, # type: ignore
"dist",
)
except Exception as e:
logger.error("WebUI更新失败...", "检查更新", session=session, e=e)
result += "\nWebUI更新错误..."
if resource.result or ver_type.result == "resource": if resource.result or ver_type.result == "resource":
try: try:
await ResourceManager.init_resources(True, zip.result, source.result) if zip.result:
result += "\n资源文件更新成功!" source_str = None
except DownloadResourceException: result += await UpdateManager.update_resources(
result += "\n资源更新下载失败..." source_str, # type: ignore
"main",
force.result,
)
except Exception as e: except Exception as e:
logger.error("资源更新下载失败...", "检查更新", session=session, e=e) logger.error("资源更新下载失败...", "检查更新", session=session, e=e)
result += "\n资源更新未知错误..." result += "\n资源更新错误..."
if result: if result:
await MessageUtils.build_message(result.strip()).finish() await MessageUtils.build_message(result.strip()).finish()
await MessageUtils.build_message("更新版本失败...").finish() await MessageUtils.build_message("更新版本失败...").finish()

View File

@ -1,167 +1,16 @@
import os from typing import Literal
from pathlib import Path
import shutil
import tarfile
import zipfile
from nonebot.adapters import Bot from nonebot.adapters import Bot
from nonebot.utils import run_sync
from zhenxun.configs.path_config import DATA_PATH
from zhenxun.services.log import logger from zhenxun.services.log import logger
from zhenxun.utils.github_utils import GithubUtils
from zhenxun.utils.github_utils.models import RepoInfo
from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.utils.manager.virtual_env_package_manager import VirtualEnvPackageManager from zhenxun.utils.manager.virtual_env_package_manager import VirtualEnvPackageManager
from zhenxun.utils.manager.zhenxun_repo_manager import ZhenxunRepoManager
from zhenxun.utils.platform import PlatformUtils from zhenxun.utils.platform import PlatformUtils
from zhenxun.utils.repo_utils import AliyunRepoManager, GithubRepoManager
from .config import ( from .config import LOG_COMMAND, REQUIREMENTS_FILE, VERSION_FILE
BACKUP_PATH,
BASE_PATH,
BASE_PATH_STRING,
COMMAND,
DEFAULT_GITHUB_URL,
DOWNLOAD_GZ_FILE,
DOWNLOAD_ZIP_FILE,
GIT_GITHUB_URL,
GIT_WEBUI_UI_URL,
PYPROJECT_FILE,
PYPROJECT_FILE_STRING,
PYPROJECT_LOCK_FILE,
PYPROJECT_LOCK_FILE_STRING,
RELEASE_URL,
REPLACE_FOLDERS,
REQ_TXT_FILE,
REQ_TXT_FILE_STRING,
TMP_PATH,
VERSION_FILE,
)
@run_sync
def _file_handle(latest_version: str | None):
"""文件移动操作
参数:
latest_version: 版本号
"""
BACKUP_PATH.mkdir(exist_ok=True, parents=True)
logger.debug("开始解压文件压缩包...", COMMAND)
download_file = DOWNLOAD_GZ_FILE
if DOWNLOAD_GZ_FILE.exists():
tf = tarfile.open(DOWNLOAD_GZ_FILE)
else:
download_file = DOWNLOAD_ZIP_FILE
tf = zipfile.ZipFile(DOWNLOAD_ZIP_FILE)
tf.extractall(TMP_PATH)
logger.debug("解压文件压缩包完成...", COMMAND)
download_file_path = TMP_PATH / next(
x for x in os.listdir(TMP_PATH) if (TMP_PATH / x).is_dir()
)
_pyproject = download_file_path / PYPROJECT_FILE_STRING
_lock_file = download_file_path / PYPROJECT_LOCK_FILE_STRING
_req_file = download_file_path / REQ_TXT_FILE_STRING
extract_path = download_file_path / BASE_PATH_STRING
target_path = BASE_PATH
if PYPROJECT_FILE.exists():
logger.debug(f"移除备份文件: {PYPROJECT_FILE}", COMMAND)
shutil.move(PYPROJECT_FILE, BACKUP_PATH / PYPROJECT_FILE_STRING)
if PYPROJECT_LOCK_FILE.exists():
logger.debug(f"移除备份文件: {PYPROJECT_LOCK_FILE}", COMMAND)
shutil.move(PYPROJECT_LOCK_FILE, BACKUP_PATH / PYPROJECT_LOCK_FILE_STRING)
if REQ_TXT_FILE.exists():
logger.debug(f"移除备份文件: {REQ_TXT_FILE}", COMMAND)
shutil.move(REQ_TXT_FILE, BACKUP_PATH / REQ_TXT_FILE_STRING)
if _pyproject.exists():
logger.debug("移动文件: pyproject.toml", COMMAND)
shutil.move(_pyproject, PYPROJECT_FILE)
if _lock_file.exists():
logger.debug("移动文件: poetry.lock", COMMAND)
shutil.move(_lock_file, PYPROJECT_LOCK_FILE)
if _req_file.exists():
logger.debug("移动文件: requirements.txt", COMMAND)
shutil.move(_req_file, REQ_TXT_FILE)
for folder in REPLACE_FOLDERS:
"""移动指定文件夹"""
_dir = BASE_PATH / folder
_backup_dir = BACKUP_PATH / folder
if _backup_dir.exists():
logger.debug(f"删除备份文件夹 {_backup_dir}", COMMAND)
shutil.rmtree(_backup_dir)
if _dir.exists():
logger.debug(f"移动旧文件夹 {_dir}", COMMAND)
shutil.move(_dir, _backup_dir)
else:
logger.warning(f"文件夹 {_dir} 不存在,跳过删除", COMMAND)
for folder in REPLACE_FOLDERS:
src_folder_path = extract_path / folder
dest_folder_path = target_path / folder
if src_folder_path.exists():
logger.debug(
f"移动文件夹: {src_folder_path} -> {dest_folder_path}", COMMAND
)
shutil.move(src_folder_path, dest_folder_path)
else:
logger.debug(f"源文件夹不存在: {src_folder_path}", COMMAND)
if tf:
tf.close()
if download_file.exists():
logger.debug(f"删除下载文件: {download_file}", COMMAND)
download_file.unlink()
if extract_path.exists():
logger.debug(f"删除解压文件夹: {extract_path}", COMMAND)
shutil.rmtree(extract_path)
if TMP_PATH.exists():
shutil.rmtree(TMP_PATH)
if latest_version:
with open(VERSION_FILE, "w", encoding="utf8") as f:
f.write(f"__version__: {latest_version}")
class UpdateManager: class UpdateManager:
@classmethod
async def update_webui(cls, is_zip: bool, source: str) -> str:
from zhenxun.builtin_plugins.web_ui.public.data_source import (
update_webui_assets,
)
WEBUI_PATH = DATA_PATH / "web_ui" / "public"
BACKUP_PATH = DATA_PATH / "web_ui" / "backup_public"
GIT_WEBUI_PATH = DATA_PATH / "web_ui" / "git_web_ui"
if WEBUI_PATH.exists():
if BACKUP_PATH.exists():
logger.debug(f"删除旧的备份webui文件夹 {BACKUP_PATH}", COMMAND)
shutil.rmtree(BACKUP_PATH)
WEBUI_PATH.rename(BACKUP_PATH)
try:
if is_zip:
await update_webui_assets()
logger.info("更新webui成功...", COMMAND)
else:
if source == "ali":
result = await AliyunRepoManager.update(
GIT_WEBUI_UI_URL, GIT_WEBUI_PATH, "dist", force=True
)
else:
result = await GithubRepoManager.update(
GIT_WEBUI_UI_URL, GIT_WEBUI_PATH, "dist", force=True
)
if not result.success:
return f"Webui更新失败...错误: {result.error_message}"
shutil.rmtree(WEBUI_PATH, ignore_errors=True)
shutil.copytree(GIT_WEBUI_PATH / "dist", WEBUI_PATH)
if BACKUP_PATH.exists():
logger.debug(f"删除旧的webui文件夹 {BACKUP_PATH}", COMMAND)
shutil.rmtree(BACKUP_PATH)
return "Webui更新成功"
except Exception as e:
logger.error("更新webui失败...", COMMAND, e=e)
if BACKUP_PATH.exists():
logger.debug(f"恢复旧的webui文件夹 {BACKUP_PATH}", COMMAND)
BACKUP_PATH.rename(WEBUI_PATH)
raise e
@classmethod @classmethod
async def check_version(cls) -> str: async def check_version(cls) -> str:
"""检查更新版本 """检查更新版本
@ -170,71 +19,88 @@ class UpdateManager:
str: 更新信息 str: 更新信息
""" """
cur_version = cls.__get_version() cur_version = cls.__get_version()
data = await cls.__get_latest_data() release_data = await ZhenxunRepoManager.zhenxun_get_latest_releases_data()
if not data: if not release_data:
return "检查更新获取版本失败..." return "检查更新获取版本失败..."
return ( return (
"检测到当前版本更新\n" "检测到当前版本更新\n"
f"当前版本:{cur_version}\n" f"当前版本:{cur_version}\n"
f"最新版本:{data.get('name')}\n" f"最新版本:{release_data.get('name')}\n"
f"创建日期:{data.get('created_at')}\n" f"创建日期:{release_data.get('created_at')}\n"
f"更新内容:\n{data.get('body')}" f"更新内容:\n{release_data.get('body')}"
) )
@classmethod @classmethod
async def __zip_update(cls, version_type: str): async def update_webui(
logger.info("开始下载真寻最新版文件....", COMMAND) cls,
cur_version = cls.__get_version() source: Literal["git", "ali"] | None,
url = None branch: str = "main",
new_version = None force: bool = False,
repo_info = GithubUtils.parse_github_url(DEFAULT_GITHUB_URL) ):
if version_type in {"main"}: """更新WebUI
repo_info.branch = version_type
new_version = await cls.__get_version_from_repo(repo_info) 参数:
if new_version: source: 更新源
new_version = new_version.split(":")[-1].strip() branch: 分支
url = await repo_info.get_archive_download_urls() force: 是否强制更新
elif version_type == "release":
data = await cls.__get_latest_data() 返回:
if not data: str: 返回消息
return "获取更新版本失败..." """
new_version = data.get("name", "") if not source:
url = await repo_info.get_release_source_download_urls_tgz(new_version) await ZhenxunRepoManager.webui_zip_update()
if not url: return "WebUI更新完成!"
return "获取版本下载链接失败..." result = await ZhenxunRepoManager.webui_git_update(
if TMP_PATH.exists(): source,
logger.debug(f"删除临时文件夹 {TMP_PATH}", COMMAND) branch=branch,
shutil.rmtree(TMP_PATH) force=force,
logger.debug(
f"开始更新版本:{cur_version} -> {new_version} | 下载链接:{url}",
COMMAND,
) )
download_file = ( if not result.success:
DOWNLOAD_GZ_FILE if version_type == "release" else DOWNLOAD_ZIP_FILE logger.error(f"WebUI更新失败...错误: {result.error_message}", LOG_COMMAND)
return f"WebUI更新失败...错误: {result.error_message}"
return "WebUI更新完成!"
@classmethod
async def update_resources(
cls,
source: Literal["git", "ali"] | None,
branch: str = "main",
force: bool = False,
) -> str:
"""更新资源
参数:
source: 更新源
branch: 分支
force: 是否强制更新
返回:
str: 返回消息
"""
if not source:
await ZhenxunRepoManager.resources_zip_update()
return "真寻资源更新完成!"
result = await ZhenxunRepoManager.resources_git_update(
source,
branch=branch,
force=force,
) )
if await AsyncHttpx.download_file(url, download_file, stream=True): if not result.success:
logger.debug("下载真寻最新版文件完成...", COMMAND) logger.error(
await _file_handle(new_version) f"真寻资源更新失败...错误: {result.error_message}", LOG_COMMAND
result = "版本更新完成"
return (
f"{result}\n"
f"版本: {cur_version} -> {new_version}\n"
"请重新启动真寻以完成更新!"
) )
else: return f"真寻资源更新失败...错误: {result.error_message}"
logger.debug("下载真寻最新版文件失败...", COMMAND) return "真寻资源更新完成!"
return ""
@classmethod @classmethod
async def update( async def update_zhenxun(
cls, cls,
bot: Bot, bot: Bot,
user_id: str, user_id: str,
version_type: str, version_type: Literal["main", "release"],
force: bool, force: bool,
source: str, source: Literal["git", "ali"],
zip: bool, zip: bool,
update_type: str,
) -> str: ) -> str:
"""更新操作 """更新操作
@ -257,33 +123,38 @@ class UpdateManager:
user_id, user_id,
) )
if zip: if zip:
return await cls.__zip_update(version_type) new_version = await ZhenxunRepoManager.zhenxun_zip_update(version_type)
elif source == "git": await PlatformUtils.send_superuser(
result = await GithubRepoManager.update( bot, "真寻更新完成,开始安装依赖...", user_id
GIT_GITHUB_URL, )
Path(), await VirtualEnvPackageManager.install_requirement(REQUIREMENTS_FILE)
use_git=update_type == "git", return (
force=force, f"版本更新完成!\n版本: {cur_version} -> {new_version}\n"
"请重新启动真寻以完成更新!"
) )
else: else:
result = await AliyunRepoManager.update( result = await ZhenxunRepoManager.zhenxun_git_update(
GIT_GITHUB_URL, source,
Path(), branch=version_type,
force=force, force=force,
) )
if not result.success: if not result.success:
return f"版本更新失败...错误: {result.error_message}" logger.error(
await PlatformUtils.send_superuser( f"真寻版本更新失败...错误: {result.error_message}",
bot, "真寻更新完成,开始安装依赖...", user_id LOG_COMMAND,
) )
await VirtualEnvPackageManager.install_requirement(REQ_TXT_FILE) return f"版本更新失败...错误: {result.error_message}"
return ( await PlatformUtils.send_superuser(
f"版本更新完成!\n" bot, "真寻更新完成,开始安装依赖...", user_id
f"版本: {cur_version} -> {result.new_version}\n" )
f"变更文件个数: {len(result.changed_files)}" await VirtualEnvPackageManager.install_requirement(REQUIREMENTS_FILE)
f"{'' if source == 'git' else '(阿里云更新不支持查看变更文件)'}\n" return (
"请重新启动真寻以完成更新!" f"版本更新完成!\n"
) f"版本: {cur_version} -> {result.new_version}\n"
f"变更文件个数: {len(result.changed_files)}"
f"{'' if source == 'git' else '(阿里云更新不支持查看变更文件)'}\n"
"请重新启动真寻以完成更新!"
)
@classmethod @classmethod
def __get_version(cls) -> str: def __get_version(cls) -> str:
@ -297,40 +168,3 @@ class UpdateManager:
if text := VERSION_FILE.open(encoding="utf8").readline(): if text := VERSION_FILE.open(encoding="utf8").readline():
_version = text.split(":")[-1].strip() _version = text.split(":")[-1].strip()
return _version return _version
@classmethod
async def __get_latest_data(cls) -> dict:
"""获取最新版本信息
返回:
dict: 最新版本数据
"""
for _ in range(3):
try:
res = await AsyncHttpx.get(RELEASE_URL)
if res.status_code == 200:
return res.json()
except TimeoutError:
pass
except Exception as e:
logger.error("检查更新真寻获取版本失败", e=e)
return {}
@classmethod
async def __get_version_from_repo(cls, repo_info: RepoInfo) -> str:
"""从指定分支获取版本号
参数:
branch: 分支名称
返回:
str: 版本号
"""
version_url = await repo_info.get_raw_download_urls(path="__version__")
try:
res = await AsyncHttpx.get(version_url)
if res.status_code == 200:
return res.text.strip()
except Exception as e:
logger.error(f"获取 {repo_info.branch} 分支版本失败", e=e)
return "未知版本"

View File

@ -1,42 +1,7 @@
from pathlib import Path from pathlib import Path
from zhenxun.configs.path_config import TEMP_PATH LOG_COMMAND = "AutoUpdate"
GIT_GITHUB_URL = "https://github.com/zhenxun-org/zhenxun_bot.git" VERSION_FILE = Path() / "__version__"
DEFAULT_GITHUB_URL = "https://github.com/HibiKier/zhenxun_bot/tree/main" REQUIREMENTS_FILE = Path() / "requirements.txt"
RELEASE_URL = "https://api.github.com/repos/HibiKier/zhenxun_bot/releases/latest"
GIT_WEBUI_UI_URL = "https://github.com/HibiKier/zhenxun_bot_webui.git"
VERSION_FILE_STRING = "__version__"
VERSION_FILE = Path() / VERSION_FILE_STRING
PYPROJECT_FILE_STRING = "pyproject.toml"
PYPROJECT_FILE = Path() / PYPROJECT_FILE_STRING
PYPROJECT_LOCK_FILE_STRING = "poetry.lock"
PYPROJECT_LOCK_FILE = Path() / PYPROJECT_LOCK_FILE_STRING
REQ_TXT_FILE_STRING = "requirements.txt"
REQ_TXT_FILE = Path() / REQ_TXT_FILE_STRING
BASE_PATH_STRING = "zhenxun"
BASE_PATH = Path() / BASE_PATH_STRING
TMP_PATH = TEMP_PATH / "auto_update"
BACKUP_PATH = Path() / "backup"
DOWNLOAD_GZ_FILE_STRING = "download_latest_file.tar.gz"
DOWNLOAD_ZIP_FILE_STRING = "download_latest_file.zip"
DOWNLOAD_GZ_FILE = TMP_PATH / DOWNLOAD_GZ_FILE_STRING
DOWNLOAD_ZIP_FILE = TMP_PATH / DOWNLOAD_ZIP_FILE_STRING
REPLACE_FOLDERS = [
"builtin_plugins",
"services",
"utils",
"models",
"configs",
]
COMMAND = "检查更新"

View File

@ -1,101 +0,0 @@
import os
from pathlib import Path
import shutil
import zipfile
from zhenxun.configs.path_config import FONT_PATH, TEMP_PATH
from zhenxun.services.log import logger
from zhenxun.utils.github_utils import GithubUtils
from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.utils.repo_utils import AliyunRepoManager, GithubRepoManager
from zhenxun.utils.repo_utils.utils import clean_git
LOG_COMMAND = "ResourceManager"
class DownloadResourceException(Exception):
pass
class ResourceManager:
GITHUB_URL = "https://github.com/zhenxun-org/zhenxun-bot-resources/tree/main"
RESOURCE_PATH = Path() / "resources"
TMP_PATH = TEMP_PATH / "_resource_tmp"
ZIP_FILE = TMP_PATH / "resources.zip"
UNZIP_PATH = None
@classmethod
async def init_resources(
cls, force: bool = False, is_zip: bool = False, git_source: str = "ali"
):
if (FONT_PATH.exists() and os.listdir(FONT_PATH)) and not force:
return
if is_zip:
if cls.TMP_PATH.exists():
logger.debug(
"resources临时文件夹已存在移除resources临时文件夹", LOG_COMMAND
)
await clean_git(cls.TMP_PATH)
shutil.rmtree(cls.TMP_PATH, ignore_errors=True)
cls.TMP_PATH.mkdir(parents=True, exist_ok=True)
try:
await cls.__download_resources()
cls.file_handle()
except Exception as e:
logger.error("获取resources资源包失败", LOG_COMMAND, e=e)
else:
if git_source == "ali":
await AliyunRepoManager.update(cls.GITHUB_URL, cls.RESOURCE_PATH)
else:
await GithubRepoManager.update(cls.GITHUB_URL, cls.RESOURCE_PATH)
cls.UNZIP_PATH = cls.TMP_PATH / "resources"
cls.file_handle()
if cls.TMP_PATH.exists():
logger.debug("移除resources临时文件夹", LOG_COMMAND)
await clean_git(cls.TMP_PATH)
shutil.rmtree(cls.TMP_PATH)
@classmethod
def file_handle(cls):
if not cls.UNZIP_PATH:
return
cls.__recursive_folder(cls.UNZIP_PATH, ".")
@classmethod
def __recursive_folder(cls, dir: Path, parent_path: str):
for file in dir.iterdir():
if file.is_dir():
cls.__recursive_folder(file, f"{parent_path}/{file.name}")
else:
res_file = Path(parent_path) / file.name
if res_file.exists():
res_file.unlink()
res_file.parent.mkdir(parents=True, exist_ok=True)
file.rename(res_file)
@classmethod
async def __download_resources(cls):
"""获取resources文件夹"""
repo_info = GithubUtils.parse_github_url(cls.GITHUB_URL)
url = await repo_info.get_archive_download_urls()
logger.debug("开始下载resources资源包...", LOG_COMMAND)
if not await AsyncHttpx.download_file(url, cls.ZIP_FILE, stream=True):
logger.error(
"下载resources资源包失败请尝试重启重新下载或前往 "
"https://github.com/zhenxun-org/zhenxun-bot-resources 手动下载..."
)
raise DownloadResourceException("下载resources资源包失败...")
logger.debug("下载resources资源文件压缩包完成...", LOG_COMMAND)
tf = zipfile.ZipFile(cls.ZIP_FILE)
tf.extractall(cls.TMP_PATH)
logger.debug("解压文件压缩包完成...", LOG_COMMAND)
download_file_path = cls.TMP_PATH / next(
x for x in os.listdir(cls.TMP_PATH) if (cls.TMP_PATH / x).is_dir()
)
cls.UNZIP_PATH = download_file_path / "resources"
if tf:
tf.close()

File diff suppressed because it is too large Load Diff

View File

@ -24,9 +24,6 @@ from .models import (
RepoFileInfo, RepoFileInfo,
RepoType, RepoType,
RepoUpdateResult, RepoUpdateResult,
SubmoduleConfig,
SubmoduleInfo,
SubmoduleUpdateResult,
) )
from .utils import check_git, filter_files, glob_to_regex, run_git_command from .utils import check_git, filter_files, glob_to_regex, run_git_command
@ -56,9 +53,6 @@ __all__ = [
"RepoType", "RepoType",
"RepoUpdateError", "RepoUpdateError",
"RepoUpdateResult", "RepoUpdateResult",
"SubmoduleConfig",
"SubmoduleInfo",
"SubmoduleUpdateResult",
"check_git", "check_git",
"filter_files", "filter_files",
"glob_to_regex", "glob_to_regex",

View File

@ -283,10 +283,11 @@ class BaseRepoManager(ABC):
return result return result
# 如果目录存在检查是否是Git仓库 # 如果目录存在检查是否是Git仓库
success, _, _ = await run_git_command( # 首先检查目录本身是否有.git文件夹
"rev-parse --is-inside-work-tree", cwd=local_path git_dir = local_path / ".git"
) is_git_repo = git_dir.exists() and git_dir.is_dir()
if not success:
if not is_git_repo:
# 如果不是Git仓库尝试初始化它 # 如果不是Git仓库尝试初始化它
logger.info(f"目录 {local_path} 不是Git仓库尝试初始化", LOG_COMMAND) logger.info(f"目录 {local_path} 不是Git仓库尝试初始化", LOG_COMMAND)
init_success, _, init_stderr = await run_git_command( init_success, _, init_stderr = await run_git_command(
@ -338,7 +339,7 @@ class BaseRepoManager(ABC):
) )
# 获取远程更新 # 获取远程更新
logger.info("获取远程更新", LOG_COMMAND) logger.info(f"获取远程更新: {repo_url}", LOG_COMMAND)
success, _, stderr = await run_git_command("fetch origin", cwd=local_path) success, _, stderr = await run_git_command("fetch origin", cwd=local_path)
if not success: if not success:
return RepoUpdateResult( return RepoUpdateResult(
@ -373,7 +374,7 @@ class BaseRepoManager(ABC):
) )
# 拉取最新代码 # 拉取最新代码
logger.info("拉取最新代码", LOG_COMMAND) logger.info(f"拉取最新代码: {repo_url}", LOG_COMMAND)
pull_cmd = f"pull origin {branch}" pull_cmd = f"pull origin {branch}"
if force: if force:
pull_cmd = f"fetch --all && git reset --hard origin/{branch}" pull_cmd = f"fetch --all && git reset --hard origin/{branch}"

View File

@ -29,11 +29,7 @@ from .models import (
RepoFileInfo, RepoFileInfo,
RepoType, RepoType,
RepoUpdateResult, RepoUpdateResult,
SubmoduleConfig,
SubmoduleInfo,
SubmoduleUpdateResult,
) )
from .submodule_manager import SubmoduleManager
class GithubManager(BaseRepoManager): class GithubManager(BaseRepoManager):
@ -47,7 +43,6 @@ class GithubManager(BaseRepoManager):
config: 配置如果为None则使用默认配置 config: 配置如果为None则使用默认配置
""" """
super().__init__(config) super().__init__(config)
self.submodule_manager = SubmoduleManager(self)
async def update_repo( async def update_repo(
self, self,
@ -529,158 +524,3 @@ class GithubManager(BaseRepoManager):
raise RepoDownloadError("下载文件失败") raise RepoDownloadError("下载文件失败")
raise RepoDownloadError("下载文件失败: 超过最大重试次数") raise RepoDownloadError("下载文件失败: 超过最大重试次数")
# 子模块相关方法
async def init_submodules(
self,
main_repo_path: Path,
submodule_configs: list[SubmoduleConfig],
) -> bool:
"""
初始化子模块
参数:
main_repo_path: 主仓库路径
submodule_configs: 子模块配置列表
返回:
bool: 是否成功
"""
return await self.submodule_manager.init_submodules(
main_repo_path, submodule_configs
)
async def update_submodules(
self,
main_repo_path: Path,
submodule_configs: list[SubmoduleConfig],
) -> list[SubmoduleUpdateResult]:
"""
更新子模块
参数:
main_repo_path: 主仓库路径
submodule_configs: 子模块配置列表
返回:
list[SubmoduleUpdateResult]: 更新结果列表
"""
return await self.submodule_manager.update_submodules(
main_repo_path, submodule_configs
)
async def get_submodule_info(
self,
main_repo_path: Path,
submodule_configs: list[SubmoduleConfig],
) -> list[SubmoduleInfo]:
"""
获取子模块信息
参数:
main_repo_path: 主仓库路径
submodule_configs: 子模块配置列表
返回:
list[SubmoduleInfo]: 子模块信息列表
"""
return await self.submodule_manager.get_submodule_info(
main_repo_path, submodule_configs
)
def save_submodule_configs(
self,
main_repo_path: Path,
submodule_configs: list[SubmoduleConfig],
) -> bool:
"""
保存子模块配置到文件
参数:
main_repo_path: 主仓库路径
submodule_configs: 子模块配置列表
返回:
bool: 是否成功
"""
return self.submodule_manager.save_submodule_configs(
main_repo_path, submodule_configs
)
async def load_submodule_configs(
self, main_repo_path: Path
) -> list[SubmoduleConfig]:
"""
从文件加载子模块配置
参数:
main_repo_path: 主仓库路径
返回:
list[SubmoduleConfig]: 子模块配置列表
"""
return await self.submodule_manager.load_submodule_configs(main_repo_path)
async def update_with_submodules(
self,
repo_url: str,
local_path: Path,
branch: str = "main",
submodule_configs: list[SubmoduleConfig] | None = None,
use_git: bool = True,
force: bool = False,
include_patterns: list[str] | None = None,
exclude_patterns: list[str] | None = None,
) -> RepoUpdateResult:
"""
更新仓库并处理子模块
参数:
repo_url: 仓库URL格式为 https://github.com/owner/repo
local_path: 本地保存路径
branch: 分支名称
submodule_configs: 子模块配置列表
use_git: 是否使用Git命令更新
force: 是否强制更新
include_patterns: 包含的文件模式列表
exclude_patterns: 排除的文件模式列表
返回:
RepoUpdateResult: 更新结果
"""
# 更新主仓库
result = await self.update(
repo_url,
local_path,
branch,
use_git,
force,
include_patterns,
exclude_patterns,
)
# 如果没有子模块配置,直接返回结果
if not submodule_configs:
return result
# 处理子模块
try:
submodule_results = await self.update_submodules(
local_path, submodule_configs
)
result.submodule_results = submodule_results
# 检查子模块更新是否成功
failed_submodules = [r for r in submodule_results if not r.success]
if failed_submodules:
logger.warning(
"部分子模块更新失败:"
f" {[r.submodule_name for r in failed_submodules]}",
LOG_COMMAND,
)
except Exception as e:
logger.error(f"处理子模块时发生错误: {e}", LOG_COMMAND)
result.error_message += f"; 子模块处理失败: {e}"
return result

View File

@ -15,62 +15,6 @@ class RepoType(str, Enum):
ALIYUN = "aliyun" ALIYUN = "aliyun"
@dataclass
class SubmoduleConfig:
"""子模块配置"""
# 子模块名称
name: str
# 子模块路径(相对于主仓库)
path: str
# 子模块仓库URL
repo_url: str
# 分支名称
branch: str = "main"
# 是否启用
enabled: bool = True
# 包含的文件模式列表
include_patterns: list[str] | None = None
# 排除的文件模式列表
exclude_patterns: list[str] | None = None
@dataclass
class SubmoduleInfo:
"""子模块信息"""
# 子模块配置
config: SubmoduleConfig
# 当前版本
current_version: str = ""
# 最新版本
latest_version: str = ""
# 最后更新时间
last_update: datetime | None = None
# 更新状态
update_status: str = "unknown" # unknown, up_to_date, outdated, error
@dataclass
class SubmoduleUpdateResult:
"""子模块更新结果"""
# 子模块名称
submodule_name: str
# 子模块路径
submodule_path: str
# 旧版本
old_version: str
# 新版本
new_version: str
# 是否成功
success: bool = False
# 错误消息
error_message: str = ""
# 变更的文件列表
changed_files: list[str] = field(default_factory=list)
@dataclass @dataclass
class RepoFileInfo: class RepoFileInfo:
"""仓库文件信息""" """仓库文件信息"""
@ -123,8 +67,6 @@ class RepoUpdateResult:
error_message: str = "" error_message: str = ""
# 变更的文件列表 # 变更的文件列表
changed_files: list[str] = field(default_factory=list) changed_files: list[str] = field(default_factory=list)
# 子模块更新结果
submodule_results: list[SubmoduleUpdateResult] = field(default_factory=list)
@dataclass @dataclass

View File

@ -1,408 +0,0 @@
"""
子模块管理工具
"""
import json
from pathlib import Path
from zhenxun.services.log import logger
from .config import LOG_COMMAND
from .github_manager import GithubManager
from .models import SubmoduleConfig, SubmoduleInfo, SubmoduleUpdateResult
from .utils import run_git_command
class SubmoduleManager:
"""子模块管理器"""
def __init__(self, github_manager: GithubManager):
"""
初始化子模块管理器
参数:
github_manager: GitHub管理器实例
"""
self.github_manager = github_manager
async def init_submodules(
self, main_repo_path: Path, submodule_configs: list[SubmoduleConfig]
) -> bool:
"""
初始化子模块
参数:
main_repo_path: 主仓库路径
submodule_configs: 子模块配置列表
返回:
bool: 是否成功
"""
try:
# 检查是否在Git仓库中
success, stdout, stderr = await run_git_command("status", main_repo_path)
if not success:
logger.error(f"路径 {main_repo_path} 不是有效的Git仓库", LOG_COMMAND)
return False
# 初始化每个子模块
for config in submodule_configs:
if not config.enabled:
continue
await self._init_single_submodule(main_repo_path, config)
# 更新子模块
await self._update_submodules(main_repo_path)
return True
except Exception as e:
logger.error(f"初始化子模块失败: {e}", LOG_COMMAND)
return False
async def _init_single_submodule(
self, main_repo_path: Path, config: SubmoduleConfig
) -> bool:
"""
初始化单个子模块
参数:
main_repo_path: 主仓库路径
config: 子模块配置
返回:
bool: 是否成功
"""
try:
submodule_path = main_repo_path / config.path
# 检查子模块是否已存在
if submodule_path.exists() and (submodule_path / ".git").exists():
logger.info(f"子模块 {config.name} 已存在,跳过初始化", LOG_COMMAND)
return True
# 添加子模块
success, stdout, stderr = await run_git_command(
f"submodule add -b {config.branch} {config.repo_url} {config.path}",
main_repo_path,
)
if not success:
logger.error(f"添加子模块 {config.name} 失败: {stderr}", LOG_COMMAND)
return False
logger.info(f"成功添加子模块 {config.name}", LOG_COMMAND)
return True
except Exception as e:
logger.error(f"初始化子模块 {config.name} 失败: {e}", LOG_COMMAND)
return False
async def _update_submodules(self, main_repo_path: Path) -> bool:
"""
更新所有子模块
参数:
main_repo_path: 主仓库路径
返回:
bool: 是否成功
"""
try:
# 更新子模块
success, stdout, stderr = await run_git_command(
"submodule update --init --recursive", main_repo_path
)
if not success:
logger.error(f"更新子模块失败: {stderr}", LOG_COMMAND)
return False
logger.info("成功更新所有子模块", LOG_COMMAND)
return True
except Exception as e:
logger.error(f"更新子模块失败: {e}", LOG_COMMAND)
return False
async def update_submodules(
self, main_repo_path: Path, submodule_configs: list[SubmoduleConfig]
) -> list[SubmoduleUpdateResult]:
"""
更新子模块
参数:
main_repo_path: 主仓库路径
submodule_configs: 子模块配置列表
返回:
List[SubmoduleUpdateResult]: 更新结果列表
"""
results = []
for config in submodule_configs:
if not config.enabled:
continue
result = await self._update_single_submodule(main_repo_path, config)
results.append(result)
return results
async def _update_single_submodule(
self, main_repo_path: Path, config: SubmoduleConfig
) -> SubmoduleUpdateResult:
"""
更新单个子模块
参数:
main_repo_path: 主仓库路径
config: 子模块配置
返回:
SubmoduleUpdateResult: 更新结果
"""
result = SubmoduleUpdateResult(
submodule_name=config.name,
submodule_path=config.path,
old_version="",
new_version="",
)
try:
submodule_path = main_repo_path / config.path
# 检查子模块是否存在
if not submodule_path.exists():
result.error_message = f"子模块路径不存在: {submodule_path}"
return result
# 获取当前版本
success, stdout, stderr = await run_git_command(
"rev-parse HEAD", submodule_path
)
if not success:
result.error_message = f"获取当前版本失败: {stderr}"
return result
old_version = stdout.strip()
result.old_version = old_version
# 获取远程最新版本
success, stdout, stderr = await run_git_command(
f"ls-remote origin {config.branch}", submodule_path
)
if not success:
result.error_message = f"获取远程版本失败: {stderr}"
return result
# 解析最新版本
lines = stdout.strip().split("\n")
if not lines or not lines[0]:
result.error_message = "无法获取远程版本信息"
return result
latest_version = lines[0].split("\t")[0]
result.new_version = latest_version
# 检查是否需要更新
if old_version == latest_version:
result.success = True
logger.info(f"子模块 {config.name} 已是最新版本", LOG_COMMAND)
return result
# 更新子模块
success, stdout, stderr = await run_git_command(
f"pull origin {config.branch}", submodule_path
)
if not success:
result.error_message = f"更新子模块失败: {stderr}"
return result
# 更新主仓库中的子模块引用
success, stdout, stderr = await run_git_command(
f"add {config.path}", main_repo_path
)
if not success:
result.error_message = f"更新主仓库引用失败: {stderr}"
return result
result.success = True
logger.info(
f"成功更新子模块 {config.name}: {old_version} -> {latest_version}",
LOG_COMMAND,
)
except Exception as e:
result.error_message = f"更新子模块时发生错误: {e}"
logger.error(f"更新子模块 {config.name} 失败: {e}", LOG_COMMAND)
return result
async def get_submodule_info(
self, main_repo_path: Path, submodule_configs: list[SubmoduleConfig]
) -> list[SubmoduleInfo]:
"""
获取子模块信息
参数:
main_repo_path: 主仓库路径
submodule_configs: 子模块配置列表
返回:
List[SubmoduleInfo]: 子模块信息列表
"""
submodule_infos = []
for config in submodule_configs:
if not config.enabled:
continue
info = await self._get_single_submodule_info(main_repo_path, config)
submodule_infos.append(info)
return submodule_infos
async def _get_single_submodule_info(
self, main_repo_path: Path, config: SubmoduleConfig
) -> SubmoduleInfo:
"""
获取单个子模块信息
参数:
main_repo_path: 主仓库路径
config: 子模块配置
返回:
SubmoduleInfo: 子模块信息
"""
info = SubmoduleInfo(config=config)
try:
submodule_path = main_repo_path / config.path
if not submodule_path.exists():
info.update_status = "error"
return info
# 获取当前版本
success, stdout, stderr = await run_git_command(
"rev-parse HEAD", submodule_path
)
if success:
info.current_version = stdout.strip()
# 获取远程最新版本
success, stdout, stderr = await run_git_command(
f"ls-remote origin {config.branch}", submodule_path
)
if success and stdout.strip():
lines = stdout.strip().split("\n")
if lines and lines[0]:
info.latest_version = lines[0].split("\t")[0]
# 确定更新状态
if info.current_version and info.latest_version:
if info.current_version == info.latest_version:
info.update_status = "up_to_date"
else:
info.update_status = "outdated"
else:
info.update_status = "unknown"
except Exception as e:
info.update_status = "error"
logger.error(f"获取子模块 {config.name} 信息失败: {e}", LOG_COMMAND)
return info
def save_submodule_configs(
self, main_repo_path: Path, submodule_configs: list[SubmoduleConfig]
) -> bool:
"""
保存子模块配置到文件
参数:
main_repo_path: 主仓库路径
submodule_configs: 子模块配置列表
返回:
bool: 是否成功
"""
try:
config_file = main_repo_path / ".submodules.json"
# 转换为字典格式
configs_dict = []
for config in submodule_configs:
config_dict = {
"name": config.name,
"path": config.path,
"repo_url": config.repo_url,
"branch": config.branch,
"enabled": config.enabled,
"include_patterns": config.include_patterns,
"exclude_patterns": config.exclude_patterns,
}
configs_dict.append(config_dict)
# 保存到文件
with open(config_file, "w", encoding="utf-8") as f:
json.dump(configs_dict, f, indent=2, ensure_ascii=False)
logger.info(f"子模块配置已保存到 {config_file}", LOG_COMMAND)
return True
except Exception as e:
logger.error(f"保存子模块配置失败: {e}", LOG_COMMAND)
return False
def load_submodule_configs(self, main_repo_path: Path) -> list[SubmoduleConfig]:
"""
从文件加载子模块配置
参数:
main_repo_path: 主仓库路径
返回:
List[SubmoduleConfig]: 子模块配置列表
"""
try:
config_file = main_repo_path / ".submodules.json"
if not config_file.exists():
logger.warning(f"子模块配置文件不存在: {config_file}", LOG_COMMAND)
return []
with open(config_file, encoding="utf-8") as f:
configs_dict = json.load(f)
# 转换为SubmoduleConfig对象
configs = []
for config_dict in configs_dict:
config = SubmoduleConfig(
name=config_dict["name"],
path=config_dict["path"],
repo_url=config_dict["repo_url"],
branch=config_dict.get("branch", "main"),
enabled=config_dict.get("enabled", True),
include_patterns=config_dict.get("include_patterns"),
exclude_patterns=config_dict.get("exclude_patterns"),
)
configs.append(config)
logger.info(
f"{config_file} 加载了 {len(configs)} 个子模块配置", LOG_COMMAND
)
return configs
except Exception as e:
logger.error(f"加载子模块配置失败: {e}", LOG_COMMAND)
return []

View File

@ -1,210 +0,0 @@
#!/usr/bin/env python3
"""
GitHub子模块快速设置脚本
"""
import asyncio
from pathlib import Path
import sys
from zhenxun.services.log import logger
from zhenxun.utils.repo_utils import (
GithubRepoManager,
SubmoduleConfig,
)
def create_sample_configs():
"""创建示例子模块配置"""
return [
SubmoduleConfig(
name="frontend-ui",
path="frontend/ui",
repo_url="https://github.com/your-org/frontend-ui",
branch="main",
enabled=True,
include_patterns=["*.js", "*.css", "*.html", "*.vue", "*.ts"],
exclude_patterns=["node_modules/*", "*.log", "dist/*", "coverage/*"],
),
SubmoduleConfig(
name="backend-api",
path="backend/api",
repo_url="https://github.com/your-org/backend-api",
branch="develop",
enabled=True,
include_patterns=["*.py", "*.json", "requirements.txt", "*.yml"],
exclude_patterns=["__pycache__/*", "*.pyc", "venv/*", ".pytest_cache/*"],
),
SubmoduleConfig(
name="shared-lib",
path="libs/shared",
repo_url="https://github.com/your-org/shared-lib",
branch="main",
enabled=True,
include_patterns=["*.py", "*.js", "*.ts", "*.json"],
exclude_patterns=["tests/*", "docs/*", "examples/*"],
),
]
async def setup_submodules(project_path: str, configs: list[SubmoduleConfig]):
"""设置子模块"""
main_repo_path = Path(project_path)
logger.info(f"正在为项目 {project_path} 设置子模块...")
# 检查路径是否存在
if not main_repo_path.exists():
logger.info(f"错误: 项目路径 {project_path} 不存在")
return False
# 检查是否是Git仓库
git_dir = main_repo_path / ".git"
if not git_dir.exists():
logger.info(f"错误: {project_path} 不是Git仓库")
logger.info("请先执行: git init")
return False
# 初始化子模块
logger.info("正在初始化子模块...")
success = await GithubRepoManager.init_submodules(main_repo_path, configs)
if not success:
logger.info("子模块初始化失败!")
return False
# 保存配置
logger.info("正在保存子模块配置...")
await GithubRepoManager.save_submodule_configs(main_repo_path, configs)
logger.info("✓ 子模块设置完成!")
logger.info(f"配置文件已保存到: {main_repo_path / '.submodules.json'}")
return True
async def update_submodules(project_path: str):
"""更新子模块"""
main_repo_path = Path(project_path)
logger.info(f"正在更新项目 {project_path} 的子模块...")
# 加载配置
configs = await GithubRepoManager.load_submodule_configs(main_repo_path)
if not configs:
logger.info("未找到子模块配置")
return False
logger.info(f"找到 {len(configs)} 个子模块配置")
# 获取子模块信息
infos = await GithubRepoManager.get_submodule_info(main_repo_path, configs)
logger.info("\n子模块状态:")
for info in infos:
status_icon = (
""
if info.update_status == "up_to_date"
else ""
if info.update_status == "outdated"
else ""
)
logger.info(
f"{status_icon} {info.config.name}"
f"({info.config.path}) - {info.update_status}"
)
# 更新子模块
logger.info("\n正在更新子模块...")
results = await GithubRepoManager.update_submodules(main_repo_path, configs)
success_count = 0
for result in results:
if result.success:
success_count += 1
if result.old_version != result.new_version:
logger.info(f"{result.submodule_name} 已更新")
else:
logger.info(f"{result.submodule_name} 已是最新版本")
else:
logger.info(f"{result.submodule_name} 更新失败: {result.error_message}")
logger.info(f"\n更新完成: {success_count}/{len(results)} 个子模块更新成功")
return success_count == len(results)
async def show_submodule_info(project_path: str):
"""显示子模块信息"""
main_repo_path = Path(project_path)
logger.info(f"项目 {project_path} 的子模块信息:")
# 加载配置
configs = await GithubRepoManager.load_submodule_configs(main_repo_path)
if not configs:
logger.info("未找到子模块配置")
return
# 获取详细信息
infos = await GithubRepoManager.get_submodule_info(main_repo_path, configs)
for info in infos:
logger.info(f"\n子模块: {info.config.name}")
logger.info(f" 路径: {info.config.path}")
logger.info(f" 仓库: {info.config.repo_url}")
logger.info(f" 分支: {info.config.branch}")
logger.info(f" 状态: {info.update_status}")
logger.info(f" 启用: {info.config.enabled}")
if info.current_version:
logger.info(f" 当前版本: {info.current_version[:8]}")
if info.latest_version:
logger.info(f" 最新版本: {info.latest_version[:8]}")
if info.config.include_patterns:
logger.info(f" 包含文件: {', '.join(info.config.include_patterns)}")
if info.config.exclude_patterns:
logger.info(f" 排除文件: {', '.join(info.config.exclude_patterns)}")
def print_info_usage():
"""打印使用说明"""
logger.info("GitHub子模块管理工具")
logger.info("用法:")
logger.info(" python submodule_setup.py setup <项目路径>")
logger.info(" python submodule_setup.py update <项目路径>")
logger.info(" python submodule_setup.py info <项目路径>")
logger.info("示例:")
logger.info(" python submodule_setup.py setup ./my_project")
logger.info(" python submodule_setup.py update ./my_project")
logger.info(" python submodule_setup.py info ./my_project")
async def main():
"""主函数"""
if len(sys.argv) < 3:
print_info_usage()
return
command = sys.argv[1]
project_path = sys.argv[2]
if command == "setup":
configs = create_sample_configs()
await setup_submodules(project_path, configs)
elif command == "update":
await update_submodules(project_path)
elif command == "info":
await show_submodule_info(project_path)
else:
logger.info(f"未知命令: {command}")
print_info_usage()
if __name__ == "__main__":
asyncio.run(main())

View File

@ -57,11 +57,13 @@ async def run_git_command(
""" """
try: try:
full_command = f"git {command}" full_command = f"git {command}"
# 将Path对象转换为字符串
cwd_str = str(cwd) if cwd else None
process = await asyncio.create_subprocess_shell( process = await asyncio.create_subprocess_shell(
full_command, full_command,
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
cwd=cwd, cwd=cwd_str,
) )
stdout_bytes, stderr_bytes = await process.communicate() stdout_bytes, stderr_bytes = await process.communicate()