🔧 优化Bot管理插件,增加权限检查和状态控制

This commit is contained in:
HibiKier 2024-12-10 17:56:31 +08:00
parent ee272ec07b
commit 5d3927d120
8 changed files with 453 additions and 209 deletions

View File

@ -290,8 +290,15 @@ class AuthChecker:
plugin: PluginInfo plugin: PluginInfo
bot_id: bot_id bot_id: bot_id
""" """
if await BotConsole.is_block_plugin(plugin.module, bot_id): if not await BotConsole.get_bot_status(bot_id):
raise IgnoredException("机器人权限检测 ignore") logger.debug("Bot休眠中阻断权限检测...", "AuthChecker")
raise IgnoredException("BotConsole休眠权限检测 ignore")
if await BotConsole.is_block_plugin(bot_id, plugin.module):
logger.debug(
f"Bot插件 {plugin.name}({plugin.module}) 权限检查结果为关闭...",
"AuthChecker",
)
raise IgnoredException("BotConsole插件权限检测 ignore")
async def auth_limit(self, plugin: PluginInfo, session: EventSession): async def auth_limit(self, plugin: PluginInfo, session: EventSession):
"""插件限制 """插件限制

View File

@ -4,6 +4,7 @@ import nonebot
from nonebot.adapters import Bot from nonebot.adapters import Bot
from nonebot.plugin import PluginMetadata from nonebot.plugin import PluginMetadata
from zhenxun.services.log import logger
from zhenxun.utils.enum import PluginType from zhenxun.utils.enum import PluginType
from zhenxun.models.task_info import TaskInfo from zhenxun.models.task_info import TaskInfo
from zhenxun.utils.platform import PlatformUtils from zhenxun.utils.platform import PlatformUtils
@ -62,3 +63,4 @@ async def _(bot: Bot):
bot_data.available_plugins = BotConsole._convert_module_format(available_plugins) bot_data.available_plugins = BotConsole._convert_module_format(available_plugins)
bot_data.available_tasks = BotConsole._convert_module_format(available_tasks) bot_data.available_tasks = BotConsole._convert_module_format(available_tasks)
await bot_data.save(update_fields=["available_plugins", "available_tasks"]) await bot_data.save(update_fields=["available_plugins", "available_tasks"])
logger.info("初始化Bot管理完成...")

View File

