🎨 优化Web UI代码结构,修改target方法

This commit is contained in:
HibiKier 2024-12-23 17:33:49 +08:00
parent ae03ad1665
commit 1ae9e689fe
18 changed files with 1389 additions and 926 deletions

View File

@ -2,20 +2,12 @@ from datetime import datetime
import uuid
import nonebot
from nonebot import require
from nonebot.adapters import Bot
from nonebot.drivers import Driver
from tortoise import Tortoise
from tortoise.exceptions import OperationalError
import ujson as json
require("nonebot_plugin_apscheduler")
require("nonebot_plugin_alconna")
require("nonebot_plugin_session")
require("nonebot_plugin_userinfo")
require("nonebot_plugin_htmlrender")
# require("nonebot_plugin_uninfo")
from zhenxun.models.bot_connect_log import BotConnectLog
from zhenxun.models.bot_console import BotConsole
from zhenxun.models.goods_info import GoodsInfo

View File

@ -89,7 +89,7 @@ async def _(bot: Bot):
async with aiofiles.open(RESTART_MARK, encoding="utf8") as f:
bot_id, user_id = (await f.read()).split()
if bot := nonebot.get_bot(bot_id):
if target := PlatformUtils.get_target(bot, user_id):
if target := PlatformUtils.get_target(user_id=user_id):
await MessageUtils.build_message(
f"{BotConfig.self_nickname}已成功重启!"
).send(target, bot=bot)

View File

