Merge pull request #1513 from HibiKier/dev

Dev
This commit is contained in:
HibiKier 2024-01-25 18:36:12 +08:00 committed by GitHub
commit 379dd4d598
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
44 changed files with 2586 additions and 1199 deletions

View File

@ -335,6 +335,10 @@ PS: **ARM平台** 请使用全量版 同时 **如果你的机器 RAM < 1G 可能
## 更新
### 2024/1/25
* 重构webui
### 2023/12/28
* 修复B站动态获取失败的时候会发送空消息

View File

@ -1,5 +1,5 @@
from nonebot import on_message
from nonebot.adapters.onebot.v11 import GroupMessageEvent, MessageEvent
from nonebot.adapters.onebot.v11 import Bot, GroupMessageEvent, MessageEvent
from configs.config import Config
from models.chat_history import ChatHistory
@ -32,7 +32,7 @@ TEMP_LIST = []
@chat_history.handle()
async def _(event: MessageEvent, msg: str = PlaintText()):
async def _(bot: Bot, event: MessageEvent, msg: str = PlaintText()):
group_id = None
if isinstance(event, GroupMessageEvent):
group_id = str(event.group_id)
@ -42,6 +42,7 @@ async def _(event: MessageEvent, msg: str = PlaintText()):
group_id=group_id,
text=str(event.get_message()),
plain_text=msg,
bot_id=str(bot.self_id),
)
)

View File

@ -1,8 +1,9 @@
from utils.manager import plugins2settings_manager, admin_manager, plugin_data_manager
import nonebot
from services.log import logger
from utils.manager import admin_manager, plugin_data_manager, plugins2settings_manager
from utils.manager.models import PluginType
from utils.utils import get_matchers
import nonebot
def init_plugins_settings():
@ -18,7 +19,10 @@ def init_plugins_settings():
# logger.warning(f"配置文件 模块:{x} 获取 plugin_name 失败...{e}")
for matcher in get_matchers(True):
try:
if matcher.plugin_name not in plugins2settings_manager.keys():
if (
matcher.plugin_name
and matcher.plugin_name not in plugins2settings_manager.keys()
):
if _plugin := matcher.plugin:
try:
_module = _plugin.module
@ -27,18 +31,26 @@ def init_plugins_settings():
else:
if plugin_data := plugin_data_manager.get(matcher.plugin_name):
if plugin_settings := plugin_data.plugin_setting:
if (name := _module.__getattribute__("__zx_plugin_name__")) not in plugin_settings.cmd:
if (
name := _module.__getattribute__(
"__zx_plugin_name__"
)
) not in plugin_settings.cmd:
plugin_settings.cmd.append(name)
# 管理员命令
if plugin_data.plugin_type == PluginType.ADMIN:
admin_manager.add_admin_plugin_settings(
matcher.plugin_name, plugin_settings.cmd, plugin_settings.level
matcher.plugin_name,
plugin_settings.cmd,
plugin_settings.level,
)
else:
plugins2settings_manager.add_plugin_settings(
matcher.plugin_name, plugin_settings
)
except Exception as e:
logger.error(f'{matcher.plugin_name} 初始化 plugin_settings 发生错误 {type(e)}{e}')
logger.error(
f"{matcher.plugin_name} 初始化 plugin_settings 发生错误 {type(e)}{e}"
)
plugins2settings_manager.save()
logger.info(f"已成功加载 {len(plugins2settings_manager.get_data())} 个非限制插件.")

View File