@ -1,5 +1,5 @@
from nonebot.permission import SUPERUSER from nonebot.permission import SUPERUSER
from arclet.alconna.action import store_true from arclet.alconna.action import store_false
from nonebot_plugin_alconna import on_alconna from nonebot_plugin_alconna import on_alconna
from arclet.alconna import Args, Option, Alconna, Subcommand from arclet.alconna import Args, Option, Alconna, Subcommand
@ -10,77 +10,34 @@ bot_manage = on_alconna(
"task", "task",
Option( Option(
"list", "list",
action=store_true, action=store_false,
default=False,
help_text="查看 bot_id 下的所有可用被动", help_text="查看 bot_id 下的所有可用被动",
), ),
Option("-b|--bot", Args["bot_id", str], help_text="指定 bot_id"),
Subcommand( Subcommand(
"enable", "enable",
Option(
"-a|--all",
action=store_true,
default=False,
help_text="可选开启全部",
),
Args["feature_name?", str], Args["feature_name?", str],
Args["bot_id?", str],
), ),
Subcommand( Subcommand(
"disable", "disable",
Option(
"-a|--all",
action=store_true,
default=False,
help_text="可选关闭全部",
),
Args["feature_name?", str], Args["feature_name?", str],
Args["bot_id?", str],
), ),
), ),
Subcommand( Subcommand(
"plugin", "plugin",
Option( Option(
"list", "list",
action=store_true, action=store_false,
default=False,
help_text="查看 bot_id 下的所有可用插件", help_text="查看 bot_id 下的所有可用插件",
), ),
Option("-b|--bot", Args["bot_id", str], help_text="指定 bot_id"),
Subcommand( Subcommand(
"enable", "enable",
Option(
"-a|--all",
action=store_true,
default=False,
help_text="可选开启全部",
),
Args["plugin_name?", str], Args["plugin_name?", str],
Args["bot_id?", str],
), ),
Subcommand( Subcommand(
"disable", "disable",
Option(
"-a|--all",
action=store_true,
default=False,
help_text="可选关闭全部",
),
Args["plugin_name?", str], Args["plugin_name?", str],
Args["bot_id?", str],
),
),
Subcommand(
"status",
Subcommand(
"tasks",
Args["bot_id", str],
),
Subcommand(
"plugins",
Args["bot_id", str],
),
Subcommand(
"bots",
Args["bot_id", str],
), ),
), ),
Subcommand( Subcommand(
@ -110,3 +67,87 @@ bot_manage = on_alconna(
priority=5, priority=5,
block=True, block=True,
) )
bot_manage.shortcut(
r"bot被动状态",
command="bot_manage",
arguments=["task", "list"],
prefix=True,
)
bot_manage.shortcut(
r"bot开启被动\s*(?P<name>.+)",
command="bot_manage",
arguments=["task", "enable", "{name}"],
prefix=True,
)
bot_manage.shortcut(
r"bot关闭被动\s*(?P<name>.+)",
command="bot_manage",
arguments=["task", "disable", "{name}"],
prefix=True,
)
bot_manage.shortcut(
r"bot开启(全部|所有)被动",
command="bot_manage",
arguments=["task", "enable"],
prefix=True,
)
bot_manage.shortcut(
r"bot关闭(全部|所有)被动",
command="bot_manage",
arguments=["task", "disable"],
prefix=True,
)
bot_manage.shortcut(
r"bot插件列表",
command="bot_manage",
arguments=["plugin", "list"],
prefix=True,
)
bot_manage.shortcut(
r"bot开启(全部|所有)插件",
command="bot_manage",
arguments=["plugin", "enable"],
prefix=True,
)
bot_manage.shortcut(
r"bot关闭(全部|所有)插件",
command="bot_manage",
arguments=["plugin", "disable"],
prefix=True,
)
bot_manage.shortcut(
r"bot开启\s*(?P<name>.+)",
command="bot_manage",
arguments=["plugin", "enable", "{name}"],
prefix=True,
)
bot_manage.shortcut(
r"bot关闭\s*(?P<name>.+)",
command="bot_manage",
arguments=["plugin", "disable", "{name}"],
prefix=True,
)
bot_manage.shortcut(
r"bot休眠\s*(?P<bot_id>.+)?",
command="bot_manage",
arguments=["bot_switch", "disable", "{bot_id}"],
prefix=True,
)
bot_manage.shortcut(
r"bot醒来\s*(?P<bot_id>.+)?",
command="bot_manage",
arguments=["bot_switch", "enable", "{bot_id}"],
prefix=True,
)

View File

@ -1,83 +1,182 @@
from nonebot_plugin_uninfo import Uninfo from nonebot_plugin_uninfo import Uninfo
from nonebot_plugin_alconna import Match, Query, AlconnaMatch, AlconnaQuery from nonebot_plugin_alconna import Match, AlconnaMatch
from zhenxun.services.log import logger from zhenxun.services.log import logger
from zhenxun.utils.enum import PluginType
from zhenxun.utils.message import MessageUtils from zhenxun.utils.message import MessageUtils
from zhenxun.models.bot_console import BotConsole from zhenxun.models.bot_console import BotConsole
from zhenxun.models.plugin_info import PluginInfo
from zhenxun.utils._build_image import BuildImage
from zhenxun.utils._image_template import RowStyle, ImageTemplate
from zhenxun.builtin_plugins.superuser.bot_manage.command import bot_manage from zhenxun.builtin_plugins.superuser.bot_manage.command import bot_manage
@bot_manage.assign("plugin") def task_row_style(column: str, text: str) -> RowStyle:
async def bot_plugin( """被动技能文本风格
session: Uninfo,
plugin_list: Query[bool] = AlconnaQuery("plugin.list.value", default=False), 参数:
): column: 表头
if plugin_list: text: 文本内容
logger.info("获取全部 bot 的所有可用插件", "bot_manage.plugin", session=session)
data = await BotConsole.get_plugins() 返回:
for bot in data: RowStyle: RowStyle
await MessageUtils.build_message(f"{bot[0]} : {bot[1]}").finish() """
style = RowStyle()
if column in {"全局状态"}:
style.font_color = "#67C23A" if text == "开启" else "#F56C6C"
return style
@bot_manage.assign("plugin.list")
async def bot_plugin(session: Uninfo, bot_id: Match[str] = AlconnaMatch("bot_id")):
logger.info("获取全部 bot 的所有可用插件", "bot_manage.plugin", session=session)
column_name = [
"ID",
"模块",
"名称",
"全局状态",
"禁用类型",
"加载状态",
"菜单分类",
"作者",
"版本",
"金币花费",
]
if bot_id.available:
data_dict = {
bot_id.result: await BotConsole.get_plugins(
bot_id=bot_id.result, status=False
)
}
else:
data_dict = await BotConsole.get_plugins(status=False)
db_plugin_list = await PluginInfo.filter(
load_status=True, plugin_type__not=PluginType.HIDDEN
).all()
img_list = []
for __bot_id, tk in data_dict.items():
column_data = [
[
plugin.id,
plugin.module,
plugin.name,
"开启" if plugin.module not in tk else "关闭",
plugin.block_type,
"SUCCESS" if plugin.load_status else "ERROR",
plugin.menu_type,
plugin.author,
plugin.version,
plugin.cost_gold,
]
for plugin in db_plugin_list
]
img = await ImageTemplate.table_page(
f"{__bot_id}插件列表",
None,
column_name,
column_data,
text_style=task_row_style,
)
img_list.append(img)
result = await BuildImage.auto_paste(img_list, 3)
await MessageUtils.build_message(result).finish()
@bot_manage.assign("plugin.enable") @bot_manage.assign("plugin.enable")
async def enable_plugin( async def enable_plugin(
session: Uninfo, session: Uninfo,
all_flag: Query[bool] = AlconnaQuery("plugin.enable.all.value", default=False),
plugin_name: Match[str] = AlconnaMatch("plugin_name"), plugin_name: Match[str] = AlconnaMatch("plugin_name"),
bot_id: Match[str] = AlconnaMatch("bot_id"), bot_id: Match[str] = AlconnaMatch("bot_id"),
): ):
if all_flag and plugin_name.available: if plugin_name.available:
await logger.info( plugin: PluginInfo | None = await PluginInfo.get_plugin(name=plugin_name.result)
f"启用全部 bot 的 {plugin_name.result} ", if not plugin:
"bot_manage.plugin.enable", await MessageUtils.build_message("未找到该插件...").finish()
session=session, if bot_id.available:
) logger.info(
await BotConsole.enable_plugin(None, plugin_name.result) f"开启 {bot_id.result} 的插件 {plugin_name.result}",
await MessageUtils.build_message( "bot_manage.plugin.disable",
f"已启用全部 bot 的 {plugin_name.result} " session=session,
).finish() )
await BotConsole.enable_plugin(bot_id.result, plugin.module)
if bot_id.available and plugin_name.available: await MessageUtils.build_message(
f"已开启 {bot_id.result} 的插件 {plugin_name.result}"
).finish()
else:
logger.info(
f"开启全部 bot 的插件: {plugin_name.result}",
"bot_manage.plugin.disable",
session=session,
)
await BotConsole.enable_plugin(None, plugin.module)
await MessageUtils.build_message(
f"已禁用全部 bot 的插件: {plugin_name.result}"
).finish()
elif bot_id.available:
logger.info( logger.info(
f"启用 {bot_id.result}{plugin_name.result}", f"{bot_id.result} 全部插件",
"bot_manage.plugin.enable", "bot_manage.plugin.disable",
session=session, session=session,
) )
await BotConsole.enable_plugin(bot_id.result, plugin_name.result) await BotConsole.enable_all(bot_id.result, "plugins")
await MessageUtils.build_message( await MessageUtils.build_message(f"已开启 {bot_id.result} 全部插件").finish()
f"已启用 {bot_id.result}{plugin_name.result}" else:
).finish() bot_id_list = await BotConsole.annotate().values_list("bot_id", flat=True)
for __bot_id in bot_id_list:
await MessageUtils.build_message("缺失参数").finish() await BotConsole.enable_all(__bot_id, "plugins") # type: ignore
logger.info(
"开启全部 bot 全部插件",
"bot_manage.plugin.disable",
session=session,
)
await MessageUtils.build_message("开启全部 bot 全部插件").finish()
@bot_manage.assign("plugin.disable") @bot_manage.assign("plugin.disable")
async def disable_plugin( async def disable_plugin(
session: Uninfo, session: Uninfo,
all_flag: Query[bool] = AlconnaQuery("plugin.disable.all.value", default=False),
plugin_name: Match[str] = AlconnaMatch("plugin_name"), plugin_name: Match[str] = AlconnaMatch("plugin_name"),
bot_id: Match[str] = AlconnaMatch("bot_id"), bot_id: Match[str] = AlconnaMatch("bot_id"),
): ):
if all_flag and plugin_name.available: if plugin_name.available:
await logger.info( plugin = await PluginInfo.get_plugin(name=plugin_name.result)
f"禁用全部 bot 的 {plugin_name.result} ", if not plugin:
"bot_manage.plugin.disable", await MessageUtils.build_message("未找到该插件...").finish()
session=session, if bot_id.available:
) logger.info(
await BotConsole.disable_plugin(None, plugin_name.result) f"禁用 {bot_id.result} 的插件 {plugin_name.result}",
await MessageUtils.build_message( "bot_manage.plugin.disable",
f"已禁用全部 bot 的 {plugin_name.result} " session=session,
).finish() )
await BotConsole.disable_plugin(bot_id.result, plugin.module)
if bot_id.available and plugin_name.available: await MessageUtils.build_message(
f"已禁用 {bot_id.result} 的插件 {plugin_name.result}"
).finish()
else:
logger.info(
f"禁用全部 bot 的插件: {plugin_name.result}",
"bot_manage.plugin.disable",
session=session,
)
await BotConsole.disable_plugin(None, plugin.module)
await MessageUtils.build_message(
f"已禁用全部 bot 的插件: {plugin_name.result}"
).finish()
elif bot_id.available:
logger.info( logger.info(
f"禁用 {bot_id.result}{plugin_name.result}", f"禁用 {bot_id.result} 全部插件",
"bot_manage.plugin.disable", "bot_manage.plugin.disable",
session=session, session=session,
) )
await BotConsole.disable_plugin(bot_id.result, plugin_name.result) await BotConsole.disable_all(bot_id.result, "plugins")
await MessageUtils.build_message( await MessageUtils.build_message(f"已禁用 {bot_id.result} 全部插件").finish()
f"已禁用 {bot_id.result}{plugin_name.result}" else:
).finish() bot_id_list = await BotConsole.annotate().values_list("bot_id", flat=True)
for __bot_id in bot_id_list:
await MessageUtils.build_message("缺失参数").finish() await BotConsole.disable_all(__bot_id, "plugins") # type: ignore
logger.info(
"禁用全部 bot 全部插件",
"bot_manage.plugin.disable",
session=session,
)
await MessageUtils.build_message("禁用全部 bot 全部插件").finish()

View File

@ -1,37 +0,0 @@
import asyncio
from nonebot_plugin_alconna import Match, AlconnaMatch
from zhenxun.utils.message import MessageUtils
from zhenxun.models.bot_console import BotConsole
from zhenxun.builtin_plugins.superuser.bot_manage.command import bot_manage
@bot_manage.assign("status.tasks")
async def handle_tasks_status(bot_id: Match[str] = AlconnaMatch("bot_id")):
if not bot_id.available:
await MessageUtils.build_message("bot_id 不能为空").finish()
result = await asyncio.gather(
BotConsole.get_tasks(bot_id.result),
BotConsole.get_tasks(bot_id.result, False),
)
await MessageUtils.build_message(
f"可用被动: {result[0]}\n禁用被动: {result[1]}"
).finish()
@bot_manage.assign("status.plugins")
async def handle_plugins_status(bot_id: Match[str] = AlconnaMatch("bot_id")):
if not bot_id.available:
await MessageUtils.build_message("bot_id 不能为空").finish()
result = await asyncio.gather(
BotConsole.get_plugins(bot_id.result),
BotConsole.get_plugins(bot_id.result, False),
)
await MessageUtils.build_message(
f"可用插件: {result[0]}\n禁用插件: {result[1]}"
).finish()

View File

@ -1,75 +1,164 @@
from nonebot_plugin_uninfo import Uninfo from nonebot_plugin_uninfo import Uninfo
from nonebot_plugin_alconna import Match, Query, AlconnaMatch, AlconnaQuery from nonebot_plugin_alconna import Match, AlconnaMatch
from zhenxun.services.log import logger from zhenxun.services.log import logger
from zhenxun.models.task_info import TaskInfo
from zhenxun.utils.message import MessageUtils from zhenxun.utils.message import MessageUtils
from zhenxun.models.bot_console import BotConsole from zhenxun.models.bot_console import BotConsole
from zhenxun.utils._build_image import BuildImage
from zhenxun.utils._image_template import RowStyle
from zhenxun.utils.image_utils import ImageTemplate
from zhenxun.builtin_plugins.superuser.bot_manage.command import bot_manage from zhenxun.builtin_plugins.superuser.bot_manage.command import bot_manage
@bot_manage.assign("task") def task_row_style(column: str, text: str) -> RowStyle:
async def bot_task( """被动技能文本风格
session: Uninfo,
task_list: Query[bool] = AlconnaQuery("task.list.value", default=False), 参数:
): column: 表头
if task_list: text: 文本内容
logger.info("获取全部 bot 的所有可用被动", "bot_manage.task", session=session)
data = await BotConsole.get_tasks() 返回:
for bot in data: RowStyle: RowStyle
await MessageUtils.build_message(f"{bot[0]} : {bot[1]}").finish() """
style = RowStyle()
if column in {"全局状态"}:
style.font_color = "#67C23A" if text == "开启" else "#F56C6C"
return style
@bot_manage.assign("task.list")
async def bot_task(session: Uninfo, bot_id: Match[str] = AlconnaMatch("bot_id")):
logger.info("获取全部 bot 的所有可用被动", "bot_manage.task", session=session)
if bot_id.available:
data_dict = {
bot_id.result: await BotConsole.get_tasks(
bot_id=bot_id.result, status=False
)
}
else:
data_dict = await BotConsole.get_tasks(status=False)
db_task_list = await TaskInfo.all()
column_name = ["ID", "模块", "名称", "全局状态", "运行时间"]
img_list = []
for __bot_id, tk in data_dict.items():
column_data = [
[
task.id,
task.module,
task.name,
"开启" if task.module not in tk else "关闭",
task.run_time or "-",
]
for task in db_task_list
]
img = await ImageTemplate.table_page(
f"{__bot_id}被动技能状态",
None,
column_name,
column_data,
text_style=task_row_style,
)
img_list.append(img)
result = await BuildImage.auto_paste(img_list, 3)
await MessageUtils.build_message(result).finish()
@bot_manage.assign("task.enable") @bot_manage.assign("task.enable")
async def enable_task( async def enable_task(
session: Uninfo, session: Uninfo,
all_flag: Query[bool] = AlconnaQuery("task.enable.all.value", default=False), task_name: Match[str] = AlconnaMatch("feature_name"),
task_name: Match[str] = AlconnaMatch("plugin_name"),
bot_id: Match[str] = AlconnaMatch("bot_id"), bot_id: Match[str] = AlconnaMatch("bot_id"),
): ):
if all_flag and task_name.available: if task_name.available:
await logger.info( task: TaskInfo | None = await TaskInfo.get_or_none(name=task_name.result)
"启用全部 bot 的所有可用被动", "bot_manage.task.enable", session=session if not task:
) await MessageUtils.build_message("未找到被动...").finish()
await BotConsole.enable_task(None, task_name.result) if bot_id.available:
await MessageUtils.build_message("已启用全部 bot 的所有可用被动").finish() logger.info(
f"开启 {bot_id.result} 被动的 {task_name.available}",
if bot_id.available and task_name.available: "bot_manage.task.disable",
await logger.info( session=session,
f"启用 {bot_id.result}{task_name.result}", )
"bot_manage.task.enable", await BotConsole.enable_task(bot_id.result, task.module)
await MessageUtils.build_message(
f"已开启 {bot_id.result} 被动的 {task_name.available}"
).finish()
else:
logger.info(
f"开启全部 bot 的被动: {task_name.available}",
"bot_manage.task.disable",
session=session,
)
await BotConsole.enable_task(None, task.module)
await MessageUtils.build_message(
f"已禁用全部 bot 的被动: {task_name.available}"
).finish()
elif bot_id.available:
logger.info(
f"开启 {bot_id.result} 全部被动",
"bot_manage.task.disable",
session=session, session=session,
) )
await BotConsole.enable_task(bot_id.result, task_name.result) await BotConsole.enable_all(bot_id.result, "tasks")
await MessageUtils.build_message( await MessageUtils.build_message(f"已开启 {bot_id.result} 全部被动").finish()
f"已启用 {bot_id.result}{task_name.result}" else:
).finish() bot_id_list = await BotConsole.annotate().values_list("bot_id", flat=True)
for __bot_id in bot_id_list:
await MessageUtils.build_message("缺失参数").finish() await BotConsole.enable_all(__bot_id, "tasks") # type: ignore
logger.info(
"开启全部 bot 全部被动",
"bot_manage.task.disable",
session=session,
)
await MessageUtils.build_message("开启全部 bot 全部被动").finish()
@bot_manage.assign("task.disable") @bot_manage.assign("task.disable")
async def disable_task( async def disable_task(
session: Uninfo, session: Uninfo,
all_flag: Query[bool] = AlconnaQuery("task.disable.all.value", default=False), task_name: Match[str] = AlconnaMatch("feature_name"),
task_name: Match[str] = AlconnaMatch("plugin_name"),
bot_id: Match[str] = AlconnaMatch("bot_id"), bot_id: Match[str] = AlconnaMatch("bot_id"),
): ):
if all_flag and task_name.available: if task_name.available:
await logger.info( task: TaskInfo | None = await TaskInfo.get_or_none(name=task_name.result)
"禁用全部 bot 的所有可用被动", "bot_manage.task.disable", session=session if not task:
) await MessageUtils.build_message("未找到被动...").finish()
await BotConsole.disable_task(None, task_name.result) if bot_id.available:
await MessageUtils.build_message("已禁用全部 bot 的所有可用被动").finish() logger.info(
f"禁用 {bot_id.result} 被动的 {task_name.available}",
if bot_id.available and task_name.available: "bot_manage.task.disable",
session=session,
)
await BotConsole.disable_task(bot_id.result, task.module)
await MessageUtils.build_message(
f"已禁用 {bot_id.result} 被动的 {task_name.available}"
).finish()
else:
logger.info(
f"禁用全部 bot 的被动: {task_name.available}",
"bot_manage.task.disable",
session=session,
)
await BotConsole.disable_task(None, task.module)
await MessageUtils.build_message(
f"已禁用全部 bot 的被动: {task_name.available}"
).finish()
elif bot_id.available:
logger.info( logger.info(
f"禁用 {bot_id.result}{task_name.result}", f"禁用 {bot_id.result} 全部被动",
"bot_manage.task.disable", "bot_manage.task.disable",
session=session, session=session,
) )
await BotConsole.disable_task(bot_id.result, task_name.result) await BotConsole.disable_all(bot_id.result, "tasks")
await MessageUtils.build_message( await MessageUtils.build_message(f"已禁用 {bot_id.result} 全部被动").finish()
f"已禁用 {bot_id.result}{task_name.result}" else:
).finish() bot_id_list = await BotConsole.annotate().values_list("bot_id", flat=True)
for __bot_id in bot_id_list:
await MessageUtils.build_message("缺失参数").finish() await BotConsole.disable_all(__bot_id, "tasks") # type: ignore
logger.info(
"禁用全部 bot 全部被动",
"bot_manage.task.disable",
session=session,
)
await MessageUtils.build_message("禁用全部 bot 全部被动").finish()

View File

@ -63,19 +63,19 @@ class BotConsole(Model):
@overload @overload
@classmethod @classmethod
async def get_tasks(cls) -> list[tuple[str, str]]: ... async def get_tasks(cls) -> list[tuple[str, list[str]]]: ...
@overload @overload
@classmethod @classmethod
async def get_tasks(cls, bot_id: str) -> str: ... async def get_tasks(cls, bot_id: str) -> list[str]: ...
@overload @overload
@classmethod @classmethod
async def get_tasks(cls, *, status: bool) -> list[tuple[str, str]]: ... async def get_tasks(cls, *, status: bool) -> dict[str, list[str]]: ...
@overload @overload
@classmethod @classmethod
async def get_tasks(cls, bot_id: str, status: bool = True) -> str: ... async def get_tasks(cls, bot_id: str, status: bool = True) -> list[str]: ...
@classmethod @classmethod
async def get_tasks(cls, bot_id: str | None = None, status: bool | None = True): async def get_tasks(cls, bot_id: str | None = None, status: bool | None = True):
@ -90,29 +90,32 @@ class BotConsole(Model):
list[tuple[str, str]] | str: 被动技能 list[tuple[str, str]] | str: 被动技能
""" """
if not bot_id: if not bot_id:
task_field = "available_tasks" if status else "block_tasks" task_field: Literal["available_tasks", "block_tasks"] = (
return await cls.all().values_list("bot_id", task_field) "available_tasks" if status else "block_tasks"
)
data_list = await cls.all().values_list("bot_id", task_field)
return {k: cls._convert_module_format(v) for k, v in data_list}
result = await cls.get_or_none(bot_id=bot_id) result = await cls.get_or_none(bot_id=bot_id)
if result: if result:
return result.available_tasks if status else result.block_tasks tasks = result.available_tasks if status else result.block_tasks
return "" return cls._convert_module_format(tasks)
return []
@overload @overload
@classmethod @classmethod
async def get_plugins(cls) -> list[tuple[str, str]]: ... async def get_plugins(cls) -> dict[str, list[str]]: ...
@overload @overload
@classmethod @classmethod
async def get_plugins(cls, bot_id: str) -> str: ... async def get_plugins(cls, bot_id: str) -> list[str]: ...
@overload @overload
@classmethod @classmethod
async def get_plugins(cls, *, status: bool) -> list[tuple[str, str]]: ... async def get_plugins(cls, *, status: bool) -> dict[str, list[str]]: ...
@overload @overload
@classmethod @classmethod
async def get_plugins(cls, bot_id: str, status: bool = True) -> str: ... async def get_plugins(cls, bot_id: str, status: bool = True) -> list[str]: ...
@classmethod @classmethod
async def get_plugins(cls, bot_id: str | None = None, status: bool = True): async def get_plugins(cls, bot_id: str | None = None, status: bool = True):
@ -128,12 +131,14 @@ class BotConsole(Model):
""" """
if not bot_id: if not bot_id:
plugin_field = "available_plugins" if status else "block_plugins" plugin_field = "available_plugins" if status else "block_plugins"
return await cls.all().values_list("bot_id", plugin_field) data_list = await cls.all().values_list("bot_id", plugin_field)
return {k: cls._convert_module_format(v) for k, v in data_list}
result = await cls.get_or_none(bot_id=bot_id) result = await cls.get_or_none(bot_id=bot_id)
if result: if result:
return result.available_plugins if status else result.block_plugins plugins = result.available_plugins if status else result.block_plugins
return "" return cls._convert_module_format(plugins)
return []
@classmethod @classmethod
async def set_bot_status(cls, status: bool, bot_id: str | None = None) -> None: async def set_bot_status(cls, status: bool, bot_id: str | None = None) -> None:
@ -336,12 +341,27 @@ class BotConsole(Model):
""" """
bot_data, _ = await cls.get_or_create(bot_id=bot_id) bot_data, _ = await cls.get_or_create(bot_id=bot_id)
if feat == "plugins": if feat == "plugins":
available_plugins = cls._convert_module_format(bot_data.available_plugins)
block_plugins = cls._convert_module_format(bot_data.block_plugins)
bot_data.block_plugins = cls._convert_module_format(
available_plugins + block_plugins
)
bot_data.available_plugins = "" bot_data.available_plugins = ""
# todo)) 使用初始化方法重新写入bot_data.block_plugins以保证插件列表完整
elif feat == "tasks": elif feat == "tasks":
available_tasks = cls._convert_module_format(bot_data.available_tasks)
block_tasks = cls._convert_module_format(bot_data.block_tasks)
bot_data.block_tasks = cls._convert_module_format(
available_tasks + block_tasks
)
bot_data.available_tasks = "" bot_data.available_tasks = ""
# todo)) 使用初始化方法重新写入bot_data.block_tasks以保证插件列表完整 await bot_data.save(
await bot_data.save() update_fields=[
"available_tasks",
"block_tasks",
"available_plugins",
"block_plugins",
]
)
@classmethod @classmethod
async def enable_all( async def enable_all(
@ -358,12 +378,27 @@ class BotConsole(Model):
""" """
bot_data, _ = await cls.get_or_create(bot_id=bot_id) bot_data, _ = await cls.get_or_create(bot_id=bot_id)
if feat == "plugins": if feat == "plugins":
available_plugins = cls._convert_module_format(bot_data.available_plugins)
block_plugins = cls._convert_module_format(bot_data.block_plugins)
bot_data.available_plugins = cls._convert_module_format(
available_plugins + block_plugins
)
bot_data.block_plugins = "" bot_data.block_plugins = ""
# todo)) 使用初始化方法重新写入bot_data.available_plugins以保证插件列表完整
elif feat == "tasks": elif feat == "tasks":
available_tasks = cls._convert_module_format(bot_data.available_tasks)
block_tasks = cls._convert_module_format(bot_data.block_tasks)
bot_data.available_tasks = cls._convert_module_format(
available_tasks + block_tasks
)
bot_data.block_tasks = "" bot_data.block_tasks = ""
# todo)) 使用初始化方法重新写入bot_data.available_tasks以保证插件列表完整 await bot_data.save(
await bot_data.save() update_fields=[
"available_tasks",
"block_tasks",
"available_plugins",
"block_plugins",
]
)
@classmethod @classmethod
async def is_block_plugin(cls, bot_id: str, plugin_name: str) -> bool: async def is_block_plugin(cls, bot_id: str, plugin_name: str) -> bool:
@ -372,7 +407,7 @@ class BotConsole(Model):
参数: 参数:
bot_id (str): bot_id bot_id (str): bot_id
plugin_name (str): 插件名称 plugin_name (str): 插件某款
返回: 返回:
bool: 是否被禁用 bool: 是否被禁用

View File

@ -5,6 +5,7 @@ from zhenxun.services.log import logger
from zhenxun.configs.config import BotConfig from zhenxun.configs.config import BotConfig
from zhenxun.models.task_info import TaskInfo from zhenxun.models.task_info import TaskInfo
from zhenxun.models.ban_console import BanConsole from zhenxun.models.ban_console import BanConsole
from zhenxun.models.bot_console import BotConsole
from zhenxun.models.group_console import GroupConsole from zhenxun.models.group_console import GroupConsole
@ -39,6 +40,13 @@ class CommonUtils:
"""被动全局状态""" """被动全局状态"""
if not task.status: if not task.status:
return True return True
if not await BotConsole.get_bot_status(session.self_id):
"""bot是否休眠"""
return True
block_tasks = await BotConsole.get_tasks(session.self_id, False)
if module in block_tasks:
"""bot是否禁用被动"""
return True
if group_id: if group_id:
if await GroupConsole.is_block_task(group_id, module): if await GroupConsole.is_block_task(group_id, module):
"""群组是否禁用被动""" """群组是否禁用被动"""