From 07ba035db65a09b1c2398977602d5c37f037e2e8 Mon Sep 17 00:00:00 2001 From: HibiKier <775757368@qq.com> Date: Sun, 11 May 2025 06:08:37 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20=E6=96=B0=E5=A2=9E=E8=B7=AF?= =?UTF-8?q?=E5=BE=84=E9=AA=8C=E8=AF=81=E5=8A=9F=E8=83=BD=EF=BC=8C=E7=A1=AE?= =?UTF-8?q?=E4=BF=9D=E7=94=A8=E6=88=B7=E8=BE=93=E5=85=A5=E7=9A=84=E8=B7=AF?= =?UTF-8?q?=E5=BE=84=E5=AE=89=E5=85=A8=E5=B9=B6=E5=9C=A8=E9=A1=B9=E7=9B=AE?= =?UTF-8?q?=E6=A0=B9=E7=9B=AE=E5=BD=95=E5=86=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../web_ui/api/tabs/system/__init__.py | 95 +++++++++++++------ zhenxun/builtin_plugins/web_ui/utils.py | 26 +++++ 2 files changed, 90 insertions(+), 31 deletions(-) diff --git a/zhenxun/builtin_plugins/web_ui/api/tabs/system/__init__.py b/zhenxun/builtin_plugins/web_ui/api/tabs/system/__init__.py index 923b3086..6b08fa00 100644 --- a/zhenxun/builtin_plugins/web_ui/api/tabs/system/__init__.py +++ b/zhenxun/builtin_plugins/web_ui/api/tabs/system/__init__.py @@ -1,6 +1,5 @@ import os from pathlib import Path -import re import shutil import aiofiles @@ -10,7 +9,7 @@ from fastapi.responses import JSONResponse from zhenxun.utils._build_image import BuildImage from ....base_model import Result, SystemFolderSize -from ....utils import authentication, get_system_disk +from ....utils import authentication, get_system_disk, validate_path from .model import AddFile, DeleteFile, DirFile, RenameFile, SaveFile router = APIRouter(prefix="/system") @@ -27,17 +26,11 @@ IMAGE_TYPE = ["jpg", "jpeg", "png", "gif", "bmp", "webp", "svg"] ) async def _(path: str | None = None) -> Result[list[DirFile]]: try: - # 清理和验证路径 - if path: - # 移除任何可能的路径遍历尝试 - path = re.sub(r"[\\/]\.\.[\\/]", "", path) - # 规范化路径 - base_path = Path(path).resolve() - # 验证路径是否在项目根目录内 - if not base_path.is_relative_to(Path().resolve()): - return Result.fail("访问路径超出允许范围") - else: - base_path = Path().resolve() + base_path, error = validate_path(path) + if error: + return Result.fail(error) + if not base_path: + return Result.fail("无效的路径") data_list = [] for file in os.listdir(base_path): @@ -77,8 +70,12 @@ async def _(full_path: str | None = None) -> Result[list[SystemFolderSize]]: description="删除文件", ) async def _(param: DeleteFile) -> Result: - path = Path(param.full_path) - if not path or not path.exists(): + path, error = validate_path(param.full_path) + if error: + return Result.fail(error) + if not path: + return Result.fail("无效的路径") + if not path.exists(): return Result.warning_("文件不存在...") try: path.unlink() @@ -95,8 +92,12 @@ async def _(param: DeleteFile) -> Result: description="删除文件夹", ) async def _(param: DeleteFile) -> Result: - path = Path(param.full_path) - if not path or not path.exists() or path.is_file(): + path, error = validate_path(param.full_path) + if error: + return Result.fail(error) + if not path: + return Result.fail("无效的路径") + if not path.exists() or path.is_file(): return Result.warning_("文件夹不存在...") try: shutil.rmtree(path.absolute()) @@ -113,10 +114,14 @@ async def _(param: DeleteFile) -> Result: description="重命名文件", ) async def _(param: RenameFile) -> Result: - path = ( - (Path(param.parent) / param.old_name) if param.parent else Path(param.old_name) - ) - if not path or not path.exists(): + parent_path, error = validate_path(param.parent) + if error: + return Result.fail(error) + if not parent_path: + return Result.fail("无效的路径") + + path = (parent_path / param.old_name) if param.parent else Path(param.old_name) + if not path.exists(): return Result.warning_("文件不存在...") try: path.rename(path.parent / param.name) @@ -133,10 +138,14 @@ async def _(param: RenameFile) -> Result: description="重命名文件夹", ) async def _(param: RenameFile) -> Result: - path = ( - (Path(param.parent) / param.old_name) if param.parent else Path(param.old_name) - ) - if not path or not path.exists() or path.is_file(): + parent_path, error = validate_path(param.parent) + if error: + return Result.fail(error) + if not parent_path: + return Result.fail("无效的路径") + + path = (parent_path / param.old_name) if param.parent else Path(param.old_name) + if not path.exists() or path.is_file(): return Result.warning_("文件夹不存在...") try: new_path = path.parent / param.name @@ -154,7 +163,13 @@ async def _(param: RenameFile) -> Result: description="新建文件", ) async def _(param: AddFile) -> Result: - path = (Path(param.parent) / param.name) if param.parent else Path(param.name) + parent_path, error = validate_path(param.parent) + if error: + return Result.fail(error) + if not parent_path: + return Result.fail("无效的路径") + + path = (parent_path / param.name) if param.parent else Path(param.name) if path.exists(): return Result.warning_("文件已存在...") try: @@ -172,7 +187,13 @@ async def _(param: AddFile) -> Result: description="新建文件夹", ) async def _(param: AddFile) -> Result: - path = (Path(param.parent) / param.name) if param.parent else Path(param.name) + parent_path, error = validate_path(param.parent) + if error: + return Result.fail(error) + if not parent_path: + return Result.fail("无效的路径") + + path = (parent_path / param.name) if param.parent else Path(param.name) if path.exists(): return Result.warning_("文件夹已存在...") try: @@ -190,7 +211,11 @@ async def _(param: AddFile) -> Result: description="读取文件", ) async def _(full_path: str) -> Result: - path = Path(full_path) + path, error = validate_path(full_path) + if error: + return Result.fail(error) + if not path: + return Result.fail("无效的路径") if not path.exists(): return Result.warning_("文件不存在...") try: @@ -208,9 +233,13 @@ async def _(full_path: str) -> Result: description="读取文件", ) async def _(param: SaveFile) -> Result[str]: - path = Path(param.full_path) + path, error = validate_path(param.full_path) + if error: + return Result.fail(error) + if not path: + return Result.fail("无效的路径") try: - async with aiofiles.open(path, "w", encoding="utf-8") as f: + async with aiofiles.open(str(path), "w", encoding="utf-8") as f: await f.write(param.content) return Result.ok("更新成功!") except Exception as e: @@ -225,7 +254,11 @@ async def _(param: SaveFile) -> Result[str]: description="读取图片base64", ) async def _(full_path: str) -> Result[str]: - path = Path(full_path) + path, error = validate_path(full_path) + if error: + return Result.fail(error) + if not path: + return Result.fail("无效的路径") if not path.exists(): return Result.warning_("文件不存在...") try: diff --git a/zhenxun/builtin_plugins/web_ui/utils.py b/zhenxun/builtin_plugins/web_ui/utils.py index df2fdd35..8e076dc0 100644 --- a/zhenxun/builtin_plugins/web_ui/utils.py +++ b/zhenxun/builtin_plugins/web_ui/utils.py @@ -2,6 +2,7 @@ import contextlib from datetime import datetime, timedelta, timezone import os from pathlib import Path +import re from fastapi import Depends, HTTPException from fastapi.security import OAuth2PasswordBearer @@ -28,6 +29,31 @@ if token_file.exists(): token_data = json.load(open(token_file, encoding="utf8")) +def validate_path(path_str: str | None) -> tuple[Path | None, str | None]: + """验证路径是否安全 + + 参数: + path_str: 用户输入的路径 + + 返回: + tuple[Path | None, str | None]: (验证后的路径, 错误信息) + """ + try: + if not path_str: + return Path().resolve(), None + + # 移除任何可能的路径遍历尝试 + path_str = re.sub(r"[\\/]\.\.[\\/]", "", path_str) + # 规范化路径 + path = Path(path_str).resolve() + # 验证路径是否在项目根目录内 + if not path.is_relative_to(Path().resolve()): + return None, "访问路径超出允许范围" + return path, None + except Exception as e: + return None, f"路径验证失败: {e!s}" + + def get_user(uname: str) -> User | None: """获取账号密码