@ -47,7 +47,7 @@ class BroadcastManage:
group.group_id,
):
target = PlatformUtils.get_target(
bot, None, group.channel_id or group.group_id
group_id=group.group_id, channel_id=group.channel_id
)
if target:
await MessageUtils.build_message(message_list).send(

View File

@ -112,6 +112,6 @@ async def _():
app.include_router(BaseApiRouter)
app.include_router(WsApiRouter)
await init_public(app)
logger.info("<g>API启动成功</g>", "Web UI")
logger.info("<g>API启动成功</g>", "WebUi")
except Exception as e:
logger.error("<g>API启动失败</g>", "Web UI", e=e)
logger.error("<g>API启动失败</g>", "WebUi", e=e)

View File

@ -1,20 +1,14 @@
from datetime import datetime, timedelta
from fastapi import APIRouter
from fastapi.responses import JSONResponse
import nonebot
from nonebot import require
from nonebot.config import Config
from tortoise.expressions import RawSQL
from tortoise.functions import Count
from zhenxun.models.bot_connect_log import BotConnectLog
from zhenxun.models.chat_history import ChatHistory
from zhenxun.models.statistics import Statistics
from zhenxun.services.log import logger
from ....base_model import BaseResultModel, QueryModel, Result
from ....utils import authentication
from .data_source import BotManage
from .data_source import ApiDataSource
from .model import AllChatAndCallCount, BotInfo, ChatCallMonthCount, QueryChatCallCount
require("plugin_store")
@ -33,8 +27,9 @@ driver = nonebot.get_driver()
)
async def _() -> Result[list[BotInfo]]:
try:
return Result.ok(await BotManage.get_bot_list(), "拿到信息啦!")
return Result.ok(await ApiDataSource.get_bot_list(), "拿到信息啦!")
except Exception as e:
logger.error(f"{router.prefix}/get_bot_list 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
@ -46,29 +41,13 @@ async def _() -> Result[list[BotInfo]]:
description="获取聊天/调用记录的全部和今日数量",
)
async def _(bot_id: str | None = None) -> Result[QueryChatCallCount]:
now = datetime.now()
query = ChatHistory
if bot_id:
query = query.filter(bot_id=bot_id)
chat_all_count = await query.annotate().count()
chat_day_count = await query.filter(
create_time__gte=now - timedelta(hours=now.hour, minutes=now.minute)
).count()
query = Statistics
if bot_id:
query = query.filter(bot_id=bot_id)
call_all_count = await query.annotate().count()
call_day_count = await query.filter(
create_time__gte=now - timedelta(hours=now.hour, minutes=now.minute)
).count()
return Result.ok(
QueryChatCallCount(
chat_num=chat_all_count,
chat_day=chat_day_count,
call_num=call_all_count,
call_day=call_day_count,
try:
return Result.ok(
await ApiDataSource.get_chat_and_call_count(bot_id), "拿到信息啦!"
)
)
except Exception as e:
logger.error(f"{router.prefix}/get_chat_and_call_count 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
@router.get(
@ -79,41 +58,15 @@ async def _(bot_id: str | None = None) -> Result[QueryChatCallCount]:
description="获取聊天/调用记录的全部数据次数",
)
async def _(bot_id: str | None = None) -> Result[AllChatAndCallCount]:
now = datetime.now()
query = ChatHistory
if bot_id:
query = query.filter(bot_id=bot_id)
chat_week_count = await query.filter(
create_time__gte=now - timedelta(days=7, hours=now.hour, minutes=now.minute)
).count()
chat_month_count = await query.filter(
create_time__gte=now - timedelta(days=30, hours=now.hour, minutes=now.minute)
).count()
chat_year_count = await query.filter(
create_time__gte=now - timedelta(days=365, hours=now.hour, minutes=now.minute)
).count()
query = Statistics
if bot_id:
query = query.filter(bot_id=bot_id)
call_week_count = await query.filter(
create_time__gte=now - timedelta(days=7, hours=now.hour, minutes=now.minute)
).count()
call_month_count = await query.filter(
create_time__gte=now - timedelta(days=30, hours=now.hour, minutes=now.minute)
).count()
call_year_count = await query.filter(
create_time__gte=now - timedelta(days=365, hours=now.hour, minutes=now.minute)
).count()
return Result.ok(
AllChatAndCallCount(
chat_week=chat_week_count,
chat_month=chat_month_count,
chat_year=chat_year_count,
call_week=call_week_count,
call_month=call_month_count,
call_year=call_year_count,
try:
return Result.ok(
await ApiDataSource.get_all_chat_and_call_count(bot_id), "拿到信息啦!"
)
)
except Exception as e:
logger.error(
f"{router.prefix}/get_all_chat_and_call_count 调用错误", "WebUi", e=e
)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
@router.get(
@ -124,48 +77,13 @@ async def _(bot_id: str | None = None) -> Result[AllChatAndCallCount]:
deprecated="获取聊天/调用记录的一个月数量", # type: ignore
)
async def _(bot_id: str | None = None) -> Result[ChatCallMonthCount]:
now = datetime.now()
filter_date = now - timedelta(days=30, hours=now.hour, minutes=now.minute)
chat_query = ChatHistory
call_query = Statistics
if bot_id:
chat_query = chat_query.filter(bot_id=bot_id)
call_query = call_query.filter(bot_id=bot_id)
chat_date_list = (
await chat_query.filter(create_time__gte=filter_date)
.annotate(date=RawSQL("DATE(create_time)"), count=Count("id"))
.group_by("date")
.values("date", "count")
)
call_date_list = (
await call_query.filter(create_time__gte=filter_date)
.annotate(date=RawSQL("DATE(create_time)"), count=Count("id"))
.group_by("date")
.values("date", "count")
)
date_list = []
chat_count_list = []
call_count_list = []
chat_date2cnt = {str(date["date"]): date["count"] for date in chat_date_list}
call_date2cnt = {str(date["date"]): date["count"] for date in call_date_list}
date = now.date()
for _ in range(30):
if str(date) in chat_date2cnt:
chat_count_list.append(chat_date2cnt[str(date)])
else:
chat_count_list.append(0)
if str(date) in call_date2cnt:
call_count_list.append(call_date2cnt[str(date)])
else:
call_count_list.append(0)
date_list.append(str(date)[5:])
date -= timedelta(days=1)
chat_count_list.reverse()
call_count_list.reverse()
date_list.reverse()
return Result.ok(
ChatCallMonthCount(chat=chat_count_list, call=call_count_list, date=date_list)
)
try:
return Result.ok(
await ApiDataSource.get_chat_and_call_month(bot_id), "拿到信息啦!"
)
except Exception as e:
logger.error(f"{router.prefix}/get_chat_and_call_month 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
@router.post(
@ -176,18 +94,11 @@ async def _(bot_id: str | None = None) -> Result[ChatCallMonthCount]:
deprecated="获取Bot连接记录", # type: ignore
)
async def _(query: QueryModel) -> Result[BaseResultModel]:
total = await BotConnectLog.all().count()
if total % query.size:
total += 1
data = (
await BotConnectLog.all()
.order_by("-id")
.offset((query.index - 1) * query.size)
.limit(query.size)
)
for v in data:
v.connect_time = v.connect_time.replace(tzinfo=None).replace(microsecond=0)
return Result.ok(BaseResultModel(total=total, data=data))
try:
return Result.ok(await ApiDataSource.get_connect_log(query), "拿到信息啦!")
except Exception as e:
logger.error(f"{router.prefix}/get_connect_log 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
@router.get(

View File

@ -4,13 +4,17 @@ import time
import nonebot
from nonebot.adapters import Bot
from nonebot.drivers import Driver
from tortoise.expressions import RawSQL
from tortoise.functions import Count
from zhenxun.models.bot_connect_log import BotConnectLog
from zhenxun.models.chat_history import ChatHistory
from zhenxun.models.statistics import Statistics
from zhenxun.utils.platform import PlatformUtils
from ....base_model import BaseResultModel, QueryModel
from ..main.data_source import bot_live
from .model import BotInfo
from .model import AllChatAndCallCount, BotInfo, ChatCallMonthCount, QueryChatCallCount
driver: Driver = nonebot.get_driver()
@ -24,7 +28,7 @@ async def _():
CONNECT_TIME = int(time.time())
class BotManage:
class ApiDataSource:
@classmethod
async def __build_bot_info(cls, bot: Bot) -> BotInfo:
"""构建Bot信息
@ -76,3 +80,161 @@ class BotManage:
for _, bot in nonebot.get_bots().items():
bot_list.append(await cls.__build_bot_info(bot))
return bot_list
@classmethod
async def get_chat_and_call_count(cls, bot_id: str | None) -> QueryChatCallCount:
"""获取今日聊天和调用次数
参数:
bot_id: bot id
返回:
QueryChatCallCount: 数据内容
"""
now = datetime.now()
query = ChatHistory
if bot_id:
query = query.filter(bot_id=bot_id)
chat_all_count = await query.annotate().count()
chat_day_count = await query.filter(
create_time__gte=now - timedelta(hours=now.hour, minutes=now.minute)
).count()
query = Statistics
if bot_id:
query = query.filter(bot_id=bot_id)
call_all_count = await query.annotate().count()
call_day_count = await query.filter(
create_time__gte=now - timedelta(hours=now.hour, minutes=now.minute)
).count()
return QueryChatCallCount(
chat_num=chat_all_count,
chat_day=chat_day_count,
call_num=call_all_count,
call_day=call_day_count,
)
@classmethod
async def get_all_chat_and_call_count(
cls, bot_id: str | None
) -> AllChatAndCallCount:
"""获取全部聊天和调用记录
参数:
bot_id: bot id
返回:
AllChatAndCallCount: 数据内容
"""
now = datetime.now()
query = ChatHistory
if bot_id:
query = query.filter(bot_id=bot_id)
chat_week_count = await query.filter(
create_time__gte=now - timedelta(days=7, hours=now.hour, minutes=now.minute)
).count()
chat_month_count = await query.filter(
create_time__gte=now
- timedelta(days=30, hours=now.hour, minutes=now.minute)
).count()
chat_year_count = await query.filter(
create_time__gte=now
- timedelta(days=365, hours=now.hour, minutes=now.minute)
).count()
query = Statistics
if bot_id:
query = query.filter(bot_id=bot_id)
call_week_count = await query.filter(
create_time__gte=now - timedelta(days=7, hours=now.hour, minutes=now.minute)
).count()
call_month_count = await query.filter(
create_time__gte=now
- timedelta(days=30, hours=now.hour, minutes=now.minute)
).count()
call_year_count = await query.filter(
create_time__gte=now
- timedelta(days=365, hours=now.hour, minutes=now.minute)
).count()
return AllChatAndCallCount(
chat_week=chat_week_count,
chat_month=chat_month_count,
chat_year=chat_year_count,
call_week=call_week_count,
call_month=call_month_count,
call_year=call_year_count,
)
@classmethod
async def get_chat_and_call_month(cls, bot_id: str | None) -> ChatCallMonthCount:
"""获取一个月内的调用/消息记录次数并根据日期对数据填充0
参数:
bot_id: bot id
返回:
ChatCallMonthCount: 数据内容
"""
now = datetime.now()
filter_date = now - timedelta(days=30, hours=now.hour, minutes=now.minute)
chat_query = ChatHistory
call_query = Statistics
if bot_id:
chat_query = chat_query.filter(bot_id=bot_id)
call_query = call_query.filter(bot_id=bot_id)
chat_date_list = (
await chat_query.filter(create_time__gte=filter_date)
.annotate(date=RawSQL("DATE(create_time)"), count=Count("id"))
.group_by("date")
.values("date", "count")
)
call_date_list = (
await call_query.filter(create_time__gte=filter_date)
.annotate(date=RawSQL("DATE(create_time)"), count=Count("id"))
.group_by("date")
.values("date", "count")
)
date_list = []
chat_count_list = []
call_count_list = []
chat_date2cnt = {str(date["date"]): date["count"] for date in chat_date_list}
call_date2cnt = {str(date["date"]): date["count"] for date in call_date_list}
date = now.date()
for _ in range(30):
if str(date) in chat_date2cnt:
chat_count_list.append(chat_date2cnt[str(date)])
else:
chat_count_list.append(0)
if str(date) in call_date2cnt:
call_count_list.append(call_date2cnt[str(date)])
else:
call_count_list.append(0)
date_list.append(str(date)[5:])
date -= timedelta(days=1)
chat_count_list.reverse()
call_count_list.reverse()
date_list.reverse()
return ChatCallMonthCount(
chat=chat_count_list, call=call_count_list, date=date_list
)
@classmethod
async def get_connect_log(cls, query: QueryModel) -> BaseResultModel:
"""获取bot连接日志
参数:
query: 查询模型
返回:
BaseResultModel: 数据内容
"""
total = await BotConnectLog.all().count()
if total % query.size:
total += 1
data = (
await BotConnectLog.all()
.order_by("-id")
.offset((query.index - 1) * query.size)
.limit(query.size)
)
for v in data:
v.connect_time = v.connect_time.replace(tzinfo=None).replace(microsecond=0)
return BaseResultModel(total=total, data=data)

View File

@ -8,9 +8,11 @@ from tortoise.exceptions import OperationalError
from zhenxun.configs.config import BotConfig
from zhenxun.models.plugin_info import PluginInfo
from zhenxun.models.task_info import TaskInfo
from zhenxun.services.log import logger
from ....base_model import BaseResultModel, QueryModel, Result
from ....utils import authentication
from .data_source import ApiDataSource, type2sql
from .models.model import Column, SqlModel, SqlText
from .models.sql_log import SqlLog
@ -20,52 +22,6 @@ router = APIRouter(prefix="/database")
driver: Driver = nonebot.get_driver()
SQL_DICT = {}
SELECT_TABLE_MYSQL_SQL = """
SELECT table_name AS name, table_comment AS `desc`
FROM information_schema.tables
WHERE table_schema = DATABASE();
"""
SELECT_TABLE_SQLITE_SQL = """
SELECT name FROM sqlite_master WHERE type='table';
"""
SELECT_TABLE_PSQL_SQL = """
select a.tablename as name,d.description as desc from pg_tables a
left join pg_class c on relname=tablename
left join pg_description d on oid=objoid and objsubid=0 where a.schemaname='public'
"""
SELECT_TABLE_COLUMN_PSQL_SQL = """
SELECT column_name, data_type, character_maximum_length as max_length, is_nullable
FROM information_schema.columns
WHERE table_name = '{}';
"""
SELECT_TABLE_COLUMN_MYSQL_SQL = """
SHOW COLUMNS FROM {};
"""
SELECT_TABLE_COLUMN_SQLITE_SQL = """
PRAGMA table_info({});
"""
type2sql = {
"mysql": SELECT_TABLE_MYSQL_SQL,
"sqlite": SELECT_TABLE_SQLITE_SQL,
"postgres": SELECT_TABLE_PSQL_SQL,
}
type2sql_column = {
"mysql": SELECT_TABLE_COLUMN_MYSQL_SQL,
"sqlite": SELECT_TABLE_COLUMN_SQLITE_SQL,
"postgres": SELECT_TABLE_COLUMN_PSQL_SQL,
}
@driver.on_startup
async def _():
for plugin in nonebot.get_loaded_plugins():
@ -73,7 +29,7 @@ async def _():
sql_list = []
if plugin.metadata and plugin.metadata.extra:
sql_list = plugin.metadata.extra.get("sql_list")
if module in SQL_DICT:
if module in ApiDataSource.SQL_DICT:
raise ValueError(f"{module} 常用SQL module 重复")
if sql_list:
SqlModel(
@ -81,15 +37,15 @@ async def _():
module=module,
sql_list=sql_list,
)
SQL_DICT[module] = SqlModel
if SQL_DICT:
result = await PluginInfo.filter(module__in=SQL_DICT.keys()).values_list(
"module", "name"
)
ApiDataSource.SQL_DICT[module] = SqlModel
if ApiDataSource.SQL_DICT:
result = await PluginInfo.filter(
module__in=ApiDataSource.SQL_DICT.keys()
).values_list("module", "name")
module2name = {r[0]: r[1] for r in result}
for s in SQL_DICT:
module = SQL_DICT[s].module
SQL_DICT[s].name = module2name.get(module, module)
for s in ApiDataSource.SQL_DICT:
module = ApiDataSource.SQL_DICT[s].module
ApiDataSource.SQL_DICT[s].name = module2name.get(module, module)
@router.get(
@ -100,10 +56,14 @@ async def _():
description="获取数据库表",
)
async def _() -> Result[list[dict]]:
db = Tortoise.get_connection("default")
sql_type = BotConfig.get_sql_type()
query = await db.execute_query_dict(type2sql[sql_type])
return Result.ok(query)
try:
db = Tortoise.get_connection("default")
sql_type = BotConfig.get_sql_type()
query = await db.execute_query_dict(type2sql[sql_type])
return Result.ok(query)
except Exception as e:
logger.error(f"{router.prefix}/get_table_list 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
@router.get(
@ -114,34 +74,13 @@ async def _() -> Result[list[dict]]:
description="获取表字段",
)
async def _(table_name: str) -> Result[list[Column]]:
db = Tortoise.get_connection("default")
sql_type = BotConfig.get_sql_type()
sql = type2sql_column[sql_type]
query = await db.execute_query_dict(sql.format(table_name))
result_list = []
if sql_type == "sqlite":
result_list.extend(
Column(
column_name=result["name"],
data_type=result["type"],
max_length=-1,
is_nullable="YES" if result["notnull"] == 1 else "NO",
)
for result in query
try:
return Result.ok(
await ApiDataSource.get_table_column(table_name), "拿到信息啦!"
)
elif sql_type == "mysql":
result_list.extend(
Column(
column_name=result["Field"],
data_type=result["Type"],
max_length=-1,
is_nullable=result["Null"],
)
for result in query
)
else:
result_list.extend(Column(**result) for result in query)
return Result.ok(result_list)
except Exception as e:
logger.error(f"{router.prefix}/get_table_column 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
@router.post(
@ -163,7 +102,8 @@ async def _(sql: SqlText, request: Request) -> Result[list[dict]]:
result = await TaskInfo.raw(sql.sql)
await SqlLog.add(ip or "0.0.0.0", sql.sql, str(result))
return Result.ok(info="执行成功啦!")
except OperationalError as e:
except Exception as e:
logger.error(f"{router.prefix}/exec_sql 调用错误", "WebUi", e=e)
await SqlLog.add(ip or "0.0.0.0", sql.sql, str(e), False)
return Result.warning_(f"sql执行错误: {e}")
@ -176,16 +116,20 @@ async def _(sql: SqlText, request: Request) -> Result[list[dict]]:
description="sql日志列表",
)
async def _(query: QueryModel) -> Result[BaseResultModel]:
total = await SqlLog.all().count()
if total % query.size:
total += 1
data = (
await SqlLog.all()
.order_by("-id")
.offset((query.index - 1) * query.size)
.limit(query.size)
)
return Result.ok(BaseResultModel(total=total, data=data))
try:
total = await SqlLog.all().count()
if total % query.size:
total += 1
data = (
await SqlLog.all()
.order_by("-id")
.offset((query.index - 1) * query.size)
.limit(query.size)
)
return Result.ok(BaseResultModel(total=total, data=data))
except Exception as e:
logger.error(f"{router.prefix}/get_sql_log 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
@router.get(

View File

@ -0,0 +1,90 @@
from tortoise import Tortoise
from zhenxun.configs.config import BotConfig
from .models.model import Column
SELECT_TABLE_MYSQL_SQL = """
SELECT table_name AS name, table_comment AS `desc`
FROM information_schema.tables
WHERE table_schema = DATABASE();
"""
SELECT_TABLE_SQLITE_SQL = """
SELECT name FROM sqlite_master WHERE type='table';
"""
SELECT_TABLE_PSQL_SQL = """
select a.tablename as name,d.description as desc from pg_tables a
left join pg_class c on relname=tablename
left join pg_description d on oid=objoid and objsubid=0 where a.schemaname='public'
"""
SELECT_TABLE_COLUMN_PSQL_SQL = """
SELECT column_name, data_type, character_maximum_length as max_length, is_nullable
FROM information_schema.columns
WHERE table_name = '{}';
"""
SELECT_TABLE_COLUMN_MYSQL_SQL = """
SHOW COLUMNS FROM {};
"""
SELECT_TABLE_COLUMN_SQLITE_SQL = """
PRAGMA table_info({});
"""
type2sql = {
"mysql": SELECT_TABLE_MYSQL_SQL,
"sqlite": SELECT_TABLE_SQLITE_SQL,
"postgres": SELECT_TABLE_PSQL_SQL,
}
type2sql_column = {
"mysql": SELECT_TABLE_COLUMN_MYSQL_SQL,
"sqlite": SELECT_TABLE_COLUMN_SQLITE_SQL,
"postgres": SELECT_TABLE_COLUMN_PSQL_SQL,
}
class ApiDataSource:
SQL_DICT = {} # noqa: RUF012
@classmethod
async def get_table_column(cls, table_name: str) -> list[Column]:
"""获取表字段信息
参数:
table_name: 表名
返回:
list[Column]: 字段数据
"""
db = Tortoise.get_connection("default")
sql_type = BotConfig.get_sql_type()
sql = type2sql_column[sql_type]
query = await db.execute_query_dict(sql.format(table_name))
result_list = []
if sql_type == "sqlite":
result_list.extend(
Column(
column_name=result["name"],
data_type=result["type"],
max_length=-1,
is_nullable="YES" if result["notnull"] == 1 else "NO",
)
for result in query
)
elif sql_type == "mysql":
result_list.extend(
Column(
column_name=result["Field"],
data_type=result["Type"],
max_length=-1,
is_nullable=result["Null"],
)
for result in query
)
else:
result_list.extend(Column(**result) for result in query)
return result_list

View File

@ -1,7 +1,5 @@
import asyncio
import contextlib
from datetime import datetime, timedelta
from pathlib import Path
import time
from fastapi import APIRouter
@ -9,25 +7,17 @@ from fastapi.responses import JSONResponse
import nonebot
from nonebot.config import Config
from starlette.websockets import WebSocket, WebSocketDisconnect, WebSocketState
from tortoise.functions import Count
from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK
from zhenxun.models.bot_connect_log import BotConnectLog
from zhenxun.models.bot_console import BotConsole
from zhenxun.models.chat_history import ChatHistory
from zhenxun.models.group_console import GroupConsole
from zhenxun.models.plugin_info import PluginInfo
from zhenxun.models.statistics import Statistics
from zhenxun.models.task_info import TaskInfo
from zhenxun.services.log import logger
from zhenxun.utils.common_utils import CommonUtils
from zhenxun.utils.enum import PluginType
from zhenxun.utils.platform import PlatformUtils
from ....base_model import Result
from ....config import AVA_URL, GROUP_AVA_URL, QueryDateType
from ....config import QueryDateType
from ....utils import authentication, get_system_status
from .data_source import bot_live
from .data_source import ApiDataSource
from .model import (
ActiveGroup,
BaseInfo,
@ -37,7 +27,6 @@ from .model import (
HotPlugin,
NonebotData,
QueryCount,
TemplateBaseInfo,
)
driver = nonebot.get_driver()
@ -63,64 +52,14 @@ async def _(bot_id: str | None = None) -> Result[list[BaseInfo]]:
返回:
Result: 获取指定bot信息与bot列表
"""
global run_time
bot_list: list[TemplateBaseInfo] = []
if bots := nonebot.get_bots():
select_bot: BaseInfo
for _, bot in bots.items():
login_info = None
try:
login_info = await bot.get_login_info()
except Exception as e:
logger.warning("调用接口get_login_info失败", "webui", e=e)
bot_list.append(
TemplateBaseInfo(
bot=bot, # type: ignore
self_id=bot.self_id,
nickname=login_info["nickname"] if login_info else bot.self_id,
ava_url=AVA_URL.format(bot.self_id),
)
)
# 获取指定qq号的bot信息若无指定 则获取第一个
if _bl := [b for b in bot_list if b.self_id == bot_id]:
select_bot = _bl[0]
else:
select_bot = bot_list[0]
select_bot.is_select = True
now = datetime.now()
# 今日累计接收消息
select_bot.received_messages = await ChatHistory.filter(
bot_id=select_bot.self_id,
create_time__gte=now - timedelta(hours=now.hour),
).count()
# 群聊数量
select_bot.group_count = len(await PlatformUtils.get_group_list(select_bot.bot))
# 好友数量
select_bot.friend_count = len(
await PlatformUtils.get_friend_list(select_bot.bot)
)
for bot in bot_list:
bot.bot = None # type: ignore
select_bot.status = await BotConsole.get_bot_status(select_bot.self_id)
# 连接时间
select_bot.connect_time = bot_live.get(select_bot.self_id) or 0
if select_bot.connect_time:
connect_date = datetime.fromtimestamp(select_bot.connect_time)
select_bot.connect_date = connect_date.strftime("%Y-%m-%d %H:%M:%S")
version_file = Path() / "__version__"
if version_file.exists():
if text := version_file.open().read():
if ver := text.replace("__version__: ", "").strip():
select_bot.version = ver
day_call = await Statistics.filter(
create_time__gte=now - timedelta(hours=now.hour)
).count()
select_bot.day_call = day_call
select_bot.connect_count = await BotConnectLog.filter(
bot_id=select_bot.self_id
).count()
return Result.ok([BaseInfo(**e.dict()) for e in bot_list], "拿到信息啦!")
return Result.warning_("无Bot连接...")
try:
result = await ApiDataSource.get_base_info(bot_id)
if not result:
Result.warning_("无Bot连接...")
return Result.ok(result, "拿到信息啦!")
except Exception as e:
logger.error(f"{router.prefix}/get_base_info 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
@router.get(
@ -131,32 +70,11 @@ async def _(bot_id: str | None = None) -> Result[list[BaseInfo]]:
description="获取接收消息数量",
)
async def _(bot_id: str | None = None) -> Result[QueryCount]:
now = datetime.now()
query = ChatHistory
if bot_id:
query = query.filter(bot_id=bot_id)
all_count = await query.annotate().count()
day_count = await query.filter(
create_time__gte=now - timedelta(hours=now.hour, minutes=now.minute)
).count()
week_count = await query.filter(
create_time__gte=now - timedelta(days=7, hours=now.hour, minutes=now.minute)
).count()
month_count = await query.filter(
create_time__gte=now - timedelta(days=30, hours=now.hour, minutes=now.minute)
).count()
year_count = await query.filter(
create_time__gte=now - timedelta(days=365, hours=now.hour, minutes=now.minute)
).count()
return Result.ok(
QueryCount(
num=all_count,
day=day_count,
week=week_count,
month=month_count,
year=year_count,
)
)
try:
return Result.ok(await ApiDataSource.get_all_chat_count(bot_id), "拿到信息啦!")
except Exception as e:
logger.error(f"{router.prefix}/get_all_chat_count 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
@router.get(
@ -167,32 +85,11 @@ async def _(bot_id: str | None = None) -> Result[QueryCount]:
description="获取调用次数",
)
async def _(bot_id: str | None = None) -> Result[QueryCount]:
now = datetime.now()
query = Statistics
if bot_id:
query = query.filter(bot_id=bot_id)
all_count = await query.annotate().count()
day_count = await query.filter(
create_time__gte=now - timedelta(hours=now.hour, minutes=now.minute)
).count()
week_count = await query.filter(
create_time__gte=now - timedelta(days=7, hours=now.hour, minutes=now.minute)
).count()
month_count = await query.filter(
create_time__gte=now - timedelta(days=30, hours=now.hour, minutes=now.minute)
).count()
year_count = await query.filter(
create_time__gte=now - timedelta(days=365, hours=now.hour, minutes=now.minute)
).count()
return Result.ok(
QueryCount(
num=all_count,
day=day_count,
week=week_count,
month=month_count,
year=year_count,
)
)
try:
return Result.ok(await ApiDataSource.get_all_call_count(bot_id), "拿到信息啦!")
except Exception as e:
logger.error(f"{router.prefix}/get_all_call_count 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
@router.get(
@ -203,19 +100,18 @@ async def _(bot_id: str | None = None) -> Result[QueryCount]:
description="好友/群组数量",
)
async def _(bot_id: str) -> Result[dict[str, int]]:
if bots := nonebot.get_bots():
if bot_id not in bots:
return Result.warning_("指定Bot未连接...")
bot = bots[bot_id]
platform = PlatformUtils.get_platform(bot)
if platform == "qq":
data = {
"friend_count": len(await bot.get_friend_list()),
"group_count": len(await bot.get_group_list()),
}
return Result.ok(data)
return Result.warning_("暂不支持该平台...")
return Result.warning_("无Bot连接...")
try:
bot = nonebot.get_bot(bot_id)
data = {
"friend_count": len(await PlatformUtils.get_friend_list(bot)),
"group_count": len(await PlatformUtils.get_group_list(bot)),
}
return Result.ok(data, "拿到信息啦!")
except ValueError:
return Result.warning_("指定Bot未连接...")
except Exception as e:
logger.error(f"{router.prefix}/get_fg_count 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
@router.get(
@ -226,6 +122,7 @@ async def _(bot_id: str) -> Result[dict[str, int]]:
description="获取nb数据",
)
async def _() -> Result[NonebotData]:
global run_time
return Result.ok(NonebotData(config=driver.config, run_time=int(run_time)))
@ -248,6 +145,7 @@ async def _() -> Result[Config]:
description="获取nb运行时间",
)
async def _() -> Result[int]:
global run_time
return Result.ok(int(run_time))
@ -261,48 +159,13 @@ async def _() -> Result[int]:
async def _(
date_type: QueryDateType | None = None, bot_id: str | None = None
) -> Result[list[ActiveGroup]]:
query = ChatHistory
now = datetime.now()
if bot_id:
query = query.filter(bot_id=bot_id)
if date_type == QueryDateType.DAY:
query = query.filter(create_time__gte=now - timedelta(hours=now.hour))
if date_type == QueryDateType.WEEK:
query = query.filter(create_time__gte=now - timedelta(days=7))
if date_type == QueryDateType.MONTH:
query = query.filter(create_time__gte=now - timedelta(days=30))
if date_type == QueryDateType.YEAR:
query = query.filter(create_time__gte=now - timedelta(days=365))
data_list = (
await query.annotate(count=Count("id"))
.filter(group_id__not_isnull=True)
.group_by("group_id")
.order_by("-count")
.limit(5)
.values_list("group_id", "count")
)
id2name = {}
if data_list:
if info_list := await GroupConsole.filter(
group_id__in=[x[0] for x in data_list]
).all():
for group_info in info_list:
id2name[group_info.group_id] = group_info.group_name
active_group_list = [
ActiveGroup(
group_id=data[0],
name=id2name.get(data[0]) or data[0],
chat_num=data[1],
ava_img=GROUP_AVA_URL.format(data[0], data[0]),
try:
return Result.ok(
await ApiDataSource.get_active_group(date_type, bot_id), "拿到信息啦!"
)
for data in data_list
]
active_group_list = sorted(
active_group_list, key=lambda x: x.chat_num, reverse=True
)
if len(active_group_list) > 5:
active_group_list = active_group_list[:5]
return Result.ok(active_group_list)
except Exception as e:
logger.error(f"{router.prefix}/get_active_group 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
@router.get(
@ -315,37 +178,13 @@ async def _(
async def _(
date_type: QueryDateType | None = None, bot_id: str | None = None
) -> Result[list[HotPlugin]]:
query = Statistics
now = datetime.now()
if bot_id:
query = query.filter(bot_id=bot_id)
if date_type == QueryDateType.DAY:
query = query.filter(create_time__gte=now - timedelta(hours=now.hour))
if date_type == QueryDateType.WEEK:
query = query.filter(create_time__gte=now - timedelta(days=7))
if date_type == QueryDateType.MONTH:
query = query.filter(create_time__gte=now - timedelta(days=30))
if date_type == QueryDateType.YEAR:
query = query.filter(create_time__gte=now - timedelta(days=365))
data_list = (
await query.annotate(count=Count("id"))
.group_by("plugin_name")
.order_by("-count")
.limit(5)
.values_list("plugin_name", "count")
)
hot_plugin_list = []
module_list = [x[0] for x in data_list]
plugins = await PluginInfo.filter(module__in=module_list).all()
module2name = {p.module: p.name for p in plugins}
for data in data_list:
module = data[0]
name = module2name.get(module) or module
hot_plugin_list.append(HotPlugin(module=module, name=name, count=data[1]))
hot_plugin_list = sorted(hot_plugin_list, key=lambda x: x.count, reverse=True)
if len(hot_plugin_list) > 5:
hot_plugin_list = hot_plugin_list[:5]
return Result.ok(hot_plugin_list)
try:
return Result.ok(
await ApiDataSource.get_hot_plugin(date_type, bot_id), "拿到信息啦!"
)
except Exception as e:
logger.error(f"{router.prefix}/get_hot_plugin 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
@router.post(
@ -368,37 +207,16 @@ async def _(param: BotStatusParam):
dependencies=[authentication()],
response_model=Result[BotBlockModule],
response_class=JSONResponse,
description="修改bot全局开关",
description="获取bot层面的禁用模块",
)
async def _(bot_id: str) -> Result[BotBlockModule]:
try:
bot_data = await BotConsole.get_or_none(bot_id=bot_id)
if not bot_data:
return Result.fail("Bot数据不存在...")
block_tasks = []
block_plugins = []
all_plugins = await PluginInfo.filter(
load_status=True, plugin_type=PluginType.NORMAL
).values("module", "name")
all_task = await TaskInfo.annotate().values("module", "name")
if bot_data.block_tasks:
tasks = CommonUtils.convert_module_format(bot_data.block_tasks)
block_tasks = [t["module"] for t in all_task if t["module"] in tasks]
if bot_data.block_plugins:
plugins = CommonUtils.convert_module_format(bot_data.block_plugins)
block_plugins = [t["module"] for t in all_plugins if t["module"] in plugins]
return Result.ok(
BotBlockModule(
bot_id=bot_id,
block_tasks=block_tasks,
block_plugins=block_plugins,
all_plugins=all_plugins,
all_tasks=all_task,
)
await ApiDataSource.get_bot_block_module(bot_id), "拿到信息啦!"
)
except Exception as e:
logger.error("获取Bot数据失败", "webui", e=e)
return Result.fail(f"获取Bot数据失败 {type(e)}:{e}...")
logger.error(f"{router.prefix}/get_bot_block_module 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
@router.post(
@ -409,13 +227,17 @@ async def _(bot_id: str) -> Result[BotBlockModule]:
description="修改bot全局开关",
)
async def _(param: BotManageUpdateParam):
bot_data = await BotConsole.get_or_none(bot_id=param.bot_id)
if not bot_data:
return Result.fail("Bot数据不存在...")
bot_data.block_plugins = CommonUtils.convert_module_format(param.block_plugins)
bot_data.block_tasks = CommonUtils.convert_module_format(param.block_tasks)
await bot_data.save(update_fields=["block_plugins", "block_tasks"])
return Result.ok()
try:
bot_data = await BotConsole.get_or_none(bot_id=param.bot_id)
if not bot_data:
return Result.fail("Bot数据不存在...")
bot_data.block_plugins = CommonUtils.convert_module_format(param.block_plugins)
bot_data.block_tasks = CommonUtils.convert_module_format(param.block_tasks)
await bot_data.save(update_fields=["block_plugins", "block_tasks"])
return Result.ok()
except Exception as e:
logger.error(f"{router.prefix}/update_bot_manage 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
@ws_router.websocket("/system_status")

View File

@ -1,8 +1,33 @@
from datetime import datetime, timedelta
from pathlib import Path
import time
import nonebot
from nonebot.adapters.onebot.v11 import Bot
from nonebot.adapters import Bot
from nonebot.drivers import Driver
from tortoise.functions import Count
from zhenxun.models.bot_connect_log import BotConnectLog
from zhenxun.models.bot_console import BotConsole
from zhenxun.models.chat_history import ChatHistory
from zhenxun.models.group_console import GroupConsole
from zhenxun.models.plugin_info import PluginInfo
from zhenxun.models.statistics import Statistics
from zhenxun.models.task_info import TaskInfo
from zhenxun.services.log import logger
from zhenxun.utils.common_utils import CommonUtils
from zhenxun.utils.enum import PluginType
from zhenxun.utils.platform import PlatformUtils
from ....config import AVA_URL, GROUP_AVA_URL, QueryDateType
from .model import (
ActiveGroup,
BaseInfo,
BotBlockModule,
HotPlugin,
QueryCount,
TemplateBaseInfo,
)
driver: Driver = nonebot.get_driver()
@ -33,3 +58,313 @@ async def _(bot: Bot):
@driver.on_bot_disconnect
async def _(bot: Bot):
bot_live.remove(bot.self_id)
class ApiDataSource:
@classmethod
async def __build_bot_info(cls, bot: Bot) -> TemplateBaseInfo:
"""构建bot信息
参数:
bot: bot实例
返回:
TemplateBaseInfo: bot信息
"""
login_info = None
try:
login_info = await bot.get_login_info()
except Exception as e:
logger.warning("调用接口get_login_info失败", "WebUi", e=e)
return TemplateBaseInfo(
bot=bot,
self_id=bot.self_id,
nickname=login_info["nickname"] if login_info else bot.self_id,
ava_url=AVA_URL.format(bot.self_id),
)
@classmethod
def __get_bot_version(cls) -> str:
"""获取bot版本
返回:
str | None: 版本
"""
version_file = Path() / "__version__"
if version_file.exists():
if text := version_file.open().read():
return text.replace("__version__: ", "").strip()
return "unknown"
@classmethod
async def __init_bot_base_data(cls, select_bot: TemplateBaseInfo):
"""初始化bot的基础数据
参数:
select_bot: bot
"""
now = datetime.now()
# 今日累计接收消息
select_bot.received_messages = await ChatHistory.filter(
bot_id=select_bot.self_id,
create_time__gte=now - timedelta(hours=now.hour),
).count()
# 群聊数量
select_bot.group_count = len(await PlatformUtils.get_group_list(select_bot.bot))
# 好友数量
select_bot.friend_count = len(
await PlatformUtils.get_friend_list(select_bot.bot)
)
select_bot.status = await BotConsole.get_bot_status(select_bot.self_id)
# 连接时间
select_bot.connect_time = bot_live.get(select_bot.self_id) or 0
if select_bot.connect_time:
connect_date = datetime.fromtimestamp(select_bot.connect_time)
select_bot.connect_date = connect_date.strftime("%Y-%m-%d %H:%M:%S")
select_bot.version = cls.__get_bot_version()
day_call = await Statistics.filter(
create_time__gte=now - timedelta(hours=now.hour)
).count()
select_bot.day_call = day_call
select_bot.connect_count = await BotConnectLog.filter(
bot_id=select_bot.self_id
).count()
@classmethod
async def get_base_info(cls, bot_id: str | None) -> list[BaseInfo] | None:
"""获取bot信息
参数:
bot_id: bot id
返回:
list[BaseInfo] | None: bot列表
"""
bots = nonebot.get_bots()
if not bots:
return None
select_bot: BaseInfo
bot_list = [await cls.__build_bot_info(bot) for _, bot in bots.items()]
# 获取指定qq号的bot信息若无指定 则获取第一个
if _bl := [b for b in bot_list if b.self_id == bot_id]:
select_bot = _bl[0]
else:
select_bot = bot_list[0]
await cls.__init_bot_base_data(select_bot)
for bot in bot_list:
bot.bot = None # type: ignore
select_bot.is_select = True
return [BaseInfo(**e.dict()) for e in bot_list]
@classmethod
async def get_all_chat_count(cls, bot_id: str | None) -> QueryCount:
"""获取年/月/周/日聊天次数
参数:
bot_id: bot id
返回:
QueryCount: 数据内容
"""
now = datetime.now()
query = ChatHistory
if bot_id:
query = query.filter(bot_id=bot_id)
all_count = await query.annotate().count()
day_count = await query.filter(
create_time__gte=now - timedelta(hours=now.hour, minutes=now.minute)
).count()
week_count = await query.filter(
create_time__gte=now - timedelta(days=7, hours=now.hour, minutes=now.minute)
).count()
month_count = await query.filter(
create_time__gte=now
- timedelta(days=30, hours=now.hour, minutes=now.minute)
).count()
year_count = await query.filter(
create_time__gte=now
- timedelta(days=365, hours=now.hour, minutes=now.minute)
).count()
return QueryCount(
num=all_count,
day=day_count,
week=week_count,
month=month_count,
year=year_count,
)
@classmethod
async def get_all_call_count(cls, bot_id: str | None) -> QueryCount:
"""获取年/月/周/日调用次数
参数:
bot_id: bot id
返回:
QueryCount: 数据内容
"""
now = datetime.now()
query = Statistics
if bot_id:
query = query.filter(bot_id=bot_id)
all_count = await query.annotate().count()
day_count = await query.filter(
create_time__gte=now - timedelta(hours=now.hour, minutes=now.minute)
).count()
week_count = await query.filter(
create_time__gte=now - timedelta(days=7, hours=now.hour, minutes=now.minute)
).count()
month_count = await query.filter(
create_time__gte=now
- timedelta(days=30, hours=now.hour, minutes=now.minute)
).count()
year_count = await query.filter(
create_time__gte=now
- timedelta(days=365, hours=now.hour, minutes=now.minute)
).count()
return QueryCount(
num=all_count,
day=day_count,
week=week_count,
month=month_count,
year=year_count,
)
@classmethod
def __get_query(
cls,
base_query: type[ChatHistory | Statistics],
date_type: QueryDateType | None = None,
bot_id: str | None = None,
):
"""构建日期查询条件
参数:
date_type: 日期类型.
bot_id: bot id.
"""
query = base_query
now = datetime.now()
if bot_id:
query = query.filter(bot_id=bot_id)
if date_type == QueryDateType.DAY:
query = query.filter(create_time__gte=now - timedelta(hours=now.hour))
if date_type == QueryDateType.WEEK:
query = query.filter(create_time__gte=now - timedelta(days=7))
if date_type == QueryDateType.MONTH:
query = query.filter(create_time__gte=now - timedelta(days=30))
if date_type == QueryDateType.YEAR:
query = query.filter(create_time__gte=now - timedelta(days=365))
return query
@classmethod
async def get_active_group(
cls, date_type: QueryDateType | None = None, bot_id: str | None = None
) -> list[ActiveGroup]:
"""获取活跃群组
参数:
date_type: 日期类型.
bot_id: bot id.
返回:
list[ActiveGroup]: 活跃群组列表
"""
query = cls.__get_query(ChatHistory, date_type, bot_id)
data_list = (
await query.annotate(count=Count("id"))
.filter(group_id__not_isnull=True)
.group_by("group_id")
.order_by("-count")
.limit(5)
.values_list("group_id", "count")
)
id2name = {}
if data_list:
if info_list := await GroupConsole.filter(
group_id__in=[x[0] for x in data_list]
).all():
for group_info in info_list:
id2name[group_info.group_id] = group_info.group_name
active_group_list = [
ActiveGroup(
group_id=data[0],
name=id2name.get(data[0]) or data[0],
chat_num=data[1],
ava_img=GROUP_AVA_URL.format(data[0], data[0]),
)
for data in data_list
]
active_group_list = sorted(
active_group_list, key=lambda x: x.chat_num, reverse=True
)
if len(active_group_list) > 5:
active_group_list = active_group_list[:5]
return active_group_list
@classmethod
async def get_hot_plugin(
cls, date_type: QueryDateType | None = None, bot_id: str | None = None
) -> list[HotPlugin]:
"""获取热门插件
参数:
date_type: 日期类型.
bot_id: bot id.
返回:
list[HotPlugin]: 热门插件列表
"""
query = cls.__get_query(Statistics, date_type, bot_id)
data_list = (
await query.annotate(count=Count("id"))
.group_by("plugin_name")
.order_by("-count")
.limit(5)
.values_list("plugin_name", "count")
)
hot_plugin_list = []
module_list = [x[0] for x in data_list]
plugins = await PluginInfo.filter(module__in=module_list).all()
module2name = {p.module: p.name for p in plugins}
for data in data_list:
module = data[0]
name = module2name.get(module) or module
hot_plugin_list.append(HotPlugin(module=module, name=name, count=data[1]))
hot_plugin_list = sorted(hot_plugin_list, key=lambda x: x.count, reverse=True)
if len(hot_plugin_list) > 5:
hot_plugin_list = hot_plugin_list[:5]
return hot_plugin_list
@classmethod
async def get_bot_block_module(cls, bot_id: str) -> BotBlockModule | None:
"""获取bot层面的禁用模块
参数:
bot_id: bot id
返回:
BotBlockModule | None: 数据内容
"""
bot_data = await BotConsole.get_or_none(bot_id=bot_id)
if not bot_data:
return None
block_tasks = []
block_plugins = []
all_plugins = await PluginInfo.filter(
load_status=True, plugin_type=PluginType.NORMAL
).values("module", "name")
all_task = await TaskInfo.annotate().values("module", "name")
if bot_data.block_tasks:
tasks = CommonUtils.convert_module_format(bot_data.block_tasks)
block_tasks = [t["module"] for t in all_task if t["module"] in tasks]
if bot_data.block_plugins:
plugins = CommonUtils.convert_module_format(bot_data.block_plugins)
block_plugins = [t["module"] for t in all_plugins if t["module"] in plugins]
return BotBlockModule(
bot_id=bot_id,
block_tasks=block_tasks,
block_plugins=block_plugins,
all_plugins=all_plugins,
all_tasks=all_task,
)

View File

@ -2,18 +2,10 @@ from fastapi import APIRouter
from fastapi.responses import JSONResponse
import nonebot
from nonebot.adapters.onebot.v11 import ActionFailed
from tortoise.functions import Count
from zhenxun.configs.config import BotConfig
from zhenxun.models.ban_console import BanConsole
from zhenxun.models.chat_history import ChatHistory
from zhenxun.models.fg_request import FgRequest
from zhenxun.models.group_console import GroupConsole
from zhenxun.models.plugin_info import PluginInfo
from zhenxun.models.statistics import Statistics
from zhenxun.models.task_info import TaskInfo
from zhenxun.services.log import logger
from zhenxun.utils.common_utils import CommonUtils
from zhenxun.utils.enum import RequestHandleType, RequestType
from zhenxun.utils.exception import NotFoundError
from zhenxun.utils.platform import PlatformUtils
@ -21,20 +13,17 @@ from zhenxun.utils.platform import PlatformUtils
from ....base_model import Result
from ....config import AVA_URL, GROUP_AVA_URL
from ....utils import authentication
from .data_source import ApiDataSource
from .model import (
ClearRequest,
DeleteFriend,
Friend,
FriendRequestResult,
GroupDetail,
GroupRequestResult,
GroupResult,
HandleRequest,
LeaveGroup,
Plugin,
ReqResult,
SendMessage,
Task,
SendMessageParam,
UpdateGroup,
UserDetail,
)
@ -53,17 +42,19 @@ async def _(bot_id: str) -> Result:
"""
获取群信息
"""
if not (bots := nonebot.get_bots()):
return Result.warning_("无Bot连接...")
if bot_id not in bots:
return Result.warning_("指定Bot未连接...")
group_list_result = []
try:
group_list = await bots[bot_id].get_group_list()
bot = nonebot.get_bot(bot_id)
group_list, _ = await PlatformUtils.get_group_list(bot)
for g in group_list:
gid = g["group_id"]
g["ava_url"] = GROUP_AVA_URL.format(gid, gid)
group_list_result.append(GroupResult(**g))
ava_url = GROUP_AVA_URL.format(g.group_id, g.group_id)
group_list_result.append(
GroupResult(
group_id=g.group_id, group_name=g.group_name, ava_url=ava_url
)
)
except ValueError:
return Result.warning_("指定Bot未连接...")
except Exception as e:
logger.error("调用API错误", "/get_group_list", e=e)
return Result.fail(f"{type(e)}: {e}")
@ -79,29 +70,11 @@ async def _(bot_id: str) -> Result:
)
async def _(group: UpdateGroup) -> Result[str]:
try:
group_id = group.group_id
if db_group := await GroupConsole.get_group(group_id):
task_list = await TaskInfo.all().values_list("module", flat=True)
db_group.level = group.level
db_group.status = group.status
if group.close_plugins:
db_group.block_plugin = CommonUtils.convert_module_format(
group.close_plugins
)
else:
db_group.block_plugin = ""
if group.task:
if block_task := [t for t in task_list if t not in group.task]:
db_group.block_task = CommonUtils.convert_module_format(block_task) # type: ignore
else:
db_group.block_task = CommonUtils.convert_module_format(task_list) # type: ignore
await db_group.save(
update_fields=["level", "status", "block_plugin", "block_task"]
)
await ApiDataSource.update_group(group)
return Result.ok(info="已完成记录!")
except Exception as e:
logger.error("调用API错误", "/get_group", e=e)
return Result.fail(f"{type(e)}: {e}")
return Result.ok(info="已完成记录!")
logger.error(f"{router.prefix}/update_group 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
@router.get(
@ -115,24 +88,24 @@ async def _(bot_id: str) -> Result[list[Friend]]:
"""
获取群信息
"""
if bots := nonebot.get_bots():
if bot_id not in bots:
return Result.warning_("指定Bot未连接...")
try:
platform = PlatformUtils.get_platform(bots[bot_id])
if platform != "qq":
return Result.warning_("该平台暂不支持该功能...")
friend_list = await bots[bot_id].get_friend_list()
for f in friend_list:
f["ava_url"] = AVA_URL.format(f["user_id"])
return Result.ok(
[Friend(**f) for f in friend_list if str(f["user_id"]) != bot_id],
"拿到了新鲜出炉的数据!",
try:
bot = nonebot.get_bot(bot_id)
friend_list, _ = await PlatformUtils.get_friend_list(bot)
result_list = []
for f in friend_list:
ava_url = AVA_URL.format(f.user_id)
result_list.append(
Friend(user_id=f.user_id, nickname=f.nickname, ava_url=ava_url)
)
except Exception as e:
logger.error("调用API错误", "/get_group_list", e=e)
return Result.fail(f"{type(e)}: {e}")
return Result.warning_("无Bot连接...")
return Result.ok(
result_list,
"拿到了新鲜出炉的数据!",
)
except ValueError:
return Result.warning_("指定Bot未连接...")
except Exception as e:
logger.error("调用API错误", "/get_group_list", e=e)
return Result.fail(f"{type(e)}: {e}")
@router.get(
@ -143,17 +116,21 @@ async def _(bot_id: str) -> Result[list[Friend]]:
description="获取请求数量",
)
async def _() -> Result[dict[str, int]]:
f_count = await FgRequest.filter(
request_type=RequestType.FRIEND, handle_type__isnull=True
).count()
g_count = await FgRequest.filter(
request_type=RequestType.GROUP, handle_type__isnull=True
).count()
data = {
"friend_count": f_count,
"group_count": g_count,
}
return Result.ok(data, f"{BotConfig.self_nickname}带来了最新的数据!")
try:
f_count = await FgRequest.filter(
request_type=RequestType.FRIEND, handle_type__isnull=True
).count()
g_count = await FgRequest.filter(
request_type=RequestType.GROUP, handle_type__isnull=True
).count()
data = {
"friend_count": f_count,
"group_count": g_count,
}
return Result.ok(data, "拿到了新鲜出炉的数据!")
except Exception as e:
logger.error("调用API错误", "/get_request_count", e=e)
return Result.fail(f"{type(e)}: {e}")
@router.get(
@ -165,43 +142,10 @@ async def _() -> Result[dict[str, int]]:
)
async def _() -> Result[ReqResult]:
try:
req_result = ReqResult()
data_list = await FgRequest.filter(handle_type__isnull=True).all()
for req in data_list:
if req.request_type == RequestType.FRIEND:
req_result.friend.append(
FriendRequestResult(
oid=req.id,
bot_id=req.bot_id,
id=req.user_id,
flag=req.flag,
nickname=req.nickname,
comment=req.comment,
ava_url=AVA_URL.format(req.user_id),
type=str(req.request_type).lower(),
)
)
else:
req_result.group.append(
GroupRequestResult(
oid=req.id,
bot_id=req.bot_id,
id=req.user_id,
flag=req.flag,
nickname=req.nickname,
comment=req.comment,
ava_url=GROUP_AVA_URL.format(req.group_id, req.group_id),
type=str(req.request_type).lower(),
invite_group=req.group_id,
group_name=None,
)
)
req_result.friend.reverse()
req_result.group.reverse()
return Result.ok(await ApiDataSource.get_request_list(), "拿到信息啦!")
except Exception as e:
logger.error("调用API错误", "/get_request", e=e)
return Result.fail(f"{type(e)}: {e}")
return Result.ok(req_result, f"{BotConfig.self_nickname}带来了最新的数据!")
logger.error(f"{router.prefix}/get_request_list 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
@router.post(
@ -225,23 +169,21 @@ async def _(cr: ClearRequest) -> Result:
response_class=JSONResponse,
description="拒绝请求",
)
async def _(parma: HandleRequest) -> Result:
async def _(param: HandleRequest) -> Result:
try:
if bots := nonebot.get_bots():
bot_id = parma.bot_id
if bot_id not in nonebot.get_bots():
return Result.warning_("指定Bot未连接...")
try:
await FgRequest.refused(bots[bot_id], parma.id)
except ActionFailed:
await FgRequest.expire(parma.id)
return Result.warning_("请求失败,可能该请求已失效或请求数据错误...")
except NotFoundError:
return Result.warning_("未找到此Id请求...")
return Result.ok(info="成功处理了请求!")
return Result.warning_("无Bot连接...")
bot = nonebot.get_bot(param.bot_id)
try:
await FgRequest.refused(bot, param.id)
except ActionFailed:
await FgRequest.expire(param.id)
return Result.warning_("请求失败,可能该请求已失效或请求数据错误...")
except NotFoundError:
return Result.warning_("未找到此Id请求...")
return Result.ok(info="成功处理了请求!")
except ValueError:
return Result.warning_("指定Bot未连接...")
except Exception as e:
logger.error("调用API错误", "/refuse_request", e=e)
logger.error(f"{router.prefix}/refuse_request 调用错误", "WebUi", e=e)
return Result.fail(f"{type(e)}: {e}")
@ -252,8 +194,8 @@ async def _(parma: HandleRequest) -> Result:
response_class=JSONResponse,
description="忽略请求",
)
async def _(parma: HandleRequest) -> Result:
await FgRequest.ignore(parma.id)
async def _(param: HandleRequest) -> Result:
await FgRequest.ignore(param.id)
return Result.ok(info="成功处理了请求!")
@ -264,32 +206,30 @@ async def _(parma: HandleRequest) -> Result:
response_class=JSONResponse,
description="同意请求",
)
async def _(parma: HandleRequest) -> Result:
async def _(param: HandleRequest) -> Result:
try:
if bots := nonebot.get_bots():
bot_id = parma.bot_id
if bot_id not in nonebot.get_bots():
return Result.warning_("指定Bot未连接...")
if not (req := await FgRequest.get_or_none(id=parma.id)):
return Result.warning_("未找到此Id请求...")
if req.request_type == RequestType.GROUP:
if group := await GroupConsole.get_group(group_id=req.group_id):
group.group_flag = 1
await group.save(update_fields=["group_flag"])
else:
await GroupConsole.update_or_create(
group_id=req.group_id,
defaults={"group_flag": 1},
)
try:
await FgRequest.approve(bots[bot_id], parma.id)
return Result.ok(info="成功处理了请求!")
except ActionFailed:
await FgRequest.expire(parma.id)
return Result.warning_("请求失败,可能该请求已失效或请求数据错误...")
return Result.warning_("无Bot连接...")
bot = nonebot.get_bot(param.bot_id)
if not (req := await FgRequest.get_or_none(id=param.id)):
return Result.warning_("未找到此Id请求...")
if req.request_type == RequestType.GROUP:
if group := await GroupConsole.get_group(group_id=req.group_id):
group.group_flag = 1
await group.save(update_fields=["group_flag"])
else:
await GroupConsole.update_or_create(
group_id=req.group_id,
defaults={"group_flag": 1},
)
try:
await FgRequest.approve(bot, param.id)
return Result.ok(info="成功处理了请求!")
except ActionFailed:
await FgRequest.expire(param.id)
return Result.warning_("请求失败,可能该请求已失效或请求数据错误...")
except ValueError:
return Result.warning_("指定Bot未连接...")
except Exception as e:
logger.error("调用API错误", "/approve_request", e=e)
logger.error(f"{router.prefix}/approve_request 调用错误", "WebUi", e=e)
return Result.fail(f"{type(e)}: {e}")
@ -302,19 +242,19 @@ async def _(parma: HandleRequest) -> Result:
)
async def _(param: LeaveGroup) -> Result:
try:
if bots := nonebot.get_bots():
bot_id = param.bot_id
platform = PlatformUtils.get_platform(bots[bot_id])
if platform != "qq":
return Result.warning_("该平台不支持退群操作...")
group_list, _ = await PlatformUtils.get_group_list(bots[bot_id])
if param.group_id not in [g.group_id for g in group_list]:
return Result.warning_("Bot未在该群聊中...")
await bots[bot_id].set_group_leave(group_id=param.group_id)
return Result.ok(info="成功处理了请求!")
return Result.warning_("无Bot连接...")
bot = nonebot.get_bot(param.bot_id)
platform = PlatformUtils.get_platform(bot)
if platform != "qq":
return Result.warning_("该平台不支持退群操作...")
group_list, _ = await PlatformUtils.get_group_list(bot)
if param.group_id not in [g.group_id for g in group_list]:
return Result.warning_("Bot未在该群聊中...")
await bot.set_group_leave(group_id=param.group_id)
return Result.ok(info="成功处理了请求!")
except ValueError:
return Result.warning_("指定Bot未连接...")
except Exception as e:
logger.error("调用API错误", "/leave_group", e=e)
logger.error(f"{router.prefix}/leave_group 调用错误", "WebUi", e=e)
return Result.fail(f"{type(e)}: {e}")
@ -327,19 +267,19 @@ async def _(param: LeaveGroup) -> Result:
)
async def _(param: DeleteFriend) -> Result:
try:
if bots := nonebot.get_bots():
bot_id = param.bot_id
platform = PlatformUtils.get_platform(bots[bot_id])
if platform != "qq":
return Result.warning_("该平台不支持删除好友操作...")
friend_list = await bots[bot_id].get_friend_list()
if param.user_id not in [str(g["user_id"]) for g in friend_list]:
return Result.warning_("Bot未有其好友...")
await bots[bot_id].delete_friend(user_id=param.user_id)
return Result.ok(info="成功处理了请求!")
return Result.warning_("Bot未连接...")
bot = nonebot.get_bot(param.bot_id)
platform = PlatformUtils.get_platform(bot)
if platform != "qq":
return Result.warning_("该平台不支持删除好友操作...")
friend_list, _ = await PlatformUtils.get_friend_list(bot)
if param.user_id not in [f.user_id for f in friend_list]:
return Result.warning_("Bot未有其好友...")
await bot.delete_friend(user_id=param.user_id)
return Result.ok(info="成功处理了请求!")
except ValueError:
return Result.warning_("指定Bot未连接...")
except Exception as e:
logger.error("调用API错误", "/delete_friend", e=e)
logger.error(f"{router.prefix}/delete_friend 调用错误", "WebUi", e=e)
return Result.fail(f"{type(e)}: {e}")
@ -351,43 +291,18 @@ async def _(param: DeleteFriend) -> Result:
description="获取好友详情",
)
async def _(bot_id: str, user_id: str) -> Result[UserDetail]:
if bots := nonebot.get_bots():
if bot_id in bots:
if fd := [
x
for x in await bots[bot_id].get_friend_list()
if str(x["user_id"]) == user_id
]:
like_plugin_list = (
await Statistics.filter(user_id=user_id)
.annotate(count=Count("id"))
.group_by("plugin_name")
.order_by("-count")
.limit(5)
.values_list("plugin_name", "count")
)
like_plugin = {}
module_list = [x[0] for x in like_plugin_list]
plugins = await PluginInfo.filter(module__in=module_list).all()
module2name = {p.module: p.name for p in plugins}
for data in like_plugin_list:
name = module2name.get(data[0]) or data[0]
like_plugin[name] = data[1]
user = fd[0]
user_detail = UserDetail(
user_id=user_id,
ava_url=AVA_URL.format(user_id),
nickname=user["nickname"],
remark=user["remark"],
is_ban=await BanConsole.is_ban(user_id),
chat_count=await ChatHistory.filter(user_id=user_id).count(),
call_count=await Statistics.filter(user_id=user_id).count(),
like_plugin=like_plugin,
)
return Result.ok(user_detail)
else:
return Result.warning_("未添加指定好友...")
return Result.warning_("无Bot连接...")
try:
result = await ApiDataSource.get_friend_detail(bot_id, user_id)
return (
Result.ok(result, "拿到信息啦!")
if result
else Result.warning_("未找到该好友...")
)
except ValueError:
return Result.warning_("指定Bot未连接...")
except Exception as e:
logger.error(f"{router.prefix}/get_friend_detail 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
@router.get(
@ -397,90 +312,12 @@ async def _(bot_id: str, user_id: str) -> Result[UserDetail]:
response_class=JSONResponse,
description="获取群组详情",
)
async def _(bot_id: str, group_id: str) -> Result[GroupDetail]:
if not (bots := nonebot.get_bots()):
return Result.warning_("无Bot连接...")
if bot_id not in bots:
return Result.warning_("未添加指定群组...")
group = await GroupConsole.get_or_none(group_id=group_id)
if not group:
return Result.warning_("指定群组未被收录...")
like_plugin_list = (
await Statistics.filter(group_id=group_id)
.annotate(count=Count("id"))
.group_by("plugin_name")
.order_by("-count")
.limit(5)
.values_list("plugin_name", "count")
)
like_plugin = {}
plugins = await PluginInfo.get_plugins()
module2name = {p.module: p.name for p in plugins}
for data in like_plugin_list:
name = module2name.get(data[0]) or data[0]
like_plugin[name] = data[1]
close_plugins: list[Plugin] = []
if group.block_plugin:
for module in group.block_plugin.replace("<", "").split(","):
if module:
plugin = Plugin(
module=module,
plugin_name=module,
is_super_block=False,
)
plugin.plugin_name = module2name.get(module) or module
close_plugins.append(plugin)
exists_modules = [p.module for p in close_plugins]
if group.superuser_block_plugin:
for module in group.superuser_block_plugin.replace("<", "").split(","):
if module and module not in exists_modules:
plugin = Plugin(
module=module,
plugin_name=module,
is_super_block=True,
)
plugin.plugin_name = module2name.get(module) or module
close_plugins.append(plugin)
all_task = await TaskInfo.annotate().values_list("module", "name")
task_module2name = {x[0]: x[1] for x in all_task}
task_list = []
if group.block_task or group.superuser_block_plugin:
sbp = group.superuser_block_plugin.replace("<", "").split(",")
split_task = group.block_task.replace("<", "").split(",")
for task in all_task:
task_list.append(
Task(
name=task[0],
zh_name=task_module2name.get(task[0]) or task[0],
status=task[0] not in split_task and task[0] not in sbp,
is_super_block=task[0] in sbp,
)
)
else:
for task in all_task:
task_list.append(
Task(
name=task[0],
zh_name=task_module2name.get(task[0]) or task[0],
status=True,
is_super_block=False,
)
)
group_detail = GroupDetail(
group_id=group_id,
ava_url=GROUP_AVA_URL.format(group_id, group_id),
name=group.group_name,
member_count=group.member_count,
max_member_count=group.max_member_count,
chat_count=await ChatHistory.filter(group_id=group_id).count(),
call_count=await Statistics.filter(group_id=group_id).count(),
like_plugin=like_plugin,
level=group.level,
status=group.status,
close_plugins=close_plugins,
task=task_list,
)
return Result.ok(group_detail)
async def _(group_id: str) -> Result[GroupDetail]:
try:
return Result.ok(await ApiDataSource.get_group_detail(group_id), "拿到信息啦!")
except Exception as e:
logger.error(f"{router.prefix}/get_group_detail 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
@router.post(
@ -488,25 +325,17 @@ async def _(bot_id: str, group_id: str) -> Result[GroupDetail]:
dependencies=[authentication()],
response_model=Result,
response_class=JSONResponse,
description="获取群组详情",
description="发送消息",
)
async def _(param: SendMessage) -> Result:
if not (bots := nonebot.get_bots()):
return Result.warning_("无Bot连接...")
if param.bot_id in bots:
platform = PlatformUtils.get_platform(bots[param.bot_id])
if platform != "qq":
return Result.warning_("暂不支持该平台...")
try:
if param.user_id:
await bots[param.bot_id].send_private_msg(
user_id=str(param.user_id), message=param.message
)
else:
await bots[param.bot_id].send_group_msg(
group_id=str(param.group_id), message=param.message
)
except Exception as e:
return Result.fail(str(e))
async def _(param: SendMessageParam) -> Result:
try:
bot = nonebot.get_bot(param.bot_id)
await PlatformUtils.send_message(
bot, param.user_id, param.group_id, param.message
)
return Result.ok("发送成功!")
return Result.warning_("指定Bot未连接...")
except ValueError:
return Result.warning_("指定Bot未连接...")
except Exception as e:
logger.error(f"{router.prefix}/send_message 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")

View File

@ -3,7 +3,7 @@ import nonebot
from nonebot import on_message
from nonebot.adapters.onebot.v11 import MessageEvent
from nonebot_plugin_alconna import At, Hyper, Image, Text, UniMsg
from nonebot_plugin_session import EventSession
from nonebot_plugin_uninfo import Uninfo
from starlette.websockets import WebSocket, WebSocketDisconnect, WebSocketState
from zhenxun.models.group_member_info import GroupInfoUser
@ -28,7 +28,7 @@ matcher = on_message(block=False, priority=1, rule=lambda: bool(ws_conn))
@driver.on_shutdown
async def _():
if ws_conn:
if ws_conn and ws_conn.client_state == WebSocketState.CONNECTED:
await ws_conn.close()
@ -36,7 +36,7 @@ async def _():
async def _(websocket: WebSocket):
global ws_conn
await websocket.accept()
if not ws_conn:
if not ws_conn or ws_conn.client_state != WebSocketState.CONNECTED:
ws_conn = websocket
try:
while websocket.client_state == WebSocketState.CONNECTED:
@ -80,25 +80,24 @@ async def message_handle(
@matcher.handle()
async def _(
message: UniMsg, event: MessageEvent, session: EventSession, uname: str = UserName()
message: UniMsg, event: MessageEvent, session: Uninfo, uname: str = UserName()
):
global ws_conn, ID2NAME, ID_LIST
uid = session.id1
if ws_conn and ws_conn.client_state == WebSocketState.CONNECTED and uid:
if ws_conn and ws_conn.client_state == WebSocketState.CONNECTED:
msg_id = event.message_id
if msg_id in ID_LIST:
return
ID_LIST.append(msg_id)
if len(ID_LIST) > 50:
ID_LIST = ID_LIST[40:]
gid = session.id3 or session.id2
gid = session.group.id if session.group else None
messages = await message_handle(message, gid)
data = Message(
object_id=gid or uid,
user_id=uid,
object_id=gid or session.user.id,
user_id=session.user.id,
group_id=gid,
message=messages,
name=uname,
ava_url=AVA_URL.format(uid),
ava_url=AVA_URL.format(session.user.id),
)
await ws_conn.send_json(data.dict())

View File

@ -0,0 +1,274 @@
import nonebot
from tortoise.functions import Count
from zhenxun.models.ban_console import BanConsole
from zhenxun.models.chat_history import ChatHistory
from zhenxun.models.fg_request import FgRequest
from zhenxun.models.group_console import GroupConsole
from zhenxun.models.plugin_info import PluginInfo
from zhenxun.models.statistics import Statistics
from zhenxun.models.task_info import TaskInfo
from zhenxun.utils.common_utils import CommonUtils
from zhenxun.utils.enum import RequestType
from zhenxun.utils.platform import PlatformUtils
from ....config import AVA_URL, GROUP_AVA_URL
from .model import (
FriendRequestResult,
GroupDetail,
GroupRequestResult,
Plugin,
ReqResult,
Task,
UpdateGroup,
UserDetail,
)
class ApiDataSource:
@classmethod
async def update_group(cls, group: UpdateGroup):
"""更新群组数据
参数:
group: UpdateGroup
"""
db_group = await GroupConsole.get_group(group.group_id) or GroupConsole(
group_id=group.group_id
)
task_list = await TaskInfo.all().values_list("module", flat=True)
db_group.level = group.level
db_group.status = group.status
if group.close_plugins:
db_group.block_plugin = CommonUtils.convert_module_format(
group.close_plugins
)
else:
db_group.block_plugin = ""
if group.task:
if block_task := [t for t in task_list if t not in group.task]:
db_group.block_task = CommonUtils.convert_module_format(block_task) # type: ignore
else:
db_group.block_task = CommonUtils.convert_module_format(task_list) # type: ignore
await db_group.save()
@classmethod
async def get_request_list(cls) -> ReqResult:
"""获取好友与群组请求列表
返回:
ReqResult: 数据内容
"""
req_result = ReqResult()
data_list = await FgRequest.filter(handle_type__isnull=True).all()
for req in data_list:
if req.request_type == RequestType.FRIEND:
req_result.friend.append(
FriendRequestResult(
oid=req.id,
bot_id=req.bot_id,
id=req.user_id,
flag=req.flag,
nickname=req.nickname,
comment=req.comment,
ava_url=AVA_URL.format(req.user_id),
type=str(req.request_type).lower(),
)
)
else:
req_result.group.append(
GroupRequestResult(
oid=req.id,
bot_id=req.bot_id,
id=req.user_id,
flag=req.flag,
nickname=req.nickname,
comment=req.comment,
ava_url=GROUP_AVA_URL.format(req.group_id, req.group_id),
type=str(req.request_type).lower(),
invite_group=req.group_id,
group_name=None,
)
)
req_result.friend.reverse()
req_result.group.reverse()
return req_result
@classmethod
async def get_friend_detail(cls, bot_id: str, user_id: str) -> UserDetail | None:
"""获取好友详情
参数:
bot_id: bot id
user_id: 用户id
返回:
UserDetail | None: 详情数据
"""
bot = nonebot.get_bot(bot_id)
friend_list, _ = await PlatformUtils.get_friend_list(bot)
fd = [x for x in friend_list if x == user_id]
if not fd:
return None
like_plugin_list = (
await Statistics.filter(user_id=user_id)
.annotate(count=Count("id"))
.group_by("plugin_name")
.order_by("-count")
.limit(5)
.values_list("plugin_name", "count")
)
like_plugin = {}
module_list = [x[0] for x in like_plugin_list]
plugins = await PluginInfo.filter(module__in=module_list).all()
module2name = {p.module: p.name for p in plugins}
for data in like_plugin_list:
name = module2name.get(data[0]) or data[0]
like_plugin[name] = data[1]
user = fd[0]
return UserDetail(
user_id=user_id,
ava_url=AVA_URL.format(user_id),
nickname=user.user_name,
remark="",
is_ban=await BanConsole.is_ban(user_id),
chat_count=await ChatHistory.filter(user_id=user_id).count(),
call_count=await Statistics.filter(user_id=user_id).count(),
like_plugin=like_plugin,
)
@classmethod
async def __get_group_detail_like_plugin(cls, group_id: str) -> dict[str, int]:
"""获取群组喜爱的插件
参数:
group_id: 群组id
返回:
dict[str, int]: 插件与调用次数
"""
like_plugin_list = (
await Statistics.filter(group_id=group_id)
.annotate(count=Count("id"))
.group_by("plugin_name")
.order_by("-count")
.limit(5)
.values_list("plugin_name", "count")
)
like_plugin = {}
plugins = await PluginInfo.get_plugins()
module2name = {p.module: p.name for p in plugins}
for data in like_plugin_list:
name = module2name.get(data[0]) or data[0]
like_plugin[name] = data[1]
return like_plugin
@classmethod
async def __get_group_detail_disable_plugin(
cls, group: GroupConsole
) -> list[Plugin]:
"""获取群组禁用插件
参数:
group: GroupConsole
返回:
list[Plugin]: 禁用插件数据列表
"""
disable_plugins: list[Plugin] = []
plugins = await PluginInfo.get_plugins()
module2name = {p.module: p.name for p in plugins}
if group.block_plugin:
for module in CommonUtils.convert_module_format(group.block_plugin):
if module:
plugin = Plugin(
module=module,
plugin_name=module,
is_super_block=False,
)
plugin.plugin_name = module2name.get(module) or module
disable_plugins.append(plugin)
exists_modules = [p.module for p in disable_plugins]
if group.superuser_block_plugin:
for module in CommonUtils.convert_module_format(
group.superuser_block_plugin
):
if module and module not in exists_modules:
plugin = Plugin(
module=module,
plugin_name=module,
is_super_block=True,
)
plugin.plugin_name = module2name.get(module) or module
disable_plugins.append(plugin)
return disable_plugins
@classmethod
async def __get_group_detail_task(cls, group: GroupConsole) -> list[Task]:
"""获取群组被动技能状态
参数:
group: GroupConsole
返回:
list[Task]: 群组被动列表
"""
all_task = await TaskInfo.annotate().values_list("module", "name")
task_module2name = {x[0]: x[1] for x in all_task}
task_list = []
if group.block_task or group.superuser_block_plugin:
sbp = CommonUtils.convert_module_format(group.superuser_block_task)
tasks = CommonUtils.convert_module_format(group.block_task)
task_list.extend(
Task(
name=task[0],
zh_name=task_module2name.get(task[0]) or task[0],
status=task[0] not in tasks and task[0] not in sbp,
is_super_block=task[0] in sbp,
)
for task in all_task
)
else:
task_list.extend(
Task(
name=task[0],
zh_name=task_module2name.get(task[0]) or task[0],
status=True,
is_super_block=False,
)
for task in all_task
)
return task_list
@classmethod
async def get_group_detail(cls, group_id: str) -> GroupDetail | None:
"""获取群组详情
参数:
group_id: 群组id
返回:
GroupDetail | None: 群组详情数据
"""
group = await GroupConsole.get_or_none(group_id=group_id)
if not group:
return None
like_plugin = await cls.__get_group_detail_like_plugin(group_id)
disable_plugins: list[Plugin] = await cls.__get_group_detail_disable_plugin(
group
)
task_list = await cls.__get_group_detail_task(group)
return GroupDetail(
group_id=group_id,
ava_url=GROUP_AVA_URL.format(group_id, group_id),
name=group.group_name,
member_count=group.member_count,
max_member_count=group.max_member_count,
chat_count=await ChatHistory.filter(group_id=group_id).count(),
call_count=await Statistics.filter(group_id=group_id).count(),
like_plugin=like_plugin,
level=group.level,
status=group.status,
close_plugins=disable_plugins,
task=task_list,
)

View File

@ -257,7 +257,7 @@ class Message(BaseModel):
"""用户头像"""
class SendMessage(BaseModel):
class SendMessageParam(BaseModel):
"""
发送消息
"""

View File

@ -11,6 +11,7 @@ from zhenxun.utils.enum import BlockType, PluginType
from ....base_model import Result
from ....utils import authentication
from .data_source import ApiDataSource
from .model import (
PluginConfig,
PluginCount,
@ -34,31 +35,12 @@ async def _(
plugin_type: list[PluginType] = Query(None), menu_type: str | None = None
) -> Result[list[PluginInfo]]:
try:
plugin_list: list[PluginInfo] = []
query = DbPluginInfo
if plugin_type:
query = query.filter(plugin_type__in=plugin_type, load_status=True)
if menu_type:
query = query.filter(menu_type=menu_type, load_status=True)
plugins = await query.all()
for plugin in plugins:
plugin_info = PluginInfo(
module=plugin.module,
plugin_name=plugin.name,
default_status=plugin.default_status,
limit_superuser=plugin.limit_superuser,
cost_gold=plugin.cost_gold,
menu_type=plugin.menu_type,
version=plugin.version or "0",
level=plugin.level,
status=plugin.status,
author=plugin.author,
)
plugin_list.append(plugin_info)
return Result.ok(
await ApiDataSource.get_plugin_list(plugin_type, menu_type), "拿到信息啦!"
)
except Exception as e:
logger.error("调用API错误", "/get_plugins", e=e)
return Result.fail(f"{type(e)}: {e}")
return Result.ok(plugin_list, "拿到了新鲜出炉的数据!")
logger.error(f"{router.prefix}/get_plugin_list 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
@router.get(
@ -69,21 +51,26 @@ async def _(
deprecated="获取插件数量", # type: ignore
)
async def _() -> Result[int]:
plugin_count = PluginCount()
plugin_count.normal = await DbPluginInfo.filter(
plugin_type=PluginType.NORMAL, load_status=True
).count()
plugin_count.admin = await DbPluginInfo.filter(
plugin_type__in=[PluginType.ADMIN, PluginType.SUPER_AND_ADMIN], load_status=True
).count()
plugin_count.superuser = await DbPluginInfo.filter(
plugin_type__in=[PluginType.SUPERUSER, PluginType.SUPER_AND_ADMIN],
load_status=True,
).count()
plugin_count.other = await DbPluginInfo.filter(
plugin_type__in=[PluginType.HIDDEN, PluginType.DEPENDANT], load_status=True
).count()
return Result.ok(plugin_count)
try:
plugin_count = PluginCount()
plugin_count.normal = await DbPluginInfo.filter(
plugin_type=PluginType.NORMAL, load_status=True
).count()
plugin_count.admin = await DbPluginInfo.filter(
plugin_type__in=[PluginType.ADMIN, PluginType.SUPER_AND_ADMIN],
load_status=True,
).count()
plugin_count.superuser = await DbPluginInfo.filter(
plugin_type__in=[PluginType.SUPERUSER, PluginType.SUPER_AND_ADMIN],
load_status=True,
).count()
plugin_count.other = await DbPluginInfo.filter(
plugin_type__in=[PluginType.HIDDEN, PluginType.DEPENDANT], load_status=True
).count()
return Result.ok(plugin_count, "拿到信息啦!")
except Exception as e:
logger.error(f"{router.prefix}/get_plugin_count 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
@router.post(
@ -93,34 +80,15 @@ async def _() -> Result[int]:
response_class=JSONResponse,
description="更新插件参数",
)
async def _(plugin: UpdatePlugin) -> Result:
async def _(param: UpdatePlugin) -> Result:
try:
db_plugin = await DbPluginInfo.get_or_none(
module=plugin.module, load_status=True
)
if not db_plugin:
return Result.fail("插件不存在...")
db_plugin.default_status = plugin.default_status
db_plugin.limit_superuser = plugin.limit_superuser
db_plugin.cost_gold = plugin.cost_gold
db_plugin.level = plugin.level
db_plugin.menu_type = plugin.menu_type
db_plugin.block_type = plugin.block_type
db_plugin.status = plugin.block_type != BlockType.ALL
await db_plugin.save()
# 配置项
if plugin.configs and (configs := Config.get(plugin.module)):
for key in plugin.configs:
if c := configs.configs.get(key):
value = plugin.configs[key]
if c.type and value is not None:
value = cattrs.structure(value, c.type)
Config.set_config(plugin.module, key, value)
Config.save(save_simple_data=True)
await ApiDataSource.update_plugin(param)
return Result.ok(info="已经帮你写好啦!")
except ValueError:
return Result.fail("插件数据不存在...")
except Exception as e:
logger.error("调用API错误", "/update_plugins", e=e)
logger.error(f"{router.prefix}/update_plugin 调用错误", "WebUi", e=e)
return Result.fail(f"{type(e)}: {e}")
return Result.ok(info="已经帮你写好啦!")
@router.post(
@ -131,17 +99,21 @@ async def _(plugin: UpdatePlugin) -> Result:
description="开关插件",
)
async def _(param: PluginSwitch) -> Result:
db_plugin = await DbPluginInfo.get_or_none(module=param.module, load_status=True)
if not db_plugin:
return Result.fail("插件不存在...")
if not param.status:
db_plugin.block_type = BlockType.ALL
db_plugin.status = False
else:
db_plugin.block_type = None
db_plugin.status = True
await db_plugin.save()
return Result.ok(info="成功改变了开关状态!")
try:
db_plugin = await DbPluginInfo.get_plugin(module=param.module)
if not db_plugin:
return Result.fail("插件不存在...")
if not param.status:
db_plugin.block_type = BlockType.ALL
db_plugin.status = False
else:
db_plugin.block_type = None
db_plugin.status = True
await db_plugin.save()
return Result.ok(info="成功改变了开关状态!")
except Exception as e:
logger.error(f"{router.prefix}/change_switch 调用错误", "WebUi", e=e)
return Result.fail(f"{type(e)}: {e}")
@router.get(
@ -152,16 +124,20 @@ async def _(param: PluginSwitch) -> Result:
description="获取插件类型",
)
async def _() -> Result[list[str]]:
menu_type_list = []
result = (
await DbPluginInfo.filter(load_status=True)
.annotate()
.values_list("menu_type", flat=True)
)
for r in result:
if r not in menu_type_list and r:
menu_type_list.append(r)
return Result.ok(menu_type_list)
try:
menu_type_list = []
result = (
await DbPluginInfo.filter(load_status=True)
.annotate()
.values_list("menu_type", flat=True)
)
for r in result:
if r not in menu_type_list and r:
menu_type_list.append(r)
return Result.ok(menu_type_list)
except Exception as e:
logger.error(f"{router.prefix}/get_plugin_menu_type 调用错误", "WebUi", e=e)
return Result.fail(f"{type(e)}: {e}")
@router.get(
@ -172,46 +148,12 @@ async def _() -> Result[list[str]]:
description="获取插件详情",
)
async def _(module: str) -> Result[PluginDetail]:
db_plugin = await DbPluginInfo.get_or_none(module=module, load_status=True)
if not db_plugin:
return Result.fail("插件不存在...")
config_list = []
if config := Config.get(module):
for cfg in config.configs:
type_str = ""
type_inner = None
if r := re.search(r"<class '(.*)'>", str(config.configs[cfg].type)):
type_str = r[1]
elif r := re.search(r"typing\.(.*)\[(.*)\]", str(config.configs[cfg].type)):
type_str = r[1]
if type_str:
type_str = type_str.lower()
type_inner = r[2]
if type_inner:
type_inner = [x.strip() for x in type_inner.split(",")]
config_list.append(
PluginConfig(
module=module,
key=cfg,
value=config.configs[cfg].value,
help=config.configs[cfg].help,
default_value=config.configs[cfg].default_value,
type=type_str,
type_inner=type_inner, # type: ignore
)
)
plugin_info = PluginDetail(
module=module,
plugin_name=db_plugin.name,
default_status=db_plugin.default_status,
limit_superuser=db_plugin.limit_superuser,
cost_gold=db_plugin.cost_gold,
menu_type=db_plugin.menu_type,
version=db_plugin.version or "0",
level=db_plugin.level,
status=db_plugin.status,
author=db_plugin.author,
config_list=config_list,
block_type=db_plugin.block_type,
)
return Result.ok(plugin_info)
try:
return Result.ok(
await ApiDataSource.get_plugin_detail(module), "已经帮你写好啦!"
)
except ValueError:
return Result.fail("插件数据不存在...")
except Exception as e:
logger.error(f"{router.prefix}/get_plugin 调用错误", "WebUi", e=e)
return Result.fail(f"{type(e)}: {e}")

View File

@ -0,0 +1,153 @@
import re
import cattrs
from fastapi import Query
from zhenxun.configs.config import Config
from zhenxun.configs.utils import ConfigGroup
from zhenxun.models.plugin_info import PluginInfo
from zhenxun.models.plugin_info import PluginInfo as DbPluginInfo
from zhenxun.utils.enum import BlockType, PluginType
from .model import PluginConfig, PluginDetail, UpdatePlugin
class ApiDataSource:
@classmethod
async def get_plugin_list(
cls, plugin_type: list[PluginType] = Query(None), menu_type: str | None = None
) -> list[PluginInfo]:
"""获取插件列表
参数:
plugin_type: 插件类型.
menu_type: 菜单类型.
返回:
list[PluginInfo]: 插件数据列表
"""
plugin_list: list[PluginInfo] = []
query = DbPluginInfo
if plugin_type:
query = query.filter(plugin_type__in=plugin_type, load_status=True)
if menu_type:
query = query.filter(menu_type=menu_type, load_status=True)
plugins = await query.all()
for plugin in plugins:
plugin_info = PluginInfo(
module=plugin.module,
plugin_name=plugin.name,
default_status=plugin.default_status,
limit_superuser=plugin.limit_superuser,
cost_gold=plugin.cost_gold,
menu_type=plugin.menu_type,
version=plugin.version or "0",
level=plugin.level,
status=plugin.status,
author=plugin.author,
)
plugin_list.append(plugin_info)
return plugin_list
@classmethod
async def update_plugin(cls, param: UpdatePlugin) -> DbPluginInfo:
"""更新插件数据
参数:
param: UpdatePlugin
返回:
DbPluginInfo | None: 插件数据
"""
db_plugin = await DbPluginInfo.get_plugin(module=param.module)
if not db_plugin:
raise ValueError("插件不存在")
db_plugin.default_status = param.default_status
db_plugin.limit_superuser = param.limit_superuser
db_plugin.cost_gold = param.cost_gold
db_plugin.level = param.level
db_plugin.menu_type = param.menu_type
db_plugin.block_type = param.block_type
db_plugin.status = param.block_type != BlockType.ALL
await db_plugin.save()
# 配置项
if param.configs and (configs := Config.get(param.module)):
for key in param.configs:
if c := configs.configs.get(key):
value = param.configs[key]
if c.type and value is not None:
value = cattrs.structure(value, c.type)
Config.set_config(param.module, key, value)
Config.save(save_simple_data=True)
return db_plugin
@classmethod
def __build_plugin_config(
cls, module: str, cfg: str, config: ConfigGroup
) -> PluginConfig:
"""获取插件配置项
参数:
module: 模块名
cfg: cfg
config: ConfigGroup
返回:
lPluginConfig: 配置数据
"""
type_str = ""
type_inner = None
if r := re.search(r"<class '(.*)'>", str(config.configs[cfg].type)):
type_str = r[1]
elif r := re.search(r"typing\.(.*)\[(.*)\]", str(config.configs[cfg].type)):
type_str = r[1]
if type_str:
type_str = type_str.lower()
type_inner = r[2]
if type_inner:
type_inner = [x.strip() for x in type_inner.split(",")]
return PluginConfig(
module=module,
key=cfg,
value=config.configs[cfg].value,
help=config.configs[cfg].help,
default_value=config.configs[cfg].default_value,
type=type_str,
type_inner=type_inner, # type: ignore
)
@classmethod
async def get_plugin_detail(cls, module: str) -> PluginDetail:
"""获取插件详情
参数:
module: 模块名
异常:
ValueError: 插件不存在
返回:
PluginDetail: 插件详情数据
"""
db_plugin = await DbPluginInfo.get_plugin(module=module)
if not db_plugin:
raise ValueError("插件不存在")
config_list = []
if config := Config.get(module):
config_list.extend(
cls.__build_plugin_config(module, cfg, config) for cfg in config.configs
)
return PluginDetail(
module=module,
plugin_name=db_plugin.name,
default_status=db_plugin.default_status,
limit_superuser=db_plugin.limit_superuser,
cost_gold=db_plugin.cost_gold,
menu_type=db_plugin.menu_type,
version=db_plugin.version or "0",
level=db_plugin.level,
status=db_plugin.status,
author=db_plugin.author,
config_list=config_list,
block_type=db_plugin.block_type,
)

View File

@ -0,0 +1,8 @@
from nonebot import require
require("nonebot_plugin_apscheduler")
require("nonebot_plugin_alconna")
require("nonebot_plugin_session")
require("nonebot_plugin_userinfo")
require("nonebot_plugin_htmlrender")
require("nonebot_plugin_uninfo")

View File

@ -268,7 +268,7 @@ class PlatformUtils:
返回:
Receipt | None: 是否发送成功
"""
if target := cls.get_target(bot, user_id, group_id):
if target := cls.get_target(user_id=user_id, group_id=group_id):
send_message = (
MessageUtils.build_message(message)
if isinstance(message, str)
@ -425,7 +425,7 @@ class PlatformUtils:
@classmethod
def get_target(
cls,
bot: Bot,
*,
user_id: str | None = None,
group_id: str | None = None,
channel_id: str | None = None,
@ -530,7 +530,9 @@ async def broadcast_group(
)
continue
target = PlatformUtils.get_target(
_bot, None, group.group_id, group.channel_id
user_id=None,
group_id=group.group_id,
channel_id=group.channel_id,
)
if target:
_used_group.append(key)