@ -58,9 +58,12 @@ async def _(bot: Bot, event: FriendRequestEvent):
if Config.get_config("invite_manager", "AUTO_ADD_FRIEND"):
logger.debug(f"已开启好友请求自动同意,成功通过该请求", "好友请求", target=event.user_id)
await bot.set_friend_add_request(flag=event.flag, approve=True)
await FriendUser.create(user_id=str(user["user_id"]), user_name=user["nickname"])
await FriendUser.create(
user_id=str(user["user_id"]), user_name=user["nickname"]
)
else:
requests_manager.add_request(
str(bot.self_id),
event.user_id,
"private",
event.flag,
@ -126,6 +129,7 @@ async def _(bot: Bot, event: GroupRequestEvent):
"等待管理员处理吧!",
)
requests_manager.add_request(
str(bot.self_id),
event.user_id,
"group",
event.flag,

View File

@ -21,6 +21,8 @@ class ChatHistory(Model):
"""纯文本"""
create_time = fields.DatetimeField(auto_now_add=True)
"""创建时间"""
bot_id = fields.CharField(255, null=True)
"""bot记录id"""
class Meta:
table = "chat_history"
@ -56,7 +58,9 @@ class ChatHistory(Model):
) # type: ignore
@classmethod
async def get_group_first_msg_datetime(cls, group_id: Union[int, str]) -> Optional[datetime]:
async def get_group_first_msg_datetime(
cls, group_id: Union[int, str]
) -> Optional[datetime]:
"""
说明:
获取群第一条记录消息时间
@ -117,4 +121,6 @@ class ChatHistory(Model):
"ALTER TABLE chat_history RENAME COLUMN user_qq TO user_id;", # 将user_id改为user_id
"ALTER TABLE chat_history ALTER COLUMN user_id TYPE character varying(255);",
"ALTER TABLE chat_history ALTER COLUMN group_id TYPE character varying(255);",
"ALTER TABLE chat_history ADD bot_id VARCHAR(255);", # 添加bot_id字段
"ALTER TABLE chat_history ALTER COLUMN bot_id TYPE character varying(255);",
]

View File

@ -0,0 +1,26 @@
from enum import Enum
from typing import NamedTuple
class SearchType(Enum):
"""
查询类型
"""
DAY = "day_statistics"
""""""
WEEK = "week_statistics"
""""""
MONTH = "month_statistics"
""""""
TOTAL = "total_statistics"
"""总数"""
class ParseData(NamedTuple):
global_search: bool
"""是否全局搜索"""
search_type: SearchType
"""搜索类型"""

View File

@ -1,13 +1,13 @@
import asyncio
import os
from typing import Tuple
from nonebot import on_command
from nonebot.adapters.onebot.v11 import Bot, GroupMessageEvent, Message, MessageEvent
from nonebot.params import Command, CommandArg
from nonebot.params import CommandArg
from configs.path_config import DATA_PATH, IMAGE_PATH
from models.group_info import GroupInfo
from utils.depends import OneCommand
from utils.image_utils import BuildMat
from utils.manager import plugins2settings_manager
from utils.message_builder import image
@ -91,7 +91,7 @@ statistics_user_file = DATA_PATH / "statistics" / "_prefix_user_count.json"
@statistics.handle()
async def _(bot: Bot, event: MessageEvent, cmd: Tuple[str, ...] = Command(), arg: Message = CommandArg()):
async def _(bot: Bot, event: MessageEvent, cmd: str = OneCommand(), arg: Message = CommandArg()):
msg = arg.extract_plain_text().strip()
if cmd[0][:2] == "全局":
if str(event.user_id) in bot.config.superusers:

View File

@ -6,11 +6,10 @@ from nonebot.message import run_postprocessor
from nonebot.typing import Optional, T_State
from configs.path_config import DATA_PATH
from models.statistics import Statistics
from utils.manager import plugins2settings_manager
from utils.utils import scheduler
from ._model import Statistics
try:
import ujson as json
except ModuleNotFoundError:
@ -21,69 +20,69 @@ __zx_plugin_name__ = "功能调用统计 [Hidden]"
__plugin_version__ = 0.1
__plugin_author__ = "HibiKier"
statistics_group_file = DATA_PATH / "statistics" / "_prefix_count.json"
statistics_user_file = DATA_PATH / "statistics" / "_prefix_user_count.json"
# statistics_group_file = DATA_PATH / "statistics" / "_prefix_count.json"
# statistics_user_file = DATA_PATH / "statistics" / "_prefix_user_count.json"
try:
with open(statistics_group_file, "r", encoding="utf8") as f:
_prefix_count_dict = json.load(f)
except (FileNotFoundError, ValueError):
_prefix_count_dict = {
"total_statistics": {
"total": {},
},
"day_statistics": {
"total": {},
},
"week_statistics": {
"total": {},
},
"month_statistics": {
"total": {},
},
"start_time": str(datetime.now().date()),
"day_index": 0,
}
# try:
# with open(statistics_group_file, "r", encoding="utf8") as f:
# _prefix_count_dict = json.load(f)
# except (FileNotFoundError, ValueError):
# _prefix_count_dict = {
# "total_statistics": {
# "total": {},
# },
# "day_statistics": {
# "total": {},
# },
# "week_statistics": {
# "total": {},
# },
# "month_statistics": {
# "total": {},
# },
# "start_time": str(datetime.now().date()),
# "day_index": 0,
# }
try:
with open(statistics_user_file, "r", encoding="utf8") as f:
_prefix_user_count_dict = json.load(f)
except (FileNotFoundError, ValueError):
_prefix_user_count_dict = {
"total_statistics": {
"total": {},
},
"day_statistics": {
"total": {},
},
"week_statistics": {
"total": {},
},
"month_statistics": {
"total": {},
},
"start_time": str(datetime.now().date()),
"day_index": 0,
}
# try:
# with open(statistics_user_file, "r", encoding="utf8") as f:
# _prefix_user_count_dict = json.load(f)
# except (FileNotFoundError, ValueError):
# _prefix_user_count_dict = {
# "total_statistics": {
# "total": {},
# },
# "day_statistics": {
# "total": {},
# },
# "week_statistics": {
# "total": {},
# },
# "month_statistics": {
# "total": {},
# },
# "start_time": str(datetime.now().date()),
# "day_index": 0,
# }
# 以前版本转换
if _prefix_count_dict.get("day_index") is None:
tmp = _prefix_count_dict.copy()
_prefix_count_dict = {
"total_statistics": tmp["total_statistics"],
"day_statistics": {
"total": {},
},
"week_statistics": {
"total": {},
},
"month_statistics": {
"total": {},
},
"start_time": tmp["start_time"],
"day_index": 0,
}
# # 以前版本转换
# if _prefix_count_dict.get("day_index") is None:
# tmp = _prefix_count_dict.copy()
# _prefix_count_dict = {
# "total_statistics": tmp["total_statistics"],
# "day_statistics": {
# "total": {},
# },
# "week_statistics": {
# "total": {},
# },
# "month_statistics": {
# "total": {},
# },
# "start_time": tmp["start_time"],
# "day_index": 0,
# }
# 添加命令次数
@ -95,7 +94,7 @@ async def _(
event: MessageEvent,
state: T_State,
):
global _prefix_count_dict
# global _prefix_count_dict
if (
matcher.type == "message"
and matcher.priority not in [1, 999]
@ -107,112 +106,112 @@ async def _(
plugin_name=matcher.plugin_name,
create_time=datetime.now(),
)
module = matcher.plugin_name
day_index = _prefix_count_dict["day_index"]
try:
group_id = str(event.group_id)
except AttributeError:
group_id = "total"
user_id = str(event.user_id)
plugin_name = plugins2settings_manager.get_plugin_data(module)
if plugin_name and plugin_name.cmd:
plugin_name = plugin_name.cmd[0]
check_exists_key(group_id, user_id, plugin_name)
for data in [_prefix_count_dict, _prefix_user_count_dict]:
data["total_statistics"]["total"][plugin_name] += 1
data["day_statistics"]["total"][plugin_name] += 1
data["week_statistics"]["total"][plugin_name] += 1
data["month_statistics"]["total"][plugin_name] += 1
# print(_prefix_count_dict)
if group_id != "total":
for data in [_prefix_count_dict, _prefix_user_count_dict]:
if data == _prefix_count_dict:
key = group_id
else:
key = user_id
data["total_statistics"][key][plugin_name] += 1
data["day_statistics"][key][plugin_name] += 1
data["week_statistics"][key][str(day_index % 7)][plugin_name] += 1
data["month_statistics"][key][str(day_index % 30)][plugin_name] += 1
with open(statistics_group_file, "w", encoding="utf8") as f:
json.dump(_prefix_count_dict, f, indent=4, ensure_ascii=False)
with open(statistics_user_file, "w", encoding="utf8") as f:
json.dump(_prefix_user_count_dict, f, ensure_ascii=False, indent=4)
# module = matcher.plugin_name
# day_index = _prefix_count_dict["day_index"]
# try:
# group_id = str(event.group_id)
# except AttributeError:
# group_id = "total"
# user_id = str(event.user_id)
# plugin_name = plugins2settings_manager.get_plugin_data(module)
# if plugin_name and plugin_name.cmd:
# plugin_name = plugin_name.cmd[0]
# check_exists_key(group_id, user_id, plugin_name)
# for data in [_prefix_count_dict, _prefix_user_count_dict]:
# data["total_statistics"]["total"][plugin_name] += 1
# data["day_statistics"]["total"][plugin_name] += 1
# data["week_statistics"]["total"][plugin_name] += 1
# data["month_statistics"]["total"][plugin_name] += 1
# # print(_prefix_count_dict)
# if group_id != "total":
# for data in [_prefix_count_dict, _prefix_user_count_dict]:
# if data == _prefix_count_dict:
# key = group_id
# else:
# key = user_id
# data["total_statistics"][key][plugin_name] += 1
# data["day_statistics"][key][plugin_name] += 1
# data["week_statistics"][key][str(day_index % 7)][plugin_name] += 1
# data["month_statistics"][key][str(day_index % 30)][plugin_name] += 1
# with open(statistics_group_file, "w", encoding="utf8") as f:
# json.dump(_prefix_count_dict, f, indent=4, ensure_ascii=False)
# with open(statistics_user_file, "w", encoding="utf8") as f:
# json.dump(_prefix_user_count_dict, f, ensure_ascii=False, indent=4)
def check_exists_key(group_id: str, user_id: str, plugin_name: str):
global _prefix_count_dict, _prefix_user_count_dict
for data in [_prefix_count_dict, _prefix_user_count_dict]:
if data == _prefix_count_dict:
key = group_id
else:
key = user_id
if not data["total_statistics"]["total"].get(plugin_name):
data["total_statistics"]["total"][plugin_name] = 0
if not data["day_statistics"]["total"].get(plugin_name):
data["day_statistics"]["total"][plugin_name] = 0
if not data["week_statistics"]["total"].get(plugin_name):
data["week_statistics"]["total"][plugin_name] = 0
if not data["month_statistics"]["total"].get(plugin_name):
data["month_statistics"]["total"][plugin_name] = 0
# def check_exists_key(group_id: str, user_id: str, plugin_name: str):
# global _prefix_count_dict, _prefix_user_count_dict
# for data in [_prefix_count_dict, _prefix_user_count_dict]:
# if data == _prefix_count_dict:
# key = group_id
# else:
# key = user_id
# if not data["total_statistics"]["total"].get(plugin_name):
# data["total_statistics"]["total"][plugin_name] = 0
# if not data["day_statistics"]["total"].get(plugin_name):
# data["day_statistics"]["total"][plugin_name] = 0
# if not data["week_statistics"]["total"].get(plugin_name):
# data["week_statistics"]["total"][plugin_name] = 0
# if not data["month_statistics"]["total"].get(plugin_name):
# data["month_statistics"]["total"][plugin_name] = 0
if not data["total_statistics"].get(key):
data["total_statistics"][key] = {}
if not data["total_statistics"][key].get(plugin_name):
data["total_statistics"][key][plugin_name] = 0
if not data["day_statistics"].get(key):
data["day_statistics"][key] = {}
if not data["day_statistics"][key].get(plugin_name):
data["day_statistics"][key][plugin_name] = 0
# if not data["total_statistics"].get(key):
# data["total_statistics"][key] = {}
# if not data["total_statistics"][key].get(plugin_name):
# data["total_statistics"][key][plugin_name] = 0
# if not data["day_statistics"].get(key):
# data["day_statistics"][key] = {}
# if not data["day_statistics"][key].get(plugin_name):
# data["day_statistics"][key][plugin_name] = 0
if key != "total":
if not data["week_statistics"].get(key):
data["week_statistics"][key] = {}
if data["week_statistics"][key].get("0") is None:
for i in range(7):
data["week_statistics"][key][str(i)] = {}
if data["week_statistics"][key]["0"].get(plugin_name) is None:
for i in range(7):
data["week_statistics"][key][str(i)][plugin_name] = 0
# if key != "total":
# if not data["week_statistics"].get(key):
# data["week_statistics"][key] = {}
# if data["week_statistics"][key].get("0") is None:
# for i in range(7):
# data["week_statistics"][key][str(i)] = {}
# if data["week_statistics"][key]["0"].get(plugin_name) is None:
# for i in range(7):
# data["week_statistics"][key][str(i)][plugin_name] = 0
if not data["month_statistics"].get(key):
data["month_statistics"][key] = {}
if data["month_statistics"][key].get("0") is None:
for i in range(30):
data["month_statistics"][key][str(i)] = {}
if data["month_statistics"][key]["0"].get(plugin_name) is None:
for i in range(30):
data["month_statistics"][key][str(i)][plugin_name] = 0
# if not data["month_statistics"].get(key):
# data["month_statistics"][key] = {}
# if data["month_statistics"][key].get("0") is None:
# for i in range(30):
# data["month_statistics"][key][str(i)] = {}
# if data["month_statistics"][key]["0"].get(plugin_name) is None:
# for i in range(30):
# data["month_statistics"][key][str(i)][plugin_name] = 0
# 天
@scheduler.scheduled_job(
"cron",
hour=0,
minute=1,
)
async def _():
for data in [_prefix_count_dict, _prefix_user_count_dict]:
data["day_index"] += 1
for x in data["day_statistics"].keys():
for key in data["day_statistics"][x].keys():
try:
data["day_statistics"][x][key] = 0
except KeyError:
pass
for type_ in ["week_statistics", "month_statistics"]:
index = str(
data["day_index"] % 7
if type_ == "week_statistics"
else data["day_index"] % 30
)
for x in data[type_].keys():
try:
for key in data[type_][x][index].keys():
data[type_][x][index][key] = 0
except KeyError:
pass
with open(statistics_group_file, "w", encoding="utf8") as f:
json.dump(_prefix_count_dict, f, indent=4, ensure_ascii=False)
with open(statistics_user_file, "w", encoding="utf8") as f:
json.dump(_prefix_user_count_dict, f, indent=4, ensure_ascii=False)
# @scheduler.scheduled_job(
# "cron",
# hour=0,
# minute=1,
# )
# async def _():
# for data in [_prefix_count_dict, _prefix_user_count_dict]:
# data["day_index"] += 1
# for x in data["day_statistics"].keys():
# for key in data["day_statistics"][x].keys():
# try:
# data["day_statistics"][x][key] = 0
# except KeyError:
# pass
# for type_ in ["week_statistics", "month_statistics"]:
# index = str(
# data["day_index"] % 7
# if type_ == "week_statistics"
# else data["day_index"] % 30
# )
# for x in data[type_].keys():
# try:
# for key in data[type_][x][index].keys():
# data[type_][x][index][key] = 0
# except KeyError:
# pass
# with open(statistics_group_file, "w", encoding="utf8") as f:
# json.dump(_prefix_count_dict, f, indent=4, ensure_ascii=False)
# with open(statistics_user_file, "w", encoding="utf8") as f:
# json.dump(_prefix_user_count_dict, f, indent=4, ensure_ascii=False)

View File

@ -0,0 +1,16 @@
from typing import List
from nonebot.adapters.onebot.v11 import MessageEvent
from ._config import SearchType
def parse_data(cmd: str, event: MessageEvent, superusers: List[str]):
search_type = SearchType.TOTAL
if cmd[:2] == "全局":
if str(event.user_id) in superusers:
if cmd[2] == '':
search_type = SearchType.DAY
elif cmd[2] == '':
_type = SearchType.WEEK
elif cmd[2] == '':
_type = SearchType.MONTH

View File

@ -1,56 +1,74 @@
# import asyncio
import asyncio
# import nonebot
# from fastapi import APIRouter, FastAPI
# from nonebot.adapters.onebot.v11 import Bot, MessageEvent
# from nonebot.log import default_filter, default_format
# from nonebot.matcher import Matcher
# from nonebot.message import run_preprocessor
# from nonebot.typing import T_State
import nonebot
from fastapi import APIRouter, FastAPI
from nonebot.adapters.onebot.v11 import Bot, MessageEvent
from nonebot.log import default_filter, default_format
from nonebot.matcher import Matcher
from nonebot.message import run_preprocessor
from nonebot.typing import T_State
# from configs.config import Config as gConfig
# from services.log import logger, logger_
# from utils.manager import plugins2settings_manager
from configs.config import Config as gConfig
from services.log import logger, logger_
from utils.manager import plugins2settings_manager
# from .api.base_info import router as base_info_routes
# from .api.group import router as group_routes
# from .api.logs import router as ws_routes
# from .api.logs.log_manager import LOG_STORAGE
# from .api.plugins import router as plugin_routes
# from .api.request import router as request_routes
# from .api.system import router as system_routes
from .api.logs import router as ws_log_routes
from .api.logs.log_manager import LOG_STORAGE
from .api.tabs.database import router as database_router
from .api.tabs.main import router as main_router
from .api.tabs.main import ws_router as status_routes
from .api.tabs.manage import router as manage_router
from .api.tabs.manage import ws_router as chat_routes
from .api.tabs.plugin_manage import router as plugin_router
from .api.tabs.system import router as system_router
from .auth import router as auth_router
# # from .api.g import *
# from .auth import router as auth_router
driver = nonebot.get_driver()
# driver = nonebot.get_driver()
gConfig.add_plugin_config("web-ui", "username", "admin", name="web-ui", help_="前端管理用户名")
# gConfig.add_plugin_config("web-ui", "username", "admin", name="web-ui", help_="前端管理用户名")
# gConfig.add_plugin_config("web-ui", "password", None, name="web-ui", help_="前端管理密码")
gConfig.add_plugin_config("web-ui", "password", None, name="web-ui", help_="前端管理密码")
# BaseApiRouter = APIRouter(prefix="/zhenxun/api")
# BaseApiRouter.include_router(auth_router)
# BaseApiRouter.include_router(plugin_routes)
# BaseApiRouter.include_router(group_routes)
# BaseApiRouter.include_router(request_routes)
# BaseApiRouter.include_router(system_routes)
# BaseApiRouter.include_router(base_info_routes)
BaseApiRouter = APIRouter(prefix="/zhenxun/api")
# @driver.on_startup
# def _():
BaseApiRouter.include_router(auth_router)
BaseApiRouter.include_router(main_router)
BaseApiRouter.include_router(manage_router)
BaseApiRouter.include_router(database_router)
BaseApiRouter.include_router(plugin_router)
BaseApiRouter.include_router(system_router)
# loop = asyncio.get_running_loop()
# def log_sink(message: str):
# loop.create_task(LOG_STORAGE.add(message.rstrip("\n")))
WsApiRouter = APIRouter(prefix="/zhenxun/socket")
# logger_.add(log_sink, colorize=True, filter=default_filter, format=default_format)
WsApiRouter.include_router(ws_log_routes)
WsApiRouter.include_router(status_routes)
WsApiRouter.include_router(chat_routes)
# app: FastAPI = nonebot.get_app()
# app.include_router(BaseApiRouter)
# app.include_router(ws_routes)
# logger.info("<g>API启动成功</g>", "Web UI")
@driver.on_startup
def _():
try:
async def log_sink(message: str):
loop = None
if not loop:
try:
loop = asyncio.get_running_loop()
except Exception as e:
logger.warning('Web Ui log_sink', e=e)
if not loop:
loop = asyncio.new_event_loop()
loop.create_task(LOG_STORAGE.add(message.rstrip("\n")))
logger_.add(
log_sink, colorize=True, filter=default_filter, format=default_format
)
app: FastAPI = nonebot.get_app()
app.include_router(BaseApiRouter)
app.include_router(WsApiRouter)
logger.info("<g>API启动成功</g>", "Web UI")
except Exception as e:
logger.error("<g>API启动失败</g>", "Web UI", e=e)

View File

@ -1,4 +1 @@
from .group import *
from .plugins import *
from .request import *
from .system import *
from .tabs import *

View File

@ -1,75 +0,0 @@
from datetime import datetime, timedelta
from typing import List, Optional
import nonebot
from fastapi import APIRouter
from configs.config import Config
from models.chat_history import ChatHistory
from services.log import logger
from utils.manager import plugin_data_manager, plugins2settings_manager, plugins_manager
from utils.manager.models import PluginData, PluginType
from ..models.model import BotInfo, Result
from ..models.params import UpdateConfig, UpdatePlugin
from ..utils import authentication
AVA_URL = "http://q1.qlogo.cn/g?b=qq&nk={}&s=160"
router = APIRouter()
@router.get("/get_bot_info", dependencies=[authentication()])
async def _(self_id: Optional[str] = None) -> Result:
"""
获取Bot基础信息
Args:
qq (Optional[str], optional): qq号. Defaults to None.
Returns:
Result: 获取指定bot信息与bot列表
"""
bot_list: List[BotInfo] = []
if bots := nonebot.get_bots():
select_bot: BotInfo
for key, bot in bots.items():
bot_list.append(
BotInfo(
bot=bot, # type: ignore
self_id=bot.self_id,
nickname="可爱的小真寻",
ava_url=AVA_URL.format(bot.self_id),
)
)
if _bl := [b for b in bot_list if b.self_id == self_id]:
select_bot = _bl[0]
else:
select_bot = bot_list[0]
select_bot.is_select = True
now = datetime.now()
select_bot.received_messages = await ChatHistory.filter(
bot_id=int(select_bot.self_id)
).count()
select_bot.received_messages_day = await ChatHistory.filter(
bot_id=int(select_bot.self_id),
create_time__gte=now - timedelta(hours=now.hour),
).count()
select_bot.received_messages_week = await ChatHistory.filter(
bot_id=int(select_bot.self_id),
create_time__gte=now - timedelta(days=7),
).count()
select_bot.group_count = len(await select_bot.bot.get_group_list())
select_bot.friend_count = len(await select_bot.bot.get_friend_list())
for bot in bot_list:
bot.bot = None # type: ignore
# 插件加载数量
select_bot.plugin_count = len(plugins2settings_manager)
pm_data = plugins_manager.get_data()
select_bot.fail_plugin_count = len([pd for pd in pm_data if pm_data[pd].error])
select_bot.success_plugin_count = (
select_bot.plugin_count - select_bot.fail_plugin_count
)
return Result.ok(bot_list, "已获取操作列表")
return Result.fail("无Bot连接")

View File

@ -1,69 +0,0 @@
from fastapi import APIRouter
from pydantic.error_wrappers import ValidationError
from services.log import logger
from utils.manager import group_manager
from utils.utils import get_bot
from ..models.model import Group, GroupResult, Result, Task
from ..models.params import UpdateGroup
from ..utils import authentication
router = APIRouter()
@router.get("/get_group", dependencies=[authentication()])
async def _() -> Result:
"""
获取群信息
"""
group_list_result = []
try:
group_info = {}
if bot := get_bot():
group_list = await bot.get_group_list()
for g in group_list:
group_info[g["group_id"]] = Group(**g)
group_data = group_manager.get_data()
for group_id in group_data.group_manager:
task_list = []
data = group_manager[group_id].dict()
for tn, status in data["group_task_status"].items():
task_list.append(
Task(
**{
"name": tn,
"nameZh": group_manager.get_task_data().get(tn) or tn,
"status": status,
}
)
)
data["task"] = task_list
if x := group_info.get(int(group_id)):
data["group"] = x
else:
continue
group_list_result.append(GroupResult(**data))
except Exception as e:
logger.error("调用API错误", "/get_group", e=e)
return Result.fail(f"{type(e)}: {e}")
return Result.ok(group_list_result, "拿到了新鲜出炉的数据!")
@router.post("/update_group", dependencies=[authentication()])
async def _(group: UpdateGroup) -> Result:
"""
修改群信息
"""
try:
group_id = group.group_id
group_manager.set_group_level(group_id, group.level)
if group.status:
group_manager.turn_on_group_bot_status(group_id)
else:
group_manager.shutdown_group_bot_status(group_id)
group_manager.save()
except Exception as e:
logger.error("调用API错误", "/get_group", e=e)
return Result.fail(f"{type(e)}: {e}")
return Result.ok(info="已完成记录!")

View File

@ -1,6 +1,6 @@
import asyncio
import re
from typing import Awaitable, Callable, ClassVar, Dict, Generic, List, Set, TypeVar
from typing import Awaitable, Callable, Dict, Generic, List, Set, TypeVar
from urllib.parse import urlparse
PATTERN = r"\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))"
@ -10,24 +10,17 @@ LogListener = Callable[[_T], Awaitable[None]]
class LogStorage(Generic[_T]):
"""
日志存储
"""
def __init__(self, rotation: float = 5 * 60):
self.count, self.rotation = 0, rotation
self.logs: Dict[int, str] = {}
self.listeners: Set[LogListener[str]] = set()
async def add(self, log: str):
log = re.sub(PATTERN, "", log)
log_split = log.split()
time = log_split[0] + " " + log_split[1]
level = log_split[2]
main = log_split[3]
type_ = None
log_ = " ".join(log_split[3:])
if "Calling API" in log_:
sp = log_.split("|")
type_ = sp[1]
log_ = "|".join(log_[1:])
data = {"time": time, "level": level, "main": main, "type": type_, "log": log_}
seq = self.count = self.count + 1
self.logs[seq] = log
asyncio.get_running_loop().call_later(self.rotation, self.remove, seq)
@ -42,4 +35,5 @@ class LogStorage(Generic[_T]):
return
LOG_STORAGE = LogStorage[str]()
LOG_STORAGE: LogStorage[str] = LogStorage[str]()

View File

@ -2,7 +2,7 @@ from typing import List
from fastapi import APIRouter, WebSocket
from loguru import logger
from nonebot.utils import escape_tag, run_sync
from nonebot.utils import escape_tag
from starlette.websockets import WebSocket, WebSocketDisconnect, WebSocketState
from .log_manager import LOG_STORAGE
@ -12,7 +12,12 @@ router = APIRouter()
@router.get("/logs", response_model=List[str])
async def system_logs_history(reverse: bool = False):
return LOG_STORAGE.list(reverse=reverse)
"""历史日志
参数:
reverse: 反转顺序.
"""
return LOG_STORAGE.list(reverse=reverse) # type: ignore
@router.websocket("/logs")

View File

@ -1,115 +0,0 @@
from typing import Optional
import cattrs
from fastapi import APIRouter
from configs.config import Config
from services.log import logger
from utils.manager import plugin_data_manager, plugins2settings_manager, plugins_manager
from utils.manager.models import PluginData, PluginType
from ..config import *
from ..models.model import Plugin, PluginConfig, Result
from ..models.params import UpdateConfig, UpdatePlugin
from ..utils import authentication
router = APIRouter()
@router.get("/get_plugins", dependencies=[authentication()])
def _(
plugin_type: PluginType,
) -> Result:
"""
获取插件列表
:param plugin_type: 类型 normal, superuser, hidden, admin
"""
try:
plugin_list = []
for module in plugin_data_manager.keys():
plugin_data: Optional[PluginData] = plugin_data_manager[module]
if plugin_data and plugin_data.plugin_type == plugin_type:
plugin_config = None
if plugin_data.plugin_configs:
plugin_config = {}
for key in plugin_data.plugin_configs:
plugin_config[key] = PluginConfig(
key=key,
module=module,
has_type=bool(plugin_data.plugin_configs[key].type),
**plugin_data.plugin_configs[key].dict(),
)
plugin_list.append(
Plugin(
model=module,
plugin_settings=plugin_data.plugin_setting,
plugin_manager=plugin_data.plugin_status,
plugin_config=plugin_config,
cd_limit=plugin_data.plugin_cd,
block_limit=plugin_data.plugin_block,
count_limit=plugin_data.plugin_count,
)
)
except Exception as e:
logger.error("调用API错误", "/get_plugins", e=e)
return Result.fail(f"{type(e)}: {e}")
return Result.ok(plugin_list, "拿到了新鲜出炉的数据!")
@router.post("/update_plugins", dependencies=[authentication()])
def _(plugin: UpdatePlugin) -> Result:
"""
修改插件信息
:param plugin: 插件内容
"""
try:
module = plugin.module
if p2s := plugins2settings_manager.get(module):
p2s.default_status = plugin.default_status
p2s.limit_superuser = plugin.limit_superuser
p2s.cost_gold = plugin.cost_gold
p2s.cmd = plugin.cmd
p2s.level = plugin.group_level
if pd := plugin_data_manager.get(module):
menu_lin = None
if len(pd.menu_type) > 1:
menu_lin = pd.menu_type[1]
if menu_lin is not None:
pd.menu_type = (plugin.menu_type, menu_lin)
else:
pd.menu_type = (plugin.menu_type,)
if pm := plugins_manager.get(module):
if plugin.block_type:
pm.block_type = plugin.block_type
pm.status = False
else:
pm.block_type = None
pm.status = True
plugins2settings_manager.save()
plugins_manager.save()
except Exception as e:
logger.error("调用API错误", "/update_plugins", e=e)
return Result.fail(f"{type(e)}: {e}")
return Result.ok(info="已经帮你写好啦!")
@router.post("/update_config", dependencies=[authentication()])
def _(config_list: List[UpdateConfig]) -> Result:
try:
for config in config_list:
if cg := Config.get(config.module):
if c := cg.configs.get(config.key):
if isinstance(c.value, (list, tuple)) or isinstance(
c.default_value, (list, tuple)
):
value = config.value.split(",")
else:
value = config.value
if c.type and value is not None:
value = cattrs.structure(value, c.type)
Config.set_config(config.module, config.key, value)
except Exception as e:
logger.error("调用API错误", "/update_config", e=e)
return Result.fail(f"{type(e)}: {e}")
Config.save(save_simple_data=True)
return Result.ok(info="写入配置项了哦!")

View File

@ -1,87 +0,0 @@
from typing import Optional
from fastapi import APIRouter
from configs.config import NICKNAME
from models.group_info import GroupInfo
from services.log import logger
from utils.manager import requests_manager
from utils.utils import get_bot
from ..models.model import RequestResult, Result
from ..models.params import HandleRequest
from ..utils import authentication
router = APIRouter()
@router.get("/get_request", dependencies=[authentication()])
def _(request_type: Optional[str]) -> Result:
try:
req_data = requests_manager.get_data()
req_list = []
if request_type in ["group", "private"]:
req_data = req_data[request_type]
for x in req_data:
req_data[x]["oid"] = x
req_list.append(RequestResult(**req_data[x]))
req_list.reverse()
except Exception as e:
logger.error("调用API错误", "/get_request", e=e)
return Result.fail(f"{type(e)}: {e}")
return Result.ok(req_list, f"{NICKNAME}带来了最新的数据!")
@router.delete("/clear_request", dependencies=[authentication()])
def _(request_type: Optional[str]) -> Result:
"""
清空请求
:param type_: 类型
"""
requests_manager.clear(request_type)
return Result.ok(info="成功清除了数据")
@router.post("/handle_request", dependencies=[authentication()])
async def _(parma: HandleRequest) -> Result:
"""
操作请求
:param parma: 参数
"""
try:
result = "操作成功!"
flag = 3
if bot := get_bot():
if parma.handle == "approve":
if parma.type == "group":
if rid := requests_manager.get_group_id(parma.id):
# await GroupInfo.update_or_create(defaults={"group_flag": 1}, )
if group := await GroupInfo.get_or_none(group_id=str(rid)):
await group.update_or_create(group_flag=1)
else:
group_info = await bot.get_group_info(group_id=rid)
await GroupInfo.update_or_create(
group_id=str(group_info["group_id"]),
defaults={
"group_name": group_info["group_name"],
"max_member_count": group_info["max_member_count"],
"member_count": group_info["member_count"],
"group_flag": 1,
},
)
flag = await requests_manager.approve(bot, parma.id, parma.type)
elif parma.handle == "refuse":
flag = await requests_manager.refused(bot, parma.id, parma.type)
elif parma.handle == "delete":
requests_manager.delete_request(parma.id, parma.type)
if parma.handle != "delete":
if flag == 1:
result = "该请求已失效"
requests_manager.delete_request(parma.id, parma.type)
elif flag == 2:
result = "未找到此Id"
return Result.ok(result, "成功处理了请求!")
return Result.fail("Bot未连接")
except Exception as e:
logger.error("调用API错误", "/get_group", e=e)
return Result.fail(f"{type(e)}: {e}")

View File

@ -1,228 +0,0 @@
import asyncio
import os
from datetime import datetime
from pathlib import Path
from typing import Dict, Optional, Union
import psutil
import ujson as json
from fastapi import APIRouter
from configs.path_config import (
DATA_PATH,
FONT_PATH,
IMAGE_PATH,
LOG_PATH,
RECORD_PATH,
TEMP_PATH,
TEXT_PATH,
)
from services.log import logger
from utils.http_utils import AsyncHttpx
from ..models.model import (
Result,
SystemFolderSize,
SystemNetwork,
SystemResult,
SystemStatus,
SystemStatusList,
)
from ..utils import authentication
CPU_DATA_PATH = DATA_PATH / "system" / "cpu.json"
MEMORY_DATA_PATH = DATA_PATH / "system" / "memory.json"
DISK_DATA_PATH = DATA_PATH / "system" / "disk.json"
CPU_DATA_PATH.parent.mkdir(exist_ok=True, parents=True)
cpu_data = {"data": []}
memory_data = {"data": []}
disk_data = {"data": []}
router = APIRouter()
@router.get("/system", dependencies=[authentication()])
async def _() -> Result:
return await get_system_data()
@router.get("/status", dependencies=[authentication()])
async def _() -> Result:
return Result.ok(
await asyncio.get_event_loop().run_in_executor(None, _get_system_status),
)
@router.get("/system/disk", dependencies=[authentication()])
async def _(type_: Optional[str] = None) -> Result:
return Result.ok(
data=await asyncio.get_event_loop().run_in_executor(
None, _get_system_disk, type_
),
)
@router.get("/system/statusList", dependencies=[authentication()])
async def _() -> Result:
global cpu_data, memory_data, disk_data
await asyncio.get_event_loop().run_in_executor(None, _get_system_status)
cpu_rst = cpu_data["data"][-10:] if len(cpu_data["data"]) > 10 else cpu_data["data"]
memory_rst = (
memory_data["data"][-10:]
if len(memory_data["data"]) > 10
else memory_data["data"]
)
disk_rst = (
disk_data["data"][-10:] if len(disk_data["data"]) > 10 else disk_data["data"]
)
return Result.ok(
SystemStatusList(
cpu_data=cpu_rst,
memory_data=memory_rst,
disk_data=disk_rst,
),
)
async def get_system_data():
"""
说明:
获取系统信息资源文件大小网络状态等
"""
baidu = 200
google = 200
try:
await AsyncHttpx.get("https://www.baidu.com/", timeout=5)
except Exception as e:
logger.warning(f"访问BaiDu失败... {type(e)}: {e}")
baidu = 404
try:
await AsyncHttpx.get("https://www.google.com/", timeout=5)
except Exception as e:
logger.warning(f"访问Google失败... {type(e)}: {e}")
google = 404
network = SystemNetwork(baidu=baidu, google=google)
disk = await asyncio.get_event_loop().run_in_executor(None, _get_system_disk, None)
status = await asyncio.get_event_loop().run_in_executor(None, _get_system_status)
return Result.ok(
SystemResult(
status=status,
network=network,
disk=disk, # type: ignore
check_time=datetime.now().replace(microsecond=0),
),
)
def _get_system_status() -> SystemStatus:
"""
说明:
获取系统信息等
"""
cpu = psutil.cpu_percent()
memory = psutil.virtual_memory().percent
disk = psutil.disk_usage("/").percent
save_system_data(cpu, memory, disk)
return SystemStatus(
cpu=cpu,
memory=memory,
disk=disk,
check_time=datetime.now().replace(microsecond=0),
)
def _get_system_disk(
type_: Optional[str],
) -> Union[SystemFolderSize, Dict[str, Union[float, datetime]]]:
"""
说明:
获取资源文件大小等
"""
if not type_:
disk = SystemFolderSize(
font_dir_size=_get_dir_size(FONT_PATH) / 1024 / 1024,
image_dir_size=_get_dir_size(IMAGE_PATH) / 1024 / 1024,
text_dir_size=_get_dir_size(TEXT_PATH) / 1024 / 1024,
record_dir_size=_get_dir_size(RECORD_PATH) / 1024 / 1024,
temp_dir_size=_get_dir_size(TEMP_PATH) / 1024 / 102,
data_dir_size=_get_dir_size(DATA_PATH) / 1024 / 1024,
log_dir_size=_get_dir_size(LOG_PATH) / 1024 / 1024,
check_time=datetime.now().replace(microsecond=0),
)
return disk
else:
if type_ == "image":
dir_path = IMAGE_PATH
elif type_ == "font":
dir_path = FONT_PATH
elif type_ == "text":
dir_path = TEXT_PATH
elif type_ == "record":
dir_path = RECORD_PATH
elif type_ == "data":
dir_path = DATA_PATH
elif type_ == "temp":
dir_path = TEMP_PATH
else:
dir_path = LOG_PATH
dir_map = {}
other_file_size = 0
for file in os.listdir(dir_path):
file = Path(dir_path / file)
if file.is_dir():
dir_map[file.name] = _get_dir_size(file) / 1024 / 1024
else:
other_file_size += os.path.getsize(file) / 1024 / 1024
dir_map["其他文件"] = other_file_size
dir_map["check_time"] = datetime.now().replace(microsecond=0)
return dir_map
def _get_dir_size(dir_path: Path) -> float:
"""
说明:
获取文件夹大小
参数:
:param dir_path: 文件夹路径
"""
size = 0
for root, dirs, files in os.walk(dir_path):
size += sum([os.path.getsize(os.path.join(root, name)) for name in files])
return size
def save_system_data(cpu: float, memory: float, disk: float):
"""
说明:
保存一些系统信息
参数:
:param cpu: cpu
:param memory: memory
:param disk: disk
"""
global cpu_data, memory_data, disk_data
if CPU_DATA_PATH.exists() and not cpu_data["data"]:
with open(CPU_DATA_PATH, "r") as f:
cpu_data = json.load(f)
if MEMORY_DATA_PATH.exists() and not memory_data["data"]:
with open(MEMORY_DATA_PATH, "r") as f:
memory_data = json.load(f)
if DISK_DATA_PATH.exists() and not disk_data["data"]:
with open(DISK_DATA_PATH, "r") as f:
disk_data = json.load(f)
now = str(datetime.now().time().replace(microsecond=0))
cpu_data["data"].append({"time": now, "data": cpu})
memory_data["data"].append({"time": now, "data": memory})
disk_data["data"].append({"time": now, "data": disk})
if len(cpu_data["data"]) > 50:
cpu_data["data"] = cpu_data["data"][-50:]
if len(memory_data["data"]) > 50:
memory_data["data"] = memory_data["data"][-50:]
if len(disk_data["data"]) > 50:
disk_data["data"] = disk_data["data"][-50:]
with open(CPU_DATA_PATH, "w") as f:
json.dump(cpu_data, f, indent=4, ensure_ascii=False)
with open(MEMORY_DATA_PATH, "w") as f:
json.dump(memory_data, f, indent=4, ensure_ascii=False)
with open(DISK_DATA_PATH, "w") as f:
json.dump(disk_data, f, indent=4, ensure_ascii=False)

View File

@ -0,0 +1,5 @@
from .database import *
from .main import *
from .manage import *
from .plugin_manage import *
from .system import *

View File

@ -0,0 +1,104 @@
from os import name
from typing import Optional
import nonebot
from fastapi import APIRouter, Request
from nonebot.drivers import Driver
from tortoise import Tortoise
from tortoise.exceptions import OperationalError
from configs.config import NICKNAME
from services.db_context import TestSQL
from utils.utils import get_matchers
from ....base_model import BaseResultModel, QueryModel, Result
from ....config import QueryDateType
from ....utils import authentication
from .models.model import SqlModel, SqlText
from .models.sql_log import SqlLog
router = APIRouter(prefix="/database")
driver: Driver = nonebot.get_driver()
SQL_DICT = {}
SELECT_TABLE_SQL = """
select a.tablename as name,d.description as desc from pg_tables a
left join pg_class c on relname=tablename
left join pg_description d on oid=objoid and objsubid=0 where a.schemaname = 'public'
"""
SELECT_TABLE_COLUMN_SQL = """
SELECT column_name, data_type, character_maximum_length as max_length, is_nullable
FROM information_schema.columns
WHERE table_name = '{}';
"""
@driver.on_startup
async def _():
for matcher in get_matchers(True):
if _plugin := matcher.plugin:
try:
_module = _plugin.module
except AttributeError:
pass
else:
plugin_name = matcher.plugin_name
if plugin_name in SQL_DICT:
raise ValueError(f"{plugin_name} 常用SQL plugin_name 重复")
SqlModel(
name=getattr(_module, "__plugin_name__", None) or plugin_name or "",
plugin_name=plugin_name or "",
sql_list=getattr(_module, "sql_list", []),
)
SQL_DICT[plugin_name] = SqlModel
@router.get("/get_table_list", dependencies=[authentication()], description="获取数据库表")
async def _() -> Result:
db = Tortoise.get_connection("default")
query = await db.execute_query_dict(SELECT_TABLE_SQL)
return Result.ok(query)
@router.get("/get_table_column", dependencies=[authentication()], description="获取表字段")
async def _(table_name: str) -> Result:
db = Tortoise.get_connection("default")
print(SELECT_TABLE_COLUMN_SQL.format(table_name))
query = await db.execute_query_dict(SELECT_TABLE_COLUMN_SQL.format(table_name))
return Result.ok(query)
@router.post("/exec_sql", dependencies=[authentication()], description="执行sql")
async def _(sql: SqlText, request: Request) -> Result:
ip = request.client.host if request.client else "unknown"
try:
if sql.sql.lower().startswith("select"):
db = Tortoise.get_connection("default")
res = await db.execute_query_dict(sql.sql)
await SqlLog.add(ip or "0.0.0.0", sql.sql, "")
return Result.ok(res, "执行成功啦!")
else:
result = await TestSQL.raw(sql.sql)
await SqlLog.add(ip or "0.0.0.0", sql.sql, str(result))
return Result.ok(info="执行成功啦!")
except OperationalError as e:
await SqlLog.add(ip or "0.0.0.0", sql.sql, str(e), False)
return Result.warning_(f"sql执行错误: {e}")
@router.post("/get_sql_log", dependencies=[authentication()], description="sql日志列表")
async def _(query: QueryModel) -> Result:
total = await SqlLog.all().count()
if (total % query.size):
total += 1
data = await SqlLog.all().order_by("-id").offset((query.index - 1) * query.size).limit(query.size)
return Result.ok(BaseResultModel(total=total, data=data))
@router.get("/get_common_sql", dependencies=[authentication()], description="常用sql")
async def _(plugin_name: Optional[str] = None) -> Result:
if plugin_name:
return Result.ok(SQL_DICT.get(plugin_name))
return Result.ok(str(SQL_DICT))

View File

@ -0,0 +1,26 @@
from typing import List
from pydantic import BaseModel
from utils.models import CommonSql
class SqlText(BaseModel):
"""
sql语句
"""
sql: str
class SqlModel(BaseModel):
"""
常用sql
"""
name: str
"""插件中文名称"""
plugin_name: str
"""插件名称"""
sql_list: List[CommonSql]
"""插件列表"""

View File

@ -0,0 +1,40 @@
from typing import Optional, Union
from tortoise import fields
from services.db_context import Model
class SqlLog(Model):
id = fields.IntField(pk=True, generated=True, auto_increment=True)
"""自增id"""
ip = fields.CharField(255)
"""ip"""
sql = fields.CharField(255)
"""sql"""
result = fields.CharField(255, null=True)
"""结果"""
is_suc = fields.BooleanField(default=True)
"""是否成功"""
create_time = fields.DatetimeField(auto_now_add=True)
"""创建时间"""
class Meta:
table = "sql_log"
table_description = "sql执行日志"
@classmethod
async def add(
cls, ip: str, sql: str, result: Optional[str] = None, is_suc: bool = True
):
"""
说明:
获取用户在群内的等级
参数:
:param ip: ip
:param sql: sql
:param result: 返回结果
:param is_suc: 是否成功
"""
await cls.create(ip=ip, sql=sql, result=result, is_suc=is_suc)

View File

@ -0,0 +1,266 @@
import asyncio
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Optional
import nonebot
from fastapi import APIRouter, WebSocket
from starlette.websockets import WebSocket, WebSocketDisconnect, WebSocketState
from tortoise.functions import Count
from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK
from configs.config import NICKNAME
from models.chat_history import ChatHistory
from models.group_info import GroupInfo
from models.statistics import Statistics
from services.log import logger
from utils.manager import plugin_data_manager, plugins2settings_manager, plugins_manager
from ....base_model import Result
from ....config import AVA_URL, GROUP_AVA_URL, QueryDateType
from ....utils import authentication, get_system_status
from .data_source import bot_live
from .model import ActiveGroup, BaseInfo, ChatHistoryCount, HotPlugin
run_time = time.time()
ws_router = APIRouter()
router = APIRouter(prefix="/main")
@router.get("/get_base_info", dependencies=[authentication()], description="基础信息")
async def _(bot_id: Optional[str] = None) -> Result:
"""
获取Bot基础信息
Args:
bot_id (Optional[str], optional): bot_id. Defaults to None.
Returns:
Result: 获取指定bot信息与bot列表
"""
bot_list: List[BaseInfo] = []
if bots := nonebot.get_bots():
select_bot: BaseInfo
for key, bot in bots.items():
login_info = await bot.get_login_info()
bot_list.append(
BaseInfo(
bot=bot, # type: ignore
self_id=bot.self_id,
nickname=login_info["nickname"],
ava_url=AVA_URL.format(bot.self_id),
)
)
# 获取指定qq号的bot信息若无指定 则获取第一个
if _bl := [b for b in bot_list if b.self_id == bot_id]:
select_bot = _bl[0]
else:
select_bot = bot_list[0]
select_bot.is_select = True
select_bot.config = select_bot.bot.config
now = datetime.now()
# 今日累计接收消息
select_bot.received_messages = await ChatHistory.filter(
bot_id=select_bot.self_id,
create_time__gte=now - timedelta(hours=now.hour),
).count()
# 群聊数量
select_bot.group_count = len(await select_bot.bot.get_group_list())
# 好友数量
select_bot.friend_count = len(await select_bot.bot.get_friend_list())
for bot in bot_list:
bot.bot = None # type: ignore
# 插件加载数量
select_bot.plugin_count = len(plugins2settings_manager)
pm_data = plugins_manager.get_data()
select_bot.fail_plugin_count = len([pd for pd in pm_data if pm_data[pd].error])
select_bot.success_plugin_count = (
select_bot.plugin_count - select_bot.fail_plugin_count
)
# 连接时间
select_bot.connect_time = bot_live.get(select_bot.self_id) or 0
if select_bot.connect_time:
connect_date = datetime.fromtimestamp(select_bot.connect_time)
select_bot.connect_date = connect_date.strftime("%Y-%m-%d %H:%M:%S")
version_file = Path() / "__version__"
if version_file.exists():
if text := version_file.open().read():
if ver := text.replace("__version__: ", "").strip():
select_bot.version = ver
day_call = await Statistics.filter(create_time__gte=now - timedelta(hours=now.hour)).count()
select_bot.day_call = day_call
return Result.ok(bot_list, "拿到信息啦!")
return Result.warning_("无Bot连接...")
@router.get(
"/get_all_ch_count", dependencies=[authentication()], description="获取接收消息数量"
)
async def _(bot_id: str) -> Result:
now = datetime.now()
all_count = await ChatHistory.filter(bot_id=bot_id).count()
day_count = await ChatHistory.filter(
bot_id=bot_id, create_time__gte=now - timedelta(hours=now.hour)
).count()
week_count = await ChatHistory.filter(
bot_id=bot_id, create_time__gte=now - timedelta(days=7)
).count()
month_count = await ChatHistory.filter(
bot_id=bot_id, create_time__gte=now - timedelta(days=30)
).count()
year_count = await ChatHistory.filter(
bot_id=bot_id, create_time__gte=now - timedelta(days=365)
).count()
return Result.ok(
ChatHistoryCount(
num=all_count,
day=day_count,
week=week_count,
month=month_count,
year=year_count,
)
)
@router.get("/get_ch_count", dependencies=[authentication()], description="获取接收消息数量")
async def _(bot_id: str, query_type: Optional[QueryDateType] = None) -> Result:
if bots := nonebot.get_bots():
if not query_type:
return Result.ok(await ChatHistory.filter(bot_id=bot_id).count())
now = datetime.now()
if query_type == QueryDateType.DAY:
return Result.ok(
await ChatHistory.filter(
bot_id=bot_id, create_time__gte=now - timedelta(hours=now.hour)
).count()
)
if query_type == QueryDateType.WEEK:
return Result.ok(
await ChatHistory.filter(
bot_id=bot_id, create_time__gte=now - timedelta(days=7)
).count()
)
if query_type == QueryDateType.MONTH:
return Result.ok(
await ChatHistory.filter(
bot_id=bot_id, create_time__gte=now - timedelta(days=30)
).count()
)
if query_type == QueryDateType.YEAR:
return Result.ok(
await ChatHistory.filter(
bot_id=bot_id, create_time__gte=now - timedelta(days=365)
).count()
)
return Result.warning_("无Bot连接...")
@router.get("get_fg_count", dependencies=[authentication()], description="好友/群组数量")
async def _(bot_id: str) -> Result:
if bots := nonebot.get_bots():
if bot_id not in bots:
return Result.warning_("指定Bot未连接...")
bot = bots[bot_id]
data = {
"friend_count": len(await bot.get_friend_list()),
"group_count": len(await bot.get_group_list()),
}
return Result.ok(data)
return Result.warning_("无Bot连接...")
@router.get("/get_run_time", dependencies=[authentication()], description="获取nb运行时间")
async def _() -> Result:
return Result.ok(int(time.time() - run_time))
@router.get("/get_active_group", dependencies=[authentication()], description="获取活跃群聊")
async def _(date_type: Optional[QueryDateType] = None) -> Result:
query = ChatHistory
now = datetime.now()
if date_type == QueryDateType.DAY:
query = ChatHistory.filter(create_time__gte=now - timedelta(hours=now.hour))
if date_type == QueryDateType.WEEK:
query = ChatHistory.filter(create_time__gte=now - timedelta(days=7))
if date_type == QueryDateType.MONTH:
query = ChatHistory.filter(create_time__gte=now - timedelta(days=30))
if date_type == QueryDateType.YEAR:
query = ChatHistory.filter(create_time__gte=now - timedelta(days=365))
data_list = (
await query.annotate(count=Count("id")).filter(group_id__not_isnull=True)
.group_by("group_id").order_by("-count").limit(5)
.values_list("group_id", "count")
)
active_group_list = []
id2name = {}
if data_list:
if info_list := await GroupInfo.filter(group_id__in=[x[0] for x in data_list]).all():
for group_info in info_list:
id2name[group_info.group_id] = group_info.group_name
for data in data_list:
active_group_list.append(
ActiveGroup(
group_id=data[0],
name=id2name.get(data[0]) or data[0],
chat_num=data[1],
ava_img=GROUP_AVA_URL.format(data[0], data[0]),
)
)
active_group_list = sorted(
active_group_list, key=lambda x: x.chat_num, reverse=True
)
if len(active_group_list) > 5:
active_group_list = active_group_list[:5]
return Result.ok(active_group_list)
@router.get("/get_hot_plugin", dependencies=[authentication()], description="获取热门插件")
async def _(date_type: Optional[QueryDateType] = None) -> Result:
query = Statistics
now = datetime.now()
if date_type == QueryDateType.DAY:
query = Statistics.filter(create_time__gte=now - timedelta(hours=now.hour))
if date_type == QueryDateType.WEEK:
query = Statistics.filter(create_time__gte=now - timedelta(days=7))
if date_type == QueryDateType.MONTH:
query = Statistics.filter(create_time__gte=now - timedelta(days=30))
if date_type == QueryDateType.YEAR:
query = Statistics.filter(create_time__gte=now - timedelta(days=365))
data_list = (
await query.annotate(count=Count("id"))
.group_by("plugin_name").order_by("-count").limit(5)
.values_list("plugin_name", "count")
)
hot_plugin_list = []
for data in data_list:
name = data[0]
if plugin_data := plugin_data_manager.get(data[0]):
name = plugin_data.name
hot_plugin_list.append(
HotPlugin(
module=data[0],
name=name,
count=data[1],
)
)
hot_plugin_list = sorted(hot_plugin_list, key=lambda x: x.count, reverse=True)
if len(hot_plugin_list) > 5:
hot_plugin_list = hot_plugin_list[:5]
return Result.ok(hot_plugin_list)
@ws_router.websocket("/system_status")
async def system_logs_realtime(websocket: WebSocket, sleep: Optional[int] = 5):
await websocket.accept()
logger.debug("ws system_status is connect")
try:
while websocket.client_state == WebSocketState.CONNECTED:
system_status = await get_system_status()
await websocket.send_text(system_status.json())
await asyncio.sleep(sleep)
except (WebSocketDisconnect, ConnectionClosedError, ConnectionClosedOK):
pass
return

View File

@ -0,0 +1,36 @@
import time
from typing import Optional
import nonebot
from nonebot import Driver
from nonebot.adapters.onebot.v11 import Bot
driver: Driver = nonebot.get_driver()
class BotLive:
def __init__(self):
self._data = {}
def add(self, bot_id: str):
self._data[bot_id] = time.time()
def get(self, bot_id: str) -> Optional[int]:
return self._data.get(bot_id)
def remove(self, bot_id: str):
if bot_id in self._data:
del self._data[bot_id]
bot_live = BotLive()
@driver.on_bot_connect
async def _(bot: Bot):
bot_live.add(bot.self_id)
@driver.on_bot_disconnect
async def _(bot: Bot):
bot_live.remove(bot.self_id)

View File

@ -0,0 +1,107 @@
from datetime import datetime
from typing import Optional, Union
from nonebot.adapters.onebot.v11 import Bot
from nonebot.config import Config
from pydantic import BaseModel
class SystemStatus(BaseModel):
"""
系统状态
"""
cpu: float
memory: float
disk: float
class BaseInfo(BaseModel):
"""
基础信息
"""
bot: Bot
"""Bot"""
self_id: str
"""SELF ID"""
nickname: str
"""昵称"""
ava_url: str
"""头像url"""
friend_count: int = 0
"""好友数量"""
group_count: int = 0
"""群聊数量"""
received_messages: int = 0
"""今日 累计接收消息"""
connect_time: int = 0
"""连接时间"""
connect_date: Optional[datetime] = None
"""连接日期"""
plugin_count: int = 0
"""加载插件数量"""
success_plugin_count: int = 0
"""加载成功插件数量"""
fail_plugin_count: int = 0
"""加载失败插件数量"""
is_select: bool = False
"""当前选择"""
config: Optional[Config] = None
"""nb配置"""
day_call: int = 0
"""今日调用插件次数"""
version: str = "unknown"
"""真寻版本"""
class Config:
arbitrary_types_allowed = True
class ChatHistoryCount(BaseModel):
"""
聊天记录数量
"""
num: int
"""总数"""
day: int
"""一天内"""
week: int
"""一周内"""
month: int
"""一月内"""
year: int
"""一年内"""
class ActiveGroup(BaseModel):
"""
活跃群聊数据
"""
group_id: Union[str, int]
"""群组id"""
name: str
"""群组名称"""
chat_num: int
"""发言数量"""
ava_img: str
"""群组头像"""
class HotPlugin(BaseModel):
"""
热门插件
"""
module: str
"""模块名"""
name: str
"""插件名称"""
count: int
"""调用次数"""

View File

@ -0,0 +1,462 @@
import re
from typing import Literal, Optional
import nonebot
from fastapi import APIRouter
from nonebot.adapters.onebot.v11.exception import ActionFailed
from starlette.websockets import WebSocket, WebSocketDisconnect, WebSocketState
from tortoise.functions import Count
from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK
from configs.config import NICKNAME
from models.ban_user import BanUser
from models.chat_history import ChatHistory
from models.friend_user import FriendUser
from models.group_info import GroupInfo
from models.group_member_info import GroupInfoUser
from models.statistics import Statistics
from services.log import logger
from utils.manager import group_manager, plugin_data_manager, requests_manager
from utils.utils import get_bot
from ....base_model import Result
from ....config import AVA_URL, GROUP_AVA_URL
from ....utils import authentication
from ...logs.log_manager import LOG_STORAGE
from .model import (
DeleteFriend,
Friend,
FriendRequestResult,
GroupDetail,
GroupRequestResult,
GroupResult,
HandleRequest,
LeaveGroup,
Message,
MessageItem,
Plugin,
ReqResult,
SendMessage,
Task,
UpdateGroup,
UserDetail,
)
ws_router = APIRouter()
router = APIRouter(prefix="/manage")
SUB_PATTERN = r"\x1b(\[.*?[@-~]|\].*?(\x07|\x1b\\))"
GROUP_PATTERN = r'.*?Message (-?\d*) from (\d*)@\[群:(\d*)] "(.*)"'
PRIVATE_PATTERN = r'.*?Message (-?\d*) from (\d*) "(.*)"'
AT_PATTERN = r'\[CQ:at,qq=(.*)\]'
IMAGE_PATTERN = r'\[CQ:image,.*,url=(.*);.*?\]'
@router.get("/get_group_list", dependencies=[authentication()], description="获取群组列表")
async def _(bot_id: str) -> Result:
"""
获取群信息
"""
if bots := nonebot.get_bots():
if bot_id not in bots:
return Result.warning_("指定Bot未连接...")
group_list_result = []
try:
group_info = {}
group_list = await bots[bot_id].get_group_list()
for g in group_list:
gid = g['group_id']
g['ava_url'] = GROUP_AVA_URL.format(gid, gid)
group_list_result.append(GroupResult(**g))
except Exception as e:
logger.error("调用API错误", "/get_group_list", e=e)
return Result.fail(f"{type(e)}: {e}")
return Result.ok(group_list_result, "拿到了新鲜出炉的数据!")
return Result.warning_("无Bot连接...")
@router.post("/update_group", dependencies=[authentication()], description="修改群组信息")
async def _(group: UpdateGroup) -> Result:
try:
group_id = group.group_id
group_manager.set_group_level(group_id, group.level)
if group.status:
group_manager.turn_on_group_bot_status(group_id)
else:
group_manager.shutdown_group_bot_status(group_id)
all_task = group_manager.get_task_data().keys()
if group.task:
for task in all_task:
if task in group.task:
group_manager.open_group_task(group_id, task)
else:
group_manager.close_group_task(group_id, task)
group_manager[group_id].close_plugins = group.close_plugins
group_manager.save()
except Exception as e:
logger.error("调用API错误", "/get_group", e=e)
return Result.fail(f"{type(e)}: {e}")
return Result.ok(info="已完成记录!")
@router.get("/get_friend_list", dependencies=[authentication()], description="获取好友列表")
async def _(bot_id: str) -> Result:
"""
获取群信息
"""
if bots := nonebot.get_bots():
if bot_id not in bots:
return Result.warning_("指定Bot未连接...")
try:
friend_list = await bots[bot_id].get_friend_list()
for f in friend_list:
f['ava_url'] = AVA_URL.format(f['user_id'])
return Result.ok([Friend(**f) for f in friend_list if str(f['user_id']) != bot_id], "拿到了新鲜出炉的数据!")
except Exception as e:
logger.error("调用API错误", "/get_group_list", e=e)
return Result.fail(f"{type(e)}: {e}")
return Result.warning_("无Bot连接...")
@router.get("/get_request_count", dependencies=[authentication()], description="获取请求数量")
def _() -> Result:
data = {
"friend_count": len(requests_manager.get_data().get("private") or []),
"group_count": len(requests_manager.get_data().get("group") or []),
}
return Result.ok(data, f"{NICKNAME}带来了最新的数据!")
@router.get("/get_request_list", dependencies=[authentication()], description="获取请求列表")
def _() -> Result:
try:
req_result = ReqResult()
data = requests_manager.get_data()
for type_ in requests_manager.get_data():
for x in data[type_]:
data[type_][x]["oid"] = x
data[type_][x]['type'] = type_
if type_ == "private":
data[type_][x]['ava_url'] = AVA_URL.format(data[type_][x]['id'])
req_result.friend.append(FriendRequestResult(**data[type_][x]))
else:
gid = data[type_][x]['id']
data[type_][x]['ava_url'] = GROUP_AVA_URL.format(gid, gid)
req_result.group.append(GroupRequestResult(**data[type_][x]))
req_result.friend.reverse()
req_result.group.reverse()
except Exception as e:
logger.error("调用API错误", "/get_request", e=e)
return Result.fail(f"{type(e)}: {e}")
return Result.ok(req_result, f"{NICKNAME}带来了最新的数据!")
@router.delete("/clear_request", dependencies=[authentication()], description="清空请求列表")
def _(request_type: Literal["private", "group"]) -> Result:
"""
清空请求
:param type_: 类型
"""
requests_manager.clear(request_type)
return Result.ok(info="成功清除了数据!")
@router.post("/refuse_request", dependencies=[authentication()], description="拒绝请求")
async def _(parma: HandleRequest) -> Result:
"""
操作请求
:param parma: 参数
"""
try:
if bots := nonebot.get_bots():
bot_id = parma.bot_id
if bot_id not in nonebot.get_bots():
return Result.warning_("指定Bot未连接...")
try:
flag = await requests_manager.refused(bots[bot_id], parma.flag, parma.request_type) # type: ignore
except ActionFailed as e:
requests_manager.delete_request(parma.flag, parma.request_type)
return Result.warning_("请求失败,可能该请求已失效或请求数据错误...")
if flag == 1:
requests_manager.delete_request(parma.flag, parma.request_type)
return Result.warning_("该请求已失效...")
elif flag == 2:
return Result.warning_("未找到此Id请求...")
return Result.ok(info="成功处理了请求!")
return Result.warning_("无Bot连接...")
except Exception as e:
logger.error("调用API错误", "/refuse_request", e=e)
return Result.fail(f"{type(e)}: {e}")
@router.post("/delete_request", dependencies=[authentication()], description="忽略请求")
async def _(parma: HandleRequest) -> Result:
"""
操作请求
:param parma: 参数
"""
requests_manager.delete_request(parma.flag, parma.request_type)
return Result.ok(info="成功处理了请求!")
@router.post("/approve_request", dependencies=[authentication()], description="同意请求")
async def _(parma: HandleRequest) -> Result:
"""
操作请求
:param parma: 参数
"""
try:
if bots := nonebot.get_bots():
bot_id = parma.bot_id
if bot_id not in nonebot.get_bots():
return Result.warning_("指定Bot未连接...")
if parma.request_type == "group":
if rid := requests_manager.get_group_id(parma.flag):
if group := await GroupInfo.get_or_none(group_id=str(rid)):
await group.update_or_create(group_flag=1)
else:
group_info = await bots[bot_id].get_group_info(group_id=rid)
await GroupInfo.update_or_create(
group_id=str(group_info["group_id"]),
defaults={
"group_name": group_info["group_name"],
"max_member_count": group_info["max_member_count"],
"member_count": group_info["member_count"],
"group_flag": 1,
},
)
try:
await requests_manager.approve(bots[bot_id], parma.flag, parma.request_type) # type: ignore
return Result.ok(info="成功处理了请求!")
except ActionFailed as e:
requests_manager.delete_request(parma.flag, parma.request_type)
return Result.warning_("请求失败,可能该请求已失效或请求数据错误...")
return Result.warning_("无Bot连接...")
except Exception as e:
logger.error("调用API错误", "/approve_request", e=e)
return Result.fail(f"{type(e)}: {e}")
@router.post("/leave_group", dependencies=[authentication()], description="退群")
async def _(param: LeaveGroup) -> Result:
try:
if bots := nonebot.get_bots():
bot_id = param.bot_id
group_list = await bots[bot_id].get_group_list()
if param.group_id not in [str(g["group_id"]) for g in group_list]:
return Result.warning_("Bot未在该群聊中...")
await bots[bot_id].set_group_leave(group_id=param.group_id)
return Result.ok(info="成功处理了请求!")
return Result.warning_("无Bot连接...")
except Exception as e:
logger.error("调用API错误", "/leave_group", e=e)
return Result.fail(f"{type(e)}: {e}")
@router.post("/delete_friend", dependencies=[authentication()], description="删除好友")
async def _(param: DeleteFriend) -> Result:
try:
if bots := nonebot.get_bots():
bot_id = param.bot_id
friend_list = await bots[bot_id].get_friend_list()
if param.user_id not in [str(g["user_id"]) for g in friend_list]:
return Result.warning_("Bot未有其好友...")
await bots[bot_id].delete_friend(user_id=param.user_id)
return Result.ok(info="成功处理了请求!")
return Result.warning_("Bot未连接...")
except Exception as e:
logger.error("调用API错误", "/delete_friend", e=e)
return Result.fail(f"{type(e)}: {e}")
@router.get("/get_friend_detail", dependencies=[authentication()], description="获取好友详情")
async def _(bot_id: str, user_id: str) -> Result:
if bots := nonebot.get_bots():
if bot_id in bots:
if fd := [x for x in await bots[bot_id].get_friend_list() if str(x['user_id']) == user_id]:
like_plugin_list = (
await Statistics.filter(user_id=user_id).annotate(count=Count("id"))
.group_by("plugin_name").order_by("-count").limit(5)
.values_list("plugin_name", "count")
)
like_plugin = {}
for data in like_plugin_list:
name = data[0]
if plugin_data := plugin_data_manager.get(data[0]):
name = plugin_data.name
like_plugin[name] = data[1]
user = fd[0]
user_detail = UserDetail(
user_id=user_id,
ava_url=AVA_URL.format(user_id),
nickname=user['nickname'],
remark=user['remark'],
is_ban=await BanUser.is_ban(user_id),
chat_count=await ChatHistory.filter(user_id=user_id).count(),
call_count=await Statistics.filter(user_id=user_id).count(),
like_plugin=like_plugin,
)
return Result.ok(user_detail)
else:
return Result.warning_("未添加指定好友...")
return Result.warning_("无Bot连接...")
@router.get("/get_group_detail", dependencies=[authentication()], description="获取群组详情")
async def _(bot_id: str, group_id: str) -> Result:
if bots := nonebot.get_bots():
if bot_id in bots:
group_info = await bots[bot_id].get_group_info(group_id=int(group_id))
g = group_manager[group_id]
if not g:
return Result.warning_("指定群组未被收录...")
if group_info:
like_plugin_list = (
await Statistics.filter(group_id=group_id).annotate(count=Count("id"))
.group_by("plugin_name").order_by("-count").limit(5)
.values_list("plugin_name", "count")
)
like_plugin = {}
for data in like_plugin_list:
name = data[0]
if plugin_data := plugin_data_manager.get(data[0]):
name = plugin_data.name
like_plugin[name] = data[1]
close_plugins = []
for module in g.close_plugins:
module_ = module.replace(":super", "")
is_super_block = module.endswith(":super")
plugin = Plugin(module=module_, plugin_name=module, is_super_block=is_super_block)
if plugin_data := plugin_data_manager.get(module_):
plugin.plugin_name = plugin_data.name
close_plugins.append(plugin)
task_list = []
task_data = group_manager.get_task_data()
for tn, status in g.group_task_status.items():
task_list.append(
Task(
name=tn,
zh_name=task_data.get(tn) or tn,
status=status
)
)
group_detail = GroupDetail(
group_id=group_id,
ava_url=GROUP_AVA_URL.format(group_id, group_id),
name=group_info['group_name'],
member_count=group_info['member_count'],
max_member_count=group_info['max_member_count'],
chat_count=await ChatHistory.filter(group_id=group_id).count(),
call_count=await Statistics.filter(group_id=group_id).count(),
like_plugin=like_plugin,
level=g.level,
status=g.status,
close_plugins=close_plugins,
task=task_list
)
return Result.ok(group_detail)
else:
return Result.warning_("未添加指定群组...")
return Result.warning_("无Bot连接...")
@router.post("/send_message", dependencies=[authentication()], description="获取群组详情")
async def _(param: SendMessage) -> Result:
if bots := nonebot.get_bots():
if param.bot_id in bots:
try:
if param.user_id:
await bots[param.bot_id].send_private_msg(user_id=str(param.user_id), message=param.message)
else:
await bots[param.bot_id].send_group_msg(group_id=str(param.group_id), message=param.message)
except Exception as e:
return Result.fail(str(e))
return Result.ok("发送成功!")
return Result.warning_("指定Bot未连接...")
return Result.warning_("无Bot连接...")
MSG_LIST = []
ID2NAME = {}
async def message_handle(sub_log: str, type: Literal["private", "group"]):
global MSG_LIST, ID2NAME
pattern = PRIVATE_PATTERN if type == 'private' else GROUP_PATTERN
msg_id = None
uid = None
gid = None
msg = None
img_list = re.findall(IMAGE_PATTERN, sub_log)
if r := re.search(pattern, sub_log):
if type == 'private':
msg_id = r.group(1)
uid = r.group(2)
msg = r.group(3)
if uid not in ID2NAME:
user = await FriendUser.filter(user_id=uid).first()
ID2NAME[uid] = user.user_name or user.nickname
else:
msg_id = r.group(1)
uid = r.group(2)
gid = r.group(3)
msg = r.group(4)
if gid not in ID2NAME:
user = await GroupInfoUser.filter(user_id=uid, group_id=gid).first()
ID2NAME[uid] = user.user_name or user.nickname
if at_list := re.findall(AT_PATTERN, msg):
user_list = await GroupInfoUser.filter(user_id__in=at_list, group_id=gid).all()
id2name = {u.user_id: (u.user_name or u.nickname) for u in user_list}
for qq in at_list:
msg = re.sub(rf'\[CQ:at,qq={qq}\]', f"@{id2name[qq] or ''}", msg)
if msg_id in MSG_LIST:
return
MSG_LIST.append(msg_id)
messages = []
rep = re.split(r'\[CQ:image.*\]', msg)
if img_list:
for i in range(len(rep)):
messages.append(MessageItem(type="text", msg=rep[i]))
if i < len(img_list):
messages.append(MessageItem(type="img", msg=img_list[i]))
else:
messages = [MessageItem(type="text", msg=x) for x in rep]
return Message(
object_id=uid if type == 'private' else gid,
user_id=uid,
group_id=gid,
message=messages,
name=ID2NAME.get(uid) or "",
ava_url=AVA_URL.format(uid),
)
@ws_router.websocket("/chat")
async def _(websocket: WebSocket):
await websocket.accept()
async def log_listener(log: str):
global MSG_LIST, ID2NAME
sub_log = re.sub(SUB_PATTERN, "", log)
img_list = re.findall(IMAGE_PATTERN, sub_log)
if "message.private.friend" in log:
if message := await message_handle(sub_log, 'private'):
await websocket.send_json(message.dict())
else:
if r := re.search(GROUP_PATTERN, sub_log):
if message := await message_handle(sub_log, 'group'):
await websocket.send_json(message.dict())
if len(MSG_LIST) > 30:
MSG_LIST = MSG_LIST[-1:]
LOG_STORAGE.listeners.add(log_listener)
try:
while websocket.client_state == WebSocketState.CONNECTED:
recv = await websocket.receive()
except WebSocketDisconnect:
pass
finally:
LOG_STORAGE.listeners.remove(log_listener)
return

View File

@ -0,0 +1,270 @@
from typing import Dict, List, Literal, Optional, Union
from matplotlib.dates import FR
from nonebot.adapters.onebot.v11 import Bot
from pydantic import BaseModel
class Group(BaseModel):
"""
群组信息
"""
group_id: Union[str, int]
"""群组id"""
group_name: str
"""群组名称"""
member_count: int
"""成员人数"""
max_member_count: int
"""群组最大人数"""
class Task(BaseModel):
"""
被动技能
"""
name: str
"""被动名称"""
zh_name: str
"""被动中文名称"""
status: bool
"""状态"""
class Plugin(BaseModel):
"""
插件
"""
module: str
"""模块名"""
plugin_name: str
"""中文名"""
is_super_block: bool
"""是否超级用户禁用"""
class GroupResult(BaseModel):
"""
群组返回数据
"""
group_id: Union[str, int]
"""群组id"""
group_name: str
"""群组名称"""
ava_url: str
"""群组头像"""
class Friend(BaseModel):
"""
好友数据
"""
user_id: Union[str, int]
"""用户id"""
nickname: str = ""
"""昵称"""
remark: str = ""
"""备注"""
ava_url: str = ""
"""头像url"""
class UpdateGroup(BaseModel):
"""
更新群组信息
"""
group_id: str
"""群号"""
status: bool
"""状态"""
level: int
"""群权限"""
task: List[str]
"""被动状态"""
close_plugins: List[str]
"""关闭插件"""
class FriendRequestResult(BaseModel):
"""
好友/群组请求管理
"""
bot_id: Union[str, int]
"""bot_id"""
oid: str
"""排序"""
id: int
"""id"""
flag: str
"""flag"""
nickname: Optional[str]
"""昵称"""
level: Optional[int]
"""等级"""
sex: Optional[str]
"""性别"""
age: Optional[int]
"""年龄"""
from_: Optional[str]
"""来自"""
comment: Optional[str]
"""备注信息"""
ava_url: str
"""头像"""
type: str
"""类型 private group"""
class GroupRequestResult(FriendRequestResult):
"""
群聊邀请请求
"""
invite_group: Union[int, str]
"""邀请群聊"""
group_name: Optional[str]
"""群聊名称"""
class HandleRequest(BaseModel):
"""
操作请求接收数据
"""
bot_id: Optional[str] = None
"""bot_id"""
flag: str
"""flag"""
request_type: Literal["private", "group"]
"""类型"""
class LeaveGroup(BaseModel):
"""
退出群聊
"""
bot_id: str
"""bot_id"""
group_id: str
"""群聊id"""
class DeleteFriend(BaseModel):
"""
删除好友
"""
bot_id: str
"""bot_id"""
user_id: str
"""用户id"""
class ReqResult(BaseModel):
"""
好友/群组请求列表
"""
friend: List[FriendRequestResult] = []
"""好友请求列表"""
group: List[GroupRequestResult] = []
"""群组请求列表"""
class UserDetail(BaseModel):
"""
用户详情
"""
user_id: str
"""用户id"""
ava_url: str
"""头像url"""
nickname: str
"""昵称"""
remark: str
"""备注"""
is_ban: bool
"""是否被ban"""
chat_count: int
"""发言次数"""
call_count: int
"""功能调用次数"""
like_plugin: Dict[str, int]
"""最喜爱的功能"""
class GroupDetail(BaseModel):
"""
用户详情
"""
group_id: str
"""群组id"""
ava_url: str
"""头像url"""
name: str
"""名称"""
member_count: int
"""成员数"""
max_member_count: int
"""最大成员数"""
chat_count: int
"""发言次数"""
call_count: int
"""功能调用次数"""
like_plugin: Dict[str, int]
"""最喜爱的功能"""
level: int
"""群权限"""
status: bool
"""状态(睡眠)"""
close_plugins: List[Plugin]
"""关闭的插件"""
task: List[Task]
"""被动列表"""
class MessageItem(BaseModel):
type: str
"""消息类型"""
msg: str
"""内容"""
class Message(BaseModel):
"""
消息
"""
object_id: str
"""主体id user_id 或 group_id"""
user_id: str
"""用户id"""
group_id: Optional[str] = None
"""群组id"""
message: List[MessageItem]
"""消息"""
name: str
"""用户名称"""
ava_url: str
"""用户头像"""
class SendMessage(BaseModel):
"""
发送消息
"""
bot_id: str
"""bot id"""
user_id: Optional[str] = None
"""用户id"""
group_id: Optional[str] = None
"""群组id"""
message: str
"""消息"""

View File

@ -0,0 +1,198 @@
import re
from typing import List, Optional
import cattrs
from fastapi import APIRouter, Query
from configs.config import Config
from services.log import logger
from utils.manager import plugin_data_manager, plugins2settings_manager, plugins_manager
from utils.manager.models import PluginData, PluginSetting, PluginType
from ....base_model import Result
from ....utils import authentication
from .model import (
PluginConfig,
PluginCount,
PluginDetail,
PluginInfo,
PluginSwitch,
UpdatePlugin,
)
router = APIRouter(prefix="/plugin")
@router.get("/get_plugin_list", dependencies=[authentication()], deprecated="获取插件列表")
def _(
plugin_type: List[PluginType] = Query(None), menu_type: Optional[str] = None
) -> Result:
"""
获取插件列表
:param plugin_type: 类型 normal, superuser, hidden, admin
:param menu_type: 菜单类型
"""
try:
plugin_list: List[PluginInfo] = []
for module in plugin_data_manager.keys():
plugin_data: Optional[PluginData] = plugin_data_manager[module]
if plugin_data and plugin_data.plugin_type in plugin_type:
setting = plugin_data.plugin_setting or PluginSetting()
plugin = plugin_data.plugin_status
menu_type_ = getattr(setting, "plugin_type", [""])[0]
if menu_type and menu_type != menu_type_:
continue
plugin_info = PluginInfo(
module=module,
plugin_name=plugin_data.name,
default_status=getattr(setting, "default_status", False),
limit_superuser=getattr(setting, "limit_superuser", False),
cost_gold=getattr(setting, "cost_gold", 0),
menu_type=menu_type_,
version=(plugin.version or 0) if plugin else 0,
level=getattr(setting, "level", 5),
status=plugin.status if plugin else False,
author=plugin.author if plugin else None
)
plugin_info.version = (plugin.version or 0) if plugin else 0
plugin_list.append(plugin_info)
except Exception as e:
logger.error("调用API错误", "/get_plugins", e=e)
return Result.fail(f"{type(e)}: {e}")
return Result.ok(plugin_list, "拿到了新鲜出炉的数据!")
@router.get("/get_plugin_count", dependencies=[authentication()], deprecated="获取插件数量")
def _() -> Result:
plugin_count = PluginCount()
for module in plugin_data_manager.keys():
plugin_data: Optional[PluginData] = plugin_data_manager[module]
if plugin_data and plugin_data.plugin_type == PluginType.NORMAL:
plugin_count.normal += 1
elif plugin_data and plugin_data.plugin_type == PluginType.ADMIN:
plugin_count.admin += 1
elif plugin_data and plugin_data.plugin_type == PluginType.SUPERUSER:
plugin_count.superuser += 1
else:
plugin_count.other += 1
return Result.ok(plugin_count)
@router.post("/update_plugin", dependencies=[authentication()], description="更新插件参数")
def _(plugin: UpdatePlugin) -> Result:
"""
修改插件信息
:param plugin: 插件内容
"""
try:
module = plugin.module
if p2s := plugins2settings_manager.get(module):
p2s.default_status = plugin.default_status
p2s.limit_superuser = plugin.limit_superuser
p2s.cost_gold = plugin.cost_gold
# p2s.cmd = plugin.cmd.split(",") if plugin.cmd else []
p2s.level = plugin.level
menu_lin = None
if len(p2s.plugin_type) > 1:
menu_lin = p2s.menu_type[1]
if menu_lin is not None:
p2s.plugin_type = (plugin.menu_type, menu_lin)
else:
p2s.plugin_type = (plugin.menu_type,)
if pm := plugins_manager.get(module):
if plugin.block_type:
pm.block_type = plugin.block_type
pm.status = False
else:
pm.block_type = None
pm.status = True
plugins2settings_manager.save()
plugins_manager.save()
# 配置项
if plugin.configs and (configs := Config.get(module)):
for key in plugin.configs:
if c := configs.configs.get(key):
value = plugin.configs[key]
# if isinstance(c.value, (list, tuple)) or isinstance(
# c.default_value, (list, tuple)
# ):
# value = value.split(",")
if c.type and value is not None:
value = cattrs.structure(value, c.type)
Config.set_config(module, key, value)
plugin_data_manager.reload()
except Exception as e:
logger.error("调用API错误", "/update_plugins", e=e)
return Result.fail(f"{type(e)}: {e}")
return Result.ok(info="已经帮你写好啦!")
@router.post("/change_switch", dependencies=[authentication()], description="开关插件")
def _(param: PluginSwitch) -> Result:
if pm := plugins_manager.get(param.module):
pm.block_type = None if param.status else 'all'
pm.status = param.status
plugins_manager.save()
return Result.ok(info="成功改变了开关状态!")
return Result.warning_("未获取该插件的配置!")
@router.get("/get_plugin_menu_type", dependencies=[authentication()], description="获取插件类型")
def _() -> Result:
menu_type_list = []
for module in plugin_data_manager.keys():
plugin_data: Optional[PluginData] = plugin_data_manager[module]
if plugin_data:
setting = plugin_data.plugin_setting or PluginSetting()
menu_type = getattr(setting, "plugin_type", [""])[0]
if menu_type not in menu_type_list:
menu_type_list.append(menu_type)
return Result.ok(menu_type_list)
@router.get("/get_plugin", dependencies=[authentication()], description="获取插件详情")
def _(module: str) -> Result:
if plugin_data := plugin_data_manager.get(module):
setting = plugin_data.plugin_setting or PluginSetting()
plugin = plugin_data.plugin_status
config_list = []
if config := Config.get(module):
for cfg in config.configs:
type_str = ""
type_inner = None
x = str(config.configs[cfg].type)
r = re.search(r"<class '(.*)'>",str(config.configs[cfg].type))
if r:
type_str = r.group(1)
else:
r = re.search(r"typing\.(.*)\[(.*)\]",str(config.configs[cfg].type))
if r:
type_str = r.group(1)
if type_str:
type_str = type_str.lower()
type_inner = r.group(2)
if type_inner:
type_inner = [x.strip() for x in type_inner.split(",")]
config_list.append(PluginConfig(
module=module,
key=cfg,
value=config.configs[cfg].value,
help=config.configs[cfg].help,
default_value=config.configs[cfg].default_value,
type=type_str,
type_inner=type_inner
))
plugin_info = PluginDetail(
module=module,
plugin_name=plugin_data.name,
default_status=getattr(setting, "default_status", False),
limit_superuser=getattr(setting, "limit_superuser", False),
cost_gold=getattr(setting, "cost_gold", 0),
menu_type=getattr(setting, "plugin_type", [""])[0],
version=(plugin.version or 0) if plugin else 0,
level=getattr(setting, "level", 5),
status=plugin.status if plugin else False,
author=plugin.author if plugin else None,
config_list=config_list,
block_type=getattr(plugin, "block_type", None)
)
return Result.ok(plugin_info)
return Result.warning_("未获取到插件详情...")

View File

@ -0,0 +1,148 @@
from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel
from utils.manager.models import Plugin as PluginManager
from utils.manager.models import PluginBlock, PluginCd, PluginCount, PluginSetting
from utils.typing import BLOCK_TYPE
class PluginSwitch(BaseModel):
"""
插件开关
"""
module: str
"""模块"""
status: bool
"""开关状态"""
class UpdateConfig(BaseModel):
"""
配置项修改参数
"""
module: str
"""模块"""
key: str
"""配置项key"""
value: Any
"""配置项值"""
class UpdatePlugin(BaseModel):
"""
插件修改参数
"""
module: str
"""模块"""
default_status: bool
"""默认开关"""
limit_superuser: bool
"""限制超级用户"""
cost_gold: int
"""金币花费"""
menu_type: str
"""插件菜单类型"""
level: int
"""插件所需群权限"""
block_type: Optional[BLOCK_TYPE] = None
"""禁用类型"""
configs: Optional[Dict[str, Any]] = None
"""配置项"""
class PluginInfo(BaseModel):
"""
基本插件信息
"""
module: str
"""插件名称"""
plugin_name: str
"""插件中文名称"""
default_status: bool
"""默认开关"""
limit_superuser: bool
"""限制超级用户"""
cost_gold: int
"""花费金币"""
menu_type: str
"""插件菜单类型"""
version: Union[int, str, float]
"""插件版本"""
level: int
"""群权限"""
status: bool
"""当前状态"""
author: Optional[str] = None
"""作者"""
block_type: BLOCK_TYPE = None
"""禁用类型"""
class PluginConfig(BaseModel):
"""
插件配置项
"""
module: str
"""模块"""
key: str
""""""
value: Any
""""""
help: Optional[str] = None
"""帮助"""
default_value: Any
"""默认值"""
type: Optional[Any] = None
"""值类型"""
type_inner: Optional[List[str]] = None
"""List Tuple等内部类型检验"""
class Plugin(BaseModel):
"""
插件
"""
module: str
"""模块名称"""
plugin_settings: Optional[PluginSetting]
"""settings"""
plugin_manager: Optional[PluginManager]
"""manager"""
plugin_config: Optional[Dict[str, PluginConfig]]
"""配置项"""
cd_limit: Optional[PluginCd]
"""cd限制"""
block_limit: Optional[PluginBlock]
"""阻断限制"""
count_limit: Optional[PluginCount]
"""次数限制"""
class PluginCount(BaseModel):
"""
插件数量
"""
normal: int = 0
"""普通插件"""
admin: int = 0
"""管理员插件"""
superuser: int = 0
"""超级用户插件"""
other: int = 0
"""其他插件"""
class PluginDetail(PluginInfo):
"""
插件详情
"""
config_list: List[PluginConfig]

View File

@ -0,0 +1,121 @@
import os
import shutil
from pathlib import Path
from typing import List, Optional
from fastapi import APIRouter
from ....base_model import Result
from ....utils import authentication, get_system_disk
from .model import AddFile, DeleteFile, DirFile, RenameFile, SaveFile
router = APIRouter(prefix="/system")
@router.get("/get_dir_list", dependencies=[authentication()], description="获取文件列表")
async def _(path: Optional[str] = None) -> Result:
base_path = Path(path) if path else Path()
data_list = []
for file in os.listdir(base_path):
data_list.append(DirFile(is_file=not (base_path / file).is_dir(), name=file, parent=path))
return Result.ok(data_list)
@router.get("/get_resources_size", dependencies=[authentication()], description="获取文件列表")
async def _(full_path: Optional[str] = None) -> Result:
return Result.ok(await get_system_disk(full_path))
@router.post("/delete_file", dependencies=[authentication()], description="删除文件")
async def _(param: DeleteFile) -> Result:
path = Path(param.full_path)
if not path or not path.exists():
return Result.warning_("文件不存在...")
try:
path.unlink()
return Result.ok('删除成功!')
except Exception as e:
return Result.warning_('删除失败: ' + str(e))
@router.post("/delete_folder", dependencies=[authentication()], description="删除文件夹")
async def _(param: DeleteFile) -> Result:
path = Path(param.full_path)
if not path or not path.exists() or path.is_file():
return Result.warning_("文件夹不存在...")
try:
shutil.rmtree(path.absolute())
return Result.ok('删除成功!')
except Exception as e:
return Result.warning_('删除失败: ' + str(e))
@router.post("/rename_file", dependencies=[authentication()], description="重命名文件")
async def _(param: RenameFile) -> Result:
path = (Path(param.parent) / param.old_name) if param.parent else Path(param.old_name)
if not path or not path.exists():
return Result.warning_("文件不存在...")
try:
path.rename(path.parent / param.name)
return Result.ok('重命名成功!')
except Exception as e:
return Result.warning_('重命名失败: ' + str(e))
@router.post("/rename_folder", dependencies=[authentication()], description="重命名文件夹")
async def _(param: RenameFile) -> Result:
path = (Path(param.parent) / param.old_name) if param.parent else Path(param.old_name)
if not path or not path.exists() or path.is_file():
return Result.warning_("文件夹不存在...")
try:
new_path = path.parent / param.name
shutil.move(path.absolute(), new_path.absolute())
return Result.ok('重命名成功!')
except Exception as e:
return Result.warning_('重命名失败: ' + str(e))
@router.post("/add_file", dependencies=[authentication()], description="新建文件")
async def _(param: AddFile) -> Result:
path = (Path(param.parent) / param.name) if param.parent else Path(param.name)
if path.exists():
return Result.warning_("文件已存在...")
try:
path.open('w')
return Result.ok('新建文件成功!')
except Exception as e:
return Result.warning_('新建文件失败: ' + str(e))
@router.post("/add_folder", dependencies=[authentication()], description="新建文件夹")
async def _(param: AddFile) -> Result:
path = (Path(param.parent) / param.name) if param.parent else Path(param.name)
if path.exists():
return Result.warning_("文件夹已存在...")
try:
path.mkdir()
return Result.ok('新建文件夹成功!')
except Exception as e:
return Result.warning_('新建文件夹失败: ' + str(e))
@router.get("/read_file", dependencies=[authentication()], description="读取文件")
async def _(full_path: str) -> Result:
path = Path(full_path)
if not path.exists():
return Result.warning_("文件不存在...")
try:
text = path.read_text(encoding='utf-8')
return Result.ok(text)
except Exception as e:
return Result.warning_('新建文件夹失败: ' + str(e))
@router.post("/save_file", dependencies=[authentication()], description="读取文件")
async def _(param: SaveFile) -> Result:
path = Path(param.full_path)
try:
with path.open('w') as f:
f.write(param.content)
return Result.ok("更新成功!")
except Exception as e:
return Result.warning_('新建文件夹失败: ' + str(e))

View File

@ -0,0 +1,64 @@
from datetime import datetime
from typing import Literal, Optional
from pydantic import BaseModel
class DirFile(BaseModel):
"""
文件或文件夹
"""
is_file: bool
"""是否为文件"""
name: str
"""文件夹或文件名称"""
parent: Optional[str] = None
"""父级"""
class DeleteFile(BaseModel):
"""
删除文件
"""
full_path: str
"""文件全路径"""
class RenameFile(BaseModel):
"""
删除文件
"""
parent: Optional[str]
"""父路径"""
old_name: str
"""旧名称"""
name: str
"""新名称"""
class AddFile(BaseModel):
"""
新建文件
"""
parent: Optional[str]
"""父路径"""
name: str
"""新名称"""
class SaveFile(BaseModel):
"""
保存文件
"""
full_path: str
"""全路径"""
content: str
"""内容"""

View File

@ -5,7 +5,9 @@ import nonebot
from fastapi import APIRouter, Depends
from fastapi.security import OAuth2PasswordRequestForm
from ..models.model import Result
from configs.config import Config
from ..base_model import Result
from ..utils import (
ACCESS_TOKEN_EXPIRE_MINUTES,
create_token,
@ -22,13 +24,14 @@ router = APIRouter()
@router.post("/login")
async def login_get_token(form_data: OAuth2PasswordRequestForm = Depends()):
if user := get_user(form_data.username):
if user.password != form_data.password:
return Result.fail("真笨, 密码都能记错!", 999)
else:
username = Config.get_config("web-ui", "username")
password = Config.get_config("web-ui", "password")
if not username or not password:
return Result.fail("你滴配置文件里用户名密码配置项为空", 998)
if username != form_data.username or password != form_data.password:
return Result.fail("真笨, 账号密码都能记错!", 999)
access_token = create_token(
user=user, expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
user=get_user(form_data.username), expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
token_data["token"].append(access_token)
if len(token_data["token"]) > 3:

View File

@ -0,0 +1,117 @@
from datetime import datetime
from logging import warning
from typing import Any, Dict, Generic, List, Optional, TypeVar, Union
from nonebot.adapters.onebot.v11 import Bot
from pydantic import BaseModel, validator
from configs.utils import Config
from utils.manager.models import Plugin as PluginManager
from utils.manager.models import PluginBlock, PluginCd, PluginCount, PluginSetting
T = TypeVar("T")
class User(BaseModel):
username: str
password: str
class Token(BaseModel):
access_token: str
token_type: str
class Result(BaseModel):
"""
总体返回
"""
suc: bool
"""调用状态"""
code: int = 200
"""code"""
info: str = "操作成功"
"""info"""
warning: Optional[str] = None
"""警告信息"""
data: Any = None
"""返回数据"""
@classmethod
def warning_(cls, info: str, code: int = 200) -> "Result":
return cls(suc=True, warning=info, code=code)
@classmethod
def fail(cls, info: str = "异常错误", code: int = 500) -> "Result":
return cls(suc=False, info=info, code=code)
@classmethod
def ok(cls, data: Any = None, info: str = "操作成功", code: int = 200) -> "Result":
return cls(suc=True, info=info, code=code, data=data)
class QueryModel(BaseModel, Generic[T]):
"""
基本查询条件
"""
index: int
"""页数"""
size: int
"""每页数量"""
data: T
"""携带数据"""
@validator("index")
def index_validator(cls, index):
if index < 1:
raise ValueError("查询下标小于1...")
return index
@validator("size")
def size_validator(cls, size):
if size < 1:
raise ValueError("每页数量小于1...")
return size
class BaseResultModel(BaseModel):
"""
基础返回
"""
total: int
"""总页数"""
data: Any
"""数据"""
class SystemStatus(BaseModel):
"""
系统状态
"""
cpu: float
memory: float
disk: float
check_time: datetime
class SystemFolderSize(BaseModel):
"""
资源文件占比
"""
name: str
"""名称"""
size: float
"""大小"""
full_path: Optional[str]
"""完整路径"""
is_dir: bool
"""是否为文件夹"""

View File

@ -5,6 +5,7 @@ import nonebot
from fastapi import APIRouter
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from strenum import StrEnum
app = nonebot.get_app()
@ -19,85 +20,67 @@ app.add_middleware(
)
class RequestResult(BaseModel):
AVA_URL = "http://q1.qlogo.cn/g?b=qq&nk={}&s=160"
GROUP_AVA_URL = "http://p.qlogo.cn/gh/{}/{}/640/"
class QueryDateType(StrEnum):
"""
好友/群组请求管理
查询日期类型
"""
oid: str
id: int
flag: str
nickname: Optional[str]
level: Optional[int]
sex: Optional[str]
age: Optional[int]
from_: Optional[str]
comment: Optional[str]
invite_group: Optional[int]
group_name: Optional[str]
DAY = "day"
""""""
WEEK = "week"
""""""
MONTH = "month"
""""""
YEAR = "year"
""""""
class RequestParma(BaseModel):
"""
操作请求接收数据
"""
# class SystemNetwork(BaseModel):
# """
# 系统网络状态
# """
id: int
handle: str
type: str
# baidu: int
# google: int
class SystemStatus(BaseModel):
"""
系统状态
"""
# class SystemFolderSize(BaseModel):
# """
# 资源文件占比
# """
cpu: int
memory: int
disk: int
check_time: datetime
# font_dir_size: float
# image_dir_size: float
# text_dir_size: float
# record_dir_size: float
# temp_dir_size: float
# data_dir_size: float
# log_dir_size: float
# check_time: datetime
class SystemNetwork(BaseModel):
"""
系统网络状态
"""
# class SystemStatusList(BaseModel):
# """
# 状态记录
# """
baidu: int
google: int
# cpu_data: List[Dict[str, Union[float, str]]]
# memory_data: List[Dict[str, Union[float, str]]]
# disk_data: List[Dict[str, Union[float, str]]]
class SystemFolderSize(BaseModel):
"""
资源文件占比
"""
# class SystemResult(BaseModel):
# """
# 系统api返回
# """
font_dir_size: float
image_dir_size: float
text_dir_size: float
record_dir_size: float
temp_dir_size: float
data_dir_size: float
log_dir_size: float
check_time: datetime
class SystemStatusList(BaseModel):
"""
状态记录
"""
cpu_data: List[Dict[str, Union[float, str]]]
memory_data: List[Dict[str, Union[float, str]]]
disk_data: List[Dict[str, Union[float, str]]]
class SystemResult(BaseModel):
"""
系统api返回
"""
status: SystemStatus
network: SystemNetwork
disk: SystemFolderSize
check_time: datetime
# status: SystemStatus
# network: SystemNetwork
# disk: SystemFolderSize
# check_time: datetime

