zhenxun_bot/zhenxun/builtin_plugins/web_ui/utils.py

139 lines
3.7 KiB
Python
Raw Normal View History

import os
from datetime import datetime, timedelta
from pathlib import Path
2024-08-23 11:10:17 +08:00
import secrets
import psutil
import ujson as json
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from nonebot.utils import run_sync
from zhenxun.configs.config import Config
from zhenxun.configs.path_config import DATA_PATH
from .base_model import SystemFolderSize, SystemStatus, User
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/login")
token_file = DATA_PATH / "web_ui" / "token.json"
token_file.parent.mkdir(parents=True, exist_ok=True)
token_data = {"token": []}
if token_file.exists():
try:
token_data = json.load(open(token_file, "r", encoding="utf8"))
except json.JSONDecodeError:
pass
2024-08-23 11:10:17 +08:00
if not token_data.get("secret"):
token_data["secret"] = secrets.token_hex(64)
def get_user(uname: str) -> User | None:
"""获取账号密码
参数:
uname: uname
返回:
Optional[User]: 用户信息
"""
username = Config.get_config("web-ui", "username")
password = Config.get_config("web-ui", "password")
if username and password and uname == username:
return User(username=username, password=password)
def create_token(user: User, expires_delta: timedelta | None = None):
"""创建token
参数:
user: 用户信息
expires_delta: 过期时间.
"""
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
return jwt.encode(
claims={"sub": user.username, "exp": expire},
2024-08-23 11:10:17 +08:00
key=token_data["secret"],
algorithm=ALGORITHM,
)
def authentication():
"""权限验证
异常:
JWTError: JWTError
HTTPException: HTTPException
"""
# if token not in token_data["token"]:
def inner(token: str = Depends(oauth2_scheme)):
try:
2024-08-23 11:10:17 +08:00
payload = jwt.decode(token, token_data["secret"], algorithms=[ALGORITHM])
username, expire = payload.get("sub"), payload.get("exp")
user = get_user(username) # type: ignore
if user is None:
raise JWTError
except JWTError:
raise HTTPException(
status_code=400, detail="登录验证失败或已失效, 踢出房间!"
)
return Depends(inner)
def _get_dir_size(dir_path: Path) -> float:
"""获取文件夹大小
参数:
dir_path: 文件夹路径
"""
size = 0
for root, dirs, files in os.walk(dir_path):
size += sum([os.path.getsize(os.path.join(root, name)) for name in files])
return size
@run_sync
def get_system_status() -> SystemStatus:
"""获取系统信息等"""
cpu = psutil.cpu_percent()
memory = psutil.virtual_memory().percent
disk = psutil.disk_usage("/").percent
return SystemStatus(
cpu=cpu,
memory=memory,
disk=disk,
check_time=datetime.now().replace(microsecond=0),
)
@run_sync
def get_system_disk(
full_path: str | None,
) -> list[SystemFolderSize]:
"""获取资源文件大小等"""
base_path = Path(full_path) if full_path else Path()
other_size = 0
data_list = []
for file in os.listdir(base_path):
f = base_path / file
if f.is_dir():
size = _get_dir_size(f) / 1024 / 1024
data_list.append(
SystemFolderSize(name=file, size=size, full_path=str(f), is_dir=True)
)
else:
other_size += f.stat().st_size / 1024 / 1024
if other_size:
data_list.append(
SystemFolderSize(
name="other_file", size=other_size, full_path=full_path, is_dir=False
)
)
return data_list