diff --git a/zhenxun/builtin_plugins/hooks/_auth_checker.py b/zhenxun/builtin_plugins/hooks/_auth_checker.py index 0f50a1b4..57f31ae8 100644 --- a/zhenxun/builtin_plugins/hooks/_auth_checker.py +++ b/zhenxun/builtin_plugins/hooks/_auth_checker.py @@ -290,8 +290,15 @@ class AuthChecker: plugin: PluginInfo bot_id: bot_id """ - if await BotConsole.is_block_plugin(plugin.module, bot_id): - raise IgnoredException("机器人权限检测 ignore") + if not await BotConsole.get_bot_status(bot_id): + logger.debug("Bot休眠中阻断权限检测...", "AuthChecker") + raise IgnoredException("BotConsole休眠权限检测 ignore") + if await BotConsole.is_block_plugin(bot_id, plugin.module): + logger.debug( + f"Bot插件 {plugin.name}({plugin.module}) 权限检查结果为关闭...", + "AuthChecker", + ) + raise IgnoredException("BotConsole插件权限检测 ignore") async def auth_limit(self, plugin: PluginInfo, session: EventSession): """插件限制 diff --git a/zhenxun/builtin_plugins/superuser/bot_manage/__init__.py b/zhenxun/builtin_plugins/superuser/bot_manage/__init__.py index 70f14b9e..49cf076c 100644 --- a/zhenxun/builtin_plugins/superuser/bot_manage/__init__.py +++ b/zhenxun/builtin_plugins/superuser/bot_manage/__init__.py @@ -4,6 +4,7 @@ import nonebot from nonebot.adapters import Bot from nonebot.plugin import PluginMetadata +from zhenxun.services.log import logger from zhenxun.utils.enum import PluginType from zhenxun.models.task_info import TaskInfo from zhenxun.utils.platform import PlatformUtils @@ -62,3 +63,4 @@ async def _(bot: Bot): bot_data.available_plugins = BotConsole._convert_module_format(available_plugins) bot_data.available_tasks = BotConsole._convert_module_format(available_tasks) await bot_data.save(update_fields=["available_plugins", "available_tasks"]) + logger.info("初始化Bot管理完成...") diff --git a/zhenxun/builtin_plugins/superuser/bot_manage/command.py b/zhenxun/builtin_plugins/superuser/bot_manage/command.py index 0183fe62..0d497676 100644 --- a/zhenxun/builtin_plugins/superuser/bot_manage/command.py +++ b/zhenxun/builtin_plugins/superuser/bot_manage/command.py @@ -1,5 +1,5 @@ from nonebot.permission import SUPERUSER -from arclet.alconna.action import store_true +from arclet.alconna.action import store_false from nonebot_plugin_alconna import on_alconna from arclet.alconna import Args, Option, Alconna, Subcommand @@ -10,77 +10,34 @@ bot_manage = on_alconna( "task", Option( "list", - action=store_true, - default=False, + action=store_false, help_text="查看 bot_id 下的所有可用被动", ), + Option("-b|--bot", Args["bot_id", str], help_text="指定 bot_id"), Subcommand( "enable", - Option( - "-a|--all", - action=store_true, - default=False, - help_text="可选开启全部", - ), Args["feature_name?", str], - Args["bot_id?", str], ), Subcommand( "disable", - Option( - "-a|--all", - action=store_true, - default=False, - help_text="可选关闭全部", - ), Args["feature_name?", str], - Args["bot_id?", str], ), ), Subcommand( "plugin", Option( "list", - action=store_true, - default=False, + action=store_false, help_text="查看 bot_id 下的所有可用插件", ), + Option("-b|--bot", Args["bot_id", str], help_text="指定 bot_id"), Subcommand( "enable", - Option( - "-a|--all", - action=store_true, - default=False, - help_text="可选开启全部", - ), Args["plugin_name?", str], - Args["bot_id?", str], ), Subcommand( "disable", - Option( - "-a|--all", - action=store_true, - default=False, - help_text="可选关闭全部", - ), Args["plugin_name?", str], - Args["bot_id?", str], - ), - ), - Subcommand( - "status", - Subcommand( - "tasks", - Args["bot_id", str], - ), - Subcommand( - "plugins", - Args["bot_id", str], - ), - Subcommand( - "bots", - Args["bot_id", str], ), ), Subcommand( @@ -110,3 +67,87 @@ bot_manage = on_alconna( priority=5, block=True, ) + +bot_manage.shortcut( + r"bot被动状态", + command="bot_manage", + arguments=["task", "list"], + prefix=True, +) + +bot_manage.shortcut( + r"bot开启被动\s*(?P.+)", + command="bot_manage", + arguments=["task", "enable", "{name}"], + prefix=True, +) + +bot_manage.shortcut( + r"bot关闭被动\s*(?P.+)", + command="bot_manage", + arguments=["task", "disable", "{name}"], + prefix=True, +) + +bot_manage.shortcut( + r"bot开启(全部|所有)被动", + command="bot_manage", + arguments=["task", "enable"], + prefix=True, +) + +bot_manage.shortcut( + r"bot关闭(全部|所有)被动", + command="bot_manage", + arguments=["task", "disable"], + prefix=True, +) + +bot_manage.shortcut( + r"bot插件列表", + command="bot_manage", + arguments=["plugin", "list"], + prefix=True, +) + +bot_manage.shortcut( + r"bot开启(全部|所有)插件", + command="bot_manage", + arguments=["plugin", "enable"], + prefix=True, +) + +bot_manage.shortcut( + r"bot关闭(全部|所有)插件", + command="bot_manage", + arguments=["plugin", "disable"], + prefix=True, +) + +bot_manage.shortcut( + r"bot开启\s*(?P.+)", + command="bot_manage", + arguments=["plugin", "enable", "{name}"], + prefix=True, +) + +bot_manage.shortcut( + r"bot关闭\s*(?P.+)", + command="bot_manage", + arguments=["plugin", "disable", "{name}"], + prefix=True, +) + +bot_manage.shortcut( + r"bot休眠\s*(?P.+)?", + command="bot_manage", + arguments=["bot_switch", "disable", "{bot_id}"], + prefix=True, +) + +bot_manage.shortcut( + r"bot醒来\s*(?P.+)?", + command="bot_manage", + arguments=["bot_switch", "enable", "{bot_id}"], + prefix=True, +) diff --git a/zhenxun/builtin_plugins/superuser/bot_manage/plugin.py b/zhenxun/builtin_plugins/superuser/bot_manage/plugin.py index 0e5a1d7b..9a4d1ef8 100644 --- a/zhenxun/builtin_plugins/superuser/bot_manage/plugin.py +++ b/zhenxun/builtin_plugins/superuser/bot_manage/plugin.py @@ -1,83 +1,182 @@ from nonebot_plugin_uninfo import Uninfo -from nonebot_plugin_alconna import Match, Query, AlconnaMatch, AlconnaQuery +from nonebot_plugin_alconna import Match, AlconnaMatch from zhenxun.services.log import logger +from zhenxun.utils.enum import PluginType from zhenxun.utils.message import MessageUtils from zhenxun.models.bot_console import BotConsole +from zhenxun.models.plugin_info import PluginInfo +from zhenxun.utils._build_image import BuildImage +from zhenxun.utils._image_template import RowStyle, ImageTemplate from zhenxun.builtin_plugins.superuser.bot_manage.command import bot_manage -@bot_manage.assign("plugin") -async def bot_plugin( - session: Uninfo, - plugin_list: Query[bool] = AlconnaQuery("plugin.list.value", default=False), -): - if plugin_list: - logger.info("获取全部 bot 的所有可用插件", "bot_manage.plugin", session=session) - data = await BotConsole.get_plugins() - for bot in data: - await MessageUtils.build_message(f"{bot[0]} : {bot[1]}").finish() +def task_row_style(column: str, text: str) -> RowStyle: + """被动技能文本风格 + + 参数: + column: 表头 + text: 文本内容 + + 返回: + RowStyle: RowStyle + """ + style = RowStyle() + if column in {"全局状态"}: + style.font_color = "#67C23A" if text == "开启" else "#F56C6C" + return style + + +@bot_manage.assign("plugin.list") +async def bot_plugin(session: Uninfo, bot_id: Match[str] = AlconnaMatch("bot_id")): + logger.info("获取全部 bot 的所有可用插件", "bot_manage.plugin", session=session) + column_name = [ + "ID", + "模块", + "名称", + "全局状态", + "禁用类型", + "加载状态", + "菜单分类", + "作者", + "版本", + "金币花费", + ] + if bot_id.available: + data_dict = { + bot_id.result: await BotConsole.get_plugins( + bot_id=bot_id.result, status=False + ) + } + else: + data_dict = await BotConsole.get_plugins(status=False) + db_plugin_list = await PluginInfo.filter( + load_status=True, plugin_type__not=PluginType.HIDDEN + ).all() + img_list = [] + for __bot_id, tk in data_dict.items(): + column_data = [ + [ + plugin.id, + plugin.module, + plugin.name, + "开启" if plugin.module not in tk else "关闭", + plugin.block_type, + "SUCCESS" if plugin.load_status else "ERROR", + plugin.menu_type, + plugin.author, + plugin.version, + plugin.cost_gold, + ] + for plugin in db_plugin_list + ] + img = await ImageTemplate.table_page( + f"{__bot_id}插件列表", + None, + column_name, + column_data, + text_style=task_row_style, + ) + img_list.append(img) + result = await BuildImage.auto_paste(img_list, 3) + await MessageUtils.build_message(result).finish() @bot_manage.assign("plugin.enable") async def enable_plugin( session: Uninfo, - all_flag: Query[bool] = AlconnaQuery("plugin.enable.all.value", default=False), plugin_name: Match[str] = AlconnaMatch("plugin_name"), bot_id: Match[str] = AlconnaMatch("bot_id"), ): - if all_flag and plugin_name.available: - await logger.info( - f"启用全部 bot 的 {plugin_name.result} ", - "bot_manage.plugin.enable", - session=session, - ) - await BotConsole.enable_plugin(None, plugin_name.result) - await MessageUtils.build_message( - f"已启用全部 bot 的 {plugin_name.result} " - ).finish() - - if bot_id.available and plugin_name.available: + if plugin_name.available: + plugin: PluginInfo | None = await PluginInfo.get_plugin(name=plugin_name.result) + if not plugin: + await MessageUtils.build_message("未找到该插件...").finish() + if bot_id.available: + logger.info( + f"开启 {bot_id.result} 的插件 {plugin_name.result}", + "bot_manage.plugin.disable", + session=session, + ) + await BotConsole.enable_plugin(bot_id.result, plugin.module) + await MessageUtils.build_message( + f"已开启 {bot_id.result} 的插件 {plugin_name.result}" + ).finish() + else: + logger.info( + f"开启全部 bot 的插件: {plugin_name.result}", + "bot_manage.plugin.disable", + session=session, + ) + await BotConsole.enable_plugin(None, plugin.module) + await MessageUtils.build_message( + f"已禁用全部 bot 的插件: {plugin_name.result}" + ).finish() + elif bot_id.available: logger.info( - f"启用 {bot_id.result} 的 {plugin_name.result}", - "bot_manage.plugin.enable", + f"开启 {bot_id.result} 全部插件", + "bot_manage.plugin.disable", session=session, ) - await BotConsole.enable_plugin(bot_id.result, plugin_name.result) - await MessageUtils.build_message( - f"已启用 {bot_id.result} 的 {plugin_name.result}" - ).finish() - - await MessageUtils.build_message("缺失参数").finish() + await BotConsole.enable_all(bot_id.result, "plugins") + await MessageUtils.build_message(f"已开启 {bot_id.result} 全部插件").finish() + else: + bot_id_list = await BotConsole.annotate().values_list("bot_id", flat=True) + for __bot_id in bot_id_list: + await BotConsole.enable_all(__bot_id, "plugins") # type: ignore + logger.info( + "开启全部 bot 全部插件", + "bot_manage.plugin.disable", + session=session, + ) + await MessageUtils.build_message("开启全部 bot 全部插件").finish() @bot_manage.assign("plugin.disable") async def disable_plugin( session: Uninfo, - all_flag: Query[bool] = AlconnaQuery("plugin.disable.all.value", default=False), plugin_name: Match[str] = AlconnaMatch("plugin_name"), bot_id: Match[str] = AlconnaMatch("bot_id"), ): - if all_flag and plugin_name.available: - await logger.info( - f"禁用全部 bot 的 {plugin_name.result} ", - "bot_manage.plugin.disable", - session=session, - ) - await BotConsole.disable_plugin(None, plugin_name.result) - await MessageUtils.build_message( - f"已禁用全部 bot 的 {plugin_name.result} " - ).finish() - - if bot_id.available and plugin_name.available: + if plugin_name.available: + plugin = await PluginInfo.get_plugin(name=plugin_name.result) + if not plugin: + await MessageUtils.build_message("未找到该插件...").finish() + if bot_id.available: + logger.info( + f"禁用 {bot_id.result} 的插件 {plugin_name.result}", + "bot_manage.plugin.disable", + session=session, + ) + await BotConsole.disable_plugin(bot_id.result, plugin.module) + await MessageUtils.build_message( + f"已禁用 {bot_id.result} 的插件 {plugin_name.result}" + ).finish() + else: + logger.info( + f"禁用全部 bot 的插件: {plugin_name.result}", + "bot_manage.plugin.disable", + session=session, + ) + await BotConsole.disable_plugin(None, plugin.module) + await MessageUtils.build_message( + f"已禁用全部 bot 的插件: {plugin_name.result}" + ).finish() + elif bot_id.available: logger.info( - f"禁用 {bot_id.result} 的 {plugin_name.result}", + f"禁用 {bot_id.result} 全部插件", "bot_manage.plugin.disable", session=session, ) - await BotConsole.disable_plugin(bot_id.result, plugin_name.result) - await MessageUtils.build_message( - f"已禁用 {bot_id.result} 的 {plugin_name.result}" - ).finish() - - await MessageUtils.build_message("缺失参数").finish() + await BotConsole.disable_all(bot_id.result, "plugins") + await MessageUtils.build_message(f"已禁用 {bot_id.result} 全部插件").finish() + else: + bot_id_list = await BotConsole.annotate().values_list("bot_id", flat=True) + for __bot_id in bot_id_list: + await BotConsole.disable_all(__bot_id, "plugins") # type: ignore + logger.info( + "禁用全部 bot 全部插件", + "bot_manage.plugin.disable", + session=session, + ) + await MessageUtils.build_message("禁用全部 bot 全部插件").finish() diff --git a/zhenxun/builtin_plugins/superuser/bot_manage/status.py b/zhenxun/builtin_plugins/superuser/bot_manage/status.py deleted file mode 100644 index f50b1f3e..00000000 --- a/zhenxun/builtin_plugins/superuser/bot_manage/status.py +++ /dev/null @@ -1,37 +0,0 @@ -import asyncio - -from nonebot_plugin_alconna import Match, AlconnaMatch - -from zhenxun.utils.message import MessageUtils -from zhenxun.models.bot_console import BotConsole -from zhenxun.builtin_plugins.superuser.bot_manage.command import bot_manage - - -@bot_manage.assign("status.tasks") -async def handle_tasks_status(bot_id: Match[str] = AlconnaMatch("bot_id")): - if not bot_id.available: - await MessageUtils.build_message("bot_id 不能为空").finish() - - result = await asyncio.gather( - BotConsole.get_tasks(bot_id.result), - BotConsole.get_tasks(bot_id.result, False), - ) - - await MessageUtils.build_message( - f"可用被动: {result[0]}\n禁用被动: {result[1]}" - ).finish() - - -@bot_manage.assign("status.plugins") -async def handle_plugins_status(bot_id: Match[str] = AlconnaMatch("bot_id")): - if not bot_id.available: - await MessageUtils.build_message("bot_id 不能为空").finish() - - result = await asyncio.gather( - BotConsole.get_plugins(bot_id.result), - BotConsole.get_plugins(bot_id.result, False), - ) - - await MessageUtils.build_message( - f"可用插件: {result[0]}\n禁用插件: {result[1]}" - ).finish() diff --git a/zhenxun/builtin_plugins/superuser/bot_manage/task.py b/zhenxun/builtin_plugins/superuser/bot_manage/task.py index 624bf180..ea35c3e7 100644 --- a/zhenxun/builtin_plugins/superuser/bot_manage/task.py +++ b/zhenxun/builtin_plugins/superuser/bot_manage/task.py @@ -1,75 +1,164 @@ from nonebot_plugin_uninfo import Uninfo -from nonebot_plugin_alconna import Match, Query, AlconnaMatch, AlconnaQuery +from nonebot_plugin_alconna import Match, AlconnaMatch from zhenxun.services.log import logger +from zhenxun.models.task_info import TaskInfo from zhenxun.utils.message import MessageUtils from zhenxun.models.bot_console import BotConsole +from zhenxun.utils._build_image import BuildImage +from zhenxun.utils._image_template import RowStyle +from zhenxun.utils.image_utils import ImageTemplate from zhenxun.builtin_plugins.superuser.bot_manage.command import bot_manage -@bot_manage.assign("task") -async def bot_task( - session: Uninfo, - task_list: Query[bool] = AlconnaQuery("task.list.value", default=False), -): - if task_list: - logger.info("获取全部 bot 的所有可用被动", "bot_manage.task", session=session) - data = await BotConsole.get_tasks() - for bot in data: - await MessageUtils.build_message(f"{bot[0]} : {bot[1]}").finish() +def task_row_style(column: str, text: str) -> RowStyle: + """被动技能文本风格 + + 参数: + column: 表头 + text: 文本内容 + + 返回: + RowStyle: RowStyle + """ + style = RowStyle() + if column in {"全局状态"}: + style.font_color = "#67C23A" if text == "开启" else "#F56C6C" + return style + + +@bot_manage.assign("task.list") +async def bot_task(session: Uninfo, bot_id: Match[str] = AlconnaMatch("bot_id")): + logger.info("获取全部 bot 的所有可用被动", "bot_manage.task", session=session) + if bot_id.available: + data_dict = { + bot_id.result: await BotConsole.get_tasks( + bot_id=bot_id.result, status=False + ) + } + else: + data_dict = await BotConsole.get_tasks(status=False) + db_task_list = await TaskInfo.all() + column_name = ["ID", "模块", "名称", "全局状态", "运行时间"] + img_list = [] + for __bot_id, tk in data_dict.items(): + column_data = [ + [ + task.id, + task.module, + task.name, + "开启" if task.module not in tk else "关闭", + task.run_time or "-", + ] + for task in db_task_list + ] + img = await ImageTemplate.table_page( + f"{__bot_id}被动技能状态", + None, + column_name, + column_data, + text_style=task_row_style, + ) + img_list.append(img) + result = await BuildImage.auto_paste(img_list, 3) + await MessageUtils.build_message(result).finish() @bot_manage.assign("task.enable") async def enable_task( session: Uninfo, - all_flag: Query[bool] = AlconnaQuery("task.enable.all.value", default=False), - task_name: Match[str] = AlconnaMatch("plugin_name"), + task_name: Match[str] = AlconnaMatch("feature_name"), bot_id: Match[str] = AlconnaMatch("bot_id"), ): - if all_flag and task_name.available: - await logger.info( - "启用全部 bot 的所有可用被动", "bot_manage.task.enable", session=session - ) - await BotConsole.enable_task(None, task_name.result) - await MessageUtils.build_message("已启用全部 bot 的所有可用被动").finish() - - if bot_id.available and task_name.available: - await logger.info( - f"启用 {bot_id.result} 的 {task_name.result}", - "bot_manage.task.enable", + if task_name.available: + task: TaskInfo | None = await TaskInfo.get_or_none(name=task_name.result) + if not task: + await MessageUtils.build_message("未找到被动...").finish() + if bot_id.available: + logger.info( + f"开启 {bot_id.result} 被动的 {task_name.available}", + "bot_manage.task.disable", + session=session, + ) + await BotConsole.enable_task(bot_id.result, task.module) + await MessageUtils.build_message( + f"已开启 {bot_id.result} 被动的 {task_name.available}" + ).finish() + else: + logger.info( + f"开启全部 bot 的被动: {task_name.available}", + "bot_manage.task.disable", + session=session, + ) + await BotConsole.enable_task(None, task.module) + await MessageUtils.build_message( + f"已禁用全部 bot 的被动: {task_name.available}" + ).finish() + elif bot_id.available: + logger.info( + f"开启 {bot_id.result} 全部被动", + "bot_manage.task.disable", session=session, ) - await BotConsole.enable_task(bot_id.result, task_name.result) - await MessageUtils.build_message( - f"已启用 {bot_id.result} 的 {task_name.result}" - ).finish() - - await MessageUtils.build_message("缺失参数").finish() + await BotConsole.enable_all(bot_id.result, "tasks") + await MessageUtils.build_message(f"已开启 {bot_id.result} 全部被动").finish() + else: + bot_id_list = await BotConsole.annotate().values_list("bot_id", flat=True) + for __bot_id in bot_id_list: + await BotConsole.enable_all(__bot_id, "tasks") # type: ignore + logger.info( + "开启全部 bot 全部被动", + "bot_manage.task.disable", + session=session, + ) + await MessageUtils.build_message("开启全部 bot 全部被动").finish() @bot_manage.assign("task.disable") async def disable_task( session: Uninfo, - all_flag: Query[bool] = AlconnaQuery("task.disable.all.value", default=False), - task_name: Match[str] = AlconnaMatch("plugin_name"), + task_name: Match[str] = AlconnaMatch("feature_name"), bot_id: Match[str] = AlconnaMatch("bot_id"), ): - if all_flag and task_name.available: - await logger.info( - "禁用全部 bot 的所有可用被动", "bot_manage.task.disable", session=session - ) - await BotConsole.disable_task(None, task_name.result) - await MessageUtils.build_message("已禁用全部 bot 的所有可用被动").finish() - - if bot_id.available and task_name.available: + if task_name.available: + task: TaskInfo | None = await TaskInfo.get_or_none(name=task_name.result) + if not task: + await MessageUtils.build_message("未找到被动...").finish() + if bot_id.available: + logger.info( + f"禁用 {bot_id.result} 被动的 {task_name.available}", + "bot_manage.task.disable", + session=session, + ) + await BotConsole.disable_task(bot_id.result, task.module) + await MessageUtils.build_message( + f"已禁用 {bot_id.result} 被动的 {task_name.available}" + ).finish() + else: + logger.info( + f"禁用全部 bot 的被动: {task_name.available}", + "bot_manage.task.disable", + session=session, + ) + await BotConsole.disable_task(None, task.module) + await MessageUtils.build_message( + f"已禁用全部 bot 的被动: {task_name.available}" + ).finish() + elif bot_id.available: logger.info( - f"禁用 {bot_id.result} 的 {task_name.result}", + f"禁用 {bot_id.result} 全部被动", "bot_manage.task.disable", session=session, ) - await BotConsole.disable_task(bot_id.result, task_name.result) - await MessageUtils.build_message( - f"已禁用 {bot_id.result} 的 {task_name.result}" - ).finish() - - await MessageUtils.build_message("缺失参数").finish() + await BotConsole.disable_all(bot_id.result, "tasks") + await MessageUtils.build_message(f"已禁用 {bot_id.result} 全部被动").finish() + else: + bot_id_list = await BotConsole.annotate().values_list("bot_id", flat=True) + for __bot_id in bot_id_list: + await BotConsole.disable_all(__bot_id, "tasks") # type: ignore + logger.info( + "禁用全部 bot 全部被动", + "bot_manage.task.disable", + session=session, + ) + await MessageUtils.build_message("禁用全部 bot 全部被动").finish() diff --git a/zhenxun/models/bot_console.py b/zhenxun/models/bot_console.py index e94c2523..45b69ca0 100644 --- a/zhenxun/models/bot_console.py +++ b/zhenxun/models/bot_console.py @@ -63,19 +63,19 @@ class BotConsole(Model): @overload @classmethod - async def get_tasks(cls) -> list[tuple[str, str]]: ... + async def get_tasks(cls) -> list[tuple[str, list[str]]]: ... @overload @classmethod - async def get_tasks(cls, bot_id: str) -> str: ... + async def get_tasks(cls, bot_id: str) -> list[str]: ... @overload @classmethod - async def get_tasks(cls, *, status: bool) -> list[tuple[str, str]]: ... + async def get_tasks(cls, *, status: bool) -> dict[str, list[str]]: ... @overload @classmethod - async def get_tasks(cls, bot_id: str, status: bool = True) -> str: ... + async def get_tasks(cls, bot_id: str, status: bool = True) -> list[str]: ... @classmethod async def get_tasks(cls, bot_id: str | None = None, status: bool | None = True): @@ -90,29 +90,32 @@ class BotConsole(Model): list[tuple[str, str]] | str: 被动技能 """ if not bot_id: - task_field = "available_tasks" if status else "block_tasks" - return await cls.all().values_list("bot_id", task_field) - + task_field: Literal["available_tasks", "block_tasks"] = ( + "available_tasks" if status else "block_tasks" + ) + data_list = await cls.all().values_list("bot_id", task_field) + return {k: cls._convert_module_format(v) for k, v in data_list} result = await cls.get_or_none(bot_id=bot_id) if result: - return result.available_tasks if status else result.block_tasks - return "" + tasks = result.available_tasks if status else result.block_tasks + return cls._convert_module_format(tasks) + return [] @overload @classmethod - async def get_plugins(cls) -> list[tuple[str, str]]: ... + async def get_plugins(cls) -> dict[str, list[str]]: ... @overload @classmethod - async def get_plugins(cls, bot_id: str) -> str: ... + async def get_plugins(cls, bot_id: str) -> list[str]: ... @overload @classmethod - async def get_plugins(cls, *, status: bool) -> list[tuple[str, str]]: ... + async def get_plugins(cls, *, status: bool) -> dict[str, list[str]]: ... @overload @classmethod - async def get_plugins(cls, bot_id: str, status: bool = True) -> str: ... + async def get_plugins(cls, bot_id: str, status: bool = True) -> list[str]: ... @classmethod async def get_plugins(cls, bot_id: str | None = None, status: bool = True): @@ -128,12 +131,14 @@ class BotConsole(Model): """ if not bot_id: plugin_field = "available_plugins" if status else "block_plugins" - return await cls.all().values_list("bot_id", plugin_field) + data_list = await cls.all().values_list("bot_id", plugin_field) + return {k: cls._convert_module_format(v) for k, v in data_list} result = await cls.get_or_none(bot_id=bot_id) if result: - return result.available_plugins if status else result.block_plugins - return "" + plugins = result.available_plugins if status else result.block_plugins + return cls._convert_module_format(plugins) + return [] @classmethod async def set_bot_status(cls, status: bool, bot_id: str | None = None) -> None: @@ -336,12 +341,27 @@ class BotConsole(Model): """ bot_data, _ = await cls.get_or_create(bot_id=bot_id) if feat == "plugins": + available_plugins = cls._convert_module_format(bot_data.available_plugins) + block_plugins = cls._convert_module_format(bot_data.block_plugins) + bot_data.block_plugins = cls._convert_module_format( + available_plugins + block_plugins + ) bot_data.available_plugins = "" - # todo)) 使用初始化方法重新写入bot_data.block_plugins以保证插件列表完整 elif feat == "tasks": + available_tasks = cls._convert_module_format(bot_data.available_tasks) + block_tasks = cls._convert_module_format(bot_data.block_tasks) + bot_data.block_tasks = cls._convert_module_format( + available_tasks + block_tasks + ) bot_data.available_tasks = "" - # todo)) 使用初始化方法重新写入bot_data.block_tasks以保证插件列表完整 - await bot_data.save() + await bot_data.save( + update_fields=[ + "available_tasks", + "block_tasks", + "available_plugins", + "block_plugins", + ] + ) @classmethod async def enable_all( @@ -358,12 +378,27 @@ class BotConsole(Model): """ bot_data, _ = await cls.get_or_create(bot_id=bot_id) if feat == "plugins": + available_plugins = cls._convert_module_format(bot_data.available_plugins) + block_plugins = cls._convert_module_format(bot_data.block_plugins) + bot_data.available_plugins = cls._convert_module_format( + available_plugins + block_plugins + ) bot_data.block_plugins = "" - # todo)) 使用初始化方法重新写入bot_data.available_plugins以保证插件列表完整 elif feat == "tasks": + available_tasks = cls._convert_module_format(bot_data.available_tasks) + block_tasks = cls._convert_module_format(bot_data.block_tasks) + bot_data.available_tasks = cls._convert_module_format( + available_tasks + block_tasks + ) bot_data.block_tasks = "" - # todo)) 使用初始化方法重新写入bot_data.available_tasks以保证插件列表完整 - await bot_data.save() + await bot_data.save( + update_fields=[ + "available_tasks", + "block_tasks", + "available_plugins", + "block_plugins", + ] + ) @classmethod async def is_block_plugin(cls, bot_id: str, plugin_name: str) -> bool: @@ -372,7 +407,7 @@ class BotConsole(Model): 参数: bot_id (str): bot_id - plugin_name (str): 插件名称 + plugin_name (str): 插件某款 返回: bool: 是否被禁用 diff --git a/zhenxun/utils/common_utils.py b/zhenxun/utils/common_utils.py index 4c36930d..2fa3eaa6 100644 --- a/zhenxun/utils/common_utils.py +++ b/zhenxun/utils/common_utils.py @@ -5,6 +5,7 @@ from zhenxun.services.log import logger from zhenxun.configs.config import BotConfig from zhenxun.models.task_info import TaskInfo from zhenxun.models.ban_console import BanConsole +from zhenxun.models.bot_console import BotConsole from zhenxun.models.group_console import GroupConsole @@ -39,6 +40,13 @@ class CommonUtils: """被动全局状态""" if not task.status: return True + if not await BotConsole.get_bot_status(session.self_id): + """bot是否休眠""" + return True + block_tasks = await BotConsole.get_tasks(session.self_id, False) + if module in block_tasks: + """bot是否禁用被动""" + return True if group_id: if await GroupConsole.is_block_task(group_id, module): """群组是否禁用被动"""