View File

@ -1,223 +0,0 @@
from datetime import datetime
from typing import Any, Dict, List, Optional, TypeVar, Union
from nonebot.adapters.onebot.v11 import Bot
from pydantic import BaseModel
from configs.utils import Config
from utils.manager.models import Plugin as PluginManager
from utils.manager.models import PluginBlock, PluginCd, PluginCount, PluginSetting
class User(BaseModel):
username: str
password: str
class Token(BaseModel):
access_token: str
token_type: str
class Result(BaseModel):
"""
总体返回
"""
suc: bool
"""调用状态"""
code: int = 200
"""code"""
info: str = "操作成功"
"""info"""
data: Any = None
"""返回数据"""
@classmethod
def fail(cls, info: str = "异常错误", code: int = 500) -> "Result":
return cls(suc=False, info=info, code=code)
@classmethod
def ok(cls, data: Any = None, info: str = "操作成功", code: int = 200) -> "Result":
return cls(suc=True, info=info, code=code, data=data)
class PluginConfig(BaseModel):
"""
插件配置项
"""
module: str
key: str
value: Optional[Any]
help: Optional[str]
default_value: Optional[Any]
has_type: bool
class Plugin(BaseModel):
"""
插件
"""
model: str
"""模块名称"""
plugin_settings: Optional[PluginSetting]
"""settings"""
plugin_manager: Optional[PluginManager]
"""manager"""
plugin_config: Optional[Dict[str, PluginConfig]]
"""配置项"""
cd_limit: Optional[PluginCd]
"""cd限制"""
block_limit: Optional[PluginBlock]
"""阻断限制"""
count_limit: Optional[PluginCount]
"""次数限制"""
class Group(BaseModel):
"""
群组信息
"""
group_id: int
group_name: str
member_count: int
max_member_count: int
class Task(BaseModel):
"""
被动技能
"""
name: str
nameZh: str
status: bool
class GroupResult(BaseModel):
"""
群组返回数据
"""
group: Group
level: int
status: bool
close_plugins: List[str]
task: List[Task]
class RequestResult(BaseModel):
"""
好友/群组请求管理
"""
oid: str
id: int
flag: str
nickname: Optional[str]
level: Optional[int]
sex: Optional[str]
age: Optional[int]
from_: Optional[str]
comment: Optional[str]
invite_group: Optional[int]
group_name: Optional[str]
class SystemStatus(BaseModel):
"""
系统状态
"""
cpu: float
memory: float
disk: float
check_time: datetime
class SystemNetwork(BaseModel):
"""
系统网络状态
"""
baidu: int
google: int
class SystemFolderSize(BaseModel):
"""
资源文件占比
"""
font_dir_size: float
image_dir_size: float
text_dir_size: float
record_dir_size: float
temp_dir_size: float
data_dir_size: float
log_dir_size: float
check_time: datetime
class SystemStatusList(BaseModel):
"""
状态记录
"""
cpu_data: List[Dict[str, Union[float, str]]]
memory_data: List[Dict[str, Union[float, str]]]
disk_data: List[Dict[str, Union[float, str]]]
class SystemResult(BaseModel):
"""
系统api返回
"""
status: SystemStatus
network: SystemNetwork
disk: SystemFolderSize
check_time: datetime
class BotInfo(BaseModel):
"""
Bot基础信息
"""
bot: Bot
"""Bot"""
self_id: str
"""SELF ID"""
nickname: str
"""昵称"""
ava_url: str
"""头像url"""
friend_count: int = 0
"""好友数量"""
group_count: int = 0
"""群聊数量"""
received_messages: int = 0
"""累计接收消息"""
received_messages_day: int = 0
"""今日累计接收消息"""
received_messages_week: int = 0
"""一周内累计接收消息"""
received_messages_month: int = 0
"""一月内累计接收消息"""
plugin_count: int = 0
"""加载插件数量"""
success_plugin_count: int = 0
"""加载成功插件数量"""
fail_plugin_count: int = 0
"""加载失败插件数量"""
is_select: bool = False
"""当前选择"""
class Config:
arbitrary_types_allowed = True

