zhenxun_bot/zhenxun/models/bot_console.py
HibiKier 8649aaaa54
引入缓存机制 (#1889)
* 添加全局cache

*  构建缓存,hook使用缓存

*  新增数据库Model方法监控

*  数据库添加semaphore锁

* 🩹 优化webapi返回数据

*  添加增量缓存与缓存过期

* 🎨 优化检测代码结构

*  优化hook权限检测性能

* 🐛 添加新异常判断跳过权限检测

*  添加插件limit缓存

* 🎨 代码格式优化

* 🐛  修复代码导入

* 🐛 修复刷新时检查

* 👽 Rename exception for missing database URL in initialization

*  Update default database URL to SQLite in configuration

* 🔧 Update tortoise-orm and aiocache dependencies restrictions; add optional redis and asyncpg support

* 🐛 修复ban检测

* 🐛 修复所有插件关闭时缓存更新

* 🐛 尝试迁移至aiocache

* 🐛 完善aiocache缓存

*  代码性能优化

* 🐛 移除获取封禁缓存时的日志记录

* 🐛 修复缓存类型声明,优化封禁用户处理逻辑

* 🐛 优化LevelUser权限更新逻辑及数据库迁移

*  cache支持redis连接

* 🚨 auto fix by pre-commit hooks

*  :增强获取群组的安全性和准确性。同时,优化了缓存管理中的相关逻辑,确保缓存操作的一致性。

*  feat(auth_limit): 将插件初始化逻辑的启动装饰器更改为优先级管理器

* 🔧 修复日志记录级别

* 🔧 更新数据库连接字符串

* 🔧 更新数据库连接字符串为内存数据库,并优化权限检查逻辑

*  feat(cache): 增加缓存功能配置项,并新增数据访问层以支持缓存逻辑

* ♻️ 重构cache

*  feat(cache): 增强缓存管理,新增缓存字典和缓存列表功能,支持过期时间管理

* 🔧 修复Notebook类中的viewport高度设置,将其从1000调整为10

*  更新插件管理逻辑,替换缓存服务为CacheRoot并优化缓存失效处理

*  更新RegisterConfig类中的type字段

*  修复清理重复记录逻辑,确保检查记录的id属性有效性

*  超级无敌大优化,解决延迟与卡死问题

*  更新封禁功能,增加封禁时长参数和描述,优化插件信息返回结构

*  更新zhenxun_help.py中的viewport高度,将其从453调整为10,以优化页面显示效果

*  优化插件分类逻辑,增加插件ID排序,并更新插件信息返回结构

---------

Co-authored-by: BalconyJH <balconyjh@gmail.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
2025-07-14 22:35:29 +08:00

445 lines
14 KiB
Python

from typing import Literal, overload
from tortoise import fields
from zhenxun.services.db_context import Model
from zhenxun.utils.enum import CacheType
class BotConsole(Model):
id = fields.IntField(pk=True, generated=True, auto_increment=True)
"""自增id"""
bot_id = fields.CharField(255, unique=True, description="bot_id")
"""bot_id"""
status = fields.BooleanField(default=True, description="Bot状态")
"""Bot状态"""
create_time = fields.DatetimeField(auto_now_add=True, description="创建时间")
"""创建时间"""
platform = fields.CharField(255, null=True, description="平台")
"""平台"""
block_plugins = fields.TextField(default="", description="禁用插件")
"""禁用插件"""
block_tasks = fields.TextField(default="", description="禁用被动技能")
"""禁用被动技能"""
available_plugins = fields.TextField(default="", description="可用插件")
"""可用插件"""
available_tasks = fields.TextField(default="", description="可用被动技能")
"""可用被动技能"""
class Meta: # pyright: ignore [reportIncompatibleVariableOverride]
table = "bot_console"
table_description = "Bot数据表"
cache_type = CacheType.BOT
"""缓存类型"""
cache_key_field = "bot_id"
"""缓存键字段"""
@staticmethod
def format(name: str) -> str:
return f"<{name},"
@overload
@classmethod
async def get_bot_status(cls) -> list[tuple[str, bool]]: ...
@overload
@classmethod
async def get_bot_status(cls, bot_id: str) -> bool: ...
@classmethod
async def get_bot_status(
cls, bot_id: str | None = None
) -> list[tuple[str, bool]] | bool:
"""
获取bot状态
参数:
bot_id (str, optional): bot_id. Defaults to None.
返回:
list[tuple[str, bool]] | bool: bot状态
"""
if not bot_id:
return await cls.all().values_list("bot_id", "status")
result = await cls.get_or_none(bot_id=bot_id)
return result.status if result else False
@overload
@classmethod
async def get_tasks(cls) -> list[tuple[str, list[str]]]: ...
@overload
@classmethod
async def get_tasks(cls, bot_id: str) -> list[str]: ...
@overload
@classmethod
async def get_tasks(cls, *, status: bool) -> dict[str, list[str]]: ...
@overload
@classmethod
async def get_tasks(cls, bot_id: str, status: bool = True) -> list[str]: ...
@classmethod
async def get_tasks(cls, bot_id: str | None = None, status: bool | None = True):
"""
获取bot被动技能
参数:
bot_id (str | None, optional): bot_id. Defaults to None.
status (bool | None, optional): 被动状态. Defaults to True.
返回:
list[tuple[str, str]] | str: 被动技能
"""
if not bot_id:
task_field: Literal["available_tasks", "block_tasks"] = (
"available_tasks" if status else "block_tasks"
)
data_list = await cls.all().values_list("bot_id", task_field)
return {k: cls.convert_module_format(v) for k, v in data_list}
result = await cls.get_or_none(bot_id=bot_id)
if result:
tasks = result.available_tasks if status else result.block_tasks
return cls.convert_module_format(tasks)
return []
@overload
@classmethod
async def get_plugins(cls) -> dict[str, list[str]]: ...
@overload
@classmethod
async def get_plugins(cls, bot_id: str) -> list[str]: ...
@overload
@classmethod
async def get_plugins(cls, *, status: bool) -> dict[str, list[str]]: ...
@overload
@classmethod
async def get_plugins(cls, bot_id: str, status: bool = True) -> list[str]: ...
@classmethod
async def get_plugins(cls, bot_id: str | None = None, status: bool = True):
"""
获取bot插件
参数:
bot_id (str | None, optional): bot_id. Defaults to None.
status (bool, optional): 插件状态. Defaults to True.
返回:
list[tuple[str, str]] | str: 插件
"""
if not bot_id:
plugin_field = "available_plugins" if status else "block_plugins"
data_list = await cls.all().values_list("bot_id", plugin_field)
return {k: cls.convert_module_format(v) for k, v in data_list}
result = await cls.get_or_none(bot_id=bot_id)
if result:
plugins = result.available_plugins if status else result.block_plugins
return cls.convert_module_format(plugins)
return []
@classmethod
async def set_bot_status(cls, status: bool, bot_id: str | None = None) -> None:
"""
设置bot状态
参数:
status (bool): 状态
bot_id (str, optional): bot_id. Defaults to None.
Raises:
ValueError: 未找到 bot_id
"""
if bot_id:
affected_rows = await cls.filter(bot_id=bot_id).update(status=status)
if not affected_rows:
raise ValueError(f"未找到 bot_id: {bot_id}")
else:
await cls.all().update(status=status)
@overload
@classmethod
def convert_module_format(cls, data: str) -> list[str]: ...
@overload
@classmethod
def convert_module_format(cls, data: list[str]) -> str: ...
@classmethod
def convert_module_format(cls, data: str | list[str]) -> str | list[str]:
"""
在 `<aaa,<bbb,<ccc,` 和 `["aaa", "bbb", "ccc"]` 之间进行相互转换。
参数:
data (str | list[str]): 输入数据,可能是格式化字符串或字符串列表。
返回:
str | list[str]: 根据输入类型返回转换后的数据。
"""
if isinstance(data, str):
return [item.strip(",") for item in data.split("<") if item]
elif isinstance(data, list):
return "".join(cls.format(item) for item in data)
@classmethod
async def _toggle_field(
cls,
bot_id: str,
from_field: str,
to_field: str,
data: str,
) -> None:
"""
在 from_field 和 to_field 之间移动指定的 data
参数:
bot_id (str): 目标 bot 的 ID
from_field (str): 源字段名称
to_field (str): 目标字段名称
data (str): 要插入的内容
Raises:
ValueError: 如果 data 不在 from_field 和 to_field 中
"""
bot_data, _ = await cls.get_or_create(bot_id=bot_id)
formatted_data = cls.format(data)
from_list: str = getattr(bot_data, from_field)
to_list: str = getattr(bot_data, to_field)
if formatted_data not in (from_list + to_list):
raise ValueError(f"{data} 不在源字段和目标字段中")
if formatted_data in from_list:
from_list = from_list.replace(formatted_data, "", 1)
if formatted_data not in to_list:
to_list += formatted_data
setattr(bot_data, from_field, from_list)
setattr(bot_data, to_field, to_list)
await bot_data.save(update_fields=[from_field, to_field])
@classmethod
async def disable_plugin(cls, bot_id: str | None, plugin_name: str) -> None:
"""
禁用插件
参数:
bot_id (str | None): bot_id
plugin_name (str): 插件名称
"""
if bot_id:
await cls._toggle_field(
bot_id,
"available_plugins",
"block_plugins",
plugin_name,
)
else:
bot_list = await cls.all()
for bot in bot_list:
await cls._toggle_field(
bot.bot_id,
"available_plugins",
"block_plugins",
plugin_name,
)
@classmethod
async def enable_plugin(cls, bot_id: str | None, plugin_name: str) -> None:
"""
启用插件
参数:
bot_id (str | None): bot_id
plugin_name (str): 插件名称
"""
if bot_id:
await cls._toggle_field(
bot_id,
"block_plugins",
"available_plugins",
plugin_name,
)
else:
bot_list = await cls.all()
for bot in bot_list:
await cls._toggle_field(
bot.bot_id,
"block_plugins",
"available_plugins",
plugin_name,
)
@classmethod
async def disable_task(cls, bot_id: str | None, task_name: str) -> None:
"""
禁用被动技能
参数:
bot_id (str | None): bot_id
task_name (str): 被动技能名称
"""
if bot_id:
await cls._toggle_field(
bot_id,
"available_tasks",
"block_tasks",
task_name,
)
else:
bot_list = await cls.all()
for bot in bot_list:
await cls._toggle_field(
bot.bot_id,
"available_tasks",
"block_tasks",
task_name,
)
@classmethod
async def enable_task(cls, bot_id: str | None, task_name: str) -> None:
"""
启用被动技能
参数:
bot_id (str | None): bot_id
task_name (str): 被动技能名称
"""
if bot_id:
await cls._toggle_field(
bot_id,
"block_tasks",
"available_tasks",
task_name,
)
else:
bot_list = await cls.all()
for bot in bot_list:
await cls._toggle_field(
bot.bot_id,
"block_tasks",
"available_tasks",
task_name,
)
@classmethod
async def disable_all(
cls,
bot_id: str,
feat: Literal["plugins", "tasks"],
) -> None:
"""
禁用全部插件或被动技能
参数:
bot_id (str): bot_id
feat (Literal["plugins", "tasks"]): 插件或被动技能
"""
bot_data, _ = await cls.get_or_create(bot_id=bot_id)
if feat == "plugins":
available_plugins = cls.convert_module_format(bot_data.available_plugins)
block_plugins = cls.convert_module_format(bot_data.block_plugins)
bot_data.block_plugins = cls.convert_module_format(
available_plugins + block_plugins
)
bot_data.available_plugins = ""
elif feat == "tasks":
available_tasks = cls.convert_module_format(bot_data.available_tasks)
block_tasks = cls.convert_module_format(bot_data.block_tasks)
bot_data.block_tasks = cls.convert_module_format(
available_tasks + block_tasks
)
bot_data.available_tasks = ""
await bot_data.save(
update_fields=[
"available_tasks",
"block_tasks",
"available_plugins",
"block_plugins",
]
)
@classmethod
async def enable_all(
cls,
bot_id: str,
feat: Literal["plugins", "tasks"],
) -> None:
"""
启用全部插件或被动技能
参数:
bot_id (str): bot_id
feat (Literal["plugins", "tasks"]): 插件或被动技能
"""
bot_data, _ = await cls.get_or_create(bot_id=bot_id)
if feat == "plugins":
available_plugins = cls.convert_module_format(bot_data.available_plugins)
block_plugins = cls.convert_module_format(bot_data.block_plugins)
bot_data.available_plugins = cls.convert_module_format(
available_plugins + block_plugins
)
bot_data.block_plugins = ""
elif feat == "tasks":
available_tasks = cls.convert_module_format(bot_data.available_tasks)
block_tasks = cls.convert_module_format(bot_data.block_tasks)
bot_data.available_tasks = cls.convert_module_format(
available_tasks + block_tasks
)
bot_data.block_tasks = ""
await bot_data.save(
update_fields=[
"available_tasks",
"block_tasks",
"available_plugins",
"block_plugins",
]
)
@classmethod
async def is_block_plugin(cls, bot_id: str, plugin_name: str) -> bool:
"""
检查插件是否被禁用
参数:
bot_id (str): bot_id
plugin_name (str): 插件某款
返回:
bool: 是否被禁用
"""
bot_data, _ = await cls.get_or_create(bot_id=bot_id)
return cls.format(plugin_name) in bot_data.block_plugins
@classmethod
async def is_block_task(cls, bot_id: str, task_name: str) -> bool:
"""
检查被动技能是否被禁用
参数:
bot_id (str): bot_id
task_name (str): 被动技能名称
返回:
bool: 是否被禁用
"""
bot_data, _ = await cls.get_or_create(bot_id=bot_id)
return cls.format(task_name) in bot_data.block_tasks
@classmethod
async def _run_script(cls):
return [
"ALTER TABLE bot_console RENAME COLUMN block_plugin TO block_plugins;",
"ALTER TABLE bot_console RENAME COLUMN block_task TO block_tasks;",
"ALTER TABLE bot_console ADD available_plugins text default '';",
"ALTER TABLE bot_console ADD available_tasks text default '';",
]