添加Bot管理插件 (#1758)

Co-authored-by: BalconyJH <73932916+BalconyJH@users.noreply.github.com>
This commit is contained in:
HibiKier 2024-12-10 20:16:14 +08:00 committed by GitHub
parent bc5a9c4fcc
commit 5590445679
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 1068 additions and 33 deletions

5
.gitignore vendored
View File

@ -113,6 +113,7 @@ venv/
ENV/
env.bak/
venv.bak/
.env.dev
# Spyder project settings
.spyderproject
@ -151,9 +152,7 @@ backup/
extensive_plugin/
test/
bot.py
data/
.env
.env.dev
.idea/
/resources/text/
# /resources/image/
/resources/temp/

View File

@ -307,8 +307,15 @@ class AuthChecker:
plugin: PluginInfo
bot_id: bot_id
"""
if await BotConsole.is_block_plugin(plugin.module, bot_id):
raise IgnoredException("机器人权限检测 ignore")
if not await BotConsole.get_bot_status(bot_id):
logger.debug("Bot休眠中阻断权限检测...", "AuthChecker")
raise IgnoredException("BotConsole休眠权限检测 ignore")
if await BotConsole.is_block_plugin(bot_id, plugin.module):
logger.debug(
f"Bot插件 {plugin.name}({plugin.module}) 权限检查结果为关闭...",
"AuthChecker",
)
raise IgnoredException("BotConsole插件权限检测 ignore")
async def auth_limit(self, plugin: PluginInfo, session: EventSession):
"""插件限制

View File

@ -1,17 +1,66 @@
from pathlib import Path
import nonebot
from nonebot.adapters import Bot
from nonebot.plugin import PluginMetadata
from zhenxun.configs.utils import PluginExtraData
from zhenxun.models.bot_console import BotConsole
from zhenxun.models.plugin_info import PluginInfo
from zhenxun.models.task_info import TaskInfo
from zhenxun.services.log import logger
from zhenxun.utils.enum import PluginType
from zhenxun.utils.platform import PlatformUtils
driver = nonebot.get_driver()
_sub_plugins = set()
_sub_plugins |= nonebot.load_plugins(str(Path(__file__).parent.resolve()))
__plugin_meta__ = PluginMetadata(
name="Bot管理",
description="指定bot对象的功能/被动开关和状态",
usage="""
清理临时数据
""".strip(),
extra=PluginExtraData(
author="",
version="0.1",
plugin_type=PluginType.SUPERUSER,
plugin_type=PluginType.PARENT,
).dict(),
)
@driver.on_bot_connect
async def _(bot: Bot):
"""初始化Bot管理
参数:
bot: Bot
"""
plugin_list = await PluginInfo.get_plugins(
plugin_type__in=[PluginType.NORMAL, PluginType.DEPENDANT, PluginType.ADMIN]
)
available_tasks: list[str] = await TaskInfo.filter(status=True).values_list(
"module", flat=True
) # type: ignore
available_plugins = [p.module for p in plugin_list]
# for _, bot in nonebot.get_bots().items():
platform = PlatformUtils.get_platform(bot)
bot_data, is_create = await BotConsole.get_or_create(
bot_id=bot.self_id, platform=platform
)
if not is_create:
block_plugins = await bot_data.get_plugins(bot.self_id, False)
block_plugins = BotConsole._convert_module_format(block_plugins)
for module in available_plugins.copy():
if module in block_plugins:
available_plugins.remove(module)
block_tasks = await bot_data.get_tasks(bot.self_id, False)
block_tasks = BotConsole._convert_module_format(block_tasks)
for module in available_tasks.copy():
if module in block_plugins:
available_tasks.remove(module)
bot_data.available_plugins = BotConsole._convert_module_format(available_plugins)
bot_data.available_tasks = BotConsole._convert_module_format(available_tasks)
await bot_data.save(update_fields=["available_plugins", "available_tasks"])
logger.info("初始化Bot管理完成...")

View File

@ -0,0 +1,51 @@
from nonebot_plugin_alconna import AlconnaMatch, Match
from nonebot_plugin_uninfo import Uninfo
from zhenxun.builtin_plugins.superuser.bot_manage.command import bot_manage
from zhenxun.models.bot_console import BotConsole
from zhenxun.services.log import logger
from zhenxun.utils.message import MessageUtils
@bot_manage.assign("bot_switch.enable")
async def enable_bot_switch(
session: Uninfo,
bot_id: Match[str] = AlconnaMatch("bot_id"),
):
if not bot_id.available:
await MessageUtils.build_message("bot_id 不能为空").finish()
else:
logger.info(
f"开启 {bot_id.result} ",
"bot_manage.bot_switch.enable",
session=session,
)
try:
await BotConsole.set_bot_status(True, bot_id.result)
except ValueError:
await MessageUtils.build_message(f"bot_id {bot_id.result} 不存在").finish()
await MessageUtils.build_message(f"已开启 {bot_id.result} ").finish()
@bot_manage.assign("bot_switch.disable")
async def diasble_bot_switch(
session: Uninfo,
bot_id: Match[str] = AlconnaMatch("bot_id"),
):
if not bot_id.available:
await MessageUtils.build_message("bot_id 不能为空").finish()
else:
logger.info(
f"禁用 {bot_id.result} ",
"bot_manage.bot_switch.disable",
session=session,
)
try:
await BotConsole.set_bot_status(False, bot_id.result)
except ValueError:
await MessageUtils.build_message(f"bot_id {bot_id.result} 不存在").finish()
await MessageUtils.build_message(f"已禁用 {bot_id.result} ").finish()

View File

@ -0,0 +1,153 @@
from arclet.alconna import Alconna, Args, Option, Subcommand
from arclet.alconna.action import store_false
from nonebot.permission import SUPERUSER
from nonebot_plugin_alconna import on_alconna
bot_manage = on_alconna(
Alconna(
"bot_manage",
Subcommand(
"task",
Option(
"list",
action=store_false,
help_text="查看 bot_id 下的所有可用被动",
),
Option("-b|--bot", Args["bot_id", str], help_text="指定 bot_id"),
Subcommand(
"enable",
Args["feature_name?", str],
),
Subcommand(
"disable",
Args["feature_name?", str],
),
),
Subcommand(
"plugin",
Option(
"list",
action=store_false,
help_text="查看 bot_id 下的所有可用插件",
),
Option("-b|--bot", Args["bot_id", str], help_text="指定 bot_id"),
Subcommand(
"enable",
Args["plugin_name?", str],
),
Subcommand(
"disable",
Args["plugin_name?", str],
),
),
Subcommand(
"full_function",
Subcommand(
"enable",
Args["bot_id?", str],
),
Subcommand(
"disable",
Args["bot_id?", str],
),
),
Subcommand(
"bot_switch",
Subcommand(
"enable",
Args["bot_id?", str],
),
Subcommand(
"disable",
Args["bot_id?", str],
),
),
),
permission=SUPERUSER,
priority=5,
block=True,
)
bot_manage.shortcut(
r"bot被动状态",
command="bot_manage",
arguments=["task", "list"],
prefix=True,
)
bot_manage.shortcut(
r"bot开启被动\s*(?P<name>.+)",
command="bot_manage",
arguments=["task", "enable", "{name}"],
prefix=True,
)
bot_manage.shortcut(
r"bot关闭被动\s*(?P<name>.+)",
command="bot_manage",
arguments=["task", "disable", "{name}"],
prefix=True,
)
bot_manage.shortcut(
r"bot开启(全部|所有)被动",
command="bot_manage",
arguments=["task", "enable"],
prefix=True,
)
bot_manage.shortcut(
r"bot关闭(全部|所有)被动",
command="bot_manage",
arguments=["task", "disable"],
prefix=True,
)
bot_manage.shortcut(
r"bot插件列表",
command="bot_manage",
arguments=["plugin", "list"],
prefix=True,
)
bot_manage.shortcut(
r"bot开启(全部|所有)插件",
command="bot_manage",
arguments=["plugin", "enable"],
prefix=True,
)
bot_manage.shortcut(
r"bot关闭(全部|所有)插件",
command="bot_manage",
arguments=["plugin", "disable"],
prefix=True,
)
bot_manage.shortcut(
r"bot开启\s*(?P<name>.+)",
command="bot_manage",
arguments=["plugin", "enable", "{name}"],
prefix=True,
)
bot_manage.shortcut(
r"bot关闭\s*(?P<name>.+)",
command="bot_manage",
arguments=["plugin", "disable", "{name}"],
prefix=True,
)
bot_manage.shortcut(
r"bot休眠\s*(?P<bot_id>.+)?",
command="bot_manage",
arguments=["bot_switch", "disable", "{bot_id}"],
prefix=True,
)
bot_manage.shortcut(
r"bot醒来\s*(?P<bot_id>.+)?",
command="bot_manage",
arguments=["bot_switch", "enable", "{bot_id}"],
prefix=True,
)

View File

@ -0,0 +1,51 @@
from nonebot_plugin_alconna import AlconnaMatch, Match
from nonebot_plugin_uninfo import Uninfo
from zhenxun.builtin_plugins.superuser.bot_manage.command import bot_manage
from zhenxun.models.bot_console import BotConsole
from zhenxun.services.log import logger
from zhenxun.utils.message import MessageUtils
@bot_manage.assign("full_function.enable")
async def enable_full_function(
session: Uninfo,
bot_id: Match[str] = AlconnaMatch("bot_id"),
):
if not bot_id.available:
await MessageUtils.build_message("bot_id 不能为空").finish()
else:
logger.info(
f"开启 {bot_id.result} 的所有可用插件及被动",
"bot_manage.full_function.enable",
session=session,
)
await BotConsole.enable_all(bot_id.result, "tasks")
await BotConsole.enable_all(bot_id.result, "plugins")
await MessageUtils.build_message(
f"已开启 {bot_id.result} 的所有插件及被动"
).finish()
@bot_manage.assign("full_function.disable")
async def diasble_full_function(
session: Uninfo,
bot_id: Match[str] = AlconnaMatch("bot_id"),
):
if not bot_id.available:
await MessageUtils.build_message("bot_id 不能为空").finish()
else:
logger.info(
f"禁用 {bot_id.result} 的所有可用插件及被动",
"bot_manage.full_function.disable",
session=session,
)
await BotConsole.disable_all(bot_id.result, "tasks")
await BotConsole.disable_all(bot_id.result, "plugins")
await MessageUtils.build_message(
f"已禁用 {bot_id.result} 的所有插件及被动"
).finish()

View File

@ -0,0 +1,182 @@
from nonebot_plugin_alconna import AlconnaMatch, Match
from nonebot_plugin_uninfo import Uninfo
from zhenxun.builtin_plugins.superuser.bot_manage.command import bot_manage
from zhenxun.models.bot_console import BotConsole
from zhenxun.models.plugin_info import PluginInfo
from zhenxun.services.log import logger
from zhenxun.utils._build_image import BuildImage
from zhenxun.utils._image_template import ImageTemplate, RowStyle
from zhenxun.utils.enum import PluginType
from zhenxun.utils.message import MessageUtils
def task_row_style(column: str, text: str) -> RowStyle:
"""被动技能文本风格
参数:
column: 表头
text: 文本内容
返回:
RowStyle: RowStyle
"""
style = RowStyle()
if column in {"全局状态"}:
style.font_color = "#67C23A" if text == "开启" else "#F56C6C"
return style
@bot_manage.assign("plugin.list")
async def bot_plugin(session: Uninfo, bot_id: Match[str] = AlconnaMatch("bot_id")):
logger.info("获取全部 bot 的所有可用插件", "bot_manage.plugin", session=session)
column_name = [
"ID",
"模块",
"名称",
"全局状态",
"禁用类型",
"加载状态",
"菜单分类",
"作者",
"版本",
"金币花费",
]
if bot_id.available:
data_dict = {
bot_id.result: await BotConsole.get_plugins(
bot_id=bot_id.result, status=False
)
}
else:
data_dict = await BotConsole.get_plugins(status=False)
db_plugin_list = await PluginInfo.filter(
load_status=True, plugin_type__not=PluginType.HIDDEN
).all()
img_list = []
for __bot_id, tk in data_dict.items():
column_data = [
[
plugin.id,
plugin.module,
plugin.name,
"开启" if plugin.module not in tk else "关闭",
plugin.block_type,
"SUCCESS" if plugin.load_status else "ERROR",
plugin.menu_type,
plugin.author,
plugin.version,
plugin.cost_gold,
]
for plugin in db_plugin_list
]
img = await ImageTemplate.table_page(
f"{__bot_id}插件列表",
None,
column_name,
column_data,
text_style=task_row_style,
)
img_list.append(img)
result = await BuildImage.auto_paste(img_list, 3)
await MessageUtils.build_message(result).finish()
@bot_manage.assign("plugin.enable")
async def enable_plugin(
session: Uninfo,
plugin_name: Match[str] = AlconnaMatch("plugin_name"),
bot_id: Match[str] = AlconnaMatch("bot_id"),
):
if plugin_name.available:
plugin: PluginInfo | None = await PluginInfo.get_plugin(name=plugin_name.result)
if not plugin:
await MessageUtils.build_message("未找到该插件...").finish()
if bot_id.available:
logger.info(
f"开启 {bot_id.result} 的插件 {plugin_name.result}",
"bot_manage.plugin.disable",
session=session,
)
await BotConsole.enable_plugin(bot_id.result, plugin.module)
await MessageUtils.build_message(
f"已开启 {bot_id.result} 的插件 {plugin_name.result}"
).finish()
else:
logger.info(
f"开启全部 bot 的插件: {plugin_name.result}",
"bot_manage.plugin.disable",
session=session,
)
await BotConsole.enable_plugin(None, plugin.module)
await MessageUtils.build_message(
f"已禁用全部 bot 的插件: {plugin_name.result}"
).finish()
elif bot_id.available:
logger.info(
f"开启 {bot_id.result} 全部插件",
"bot_manage.plugin.disable",
session=session,
)
await BotConsole.enable_all(bot_id.result, "plugins")
await MessageUtils.build_message(f"已开启 {bot_id.result} 全部插件").finish()
else:
bot_id_list = await BotConsole.annotate().values_list("bot_id", flat=True)
for __bot_id in bot_id_list:
await BotConsole.enable_all(__bot_id, "plugins") # type: ignore
logger.info(
"开启全部 bot 全部插件",
"bot_manage.plugin.disable",
session=session,
)
await MessageUtils.build_message("开启全部 bot 全部插件").finish()
@bot_manage.assign("plugin.disable")
async def disable_plugin(
session: Uninfo,
plugin_name: Match[str] = AlconnaMatch("plugin_name"),
bot_id: Match[str] = AlconnaMatch("bot_id"),
):
if plugin_name.available:
plugin = await PluginInfo.get_plugin(name=plugin_name.result)
if not plugin:
await MessageUtils.build_message("未找到该插件...").finish()
if bot_id.available:
logger.info(
f"禁用 {bot_id.result} 的插件 {plugin_name.result}",
"bot_manage.plugin.disable",
session=session,
)
await BotConsole.disable_plugin(bot_id.result, plugin.module)
await MessageUtils.build_message(
f"已禁用 {bot_id.result} 的插件 {plugin_name.result}"
).finish()
else:
logger.info(
f"禁用全部 bot 的插件: {plugin_name.result}",
"bot_manage.plugin.disable",
session=session,
)
await BotConsole.disable_plugin(None, plugin.module)
await MessageUtils.build_message(
f"已禁用全部 bot 的插件: {plugin_name.result}"
).finish()
elif bot_id.available:
logger.info(
f"禁用 {bot_id.result} 全部插件",
"bot_manage.plugin.disable",
session=session,
)
await BotConsole.disable_all(bot_id.result, "plugins")
await MessageUtils.build_message(f"已禁用 {bot_id.result} 全部插件").finish()
else:
bot_id_list = await BotConsole.annotate().values_list("bot_id", flat=True)
for __bot_id in bot_id_list:
await BotConsole.disable_all(__bot_id, "plugins") # type: ignore
logger.info(
"禁用全部 bot 全部插件",
"bot_manage.plugin.disable",
session=session,
)
await MessageUtils.build_message("禁用全部 bot 全部插件").finish()

View File

@ -0,0 +1,164 @@
from nonebot_plugin_alconna import AlconnaMatch, Match
from nonebot_plugin_uninfo import Uninfo
from zhenxun.builtin_plugins.superuser.bot_manage.command import bot_manage
from zhenxun.models.bot_console import BotConsole
from zhenxun.models.task_info import TaskInfo
from zhenxun.services.log import logger
from zhenxun.utils._build_image import BuildImage
from zhenxun.utils._image_template import RowStyle
from zhenxun.utils.image_utils import ImageTemplate
from zhenxun.utils.message import MessageUtils
def task_row_style(column: str, text: str) -> RowStyle:
"""被动技能文本风格
参数:
column: 表头
text: 文本内容
返回:
RowStyle: RowStyle
"""
style = RowStyle()
if column in {"全局状态"}:
style.font_color = "#67C23A" if text == "开启" else "#F56C6C"
return style
@bot_manage.assign("task.list")
async def bot_task(session: Uninfo, bot_id: Match[str] = AlconnaMatch("bot_id")):
logger.info("获取全部 bot 的所有可用被动", "bot_manage.task", session=session)
if bot_id.available:
data_dict = {
bot_id.result: await BotConsole.get_tasks(
bot_id=bot_id.result, status=False
)
}
else:
data_dict = await BotConsole.get_tasks(status=False)
db_task_list = await TaskInfo.all()
column_name = ["ID", "模块", "名称", "全局状态", "运行时间"]
img_list = []
for __bot_id, tk in data_dict.items():
column_data = [
[
task.id,
task.module,
task.name,
"开启" if task.module not in tk else "关闭",
task.run_time or "-",
]
for task in db_task_list
]
img = await ImageTemplate.table_page(
f"{__bot_id}被动技能状态",
None,
column_name,
column_data,
text_style=task_row_style,
)
img_list.append(img)
result = await BuildImage.auto_paste(img_list, 3)
await MessageUtils.build_message(result).finish()
@bot_manage.assign("task.enable")
async def enable_task(
session: Uninfo,
task_name: Match[str] = AlconnaMatch("feature_name"),
bot_id: Match[str] = AlconnaMatch("bot_id"),
):
if task_name.available:
task: TaskInfo | None = await TaskInfo.get_or_none(name=task_name.result)
if not task:
await MessageUtils.build_message("未找到被动...").finish()
if bot_id.available:
logger.info(
f"开启 {bot_id.result} 被动的 {task_name.available}",
"bot_manage.task.disable",
session=session,
)
await BotConsole.enable_task(bot_id.result, task.module)
await MessageUtils.build_message(
f"已开启 {bot_id.result} 被动的 {task_name.available}"
).finish()
else:
logger.info(
f"开启全部 bot 的被动: {task_name.available}",
"bot_manage.task.disable",
session=session,
)
await BotConsole.enable_task(None, task.module)
await MessageUtils.build_message(
f"已禁用全部 bot 的被动: {task_name.available}"
).finish()
elif bot_id.available:
logger.info(
f"开启 {bot_id.result} 全部被动",
"bot_manage.task.disable",
session=session,
)
await BotConsole.enable_all(bot_id.result, "tasks")
await MessageUtils.build_message(f"已开启 {bot_id.result} 全部被动").finish()
else:
bot_id_list = await BotConsole.annotate().values_list("bot_id", flat=True)
for __bot_id in bot_id_list:
await BotConsole.enable_all(__bot_id, "tasks") # type: ignore
logger.info(
"开启全部 bot 全部被动",
"bot_manage.task.disable",
session=session,
)
await MessageUtils.build_message("开启全部 bot 全部被动").finish()
@bot_manage.assign("task.disable")
async def disable_task(
session: Uninfo,
task_name: Match[str] = AlconnaMatch("feature_name"),
bot_id: Match[str] = AlconnaMatch("bot_id"),
):
if task_name.available:
task: TaskInfo | None = await TaskInfo.get_or_none(name=task_name.result)
if not task:
await MessageUtils.build_message("未找到被动...").finish()
if bot_id.available:
logger.info(
f"禁用 {bot_id.result} 被动的 {task_name.available}",
"bot_manage.task.disable",
session=session,
)
await BotConsole.disable_task(bot_id.result, task.module)
await MessageUtils.build_message(
f"已禁用 {bot_id.result} 被动的 {task_name.available}"
).finish()
else:
logger.info(
f"禁用全部 bot 的被动: {task_name.available}",
"bot_manage.task.disable",
session=session,
)
await BotConsole.disable_task(None, task.module)
await MessageUtils.build_message(
f"已禁用全部 bot 的被动: {task_name.available}"
).finish()
elif bot_id.available:
logger.info(
f"禁用 {bot_id.result} 全部被动",
"bot_manage.task.disable",
session=session,
)
await BotConsole.disable_all(bot_id.result, "tasks")
await MessageUtils.build_message(f"已禁用 {bot_id.result} 全部被动").finish()
else:
bot_id_list = await BotConsole.annotate().values_list("bot_id", flat=True)
for __bot_id in bot_id_list:
await BotConsole.disable_all(__bot_id, "tasks") # type: ignore
logger.info(
"禁用全部 bot 全部被动",
"bot_manage.task.disable",
session=session,
)
await MessageUtils.build_message("禁用全部 bot 全部被动").finish()

View File

@ -1,3 +1,5 @@
from typing import Literal, overload
from tortoise import fields
from zhenxun.services.db_context import Model
@ -10,51 +12,420 @@ class BotConsole(Model):
"""bot_id"""
status = fields.BooleanField(default=True, description="Bot状态")
"""Bot状态"""
block_plugin = fields.TextField(default="", description="禁用插件")
"""禁用插件"""
block_task = fields.TextField(default="", description="禁用被动技能")
"""禁用被动技能"""
create_time = fields.DatetimeField(auto_now_add=True, description="创建时间")
"""创建时间"""
platform = fields.CharField(255, null=True, description="平台")
"""平台"""
block_plugins = fields.TextField(default="", description="禁用插件")
"""禁用插件"""
block_tasks = fields.TextField(default="", description="禁用被动技能")
"""禁用被动技能"""
available_plugins = fields.TextField(default="", description="可用插件")
# todo)) 计划任务 or on_startup 写入可用插件
"""可用插件"""
available_tasks = fields.TextField(default="", description="可用被动技能")
# todo)) 计划任务 or on_startup 写入可用被动技能
"""可用被动技能"""
class Meta: # type: ignore
table = "bot_console"
table_description = "Bot数据表"
@staticmethod
def format(name: str) -> str:
return f"<{name},"
@overload
@classmethod
async def get_bot_status(cls, bot_id: str) -> bool:
async def get_bot_status(cls) -> list[tuple[str, bool]]: ...
@overload
@classmethod
async def get_bot_status(cls, bot_id: str) -> bool: ...
@classmethod
async def get_bot_status(
cls, bot_id: str | None = None
) -> list[tuple[str, bool]] | bool:
"""
获取bot状态
参数:
bot_id (str, optional): bot_id. Defaults to None.
返回:
list[tuple[str, bool]] | bool: bot状态
"""
if not bot_id:
return await cls.all().values_list("bot_id", "status")
result = await cls.get_or_none(bot_id=bot_id)
return result.status if result else False
@overload
@classmethod
async def set_block_plugin(cls, bot_id: str, module: str):
bot_data, _ = await cls.get_or_create(bot_id=bot_id)
if f"<{module}," not in bot_data.block_plugin:
bot_data.block_plugin += f"<{module},"
await bot_data.save(update_fields=["block_plugin"])
async def get_tasks(cls) -> list[tuple[str, list[str]]]: ...
@overload
@classmethod
async def get_tasks(cls, bot_id: str) -> list[str]: ...
@overload
@classmethod
async def get_tasks(cls, *, status: bool) -> dict[str, list[str]]: ...
@overload
@classmethod
async def get_tasks(cls, bot_id: str, status: bool = True) -> list[str]: ...
@classmethod
async def set_unblock_plugin(cls, bot_id: str, module: str):
bot_data, _ = await cls.get_or_create(bot_id=bot_id)
if f"<{module}," in bot_data.block_plugin:
bot_data.block_plugin = bot_data.block_plugin.replace(f"<{module},", "")
await bot_data.save(update_fields=["block_plugin"])
async def get_tasks(cls, bot_id: str | None = None, status: bool | None = True):
"""
获取bot被动技能
参数:
bot_id (str | None, optional): bot_id. Defaults to None.
status (bool | None, optional): 被动状态. Defaults to True.
返回:
list[tuple[str, str]] | str: 被动技能
"""
if not bot_id:
task_field: Literal["available_tasks", "block_tasks"] = (
"available_tasks" if status else "block_tasks"
)
data_list = await cls.all().values_list("bot_id", task_field)
return {k: cls._convert_module_format(v) for k, v in data_list}
result = await cls.get_or_none(bot_id=bot_id)
if result:
tasks = result.available_tasks if status else result.block_tasks
return cls._convert_module_format(tasks)
return []
@overload
@classmethod
async def get_plugins(cls) -> dict[str, list[str]]: ...
@overload
@classmethod
async def get_plugins(cls, bot_id: str) -> list[str]: ...
@overload
@classmethod
async def get_plugins(cls, *, status: bool) -> dict[str, list[str]]: ...
@overload
@classmethod
async def get_plugins(cls, bot_id: str, status: bool = True) -> list[str]: ...
@classmethod
async def set_block_task(cls, bot_id: str, task: str):
bot_data, _ = await cls.get_or_create(bot_id=bot_id)
if f"<{task}," not in bot_data.block_task:
bot_data.block_plugin += f"<{task},"
await bot_data.save(update_fields=["block_task"])
async def get_plugins(cls, bot_id: str | None = None, status: bool = True):
"""
获取bot插件
参数:
bot_id (str | None, optional): bot_id. Defaults to None.
status (bool, optional): 插件状态. Defaults to True.
返回:
list[tuple[str, str]] | str: 插件
"""
if not bot_id:
plugin_field = "available_plugins" if status else "block_plugins"
data_list = await cls.all().values_list("bot_id", plugin_field)
return {k: cls._convert_module_format(v) for k, v in data_list}
result = await cls.get_or_none(bot_id=bot_id)
if result:
plugins = result.available_plugins if status else result.block_plugins
return cls._convert_module_format(plugins)
return []
@classmethod
async def is_block_plugin(cls, bot_id: str, task: str) -> bool:
bot_data, _ = await cls.get_or_create(bot_id=bot_id)
return f"<{task}," in bot_data.block_plugin
async def set_bot_status(cls, status: bool, bot_id: str | None = None) -> None:
"""
设置bot状态
参数:
status (bool): 状态
bot_id (str, optional): bot_id. Defaults to None.
Raises:
ValueError: 未找到 bot_id
"""
if bot_id:
affected_rows = await cls.filter(bot_id=bot_id).update(status=status)
if not affected_rows:
raise ValueError(f"未找到 bot_id: {bot_id}")
else:
await cls.all().update(status=status)
@overload
@classmethod
def _convert_module_format(cls, data: str) -> list[str]: ...
@overload
@classmethod
def _convert_module_format(cls, data: list[str]) -> str: ...
@classmethod
async def is_block_task(cls, bot_id: str, task: str) -> bool:
def _convert_module_format(cls, data: str | list[str]) -> str | list[str]:
"""
`<aaa,<bbb,<ccc,` `["aaa", "bbb", "ccc"]` 之间进行相互转换
参数:
data (str | list[str]): 输入数据可能是格式化字符串或字符串列表
返回:
str | list[str]: 根据输入类型返回转换后的数据
"""
if isinstance(data, str):
return [item.strip(",") for item in data.split("<") if item]
elif isinstance(data, list):
return "".join(cls.format(item) for item in data)
@classmethod
async def _toggle_field(
cls,
bot_id: str,
from_field: str,
to_field: str,
data: str,
) -> None:
"""
from_field to_field 之间移动指定的 data
参数:
bot_id (str): 目标 bot ID
from_field (str): 源字段名称
to_field (str): 目标字段名称
data (str): 要插入的内容
Raises:
ValueError: 如果 data 不在 from_field to_field
"""
bot_data, _ = await cls.get_or_create(bot_id=bot_id)
return f"<{task}," in bot_data.block_task
formatted_data = cls.format(data)
from_list: str = getattr(bot_data, from_field)
to_list: str = getattr(bot_data, to_field)
if formatted_data not in (from_list + to_list):
raise ValueError(f"{data} 不在源字段和目标字段中")
if formatted_data in from_list:
from_list = from_list.replace(formatted_data, "", 1)
if formatted_data not in to_list:
to_list += formatted_data
setattr(bot_data, from_field, from_list)
setattr(bot_data, to_field, to_list)
await bot_data.save(update_fields=[from_field, to_field])
@classmethod
async def disable_plugin(cls, bot_id: str | None, plugin_name: str) -> None:
"""
禁用插件
参数:
bot_id (str | None): bot_id
plugin_name (str): 插件名称
"""
if bot_id:
await cls._toggle_field(
bot_id,
"available_plugins",
"block_plugins",
plugin_name,
)
else:
bot_list = await cls.all()
for bot in bot_list:
await cls._toggle_field(
bot.bot_id,
"available_plugins",
"block_plugins",
plugin_name,
)
@classmethod
async def enable_plugin(cls, bot_id: str | None, plugin_name: str) -> None:
"""
启用插件
参数:
bot_id (str | None): bot_id
plugin_name (str): 插件名称
"""
if bot_id:
await cls._toggle_field(
bot_id,
"block_plugins",
"available_plugins",
plugin_name,
)
else:
bot_list = await cls.all()
for bot in bot_list:
await cls._toggle_field(
bot.bot_id,
"block_plugins",
"available_plugins",
plugin_name,
)
@classmethod
async def disable_task(cls, bot_id: str | None, task_name: str) -> None:
"""
禁用被动技能
参数:
bot_id (str | None): bot_id
task_name (str): 被动技能名称
"""
if bot_id:
await cls._toggle_field(
bot_id,
"available_tasks",
"block_tasks",
task_name,
)
else:
bot_list = await cls.all()
for bot in bot_list:
await cls._toggle_field(
bot.bot_id,
"available_tasks",
"block_tasks",
task_name,
)
@classmethod
async def enable_task(cls, bot_id: str | None, task_name: str) -> None:
"""
启用被动技能
参数:
bot_id (str | None): bot_id
task_name (str): 被动技能名称
"""
if bot_id:
await cls._toggle_field(
bot_id,
"block_tasks",
"available_tasks",
task_name,
)
else:
bot_list = await cls.all()
for bot in bot_list:
await cls._toggle_field(
bot.bot_id,
"block_tasks",
"available_tasks",
task_name,
)
@classmethod
async def disable_all(
cls,
bot_id: str,
feat: Literal["plugins", "tasks"],
) -> None:
"""
禁用全部插件或被动技能
参数:
bot_id (str): bot_id
feat (Literal["plugins", "tasks"]): 插件或被动技能
"""
bot_data, _ = await cls.get_or_create(bot_id=bot_id)
if feat == "plugins":
available_plugins = cls._convert_module_format(bot_data.available_plugins)
block_plugins = cls._convert_module_format(bot_data.block_plugins)
bot_data.block_plugins = cls._convert_module_format(
available_plugins + block_plugins
)
bot_data.available_plugins = ""
elif feat == "tasks":
available_tasks = cls._convert_module_format(bot_data.available_tasks)
block_tasks = cls._convert_module_format(bot_data.block_tasks)
bot_data.block_tasks = cls._convert_module_format(
available_tasks + block_tasks
)
bot_data.available_tasks = ""
await bot_data.save(
update_fields=[
"available_tasks",
"block_tasks",
"available_plugins",
"block_plugins",
]
)
@classmethod
async def enable_all(
cls,
bot_id: str,
feat: Literal["plugins", "tasks"],
) -> None:
"""
启用全部插件或被动技能
参数:
bot_id (str): bot_id
feat (Literal["plugins", "tasks"]): 插件或被动技能
"""
bot_data, _ = await cls.get_or_create(bot_id=bot_id)
if feat == "plugins":
available_plugins = cls._convert_module_format(bot_data.available_plugins)
block_plugins = cls._convert_module_format(bot_data.block_plugins)
bot_data.available_plugins = cls._convert_module_format(
available_plugins + block_plugins
)
bot_data.block_plugins = ""
elif feat == "tasks":
available_tasks = cls._convert_module_format(bot_data.available_tasks)
block_tasks = cls._convert_module_format(bot_data.block_tasks)
bot_data.available_tasks = cls._convert_module_format(
available_tasks + block_tasks
)
bot_data.block_tasks = ""
await bot_data.save(
update_fields=[
"available_tasks",
"block_tasks",
"available_plugins",
"block_plugins",
]
)
@classmethod
async def is_block_plugin(cls, bot_id: str, plugin_name: str) -> bool:
"""
检查插件是否被禁用
参数:
bot_id (str): bot_id
plugin_name (str): 插件某款
返回:
bool: 是否被禁用
"""
bot_data, _ = await cls.get_or_create(bot_id=bot_id)
return cls.format(plugin_name) in bot_data.block_plugins
@classmethod
async def is_block_task(cls, bot_id: str, task_name: str) -> bool:
"""
检查被动技能是否被禁用
参数:
bot_id (str): bot_id
task_name (str): 被动技能名称
返回:
bool: 是否被禁用
"""
bot_data, _ = await cls.get_or_create(bot_id=bot_id)
return cls.format(task_name) in bot_data.block_tasks

View File

@ -3,6 +3,7 @@ from nonebot_plugin_uninfo import Session, SupportScope, Uninfo, get_interface
from zhenxun.configs.config import BotConfig
from zhenxun.models.ban_console import BanConsole
from zhenxun.models.bot_console import BotConsole
from zhenxun.models.group_console import GroupConsole
from zhenxun.models.task_info import TaskInfo
from zhenxun.services.log import logger
@ -39,6 +40,13 @@ class CommonUtils:
"""被动全局状态"""
if not task.status:
return True
if not await BotConsole.get_bot_status(session.self_id):
"""bot是否休眠"""
return True
block_tasks = await BotConsole.get_tasks(session.self_id, False)
if module in block_tasks:
"""bot是否禁用被动"""
return True
if group_id:
if await GroupConsole.is_block_task(group_id, module):
"""群组是否禁用被动"""