View File

@ -1,59 +0,0 @@
from typing import Any, List
from pydantic import BaseModel
class UpdatePlugin(BaseModel):
"""
插件修改参数
"""
module: str
"""模块"""
default_status: bool
"""默认开关"""
limit_superuser: bool
"""限制超级用户"""
cost_gold: int
"""金币花费"""
cmd: List[str]
"""插件别名"""
menu_type: str
"""插件菜单类型"""
group_level: int
"""插件所需群权限"""
block_type: str
"""禁用类型"""
class UpdateConfig(BaseModel):
"""
配置项修改参数
"""
module: str
"""模块"""
key: str
"""配置项key"""
value: Any
"""配置项值"""
class UpdateGroup(BaseModel):
group_id: str
"""群号"""
status: bool
"""状态"""
level: int
"""群权限"""
class HandleRequest(BaseModel):
"""
操作请求接收数据
"""
id: int
handle: str
type: str

View File

@ -1,16 +1,27 @@
import os
from datetime import datetime, timedelta
from typing import Any, Optional
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
import nonebot
import psutil
import ujson as json
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from nonebot.utils import run_sync
from configs.config import Config
from configs.path_config import DATA_PATH
from configs.path_config import (
DATA_PATH,
FONT_PATH,
IMAGE_PATH,
LOG_PATH,
RECORD_PATH,
TEMP_PATH,
TEXT_PATH,
)
from .models.model import Result, User
from .base_model import SystemFolderSize, SystemStatus, User
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
@ -29,6 +40,14 @@ if token_file.exists():
def get_user(uname: str) -> Optional[User]:
"""获取账号密码
参数:
uname: uname
返回:
Optional[User]: 用户信息
"""
username = Config.get_config("web-ui", "username")
password = Config.get_config("web-ui", "password")
if username and password and uname == username:
@ -36,6 +55,12 @@ def get_user(uname: str) -> Optional[User]:
def create_token(user: User, expires_delta: Optional[timedelta] = None):
"""创建token
参数:
user: 用户信息
expires_delta: 过期时间.
"""
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
return jwt.encode(
claims={"sub": user.username, "exp": expire},
@ -45,6 +70,13 @@ def create_token(user: User, expires_delta: Optional[timedelta] = None):
def authentication():
"""权限验证
异常:
JWTError: JWTError
HTTPException: HTTPException
"""
# if token not in token_data["token"]:
def inner(token: str = Depends(oauth2_scheme)):
try:
@ -57,3 +89,82 @@ def authentication():
raise HTTPException(status_code=400, detail="登录验证失败或已失效, 踢出房间!")
return Depends(inner)
def _get_dir_size(dir_path: Path) -> float:
"""
说明:
获取文件夹大小
参数:
:param dir_path: 文件夹路径
"""
size = 0
for root, dirs, files in os.walk(dir_path):
size += sum([os.path.getsize(os.path.join(root, name)) for name in files])
return size
@run_sync
def get_system_status() -> SystemStatus:
"""
说明:
获取系统信息等
"""
cpu = psutil.cpu_percent()
memory = psutil.virtual_memory().percent
disk = psutil.disk_usage("/").percent
return SystemStatus(
cpu=cpu,
memory=memory,
disk=disk,
check_time=datetime.now().replace(microsecond=0),
)
@run_sync
def get_system_disk(
full_path: Optional[str],
) -> List[SystemFolderSize]:
"""
说明:
获取资源文件大小等
"""
base_path = Path(full_path) if full_path else Path()
other_size = 0
data_list = []
for file in os.listdir(base_path):
f = base_path / file
if f.is_dir():
size = _get_dir_size(f) / 1024 / 1024
data_list.append(SystemFolderSize(name=file, size=size, full_path=str(f), is_dir=True))
else:
other_size += f.stat().st_size / 1024 / 1024
if other_size:
data_list.append(SystemFolderSize(name='other_file', size=other_size, full_path=full_path, is_dir=False))
return data_list
# else:
# if type_ == "image":
# dir_path = IMAGE_PATH
# elif type_ == "font":
# dir_path = FONT_PATH
# elif type_ == "text":
# dir_path = TEXT_PATH
# elif type_ == "record":
# dir_path = RECORD_PATH
# elif type_ == "data":
# dir_path = DATA_PATH
# elif type_ == "temp":
# dir_path = TEMP_PATH
# else:
# dir_path = LOG_PATH
# dir_map = {}
# other_file_size = 0
# for file in os.listdir(dir_path):
# file = Path(dir_path / file)
# if file.is_dir():
# dir_map[file.name] = _get_dir_size(file) / 1024 / 1024
# else:
# other_file_size += os.path.getsize(file) / 1024 / 1024
# dir_map["其他文件"] = other_file_size
# dir_map["check_time"] = datetime.now().replace(microsecond=0)
# return dir_map

View File

@ -2,7 +2,6 @@ import asyncio
from typing import Optional
from nonebot import get_driver
from nonebot.log import logger
from playwright.async_api import Browser, Playwright, async_playwright
from services.log import logger
@ -26,7 +25,7 @@ async def shutdown_browser():
if _browser:
await _browser.close()
if _playwright:
_playwright.stop()
await _playwright.stop() # type: ignore
def get_browser() -> Browser:

View File

@ -30,7 +30,7 @@ class StaticData(Generic[T]):
try:
self._data: dict = json.load(f)
except ValueError:
if f.read().strip():
if not f.read().strip():
raise ValueError(f"{file} 文件加载错误,请检查文件内容格式.")
elif file.name.endswith("yaml"):
self._data = _yaml.load(f)

View File

@ -29,3 +29,6 @@ class PluginDataManager(StaticData[PluginData]):
def __getitem__(self, item) -> Optional[PluginData]:
return self._data.get(item)
def reload(self):
pass

View File

@ -1,11 +1,13 @@
from utils.manager.data_class import StaticData
from nonebot.adapters.onebot.v11 import Bot, ActionFailed
from services.log import logger
from typing import Optional
from utils.image_utils import BuildImage
from utils.utils import get_user_avatar
from pathlib import Path
from io import BytesIO
from pathlib import Path
from typing import Optional, Union, overload
from nonebot.adapters.onebot.v11 import ActionFailed, Bot
from services.log import logger
from utils.image_utils import BuildImage
from utils.manager.data_class import StaticData
from utils.utils import get_user_avatar
class RequestManager(StaticData):
@ -21,6 +23,7 @@ class RequestManager(StaticData):
def add_request(
self,
bot_id: str,
id_: int,
type_: str,
flag: str,
@ -36,6 +39,7 @@ class RequestManager(StaticData):
):
"""
添加一个请求
:param bot_id: bot_id
:param id_: id用户id或群id
:param type_: 类型private group
:param flag: event.flag
@ -49,6 +53,7 @@ class RequestManager(StaticData):
:param group_name: 群聊名称
"""
self._data[type_][str(len(self._data[type_].keys()))] = {
"bot_id": bot_id,
"id": id_,
"flag": flag,
"nickname": nickname,
@ -62,14 +67,24 @@ class RequestManager(StaticData):
}
self.save()
@overload
def remove_request(self, type_: str, flag: str):
...
@overload
def remove_request(self, type_: str, id_: int):
...
def remove_request(self, type_: str, id_: Union[int, str]):
"""
删除一个请求数据
:param type_: 类型
:param id_: iduser_id group_id
"""
for x in self._data[type_].keys():
if self._data[type_][x].get("id") == id_:
a_id = self._data[type_][x].get("id")
a_flag = self._data[type_][x].get("flag")
if a_id == id_ or a_flag == id_:
del self._data[type_][x]
break
self.save()
@ -83,8 +98,17 @@ class RequestManager(StaticData):
if data:
return data["invite_group"]
return None
@overload
async def approve(self, bot: Bot, id_: int, type_: str) -> int:
...
@overload
async def approve(self, bot: Bot, flag: str, type_: str) -> int:
...
async def approve(self, bot: Bot, id_: Union[int, str], type_: str) -> int:
"""
同意请求
:param bot: Bot
@ -92,8 +116,16 @@ class RequestManager(StaticData):
:param type_: 类型private group
"""
return await self._set_add_request(bot, id_, type_, True)
@overload
async def refused(self, bot: Bot, id_: int, type_: str) -> int:
...
async def refused(self, bot: Bot, id_: int, type_: str) -> Optional[int]:
@overload
async def refused(self, bot: Bot, flag: str, type_: str) -> int:
...
async def refused(self, bot: Bot, id_: Union[int, str], type_: str) -> Optional[int]:
"""
拒绝请求
:param bot: Bot
@ -102,7 +134,9 @@ class RequestManager(StaticData):
"""
return await self._set_add_request(bot, id_, type_, False)
def clear(self, type_: Optional[str] = None): # type_: Optional[Literal["group", "private"]] = None
def clear(
self, type_: Optional[str] = None
): # type_: Optional[Literal["group", "private"]] = None
"""
清空所有请求信息无视请求
:param type_: 类型
@ -113,16 +147,32 @@ class RequestManager(StaticData):
self._data = {"private": {}, "group": {}}
self.save()
def delete_request(self, id_: int, type_: str): # type_: Literal["group", "private"]
@overload
async def delete_request(self, id_: int, type_: str) -> int:
...
@overload
async def delete_request(self, flag: str, type_: str) -> int:
...
def delete_request(
self, id_: Union[str, int], type_: str
): # type_: Literal["group", "private"]
"""
删除请求
:param id_: id
:param type_: 类型
"""
id_ = str(id_)
if self._data[type_].get(id_):
del self._data[type_][id_]
self.save()
if type(id_) == int:
if self._data[type_].get(id_):
del self._data[type_][id_]
self.save()
else:
for k, item in self._data[type_].items():
if item['flag'] == id_:
del self._data[type_][k]
self.save()
break
def set_group_name(self, group_name: str, group_id: int):
"""
@ -230,7 +280,7 @@ class RequestManager(StaticData):
return bk.pic2bs4()
async def _set_add_request(
self, bot: Bot, id_: int, type_: str, approve: bool
self, bot: Bot, idx: Union[str, int], type_: str, approve: bool
) -> int:
"""
处理请求
@ -239,8 +289,13 @@ class RequestManager(StaticData):
:param type_: 类型private group
:param approve: 是否同意
"""
id_ = str(id_)
if id_ in self._data[type_].keys():
flag = None
id_ = None
if type(idx) == str:
flag = idx
else:
id_ = str(idx)
if id_ and id_ in self._data[type_].keys():
try:
if type_ == "private":
await bot.set_friend_add_request(
@ -259,7 +314,7 @@ class RequestManager(StaticData):
f"同意{self._data[type_][id_]['nickname']}({self._data[type_][id_]['id']})"
f"{'好友' if type_ == 'private' else '入群'}请求失败了..."
)
return 1 # flag失效
return 1 # flag失效
else:
logger.info(
f"{'同意' if approve else '拒绝'}{self._data[type_][id_]['nickname']}({self._data[type_][id_]['id']})"
@ -268,4 +323,25 @@ class RequestManager(StaticData):
del self._data[type_][id_]
self.save()
return rid
return 2 # 未找到id
if flag:
rm_id = None
for k, item in self._data[type_].items():
if item['flag'] == flag:
rm_id = k
if type_ == 'private':
await bot.set_friend_add_request(
flag=item['flag'], approve=approve
)
rid = item["id"]
else:
await bot.set_group_add_request(
flag=item['flag'],
sub_type="invite",
approve=approve,
)
rid = item["invite_group"]
if rm_id is not None:
del self._data[type_][rm_id]
self.save()
return rid
return 2 # 未找到id

View File

@ -1,16 +1,37 @@
from typing import Any
from nonebot.adapters.onebot.v11 import Message, MessageEvent
from pydantic import BaseModel
from typing import Any
class ShopParam(BaseModel):
goods_name: str
"""商品名称"""
user_id: int
"""用户id"""
group_id: int
"""群聊id"""
bot: Any
"""bot"""
event: MessageEvent
num: int # 道具单次使用数量
"""event"""
num: int
"""道具单次使用数量"""
message: Message
"""message"""
text: str
send_success_msg: bool = True # 是否发送使用成功信息
max_num_limit: int = 1 # 单次使用最大次数
"""text"""
send_success_msg: bool = True
"""是否发送使用成功信息"""
max_num_limit: int = 1
"""单次使用最大次数"""
class CommonSql(BaseModel):
sql: str
"""sql语句"""
remark: str
"""备注"""

View File

@ -1,3 +1,4 @@
from typing import Literal
BLOCK_TYPE = Literal["all", "private", "group"]
"""禁用类型"""