From 5590445679edfb80674395cd6583f091c96c40bb Mon Sep 17 00:00:00 2001 From: HibiKier <45528451+HibiKier@users.noreply.github.com> Date: Tue, 10 Dec 2024 20:16:14 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0Bot=E7=AE=A1=E7=90=86?= =?UTF-8?q?=E6=8F=92=E4=BB=B6=20(#1758)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: BalconyJH <73932916+BalconyJH@users.noreply.github.com> --- .gitignore | 5 +- .pre-commit-config.yaml | 2 +- .../builtin_plugins/hooks/_auth_checker.py | 11 +- .../superuser/bot_manage/__init__.py | 53 ++- .../superuser/bot_manage/bot_switch.py | 51 +++ .../superuser/bot_manage/command.py | 153 +++++++ .../superuser/bot_manage/full_function.py | 51 +++ .../superuser/bot_manage/plugin.py | 182 ++++++++ .../superuser/bot_manage/task.py | 164 +++++++ zhenxun/models/bot_console.py | 421 ++++++++++++++++-- zhenxun/utils/common_utils.py | 8 + 11 files changed, 1068 insertions(+), 33 deletions(-) create mode 100644 zhenxun/builtin_plugins/superuser/bot_manage/bot_switch.py create mode 100644 zhenxun/builtin_plugins/superuser/bot_manage/command.py create mode 100644 zhenxun/builtin_plugins/superuser/bot_manage/full_function.py create mode 100644 zhenxun/builtin_plugins/superuser/bot_manage/plugin.py create mode 100644 zhenxun/builtin_plugins/superuser/bot_manage/task.py diff --git a/.gitignore b/.gitignore index db7233b2..f8e91025 100644 --- a/.gitignore +++ b/.gitignore @@ -113,6 +113,7 @@ venv/ ENV/ env.bak/ venv.bak/ +.env.dev # Spyder project settings .spyderproject @@ -151,9 +152,7 @@ backup/ extensive_plugin/ test/ bot.py -data/ -.env -.env.dev +.idea/ /resources/text/ # /resources/image/ /resources/temp/ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7cdafdb1..1d73297b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -13,4 +13,4 @@ repos: args: [--fix] stages: [pre-commit] - id: ruff-format - stages: [pre-commit] \ No newline at end of file + stages: [pre-commit] diff --git a/zhenxun/builtin_plugins/hooks/_auth_checker.py b/zhenxun/builtin_plugins/hooks/_auth_checker.py index 79fa43e5..7e2e179c 100644 --- a/zhenxun/builtin_plugins/hooks/_auth_checker.py +++ b/zhenxun/builtin_plugins/hooks/_auth_checker.py @@ -307,8 +307,15 @@ class AuthChecker: plugin: PluginInfo bot_id: bot_id """ - if await BotConsole.is_block_plugin(plugin.module, bot_id): - raise IgnoredException("机器人权限检测 ignore") + if not await BotConsole.get_bot_status(bot_id): + logger.debug("Bot休眠中阻断权限检测...", "AuthChecker") + raise IgnoredException("BotConsole休眠权限检测 ignore") + if await BotConsole.is_block_plugin(bot_id, plugin.module): + logger.debug( + f"Bot插件 {plugin.name}({plugin.module}) 权限检查结果为关闭...", + "AuthChecker", + ) + raise IgnoredException("BotConsole插件权限检测 ignore") async def auth_limit(self, plugin: PluginInfo, session: EventSession): """插件限制 diff --git a/zhenxun/builtin_plugins/superuser/bot_manage/__init__.py b/zhenxun/builtin_plugins/superuser/bot_manage/__init__.py index a794c413..736efa40 100644 --- a/zhenxun/builtin_plugins/superuser/bot_manage/__init__.py +++ b/zhenxun/builtin_plugins/superuser/bot_manage/__init__.py @@ -1,17 +1,66 @@ +from pathlib import Path + +import nonebot +from nonebot.adapters import Bot from nonebot.plugin import PluginMetadata from zhenxun.configs.utils import PluginExtraData +from zhenxun.models.bot_console import BotConsole +from zhenxun.models.plugin_info import PluginInfo +from zhenxun.models.task_info import TaskInfo +from zhenxun.services.log import logger from zhenxun.utils.enum import PluginType +from zhenxun.utils.platform import PlatformUtils + +driver = nonebot.get_driver() + +_sub_plugins = set() +_sub_plugins |= nonebot.load_plugins(str(Path(__file__).parent.resolve())) __plugin_meta__ = PluginMetadata( name="Bot管理", description="指定bot对象的功能/被动开关和状态", usage=""" - 清理临时数据 """.strip(), extra=PluginExtraData( author="", version="0.1", - plugin_type=PluginType.SUPERUSER, + plugin_type=PluginType.PARENT, ).dict(), ) + + +@driver.on_bot_connect +async def _(bot: Bot): + """初始化Bot管理 + + 参数: + bot: Bot + """ + plugin_list = await PluginInfo.get_plugins( + plugin_type__in=[PluginType.NORMAL, PluginType.DEPENDANT, PluginType.ADMIN] + ) + available_tasks: list[str] = await TaskInfo.filter(status=True).values_list( + "module", flat=True + ) # type: ignore + available_plugins = [p.module for p in plugin_list] + # for _, bot in nonebot.get_bots().items(): + platform = PlatformUtils.get_platform(bot) + bot_data, is_create = await BotConsole.get_or_create( + bot_id=bot.self_id, platform=platform + ) + if not is_create: + block_plugins = await bot_data.get_plugins(bot.self_id, False) + block_plugins = BotConsole._convert_module_format(block_plugins) + for module in available_plugins.copy(): + if module in block_plugins: + available_plugins.remove(module) + block_tasks = await bot_data.get_tasks(bot.self_id, False) + block_tasks = BotConsole._convert_module_format(block_tasks) + for module in available_tasks.copy(): + if module in block_plugins: + available_tasks.remove(module) + bot_data.available_plugins = BotConsole._convert_module_format(available_plugins) + bot_data.available_tasks = BotConsole._convert_module_format(available_tasks) + await bot_data.save(update_fields=["available_plugins", "available_tasks"]) + logger.info("初始化Bot管理完成...") diff --git a/zhenxun/builtin_plugins/superuser/bot_manage/bot_switch.py b/zhenxun/builtin_plugins/superuser/bot_manage/bot_switch.py new file mode 100644 index 00000000..5618fcf7 --- /dev/null +++ b/zhenxun/builtin_plugins/superuser/bot_manage/bot_switch.py @@ -0,0 +1,51 @@ +from nonebot_plugin_alconna import AlconnaMatch, Match +from nonebot_plugin_uninfo import Uninfo + +from zhenxun.builtin_plugins.superuser.bot_manage.command import bot_manage +from zhenxun.models.bot_console import BotConsole +from zhenxun.services.log import logger +from zhenxun.utils.message import MessageUtils + + +@bot_manage.assign("bot_switch.enable") +async def enable_bot_switch( + session: Uninfo, + bot_id: Match[str] = AlconnaMatch("bot_id"), +): + if not bot_id.available: + await MessageUtils.build_message("bot_id 不能为空").finish() + + else: + logger.info( + f"开启 {bot_id.result} ", + "bot_manage.bot_switch.enable", + session=session, + ) + try: + await BotConsole.set_bot_status(True, bot_id.result) + except ValueError: + await MessageUtils.build_message(f"bot_id {bot_id.result} 不存在").finish() + + await MessageUtils.build_message(f"已开启 {bot_id.result} ").finish() + + +@bot_manage.assign("bot_switch.disable") +async def diasble_bot_switch( + session: Uninfo, + bot_id: Match[str] = AlconnaMatch("bot_id"), +): + if not bot_id.available: + await MessageUtils.build_message("bot_id 不能为空").finish() + + else: + logger.info( + f"禁用 {bot_id.result} ", + "bot_manage.bot_switch.disable", + session=session, + ) + try: + await BotConsole.set_bot_status(False, bot_id.result) + except ValueError: + await MessageUtils.build_message(f"bot_id {bot_id.result} 不存在").finish() + + await MessageUtils.build_message(f"已禁用 {bot_id.result} ").finish() diff --git a/zhenxun/builtin_plugins/superuser/bot_manage/command.py b/zhenxun/builtin_plugins/superuser/bot_manage/command.py new file mode 100644 index 00000000..3df9f6bd --- /dev/null +++ b/zhenxun/builtin_plugins/superuser/bot_manage/command.py @@ -0,0 +1,153 @@ +from arclet.alconna import Alconna, Args, Option, Subcommand +from arclet.alconna.action import store_false +from nonebot.permission import SUPERUSER +from nonebot_plugin_alconna import on_alconna + +bot_manage = on_alconna( + Alconna( + "bot_manage", + Subcommand( + "task", + Option( + "list", + action=store_false, + help_text="查看 bot_id 下的所有可用被动", + ), + Option("-b|--bot", Args["bot_id", str], help_text="指定 bot_id"), + Subcommand( + "enable", + Args["feature_name?", str], + ), + Subcommand( + "disable", + Args["feature_name?", str], + ), + ), + Subcommand( + "plugin", + Option( + "list", + action=store_false, + help_text="查看 bot_id 下的所有可用插件", + ), + Option("-b|--bot", Args["bot_id", str], help_text="指定 bot_id"), + Subcommand( + "enable", + Args["plugin_name?", str], + ), + Subcommand( + "disable", + Args["plugin_name?", str], + ), + ), + Subcommand( + "full_function", + Subcommand( + "enable", + Args["bot_id?", str], + ), + Subcommand( + "disable", + Args["bot_id?", str], + ), + ), + Subcommand( + "bot_switch", + Subcommand( + "enable", + Args["bot_id?", str], + ), + Subcommand( + "disable", + Args["bot_id?", str], + ), + ), + ), + permission=SUPERUSER, + priority=5, + block=True, +) + +bot_manage.shortcut( + r"bot被动状态", + command="bot_manage", + arguments=["task", "list"], + prefix=True, +) + +bot_manage.shortcut( + r"bot开启被动\s*(?P.+)", + command="bot_manage", + arguments=["task", "enable", "{name}"], + prefix=True, +) + +bot_manage.shortcut( + r"bot关闭被动\s*(?P.+)", + command="bot_manage", + arguments=["task", "disable", "{name}"], + prefix=True, +) + +bot_manage.shortcut( + r"bot开启(全部|所有)被动", + command="bot_manage", + arguments=["task", "enable"], + prefix=True, +) + +bot_manage.shortcut( + r"bot关闭(全部|所有)被动", + command="bot_manage", + arguments=["task", "disable"], + prefix=True, +) + +bot_manage.shortcut( + r"bot插件列表", + command="bot_manage", + arguments=["plugin", "list"], + prefix=True, +) + +bot_manage.shortcut( + r"bot开启(全部|所有)插件", + command="bot_manage", + arguments=["plugin", "enable"], + prefix=True, +) + +bot_manage.shortcut( + r"bot关闭(全部|所有)插件", + command="bot_manage", + arguments=["plugin", "disable"], + prefix=True, +) + +bot_manage.shortcut( + r"bot开启\s*(?P.+)", + command="bot_manage", + arguments=["plugin", "enable", "{name}"], + prefix=True, +) + +bot_manage.shortcut( + r"bot关闭\s*(?P.+)", + command="bot_manage", + arguments=["plugin", "disable", "{name}"], + prefix=True, +) + +bot_manage.shortcut( + r"bot休眠\s*(?P.+)?", + command="bot_manage", + arguments=["bot_switch", "disable", "{bot_id}"], + prefix=True, +) + +bot_manage.shortcut( + r"bot醒来\s*(?P.+)?", + command="bot_manage", + arguments=["bot_switch", "enable", "{bot_id}"], + prefix=True, +) diff --git a/zhenxun/builtin_plugins/superuser/bot_manage/full_function.py b/zhenxun/builtin_plugins/superuser/bot_manage/full_function.py new file mode 100644 index 00000000..010381f5 --- /dev/null +++ b/zhenxun/builtin_plugins/superuser/bot_manage/full_function.py @@ -0,0 +1,51 @@ +from nonebot_plugin_alconna import AlconnaMatch, Match +from nonebot_plugin_uninfo import Uninfo + +from zhenxun.builtin_plugins.superuser.bot_manage.command import bot_manage +from zhenxun.models.bot_console import BotConsole +from zhenxun.services.log import logger +from zhenxun.utils.message import MessageUtils + + +@bot_manage.assign("full_function.enable") +async def enable_full_function( + session: Uninfo, + bot_id: Match[str] = AlconnaMatch("bot_id"), +): + if not bot_id.available: + await MessageUtils.build_message("bot_id 不能为空").finish() + + else: + logger.info( + f"开启 {bot_id.result} 的所有可用插件及被动", + "bot_manage.full_function.enable", + session=session, + ) + await BotConsole.enable_all(bot_id.result, "tasks") + await BotConsole.enable_all(bot_id.result, "plugins") + + await MessageUtils.build_message( + f"已开启 {bot_id.result} 的所有插件及被动" + ).finish() + + +@bot_manage.assign("full_function.disable") +async def diasble_full_function( + session: Uninfo, + bot_id: Match[str] = AlconnaMatch("bot_id"), +): + if not bot_id.available: + await MessageUtils.build_message("bot_id 不能为空").finish() + + else: + logger.info( + f"禁用 {bot_id.result} 的所有可用插件及被动", + "bot_manage.full_function.disable", + session=session, + ) + await BotConsole.disable_all(bot_id.result, "tasks") + await BotConsole.disable_all(bot_id.result, "plugins") + + await MessageUtils.build_message( + f"已禁用 {bot_id.result} 的所有插件及被动" + ).finish() diff --git a/zhenxun/builtin_plugins/superuser/bot_manage/plugin.py b/zhenxun/builtin_plugins/superuser/bot_manage/plugin.py new file mode 100644 index 00000000..df6d7f35 --- /dev/null +++ b/zhenxun/builtin_plugins/superuser/bot_manage/plugin.py @@ -0,0 +1,182 @@ +from nonebot_plugin_alconna import AlconnaMatch, Match +from nonebot_plugin_uninfo import Uninfo + +from zhenxun.builtin_plugins.superuser.bot_manage.command import bot_manage +from zhenxun.models.bot_console import BotConsole +from zhenxun.models.plugin_info import PluginInfo +from zhenxun.services.log import logger +from zhenxun.utils._build_image import BuildImage +from zhenxun.utils._image_template import ImageTemplate, RowStyle +from zhenxun.utils.enum import PluginType +from zhenxun.utils.message import MessageUtils + + +def task_row_style(column: str, text: str) -> RowStyle: + """被动技能文本风格 + + 参数: + column: 表头 + text: 文本内容 + + 返回: + RowStyle: RowStyle + """ + style = RowStyle() + if column in {"全局状态"}: + style.font_color = "#67C23A" if text == "开启" else "#F56C6C" + return style + + +@bot_manage.assign("plugin.list") +async def bot_plugin(session: Uninfo, bot_id: Match[str] = AlconnaMatch("bot_id")): + logger.info("获取全部 bot 的所有可用插件", "bot_manage.plugin", session=session) + column_name = [ + "ID", + "模块", + "名称", + "全局状态", + "禁用类型", + "加载状态", + "菜单分类", + "作者", + "版本", + "金币花费", + ] + if bot_id.available: + data_dict = { + bot_id.result: await BotConsole.get_plugins( + bot_id=bot_id.result, status=False + ) + } + else: + data_dict = await BotConsole.get_plugins(status=False) + db_plugin_list = await PluginInfo.filter( + load_status=True, plugin_type__not=PluginType.HIDDEN + ).all() + img_list = [] + for __bot_id, tk in data_dict.items(): + column_data = [ + [ + plugin.id, + plugin.module, + plugin.name, + "开启" if plugin.module not in tk else "关闭", + plugin.block_type, + "SUCCESS" if plugin.load_status else "ERROR", + plugin.menu_type, + plugin.author, + plugin.version, + plugin.cost_gold, + ] + for plugin in db_plugin_list + ] + img = await ImageTemplate.table_page( + f"{__bot_id}插件列表", + None, + column_name, + column_data, + text_style=task_row_style, + ) + img_list.append(img) + result = await BuildImage.auto_paste(img_list, 3) + await MessageUtils.build_message(result).finish() + + +@bot_manage.assign("plugin.enable") +async def enable_plugin( + session: Uninfo, + plugin_name: Match[str] = AlconnaMatch("plugin_name"), + bot_id: Match[str] = AlconnaMatch("bot_id"), +): + if plugin_name.available: + plugin: PluginInfo | None = await PluginInfo.get_plugin(name=plugin_name.result) + if not plugin: + await MessageUtils.build_message("未找到该插件...").finish() + if bot_id.available: + logger.info( + f"开启 {bot_id.result} 的插件 {plugin_name.result}", + "bot_manage.plugin.disable", + session=session, + ) + await BotConsole.enable_plugin(bot_id.result, plugin.module) + await MessageUtils.build_message( + f"已开启 {bot_id.result} 的插件 {plugin_name.result}" + ).finish() + else: + logger.info( + f"开启全部 bot 的插件: {plugin_name.result}", + "bot_manage.plugin.disable", + session=session, + ) + await BotConsole.enable_plugin(None, plugin.module) + await MessageUtils.build_message( + f"已禁用全部 bot 的插件: {plugin_name.result}" + ).finish() + elif bot_id.available: + logger.info( + f"开启 {bot_id.result} 全部插件", + "bot_manage.plugin.disable", + session=session, + ) + await BotConsole.enable_all(bot_id.result, "plugins") + await MessageUtils.build_message(f"已开启 {bot_id.result} 全部插件").finish() + else: + bot_id_list = await BotConsole.annotate().values_list("bot_id", flat=True) + for __bot_id in bot_id_list: + await BotConsole.enable_all(__bot_id, "plugins") # type: ignore + logger.info( + "开启全部 bot 全部插件", + "bot_manage.plugin.disable", + session=session, + ) + await MessageUtils.build_message("开启全部 bot 全部插件").finish() + + +@bot_manage.assign("plugin.disable") +async def disable_plugin( + session: Uninfo, + plugin_name: Match[str] = AlconnaMatch("plugin_name"), + bot_id: Match[str] = AlconnaMatch("bot_id"), +): + if plugin_name.available: + plugin = await PluginInfo.get_plugin(name=plugin_name.result) + if not plugin: + await MessageUtils.build_message("未找到该插件...").finish() + if bot_id.available: + logger.info( + f"禁用 {bot_id.result} 的插件 {plugin_name.result}", + "bot_manage.plugin.disable", + session=session, + ) + await BotConsole.disable_plugin(bot_id.result, plugin.module) + await MessageUtils.build_message( + f"已禁用 {bot_id.result} 的插件 {plugin_name.result}" + ).finish() + else: + logger.info( + f"禁用全部 bot 的插件: {plugin_name.result}", + "bot_manage.plugin.disable", + session=session, + ) + await BotConsole.disable_plugin(None, plugin.module) + await MessageUtils.build_message( + f"已禁用全部 bot 的插件: {plugin_name.result}" + ).finish() + elif bot_id.available: + logger.info( + f"禁用 {bot_id.result} 全部插件", + "bot_manage.plugin.disable", + session=session, + ) + await BotConsole.disable_all(bot_id.result, "plugins") + await MessageUtils.build_message(f"已禁用 {bot_id.result} 全部插件").finish() + else: + bot_id_list = await BotConsole.annotate().values_list("bot_id", flat=True) + for __bot_id in bot_id_list: + await BotConsole.disable_all(__bot_id, "plugins") # type: ignore + logger.info( + "禁用全部 bot 全部插件", + "bot_manage.plugin.disable", + session=session, + ) + await MessageUtils.build_message("禁用全部 bot 全部插件").finish() diff --git a/zhenxun/builtin_plugins/superuser/bot_manage/task.py b/zhenxun/builtin_plugins/superuser/bot_manage/task.py new file mode 100644 index 00000000..005ab188 --- /dev/null +++ b/zhenxun/builtin_plugins/superuser/bot_manage/task.py @@ -0,0 +1,164 @@ +from nonebot_plugin_alconna import AlconnaMatch, Match +from nonebot_plugin_uninfo import Uninfo + +from zhenxun.builtin_plugins.superuser.bot_manage.command import bot_manage +from zhenxun.models.bot_console import BotConsole +from zhenxun.models.task_info import TaskInfo +from zhenxun.services.log import logger +from zhenxun.utils._build_image import BuildImage +from zhenxun.utils._image_template import RowStyle +from zhenxun.utils.image_utils import ImageTemplate +from zhenxun.utils.message import MessageUtils + + +def task_row_style(column: str, text: str) -> RowStyle: + """被动技能文本风格 + + 参数: + column: 表头 + text: 文本内容 + + 返回: + RowStyle: RowStyle + """ + style = RowStyle() + if column in {"全局状态"}: + style.font_color = "#67C23A" if text == "开启" else "#F56C6C" + return style + + +@bot_manage.assign("task.list") +async def bot_task(session: Uninfo, bot_id: Match[str] = AlconnaMatch("bot_id")): + logger.info("获取全部 bot 的所有可用被动", "bot_manage.task", session=session) + if bot_id.available: + data_dict = { + bot_id.result: await BotConsole.get_tasks( + bot_id=bot_id.result, status=False + ) + } + else: + data_dict = await BotConsole.get_tasks(status=False) + db_task_list = await TaskInfo.all() + column_name = ["ID", "模块", "名称", "全局状态", "运行时间"] + img_list = [] + for __bot_id, tk in data_dict.items(): + column_data = [ + [ + task.id, + task.module, + task.name, + "开启" if task.module not in tk else "关闭", + task.run_time or "-", + ] + for task in db_task_list + ] + img = await ImageTemplate.table_page( + f"{__bot_id}被动技能状态", + None, + column_name, + column_data, + text_style=task_row_style, + ) + img_list.append(img) + result = await BuildImage.auto_paste(img_list, 3) + await MessageUtils.build_message(result).finish() + + +@bot_manage.assign("task.enable") +async def enable_task( + session: Uninfo, + task_name: Match[str] = AlconnaMatch("feature_name"), + bot_id: Match[str] = AlconnaMatch("bot_id"), +): + if task_name.available: + task: TaskInfo | None = await TaskInfo.get_or_none(name=task_name.result) + if not task: + await MessageUtils.build_message("未找到被动...").finish() + if bot_id.available: + logger.info( + f"开启 {bot_id.result} 被动的 {task_name.available}", + "bot_manage.task.disable", + session=session, + ) + await BotConsole.enable_task(bot_id.result, task.module) + await MessageUtils.build_message( + f"已开启 {bot_id.result} 被动的 {task_name.available}" + ).finish() + else: + logger.info( + f"开启全部 bot 的被动: {task_name.available}", + "bot_manage.task.disable", + session=session, + ) + await BotConsole.enable_task(None, task.module) + await MessageUtils.build_message( + f"已禁用全部 bot 的被动: {task_name.available}" + ).finish() + elif bot_id.available: + logger.info( + f"开启 {bot_id.result} 全部被动", + "bot_manage.task.disable", + session=session, + ) + await BotConsole.enable_all(bot_id.result, "tasks") + await MessageUtils.build_message(f"已开启 {bot_id.result} 全部被动").finish() + else: + bot_id_list = await BotConsole.annotate().values_list("bot_id", flat=True) + for __bot_id in bot_id_list: + await BotConsole.enable_all(__bot_id, "tasks") # type: ignore + logger.info( + "开启全部 bot 全部被动", + "bot_manage.task.disable", + session=session, + ) + await MessageUtils.build_message("开启全部 bot 全部被动").finish() + + +@bot_manage.assign("task.disable") +async def disable_task( + session: Uninfo, + task_name: Match[str] = AlconnaMatch("feature_name"), + bot_id: Match[str] = AlconnaMatch("bot_id"), +): + if task_name.available: + task: TaskInfo | None = await TaskInfo.get_or_none(name=task_name.result) + if not task: + await MessageUtils.build_message("未找到被动...").finish() + if bot_id.available: + logger.info( + f"禁用 {bot_id.result} 被动的 {task_name.available}", + "bot_manage.task.disable", + session=session, + ) + await BotConsole.disable_task(bot_id.result, task.module) + await MessageUtils.build_message( + f"已禁用 {bot_id.result} 被动的 {task_name.available}" + ).finish() + else: + logger.info( + f"禁用全部 bot 的被动: {task_name.available}", + "bot_manage.task.disable", + session=session, + ) + await BotConsole.disable_task(None, task.module) + await MessageUtils.build_message( + f"已禁用全部 bot 的被动: {task_name.available}" + ).finish() + elif bot_id.available: + logger.info( + f"禁用 {bot_id.result} 全部被动", + "bot_manage.task.disable", + session=session, + ) + await BotConsole.disable_all(bot_id.result, "tasks") + await MessageUtils.build_message(f"已禁用 {bot_id.result} 全部被动").finish() + else: + bot_id_list = await BotConsole.annotate().values_list("bot_id", flat=True) + for __bot_id in bot_id_list: + await BotConsole.disable_all(__bot_id, "tasks") # type: ignore + logger.info( + "禁用全部 bot 全部被动", + "bot_manage.task.disable", + session=session, + ) + await MessageUtils.build_message("禁用全部 bot 全部被动").finish() diff --git a/zhenxun/models/bot_console.py b/zhenxun/models/bot_console.py index 86fa32cc..45b69ca0 100644 --- a/zhenxun/models/bot_console.py +++ b/zhenxun/models/bot_console.py @@ -1,3 +1,5 @@ +from typing import Literal, overload + from tortoise import fields from zhenxun.services.db_context import Model @@ -10,51 +12,420 @@ class BotConsole(Model): """bot_id""" status = fields.BooleanField(default=True, description="Bot状态") """Bot状态""" - block_plugin = fields.TextField(default="", description="禁用插件") - """禁用插件""" - block_task = fields.TextField(default="", description="禁用被动技能") - """禁用被动技能""" create_time = fields.DatetimeField(auto_now_add=True, description="创建时间") """创建时间""" platform = fields.CharField(255, null=True, description="平台") """平台""" + block_plugins = fields.TextField(default="", description="禁用插件") + """禁用插件""" + block_tasks = fields.TextField(default="", description="禁用被动技能") + """禁用被动技能""" + available_plugins = fields.TextField(default="", description="可用插件") + # todo)) 计划任务 or on_startup 写入可用插件 + """可用插件""" + available_tasks = fields.TextField(default="", description="可用被动技能") + # todo)) 计划任务 or on_startup 写入可用被动技能 + """可用被动技能""" class Meta: # type: ignore table = "bot_console" table_description = "Bot数据表" + @staticmethod + def format(name: str) -> str: + return f"<{name}," + + @overload @classmethod - async def get_bot_status(cls, bot_id: str) -> bool: + async def get_bot_status(cls) -> list[tuple[str, bool]]: ... + + @overload + @classmethod + async def get_bot_status(cls, bot_id: str) -> bool: ... + + @classmethod + async def get_bot_status( + cls, bot_id: str | None = None + ) -> list[tuple[str, bool]] | bool: + """ + 获取bot状态 + + 参数: + bot_id (str, optional): bot_id. Defaults to None. + + 返回: + list[tuple[str, bool]] | bool: bot状态 + """ + if not bot_id: + return await cls.all().values_list("bot_id", "status") result = await cls.get_or_none(bot_id=bot_id) return result.status if result else False + @overload @classmethod - async def set_block_plugin(cls, bot_id: str, module: str): - bot_data, _ = await cls.get_or_create(bot_id=bot_id) - if f"<{module}," not in bot_data.block_plugin: - bot_data.block_plugin += f"<{module}," - await bot_data.save(update_fields=["block_plugin"]) + async def get_tasks(cls) -> list[tuple[str, list[str]]]: ... + + @overload + @classmethod + async def get_tasks(cls, bot_id: str) -> list[str]: ... + + @overload + @classmethod + async def get_tasks(cls, *, status: bool) -> dict[str, list[str]]: ... + + @overload + @classmethod + async def get_tasks(cls, bot_id: str, status: bool = True) -> list[str]: ... @classmethod - async def set_unblock_plugin(cls, bot_id: str, module: str): - bot_data, _ = await cls.get_or_create(bot_id=bot_id) - if f"<{module}," in bot_data.block_plugin: - bot_data.block_plugin = bot_data.block_plugin.replace(f"<{module},", "") - await bot_data.save(update_fields=["block_plugin"]) + async def get_tasks(cls, bot_id: str | None = None, status: bool | None = True): + """ + 获取bot被动技能 + + 参数: + bot_id (str | None, optional): bot_id. Defaults to None. + status (bool | None, optional): 被动状态. Defaults to True. + + 返回: + list[tuple[str, str]] | str: 被动技能 + """ + if not bot_id: + task_field: Literal["available_tasks", "block_tasks"] = ( + "available_tasks" if status else "block_tasks" + ) + data_list = await cls.all().values_list("bot_id", task_field) + return {k: cls._convert_module_format(v) for k, v in data_list} + result = await cls.get_or_none(bot_id=bot_id) + if result: + tasks = result.available_tasks if status else result.block_tasks + return cls._convert_module_format(tasks) + return [] + + @overload + @classmethod + async def get_plugins(cls) -> dict[str, list[str]]: ... + + @overload + @classmethod + async def get_plugins(cls, bot_id: str) -> list[str]: ... + + @overload + @classmethod + async def get_plugins(cls, *, status: bool) -> dict[str, list[str]]: ... + + @overload + @classmethod + async def get_plugins(cls, bot_id: str, status: bool = True) -> list[str]: ... @classmethod - async def set_block_task(cls, bot_id: str, task: str): - bot_data, _ = await cls.get_or_create(bot_id=bot_id) - if f"<{task}," not in bot_data.block_task: - bot_data.block_plugin += f"<{task}," - await bot_data.save(update_fields=["block_task"]) + async def get_plugins(cls, bot_id: str | None = None, status: bool = True): + """ + 获取bot插件 + + 参数: + bot_id (str | None, optional): bot_id. Defaults to None. + status (bool, optional): 插件状态. Defaults to True. + + 返回: + list[tuple[str, str]] | str: 插件 + """ + if not bot_id: + plugin_field = "available_plugins" if status else "block_plugins" + data_list = await cls.all().values_list("bot_id", plugin_field) + return {k: cls._convert_module_format(v) for k, v in data_list} + + result = await cls.get_or_none(bot_id=bot_id) + if result: + plugins = result.available_plugins if status else result.block_plugins + return cls._convert_module_format(plugins) + return [] @classmethod - async def is_block_plugin(cls, bot_id: str, task: str) -> bool: - bot_data, _ = await cls.get_or_create(bot_id=bot_id) - return f"<{task}," in bot_data.block_plugin + async def set_bot_status(cls, status: bool, bot_id: str | None = None) -> None: + """ + 设置bot状态 + + 参数: + status (bool): 状态 + bot_id (str, optional): bot_id. Defaults to None. + + Raises: + ValueError: 未找到 bot_id + """ + if bot_id: + affected_rows = await cls.filter(bot_id=bot_id).update(status=status) + if not affected_rows: + raise ValueError(f"未找到 bot_id: {bot_id}") + else: + await cls.all().update(status=status) + + @overload + @classmethod + def _convert_module_format(cls, data: str) -> list[str]: ... + + @overload + @classmethod + def _convert_module_format(cls, data: list[str]) -> str: ... @classmethod - async def is_block_task(cls, bot_id: str, task: str) -> bool: + def _convert_module_format(cls, data: str | list[str]) -> str | list[str]: + """ + 在 ` None: + """ + 在 from_field 和 to_field 之间移动指定的 data + + 参数: + bot_id (str): 目标 bot 的 ID + from_field (str): 源字段名称 + to_field (str): 目标字段名称 + data (str): 要插入的内容 + + Raises: + ValueError: 如果 data 不在 from_field 和 to_field 中 + """ bot_data, _ = await cls.get_or_create(bot_id=bot_id) - return f"<{task}," in bot_data.block_task + formatted_data = cls.format(data) + + from_list: str = getattr(bot_data, from_field) + to_list: str = getattr(bot_data, to_field) + + if formatted_data not in (from_list + to_list): + raise ValueError(f"{data} 不在源字段和目标字段中") + + if formatted_data in from_list: + from_list = from_list.replace(formatted_data, "", 1) + if formatted_data not in to_list: + to_list += formatted_data + + setattr(bot_data, from_field, from_list) + setattr(bot_data, to_field, to_list) + + await bot_data.save(update_fields=[from_field, to_field]) + + @classmethod + async def disable_plugin(cls, bot_id: str | None, plugin_name: str) -> None: + """ + 禁用插件 + + 参数: + bot_id (str | None): bot_id + plugin_name (str): 插件名称 + """ + if bot_id: + await cls._toggle_field( + bot_id, + "available_plugins", + "block_plugins", + plugin_name, + ) + else: + bot_list = await cls.all() + for bot in bot_list: + await cls._toggle_field( + bot.bot_id, + "available_plugins", + "block_plugins", + plugin_name, + ) + + @classmethod + async def enable_plugin(cls, bot_id: str | None, plugin_name: str) -> None: + """ + 启用插件 + + 参数: + bot_id (str | None): bot_id + plugin_name (str): 插件名称 + """ + if bot_id: + await cls._toggle_field( + bot_id, + "block_plugins", + "available_plugins", + plugin_name, + ) + else: + bot_list = await cls.all() + for bot in bot_list: + await cls._toggle_field( + bot.bot_id, + "block_plugins", + "available_plugins", + plugin_name, + ) + + @classmethod + async def disable_task(cls, bot_id: str | None, task_name: str) -> None: + """ + 禁用被动技能 + + 参数: + bot_id (str | None): bot_id + task_name (str): 被动技能名称 + """ + if bot_id: + await cls._toggle_field( + bot_id, + "available_tasks", + "block_tasks", + task_name, + ) + else: + bot_list = await cls.all() + for bot in bot_list: + await cls._toggle_field( + bot.bot_id, + "available_tasks", + "block_tasks", + task_name, + ) + + @classmethod + async def enable_task(cls, bot_id: str | None, task_name: str) -> None: + """ + 启用被动技能 + + 参数: + bot_id (str | None): bot_id + task_name (str): 被动技能名称 + """ + if bot_id: + await cls._toggle_field( + bot_id, + "block_tasks", + "available_tasks", + task_name, + ) + else: + bot_list = await cls.all() + for bot in bot_list: + await cls._toggle_field( + bot.bot_id, + "block_tasks", + "available_tasks", + task_name, + ) + + @classmethod + async def disable_all( + cls, + bot_id: str, + feat: Literal["plugins", "tasks"], + ) -> None: + """ + 禁用全部插件或被动技能 + + 参数: + bot_id (str): bot_id + feat (Literal["plugins", "tasks"]): 插件或被动技能 + """ + bot_data, _ = await cls.get_or_create(bot_id=bot_id) + if feat == "plugins": + available_plugins = cls._convert_module_format(bot_data.available_plugins) + block_plugins = cls._convert_module_format(bot_data.block_plugins) + bot_data.block_plugins = cls._convert_module_format( + available_plugins + block_plugins + ) + bot_data.available_plugins = "" + elif feat == "tasks": + available_tasks = cls._convert_module_format(bot_data.available_tasks) + block_tasks = cls._convert_module_format(bot_data.block_tasks) + bot_data.block_tasks = cls._convert_module_format( + available_tasks + block_tasks + ) + bot_data.available_tasks = "" + await bot_data.save( + update_fields=[ + "available_tasks", + "block_tasks", + "available_plugins", + "block_plugins", + ] + ) + + @classmethod + async def enable_all( + cls, + bot_id: str, + feat: Literal["plugins", "tasks"], + ) -> None: + """ + 启用全部插件或被动技能 + + 参数: + bot_id (str): bot_id + feat (Literal["plugins", "tasks"]): 插件或被动技能 + """ + bot_data, _ = await cls.get_or_create(bot_id=bot_id) + if feat == "plugins": + available_plugins = cls._convert_module_format(bot_data.available_plugins) + block_plugins = cls._convert_module_format(bot_data.block_plugins) + bot_data.available_plugins = cls._convert_module_format( + available_plugins + block_plugins + ) + bot_data.block_plugins = "" + elif feat == "tasks": + available_tasks = cls._convert_module_format(bot_data.available_tasks) + block_tasks = cls._convert_module_format(bot_data.block_tasks) + bot_data.available_tasks = cls._convert_module_format( + available_tasks + block_tasks + ) + bot_data.block_tasks = "" + await bot_data.save( + update_fields=[ + "available_tasks", + "block_tasks", + "available_plugins", + "block_plugins", + ] + ) + + @classmethod + async def is_block_plugin(cls, bot_id: str, plugin_name: str) -> bool: + """ + 检查插件是否被禁用 + + 参数: + bot_id (str): bot_id + plugin_name (str): 插件某款 + + 返回: + bool: 是否被禁用 + """ + bot_data, _ = await cls.get_or_create(bot_id=bot_id) + return cls.format(plugin_name) in bot_data.block_plugins + + @classmethod + async def is_block_task(cls, bot_id: str, task_name: str) -> bool: + """ + 检查被动技能是否被禁用 + + 参数: + bot_id (str): bot_id + task_name (str): 被动技能名称 + + 返回: + bool: 是否被禁用 + """ + bot_data, _ = await cls.get_or_create(bot_id=bot_id) + return cls.format(task_name) in bot_data.block_tasks diff --git a/zhenxun/utils/common_utils.py b/zhenxun/utils/common_utils.py index b69069ba..76445649 100644 --- a/zhenxun/utils/common_utils.py +++ b/zhenxun/utils/common_utils.py @@ -3,6 +3,7 @@ from nonebot_plugin_uninfo import Session, SupportScope, Uninfo, get_interface from zhenxun.configs.config import BotConfig from zhenxun.models.ban_console import BanConsole +from zhenxun.models.bot_console import BotConsole from zhenxun.models.group_console import GroupConsole from zhenxun.models.task_info import TaskInfo from zhenxun.services.log import logger @@ -39,6 +40,13 @@ class CommonUtils: """被动全局状态""" if not task.status: return True + if not await BotConsole.get_bot_status(session.self_id): + """bot是否休眠""" + return True + block_tasks = await BotConsole.get_tasks(session.self_id, False) + if module in block_tasks: + """bot是否禁用被动""" + return True if group_id: if await GroupConsole.is_block_task(group_id, module): """群组是否禁用被动"""