UPDATE WEB UI

This commit is contained in:
HibiKier 2023-04-02 22:57:36 +08:00
parent 3cc834d424
commit 62e29f389b
13 changed files with 583 additions and 493 deletions

View File

@ -1,32 +1,40 @@
import nonebot
from fastapi import APIRouter, FastAPI
from nonebot.adapters.onebot.v11 import Bot, MessageEvent
from nonebot.matcher import Matcher
from nonebot.message import IgnoredException, run_preprocessor
from nonebot.message import run_preprocessor
from nonebot.typing import T_State
from configs.config import Config as gConfig
from services.log import logger
from utils.manager import plugins2settings_manager
from .api import *
from .auth import *
from .api.group import router as group_routes
from .api.plugins import router as plugin_routes
from .api.request import router as request_routes
from .api.system import router as system_routes
# from .api.g import *
from .auth import router as auth_router
driver = nonebot.get_driver()
gConfig.add_plugin_config("web-ui", "username", "admin", name="web-ui", help_="前端管理用户名")
gConfig.add_plugin_config("web-ui", "password", None, name="web-ui", help_="前端管理密码")
# 使用webui访问api后plugins2settings中的cmd字段将从list变为str
# 暂时找不到原因
# 先使用hook修复
@run_preprocessor
async def _(matcher: Matcher, bot: Bot, event: MessageEvent, state: T_State):
flag = False
for module in plugins2settings_manager.keys():
if plugins2settings_manager.get_plugin_data(module).cmd and isinstance(
plugins2settings_manager.get_plugin_data(module).cmd, str
):
plugins2settings_manager[module].cmd = plugins2settings_manager[
module
].cmd.split(",")
flag = True
if flag:
plugins2settings_manager.save()
BaseApiRouter = APIRouter(prefix="/zhenxun/api")
BaseApiRouter.include_router(auth_router)
BaseApiRouter.include_router(plugin_routes)
BaseApiRouter.include_router(group_routes)
BaseApiRouter.include_router(request_routes)
BaseApiRouter.include_router(system_routes)
@driver.on_startup
def _():
app: FastAPI = nonebot.get_app()
app.include_router(BaseApiRouter)
logger.info("<g>API启动成功</g>", "Web UI")

View File

@ -1,27 +1,31 @@
from fastapi import APIRouter
from pydantic.error_wrappers import ValidationError
from services.log import logger
from utils.manager import group_manager
from utils.utils import get_bot
from ..auth import Depends, User, token_to_user
from ..config import *
from ..models.model import Group, GroupResult, Result, Task
from ..models.params import UpdateGroup
from ..utils import authentication
router = APIRouter()
@router.get("/group", dependencies=[token_to_user()])
@router.get("/get_group", dependencies=[authentication()])
async def _() -> Result:
"""
获取群信息
"""
group_list_result = []
group_info = {}
if bot := get_bot():
group_list = await bot.get_group_list()
for g in group_list:
group_info[g["group_id"]] = Group(**g)
group_data = group_manager.get_data()
for group_id in group_data.group_manager:
try:
try:
group_info = {}
if bot := get_bot():
group_list = await bot.get_group_list()
for g in group_list:
group_info[g["group_id"]] = Group(**g)
group_data = group_manager.get_data()
for group_id in group_data.group_manager:
task_list = []
data = group_manager[group_id].dict()
for tn, status in data["group_task_status"].items():
@ -39,24 +43,27 @@ async def _() -> Result:
data["group"] = x
else:
continue
try:
group_list_result.append(GroupResult(**data))
except ValidationError:
pass
except Exception as e:
logger.error(f"WEB_UI /webui/group 发生错误 {type(e)}{e}")
return Result(code=200, data=group_list_result)
group_list_result.append(GroupResult(**data))
except Exception as e:
logger.error("调用API错误", "/get_group", e=e)
return Result.fail(f"{type(e)}: {e}")
return Result.ok(group_list_result, "拿到了新鲜出炉的数据!")
@router.post("/group", dependencies=[token_to_user()])
async def _(group: GroupResult) -> Result:
@router.post("/update_group", dependencies=[authentication()])
async def _(group: UpdateGroup) -> Result:
"""
修改群信息
"""
group_id = group.group.group_id
group_manager.set_group_level(group_id, group.level)
if group.status:
group_manager.turn_on_group_bot_status(group_id)
else:
group_manager.shutdown_group_bot_status(group_id)
return Result(data="修改成功!")
try:
group_id = group.group_id
group_manager.set_group_level(group_id, group.level)
if group.status:
group_manager.turn_on_group_bot_status(group_id)
else:
group_manager.shutdown_group_bot_status(group_id)
group_manager.save()
except Exception as e:
logger.error("调用API错误", "/get_group", e=e)
return Result.fail(f"{type(e)}: {e}")
return Result.ok(info="已完成记录!")

