This commit is contained in:
HibiKier 2024-08-24 12:31:04 +08:00
commit a9ed5d144d
5 changed files with 113 additions and 4 deletions

View File

@ -20,6 +20,7 @@ from .api.tabs.manage.chat import ws_router as chat_routes
from .api.tabs.plugin_manage import router as plugin_router
from .api.tabs.system import router as system_router
from .auth import router as auth_router
from .public import init_public
__plugin_meta__ = PluginMetadata(
name="WebUi",
@ -59,7 +60,7 @@ WsApiRouter.include_router(chat_routes)
@driver.on_startup
def _():
async def _():
try:
async def log_sink(message: str):
@ -80,6 +81,7 @@ def _():
app: FastAPI = nonebot.get_app()
app.include_router(BaseApiRouter)
app.include_router(WsApiRouter)
await init_public(app)
logger.info("<g>API启动成功</g>", "Web UI")
except Exception as e:
logger.error("<g>API启动失败</g>", "Web UI", e=e)

View File

@ -0,0 +1,35 @@
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from fastapi import APIRouter, FastAPI
from zhenxun.services.log import logger
from .config import PUBLIC_PATH
from .data_source import update_webui_assets
router = APIRouter()
@router.get("/")
async def index():
return FileResponse(PUBLIC_PATH / "index.html")
@router.get("/favicon.ico")
async def favicon():
return FileResponse(PUBLIC_PATH / "favicon.ico")
async def init_public(app: FastAPI):
try:
if not PUBLIC_PATH.exists():
await update_webui_assets()
app.include_router(router)
for pathname in ["css", "js", "fonts", "img"]:
app.mount(
f"/{pathname}",
StaticFiles(directory=PUBLIC_PATH / pathname, check_dir=True),
name=f"public_{pathname}",
)
except Exception as e:
logger.error(f"初始化 web ui assets 失败 e: {e}", "Web UI assets")

View File

@ -0,0 +1,20 @@
from datetime import datetime
from pydantic import BaseModel
from zhenxun.configs.path_config import DATA_PATH, TEMP_PATH
class PublicData(BaseModel):
etag: str
update_time: datetime
COMMAND_NAME = "webui_update_assets"
WEBUI_DATA_PATH = DATA_PATH / "web_ui"
PUBLIC_PATH = WEBUI_DATA_PATH / "public"
TMP_PATH = TEMP_PATH / "web_ui"
GITHUB_API_COMMITS = "https://api.github.com/repos/HibiKier/zhenxun_bot_webui/commits"
WEBUI_ASSETS_DOWNLOAD_URL = (
"https://github.com/HibiKier/zhenxun_bot_webui/archive/refs/heads/dist.zip"
)

View File

@ -0,0 +1,50 @@
import os
import shutil
import zipfile
from pathlib import Path
from nonebot.utils import run_sync
from zhenxun.services.log import logger
from zhenxun.utils.http_utils import AsyncHttpx
from .config import (
WEBUI_ASSETS_DOWNLOAD_URL,
WEBUI_DATA_PATH,
TMP_PATH,
COMMAND_NAME,
PUBLIC_PATH,
)
async def update_webui_assets():
webui_assets_path = TMP_PATH / "webui_assets.zip"
if await AsyncHttpx.download_file(
WEBUI_ASSETS_DOWNLOAD_URL, webui_assets_path, follow_redirects=True
):
logger.info("下载 webui_assets 成功...", COMMAND_NAME)
else:
logger.error("下载 webui_assets 失败...", COMMAND_NAME)
await _file_handle(webui_assets_path)
logger.info("更新 webui_assets 成功...", COMMAND_NAME)
return True
@run_sync
def _file_handle(webui_assets_path: Path):
logger.debug("开始解压 webui_assets...", COMMAND_NAME)
if webui_assets_path.exists():
tf = zipfile.ZipFile(webui_assets_path)
tf.extractall(TMP_PATH)
logger.debug("解压 webui_assets 成功...", COMMAND_NAME)
else:
logger.error("解压 webui_assets 失败...", COMMAND_NAME)
return
download_file_path = (
TMP_PATH / [x for x in os.listdir(TMP_PATH) if (TMP_PATH / x).is_dir()][0]
)
shutil.rmtree(PUBLIC_PATH, ignore_errors=True)
shutil.copytree(download_file_path / "dist", PUBLIC_PATH, dirs_exist_ok=True)
logger.debug("复制 webui_assets 成功...", COMMAND_NAME)
shutil.rmtree(TMP_PATH, ignore_errors=True)

View File

@ -1,6 +1,7 @@
import os
from datetime import datetime, timedelta
from pathlib import Path
import secrets
import psutil
import ujson as json
@ -14,7 +15,6 @@ from zhenxun.configs.path_config import DATA_PATH
from .base_model import SystemFolderSize, SystemStatus, User
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
@ -28,6 +28,8 @@ if token_file.exists():
token_data = json.load(open(token_file, "r", encoding="utf8"))
except json.JSONDecodeError:
pass
if not token_data.get("secret"):
token_data["secret"] = secrets.token_hex(64)
def get_user(uname: str) -> User | None:
@ -55,7 +57,7 @@ def create_token(user: User, expires_delta: timedelta | None = None):
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
return jwt.encode(
claims={"sub": user.username, "exp": expire},
key=SECRET_KEY,
key=token_data["secret"],
algorithm=ALGORITHM,
)
@ -71,7 +73,7 @@ def authentication():
# if token not in token_data["token"]:
def inner(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
payload = jwt.decode(token, token_data["secret"], algorithms=[ALGORITHM])
username, expire = payload.get("sub"), payload.get("exp")
user = get_user(username) # type: ignore
if user is None: