新增路径验证功能,确保用户输入的路径安全并在项目根目录内

This commit is contained in:
HibiKier 2025-05-11 06:08:37 +08:00
parent 005cafb90d
commit 07ba035db6
2 changed files with 90 additions and 31 deletions

View File

@ -1,6 +1,5 @@
import os
from pathlib import Path
import re
import shutil
import aiofiles
@ -10,7 +9,7 @@ from fastapi.responses import JSONResponse
from zhenxun.utils._build_image import BuildImage
from ....base_model import Result, SystemFolderSize
from ....utils import authentication, get_system_disk
from ....utils import authentication, get_system_disk, validate_path
from .model import AddFile, DeleteFile, DirFile, RenameFile, SaveFile
router = APIRouter(prefix="/system")
@ -27,17 +26,11 @@ IMAGE_TYPE = ["jpg", "jpeg", "png", "gif", "bmp", "webp", "svg"]
)
async def _(path: str | None = None) -> Result[list[DirFile]]:
try:
# 清理和验证路径
if path:
# 移除任何可能的路径遍历尝试
path = re.sub(r"[\\/]\.\.[\\/]", "", path)
# 规范化路径
base_path = Path(path).resolve()
# 验证路径是否在项目根目录内
if not base_path.is_relative_to(Path().resolve()):
return Result.fail("访问路径超出允许范围")
else:
base_path = Path().resolve()
base_path, error = validate_path(path)
if error:
return Result.fail(error)
if not base_path:
return Result.fail("无效的路径")
data_list = []
for file in os.listdir(base_path):
@ -77,8 +70,12 @@ async def _(full_path: str | None = None) -> Result[list[SystemFolderSize]]:
description="删除文件",
)
async def _(param: DeleteFile) -> Result:
path = Path(param.full_path)
if not path or not path.exists():
path, error = validate_path(param.full_path)
if error:
return Result.fail(error)
if not path:
return Result.fail("无效的路径")
if not path.exists():
return Result.warning_("文件不存在...")
try:
path.unlink()
@ -95,8 +92,12 @@ async def _(param: DeleteFile) -> Result:
description="删除文件夹",
)
async def _(param: DeleteFile) -> Result:
path = Path(param.full_path)
if not path or not path.exists() or path.is_file():
path, error = validate_path(param.full_path)
if error:
return Result.fail(error)
if not path:
return Result.fail("无效的路径")
if not path.exists() or path.is_file():
return Result.warning_("文件夹不存在...")
try:
shutil.rmtree(path.absolute())
@ -113,10 +114,14 @@ async def _(param: DeleteFile) -> Result:
description="重命名文件",
)
async def _(param: RenameFile) -> Result:
path = (
(Path(param.parent) / param.old_name) if param.parent else Path(param.old_name)
)
if not path or not path.exists():
parent_path, error = validate_path(param.parent)
if error:
return Result.fail(error)
if not parent_path:
return Result.fail("无效的路径")
path = (parent_path / param.old_name) if param.parent else Path(param.old_name)
if not path.exists():
return Result.warning_("文件不存在...")
try:
path.rename(path.parent / param.name)
@ -133,10 +138,14 @@ async def _(param: RenameFile) -> Result:
description="重命名文件夹",
)
async def _(param: RenameFile) -> Result:
path = (
(Path(param.parent) / param.old_name) if param.parent else Path(param.old_name)
)
if not path or not path.exists() or path.is_file():
parent_path, error = validate_path(param.parent)
if error:
return Result.fail(error)
if not parent_path:
return Result.fail("无效的路径")
path = (parent_path / param.old_name) if param.parent else Path(param.old_name)
if not path.exists() or path.is_file():
return Result.warning_("文件夹不存在...")
try:
new_path = path.parent / param.name
@ -154,7 +163,13 @@ async def _(param: RenameFile) -> Result:
description="新建文件",
)
async def _(param: AddFile) -> Result:
path = (Path(param.parent) / param.name) if param.parent else Path(param.name)
parent_path, error = validate_path(param.parent)
if error:
return Result.fail(error)
if not parent_path:
return Result.fail("无效的路径")
path = (parent_path / param.name) if param.parent else Path(param.name)
if path.exists():
return Result.warning_("文件已存在...")
try:
@ -172,7 +187,13 @@ async def _(param: AddFile) -> Result:
description="新建文件夹",
)
async def _(param: AddFile) -> Result:
path = (Path(param.parent) / param.name) if param.parent else Path(param.name)
parent_path, error = validate_path(param.parent)
if error:
return Result.fail(error)
if not parent_path:
return Result.fail("无效的路径")
path = (parent_path / param.name) if param.parent else Path(param.name)
if path.exists():
return Result.warning_("文件夹已存在...")
try:
@ -190,7 +211,11 @@ async def _(param: AddFile) -> Result:
description="读取文件",
)
async def _(full_path: str) -> Result:
path = Path(full_path)
path, error = validate_path(full_path)
if error:
return Result.fail(error)
if not path:
return Result.fail("无效的路径")
if not path.exists():
return Result.warning_("文件不存在...")
try:
@ -208,9 +233,13 @@ async def _(full_path: str) -> Result:
description="读取文件",
)
async def _(param: SaveFile) -> Result[str]:
path = Path(param.full_path)
path, error = validate_path(param.full_path)
if error:
return Result.fail(error)
if not path:
return Result.fail("无效的路径")
try:
async with aiofiles.open(path, "w", encoding="utf-8") as f:
async with aiofiles.open(str(path), "w", encoding="utf-8") as f:
await f.write(param.content)
return Result.ok("更新成功!")
except Exception as e:
@ -225,7 +254,11 @@ async def _(param: SaveFile) -> Result[str]:
description="读取图片base64",
)
async def _(full_path: str) -> Result[str]:
path = Path(full_path)
path, error = validate_path(full_path)
if error:
return Result.fail(error)
if not path:
return Result.fail("无效的路径")
if not path.exists():
return Result.warning_("文件不存在...")
try:

View File

@ -2,6 +2,7 @@ import contextlib
from datetime import datetime, timedelta, timezone
import os
from pathlib import Path
import re
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
@ -28,6 +29,31 @@ if token_file.exists():
token_data = json.load(open(token_file, encoding="utf8"))
def validate_path(path_str: str | None) -> tuple[Path | None, str | None]:
"""验证路径是否安全
参数:
path_str: 用户输入的路径
返回:
tuple[Path | None, str | None]: (验证后的路径, 错误信息)
"""
try:
if not path_str:
return Path().resolve(), None
# 移除任何可能的路径遍历尝试
path_str = re.sub(r"[\\/]\.\.[\\/]", "", path_str)
# 规范化路径
path = Path(path_str).resolve()
# 验证路径是否在项目根目录内
if not path.is_relative_to(Path().resolve()):
return None, "访问路径超出允许范围"
return path, None
except Exception as e:
return None, f"路径验证失败: {e!s}"
def get_user(uname: str) -> User | None:
"""获取账号密码