View File

@ -1,176 +1,115 @@
from pydantic import ValidationError
from typing import Optional
import cattrs
from fastapi import APIRouter
from configs.config import Config
from services.log import logger
from utils.manager import (
plugins2block_manager,
plugins2cd_manager,
plugins2count_manager,
plugins2settings_manager,
plugins_manager,
)
from utils.utils import get_matchers
from utils.manager import plugin_data_manager, plugins2settings_manager, plugins_manager
from utils.manager.models import PluginData, PluginType
from ..auth import Depends, User, token_to_user
from ..config import *
from ..models.model import Plugin, PluginConfig, Result
from ..models.params import UpdateConfig, UpdatePlugin
from ..utils import authentication
plugin_name_list = None
router = APIRouter()
@router.get("/plugins", dependencies=[token_to_user()])
def _(type_: Optional[str]) -> Result:
@router.get("/get_plugins", dependencies=[authentication()])
def _(
plugin_type: PluginType,
) -> Result:
"""
获取插件列表
:param type_: 类型 normal, superuser, hidden, admin
:param plugin_type: 类型 normal, superuser, hidden, admin
"""
global plugin_name_list
if not plugin_name_list:
plugin_name_list = [x.plugin_name for x in get_matchers(True)]
plugin_list = []
plugin_data = plugins_manager.get_data()
for model in plugin_data:
if model in plugin_name_list:
try:
data = plugin_data.get(model)
# data.model = model
plugin_name = data.plugin_name
if (
(type_ == "hidden" and "[hidden]" not in plugin_name.lower())
or (type_ == "admin" and "[admin]" not in plugin_name.lower())
or (
type_ == "superuser"
and "[superuser]" not in plugin_name.lower()
try:
plugin_list = []
for module in plugin_data_manager.keys():
plugin_data: Optional[PluginData] = plugin_data_manager[module]
if plugin_data and plugin_data.plugin_type == plugin_type:
plugin_config = None
if plugin_data.plugin_configs:
plugin_config = {}
for key in plugin_data.plugin_configs:
plugin_config[key] = PluginConfig(
key=key,
module=module,
has_type=bool(plugin_data.plugin_configs[key].type),
**plugin_data.plugin_configs[key].dict(),
)
plugin_list.append(
Plugin(
model=module,
plugin_settings=plugin_data.plugin_setting,
plugin_manager=plugin_data.plugin_status,
plugin_config=plugin_config,
cd_limit=plugin_data.plugin_cd,
block_limit=plugin_data.plugin_block,
count_limit=plugin_data.plugin_count,
)
):
continue
if type_ == "normal" and (
"[hidden]" in plugin_name.lower()
or "[admin]" in plugin_name.lower()
or "[superuser]" in plugin_name.lower()
):
continue
data = {"model": model}
if x := plugin_data.get(model):
if not x.status and x.block_type in [
"group",
"private",
"all",
]:
x.block_type = (
"群聊"
if x.block_type == "group"
else "私聊"
if x.block_type == "private"
else "全部"
)
data["plugin_manager"] = PluginManager(**x.dict())
if x := plugins2settings_manager.get(model):
if x.cmd and isinstance(x.cmd, list):
x.cmd = ",".join(x.cmd)
data["plugin_settings"] = PluginSettings(**x.dict())
if x := plugins2cd_manager.get(model):
data["cd_limit"] = CdLimit(**x.dict())
if x := plugins2block_manager.get(model):
data["block_limit"] = BlockLimit(**x.dict())
if x := plugins2count_manager.get(model):
data["count_limit"] = CountLimit(**x.dict())
if x := Config.get(model):
id_ = 0
tmp = []
for key in x.keys():
tmp.append(
PluginConfig(
**{
"key": key,
"help_": x[key].get("help"),
"id": id_,
**x[key],
}
)
)
id_ += 1
data["plugin_config"] = tmp
plugin_list.append(Plugin(**data))
except (AttributeError, ValidationError) as e:
logger.error(
f"WEB_UI GET /webui/plugins model{model} 发生错误 {type(e)}{e}"
)
except Exception as e:
logger.error(
f"WEB_UI GET /webui/plugins model{model} 发生错误 {type(e)}{e}"
)
return Result(
code=500,
data=f"WEB_UI GET /webui/plugins model{model} 发生错误 {type(e)}{e}",
)
return Result(code=200, data=plugin_list)
except Exception as e:
logger.error("调用API错误", "/get_plugins", e=e)
return Result.fail(f"{type(e)}: {e}")
return Result.ok(plugin_list, "拿到了新鲜出炉的数据!")
@router.post("/plugins", dependencies=[token_to_user()])
def _(plugin: Plugin, user: User = Depends(token_to_user)) -> Result:
@router.post("/update_plugins", dependencies=[authentication()])
def _(plugin: UpdatePlugin) -> Result:
"""
修改插件信息
:param plugin: 插件内容
"""
try:
if plugin.plugin_config:
for c in plugin.plugin_config:
if not c.value:
Config.set_config(plugin.model, c.key, None)
continue
if str(c.value).lower() in ["true", "false"] and (
c.default_value is None or isinstance(c.default_value, bool)
):
c.value = str(c.value).lower() == "true"
elif isinstance(
Config.get_config(plugin.model, c.key, c.value), int
) or isinstance(c.default_value, int):
c.value = int(c.value)
elif isinstance(
Config.get_config(plugin.model, c.key, c.value), float
) or isinstance(c.default_value, float):
c.value = float(c.value)
elif isinstance(c.value, str) and (
isinstance(
Config.get_config(plugin.model, c.key, c.value), (list, tuple)
)
or isinstance(c.default_value, (list, tuple))
):
default_value = Config.get_config(plugin.model, c.key, c.value)
c.value = c.value.split(",")
if default_value and isinstance(default_value[0], int):
c.value = [int(x) for x in c.value]
elif default_value and isinstance(default_value[0], float):
c.value = [float(x) for x in c.value]
elif default_value and isinstance(default_value[0], bool):
temp = []
for x in c.value:
temp.append(x.lower() == "true")
c.value = temp
Config.set_config(plugin.model, c.key, c.value)
Config.save(None, True)
else:
if plugin.plugin_settings:
for key, value in plugin.plugin_settings:
if key == "cmd":
value = value.split(",")
setattr(plugins2settings_manager[plugin.model], key, value)
if plugin.plugin_manager:
for key, value in plugin.plugin_manager:
setattr(plugins_manager[plugin.model], key, value)
module = plugin.module
if p2s := plugins2settings_manager.get(module):
p2s.default_status = plugin.default_status
p2s.limit_superuser = plugin.limit_superuser
p2s.cost_gold = plugin.cost_gold
p2s.cmd = plugin.cmd
p2s.level = plugin.group_level
if pd := plugin_data_manager.get(module):
menu_lin = None
if len(pd.menu_type) > 1:
menu_lin = pd.menu_type[1]
if menu_lin is not None:
pd.menu_type = (plugin.menu_type, menu_lin)
else:
pd.menu_type = (plugin.menu_type,)
if pm := plugins_manager.get(module):
if plugin.block_type:
pm.block_type = plugin.block_type
pm.status = False
else:
pm.block_type = None
pm.status = True
plugins2settings_manager.save()
plugins_manager.save()
except Exception as e:
logger.error(
f"WEB_UI POST /webui/plugins model{plugin.model} 发生错误 {type(e)}{e}"
)
return Result(
code=500,
data=f"WEB_UI POST /webui/plugins model{plugin.model} 发生错误 {type(e)}{e}",
)
for key in plugins2settings_manager.keys():
if isinstance(plugins2settings_manager[key].cmd, str):
plugins2settings_manager[key].cmd = plugins2settings_manager[key].cmd.split(
","
)
plugins2settings_manager.save()
plugins_manager.save()
return Result(code=200, data="修改成功!")
logger.error("调用API错误", "/update_plugins", e=e)
return Result.fail(f"{type(e)}: {e}")
return Result.ok(info="已经帮你写好啦!")
@router.post("/update_config", dependencies=[authentication()])
def _(config_list: List[UpdateConfig]) -> Result:
try:
for config in config_list:
if cg := Config.get(config.module):
if c := cg.configs.get(config.key):
if isinstance(c.value, (list, tuple)) or isinstance(
c.default_value, (list, tuple)
):
value = config.value.split(",")
else:
value = config.value
if c.type and value is not None:
value = cattrs.structure(value, c.type)
Config.set_config(config.module, config.key, value)
except Exception as e:
logger.error("调用API错误", "/update_config", e=e)
return Result.fail(f"{type(e)}: {e}")
Config.save(save_simple_data=True)
return Result.ok(info="写入配置项了哦!")

View File

@ -1,69 +1,87 @@
from typing import Optional
from fastapi import APIRouter
from configs.config import NICKNAME
from models.group_info import GroupInfo
from services.log import logger
from utils.manager import requests_manager
from utils.utils import get_bot
from ..auth import Depends, User, token_to_user
from ..config import *
from ..models.model import RequestResult, Result
from ..models.params import HandleRequest
from ..utils import authentication
router = APIRouter()
@router.get("/webui/request", dependencies=[token_to_user()])
def _(type_: Optional[str]) -> Result:
req_data = requests_manager.get_data()
req_list = []
if type_ in ["group", "private"]:
req_data = req_data[type_]
for x in req_data:
req_data[x]["oid"] = x
req_list.append(RequestResult(**req_data[x]))
req_list.reverse()
return Result(code=200, data=req_list)
@router.get("/get_request", dependencies=[authentication()])
def _(request_type: Optional[str]) -> Result:
try:
req_data = requests_manager.get_data()
req_list = []
if request_type in ["group", "private"]:
req_data = req_data[request_type]
for x in req_data:
req_data[x]["oid"] = x
req_list.append(RequestResult(**req_data[x]))
req_list.reverse()
except Exception as e:
logger.error("调用API错误", "/get_request", e=e)
return Result.fail(f"{type(e)}: {e}")
return Result.ok(req_list, f"{NICKNAME}带来了最新的数据!")
@router.delete("/webui/request", dependencies=[token_to_user()])
def _(type_: Optional[str]) -> Result:
@router.delete("/clear_request", dependencies=[authentication()])
def _(request_type: Optional[str]) -> Result:
"""
清空请求
:param type_: 类型
"""
requests_manager.clear(type_)
return Result(code=200)
requests_manager.clear(request_type)
return Result.ok(info="成功清除了数据")
@router.post("/webui/request", dependencies=[token_to_user()])
async def _(parma: RequestParma) -> Result:
@router.post("/handle_request", dependencies=[authentication()])
async def _(parma: HandleRequest) -> Result:
"""
操作请求
:param parma: 参数
"""
result = "操作成功!"
flag = 3
if bot := get_bot():
if parma.handle == "approve":
if parma.type == "group":
if rid := requests_manager.get_group_id(parma.id):
# await GroupInfo.update_or_create(defaults={"group_flag": 1}, )
if group := await GroupInfo.filter(group_id=rid).first():
await group.update_or_create(group_flag=1)
else:
group_info = await bot.get_group_info(group_id=rid)
await GroupInfo.update_or_create(
group_id=group_info["group_id"],
defaults={
"group_name": group_info["group_name"],
"max_member_count": group_info["max_member_count"],
"member_count": group_info["member_count"],
"group_flag": 1,
},
)
flag = await requests_manager.approve(bot, parma.id, parma.type)
elif parma.handle == "refuse":
flag = await requests_manager.refused(bot, parma.id, parma.type)
elif parma.handle == "delete":
requests_manager.delete_request(parma.id, parma.type)
if parma.handle != "delete":
if flag == 1:
result = "该请求已失效"
try:
result = "操作成功!"
flag = 3
if bot := get_bot():
if parma.handle == "approve":
if parma.type == "group":
if rid := requests_manager.get_group_id(parma.id):
# await GroupInfo.update_or_create(defaults={"group_flag": 1}, )
if group := await GroupInfo.get_or_none(group_id=rid):
await group.update_or_create(group_flag=1)
else:
group_info = await bot.get_group_info(group_id=rid)
await GroupInfo.update_or_create(
group_id=group_info["group_id"],
defaults={
"group_name": group_info["group_name"],
"max_member_count": group_info["max_member_count"],
"member_count": group_info["member_count"],
"group_flag": 1,
},
)
flag = await requests_manager.approve(bot, parma.id, parma.type)
elif parma.handle == "refuse":
flag = await requests_manager.refused(bot, parma.id, parma.type)
elif parma.handle == "delete":
requests_manager.delete_request(parma.id, parma.type)
elif flag == 2:
result = "未找到此Id"
return Result(code=200, data=result)
if parma.handle != "delete":
if flag == 1:
result = "该请求已失效"
requests_manager.delete_request(parma.id, parma.type)
elif flag == 2:
result = "未找到此Id"
return Result.ok(result, "成功处理了请求!")
return Result.fail("Bot未连接")
except Exception as e:
logger.error("调用API错误", "/get_group", e=e)
return Result.fail(f"{type(e)}: {e}")

View File

@ -1,9 +1,12 @@
import asyncio
import os
from datetime import datetime
from pathlib import Path
from typing import Dict, Optional, Union
import psutil
import ujson as json
from fastapi import APIRouter
from configs.path_config import (
DATA_PATH,
@ -17,8 +20,15 @@ from configs.path_config import (
from services.log import logger
from utils.http_utils import AsyncHttpx
from ..auth import Depends, User, token_to_user
from ..config import *
from ..models.model import (
Result,
SystemFolderSize,
SystemNetwork,
SystemResult,
SystemStatus,
SystemStatusList,
)
from ..utils import authentication
CPU_DATA_PATH = DATA_PATH / "system" / "cpu.json"
MEMORY_DATA_PATH = DATA_PATH / "system" / "memory.json"
@ -28,31 +38,31 @@ cpu_data = {"data": []}
memory_data = {"data": []}
disk_data = {"data": []}
router = APIRouter()
@router.get("/system", dependencies=[token_to_user()])
@router.get("/system", dependencies=[authentication()])
async def _() -> Result:
return await get_system_data()
@router.get("/webui/system/status", dependencies=[token_to_user()])
@router.get("/status", dependencies=[authentication()])
async def _() -> Result:
return Result(
code=200,
data=await asyncio.get_event_loop().run_in_executor(None, _get_system_status),
return Result.ok(
await asyncio.get_event_loop().run_in_executor(None, _get_system_status),
)
@router.get("/webui/system/disk", dependencies=[token_to_user()])
@router.get("/system/disk", dependencies=[authentication()])
async def _(type_: Optional[str] = None) -> Result:
return Result(
code=200,
return Result.ok(
data=await asyncio.get_event_loop().run_in_executor(
None, _get_system_disk, type_
),
)
@router.get("/webui/system/statusList", dependencies=[token_to_user()])
@router.get("/system/statusList", dependencies=[authentication()])
async def _() -> Result:
global cpu_data, memory_data, disk_data
await asyncio.get_event_loop().run_in_executor(None, _get_system_status)
@ -65,9 +75,8 @@ async def _() -> Result:
disk_rst = (
disk_data["data"][-10:] if len(disk_data["data"]) > 10 else disk_data["data"]
)
return Result(
code=200,
data=SystemStatusList(
return Result.ok(
SystemStatusList(
cpu_data=cpu_rst,
memory_data=memory_rst,
disk_data=disk_rst,
@ -95,12 +104,11 @@ async def get_system_data():
network = SystemNetwork(baidu=baidu, google=google)
disk = await asyncio.get_event_loop().run_in_executor(None, _get_system_disk, None)
status = await asyncio.get_event_loop().run_in_executor(None, _get_system_status)
return Result(
code=200,
data=SystemResult(
return Result.ok(
SystemResult(
status=status,
network=network,
disk=disk,
disk=disk, # type: ignore
check_time=datetime.now().replace(microsecond=0),
),
)

View File

@ -1,74 +1,32 @@
import json
from datetime import datetime, timedelta
from typing import Optional
from datetime import timedelta
import nonebot
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from pydantic import BaseModel
from starlette import status
from fastapi import APIRouter, Depends
from fastapi.security import OAuth2PasswordRequestForm
from configs.config import Config
from configs.path_config import DATA_PATH
from ..config import Result, router
from ..models.model import Result
from ..utils import (
ACCESS_TOKEN_EXPIRE_MINUTES,
create_token,
get_user,
token_data,
token_file,
)
app = nonebot.get_app()
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/login")
token_file = DATA_PATH / "web_ui" / "token.json"
token_file.parent.mkdir(parents=True, exist_ok=True)
token_data = {"token": []}
if token_file.exists():
token_data = json.load(open(token_file, "r", encoding="utf8"))
class User(BaseModel):
username: str
password: str
class Token(BaseModel):
access_token: str
token_type: str
def get_user(uname: str) -> Optional[User]:
username = Config.get_config("web-ui", "username")
password = Config.get_config("web-ui", "password")
if username and password and uname == username:
return User(username=username, password=password)
form_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
def create_token(user: User, expires_delta: Optional[timedelta] = None):
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
return jwt.encode(
claims={"sub": user.username, "exp": expire},
key=SECRET_KEY,
algorithm=ALGORITHM,
)
router = APIRouter()
@router.post("/login")
async def login_get_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = get_user(form_data.username)
if not user or user.password != form_data.password:
raise form_exception
if user := get_user(form_data.username):
if user.password != form_data.password:
return Result.fail("真笨, 密码都能记错!", 999)
else:
return Result.fail("你滴配置文件里用户名密码配置项为空", 998)
access_token = create_token(
user=user, expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
@ -77,31 +35,6 @@ async def login_get_token(form_data: OAuth2PasswordRequestForm = Depends()):
token_data["token"] = token_data["token"][1:]
with open(token_file, "w", encoding="utf8") as f:
json.dump(token_data, f, ensure_ascii=False, indent=4)
return {"access_token": access_token, "token_type": "bearer"}
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
@app.post("/auth")
def token_to_user(token: str = Depends(oauth2_scheme)):
if token not in token_data["token"]:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username, expire = payload.get("sub"), payload.get("exp")
user = get_user(username) # type: ignore
if user is None:
raise JWTError
except JWTError:
return Result(code=401)
return Result(code=200, info="登录成功")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8080)
return Result.ok(
{"access_token": access_token, "token_type": "bearer"}, "欢迎回家, 欧尼酱!"
)

View File

@ -18,127 +18,6 @@ app.add_middleware(
allow_headers=["*"],
)
router = APIRouter(tags=["api"])
class CdLimit(BaseModel):
"""
Cd 限制
"""
cd: int
status: bool
check_type: str
limit_type: str
rst: Optional[str]
class BlockLimit(BaseModel):
"""
Block限制
"""
status: bool
check_type: str
limit_type: str
rst: Optional[str]
class CountLimit(BaseModel):
"""
Count限制
"""
max_count: int
status: bool
limit_type: str
rst: Optional[str]
class PluginManager(BaseModel):
"""
插件信息
"""
plugin_name: str # 插件名称
status: Optional[bool] # 插件状态
error: Optional[bool] # 加载状态
version: Optional[float] # 版本
author: Optional[str] # 作者
block_type: Optional[str] # 禁用类型
class PluginSettings(BaseModel):
"""
插件基本设置
"""
level: Optional[int] # 群权限等级
default_status: Optional[bool] # 默认开关
limit_superuser: Optional[bool] # 是否限制超级用户
cmd: Optional[str] # cmd别名
cost_gold: Optional[int] # 花费金币限制
plugin_type: Optional[List[Union[str, int]]] # 帮助类型
class PluginConfig(BaseModel):
"""
插件配置项
"""
id: int
key: str
value: Optional[Any]
help_: Optional[str]
default_value: Optional[Any]
class Plugin(BaseModel):
"""
插件
"""
model: str # 模块
plugin_settings: Optional[PluginSettings]
plugin_manager: Optional[PluginManager]
plugin_config: Optional[List[PluginConfig]]
cd_limit: Optional[CdLimit]
block_limit: Optional[BlockLimit]
count_limit: Optional[CountLimit]
class Group(BaseModel):
"""
群组信息
"""
group_id: int
group_name: str
member_count: int
max_member_count: int
class Task(BaseModel):
"""
被动技能
"""
name: str
nameZh: str
status: bool
class GroupResult(BaseModel):
"""
群组返回数据
"""
group: Group
level: int
status: bool
close_plugins: List[str]
task: List[Task]
class RequestResult(BaseModel):
"""
@ -222,13 +101,3 @@ class SystemResult(BaseModel):
network: SystemNetwork
disk: SystemFolderSize
check_time: datetime
class Result(BaseModel):
"""
总体返回
"""
code: int = 200
info: str = "操作成功"
data: Any = None

View File

@ -0,0 +1,182 @@
from datetime import datetime
from typing import Any, Dict, List, Optional, TypeVar, Union
from pydantic import BaseModel
from configs.utils import Config
from utils.manager.models import Plugin as PluginManager
from utils.manager.models import PluginBlock, PluginCd, PluginCount, PluginSetting
class User(BaseModel):
username: str
password: str
class Token(BaseModel):
access_token: str
token_type: str
class Result(BaseModel):
"""
总体返回
"""
suc: bool
"""调用状态"""
code: int = 200
"""code"""
info: str = "操作成功"
"""info"""
data: Any = None
"""返回数据"""
@classmethod
def fail(cls, info: str = "异常错误", code: int = 500) -> "Result":
return cls(suc=False, info=info, code=code)
@classmethod
def ok(cls, data: Any = None, info: str = "操作成功", code: int = 200) -> "Result":
return cls(suc=True, info=info, code=code, data=data)
class PluginConfig(BaseModel):
"""
插件配置项
"""
module: str
key: str
value: Optional[Any]
help: Optional[str]
default_value: Optional[Any]
has_type: bool
class Plugin(BaseModel):
"""
插件
"""
model: str
"""模块名称"""
plugin_settings: Optional[PluginSetting]
"""settings"""
plugin_manager: Optional[PluginManager]
"""manager"""
plugin_config: Optional[Dict[str, PluginConfig]]
"""配置项"""
cd_limit: Optional[PluginCd]
"""cd限制"""
block_limit: Optional[PluginBlock]
"""阻断限制"""
count_limit: Optional[PluginCount]
"""次数限制"""
class Group(BaseModel):
"""
群组信息
"""
group_id: int
group_name: str
member_count: int
max_member_count: int
class Task(BaseModel):
"""
被动技能
"""
name: str
nameZh: str
status: bool
class GroupResult(BaseModel):
"""
群组返回数据
"""
group: Group
level: int
status: bool
close_plugins: List[str]
task: List[Task]
class RequestResult(BaseModel):
"""
好友/群组请求管理
"""
oid: str
id: int
flag: str
nickname: Optional[str]
level: Optional[int]
sex: Optional[str]
age: Optional[int]
from_: Optional[str]
comment: Optional[str]
invite_group: Optional[int]
group_name: Optional[str]
class SystemStatus(BaseModel):
"""
系统状态
"""
cpu: float
memory: float
disk: float
check_time: datetime
class SystemNetwork(BaseModel):
"""
系统网络状态
"""
baidu: int
google: int
class SystemFolderSize(BaseModel):
"""
资源文件占比
"""
font_dir_size: float
image_dir_size: float
text_dir_size: float
record_dir_size: float
temp_dir_size: float
data_dir_size: float
log_dir_size: float
check_time: datetime
class SystemStatusList(BaseModel):
"""
状态记录
"""
cpu_data: List[Dict[str, Union[float, str]]]
memory_data: List[Dict[str, Union[float, str]]]
disk_data: List[Dict[str, Union[float, str]]]
class SystemResult(BaseModel):
"""
系统api返回
"""
status: SystemStatus
network: SystemNetwork
disk: SystemFolderSize
check_time: datetime

View File

@ -0,0 +1,61 @@
from typing import Any, List
from pydantic import BaseModel
class UpdatePlugin(BaseModel):
"""
插件修改参数
"""
module: str
"""模块"""
default_status: bool
"""默认开关"""
limit_superuser: bool
"""限制超级用户"""
cost_gold: int
"""金币花费"""
cmd: List[str]
"""插件别名"""
menu_type: str
"""插件菜单类型"""
group_level: int
"""插件所需群权限"""
block_type: str
"""禁用类型"""
class UpdateConfig(BaseModel):
"""
配置项修改参数
"""
module: str
"""模块"""
key: str
"""配置项key"""
value: Any
"""配置项值"""
class UpdateGroup(BaseModel):
group_id: int
"""群号"""
status: bool
"""状态"""
level: int
"""群权限"""
class HandleRequest(BaseModel):
"""
操作请求接收数据
"""
id: int
handle: str
type: str

59
plugins/web_ui/utils.py Normal file
View File

@ -0,0 +1,59 @@
from datetime import datetime, timedelta
from typing import Any, Optional
import nonebot
import ujson as json
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from configs.config import Config
from configs.path_config import DATA_PATH
from .models.model import Result, User
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/login")
token_file = DATA_PATH / "web_ui" / "token.json"
token_file.parent.mkdir(parents=True, exist_ok=True)
token_data = {"token": []}
if token_file.exists():
try:
token_data = json.load(open(token_file, "r", encoding="utf8"))
except json.JSONDecodeError:
pass
def get_user(uname: str) -> Optional[User]:
username = Config.get_config("web-ui", "username")
password = Config.get_config("web-ui", "password")
if username and password and uname == username:
return User(username=username, password=password)
def create_token(user: User, expires_delta: Optional[timedelta] = None):
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
return jwt.encode(
claims={"sub": user.username, "exp": expire},
key=SECRET_KEY,
algorithm=ALGORITHM,
)
def authentication():
# if token not in token_data["token"]:
def inner(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username, expire = payload.get("sub"), payload.get("exp")
user = get_user(username) # type: ignore
if user is None:
raise JWTError
except JWTError:
raise HTTPException(status_code=400, detail="登录验证失败或已失效, 踢出房间!")
return Depends(inner)

View File

@ -67,18 +67,24 @@ class StaticData(Generic[T]):
temp[k] = copy.deepcopy(v)
return temp
def save(self, path: Union[str, Path] = None) -> NoReturn:
def save(self, path: Optional[Union[str, Path]] = None):
path = path or self.file
if isinstance(path, str):
path = Path(path)
if path:
with open(path, "w", encoding="utf8") as f:
if path.name.endswith("yaml"):
yaml.dump(self._data, f, indent=2, Dumper=yaml.RoundTripDumper, allow_unicode=True)
yaml.dump(
self._data,
f,
indent=2,
Dumper=yaml.RoundTripDumper,
allow_unicode=True,
)
else:
json.dump(self.dict(), f, ensure_ascii=False, indent=4)
def reload(self) -> NoReturn:
def reload(self):
if self.file.exists():
if self.file.name.endswith("json"):
self._data: dict = json.load(open(self.file, "r", encoding="utf8"))
@ -94,7 +100,7 @@ class StaticData(Generic[T]):
def __str__(self) -> str:
return str(self._data)
def __setitem__(self, key, value) -> NoReturn:
def __setitem__(self, key, value):
self._data[key] = value
def __getitem__(self, key) -> T:

View File

@ -140,4 +140,4 @@ class PluginData(BaseModel):
)
def __hash__(self):
return hash(self.name + self.menu_type[0])
return hash(self.name + str(self.menu_type[0]))

View File

@ -24,7 +24,7 @@ class PluginDataManager(StaticData[PluginData]):
raise ValueError(f"PluginInfoManager {info.model}:{info.name} 插件名称及类型已存在")
self._data[info.model] = info
def get(self, item: str, default: Any = None) -> PluginData:
def get(self, item: str, default: Any = None) -> Optional[PluginData]:
return self._data.get(item, default)
def __getitem__(self, item) -> Optional[PluginData]: