from collections import defaultdict
from collections.abc import Callable
import copy
from datetime import datetime
from pathlib import Path
from typing import Any, Literal
import cattrs
from nonebot.compat import model_dump
from pydantic import BaseModel
from ruamel.yaml import YAML
from ruamel.yaml.scanner import ScannerError
from zhenxun.configs.path_config import DATA_PATH
from zhenxun.services.log import logger
from zhenxun.utils.enum import BlockType, LimitWatchType, PluginLimitType, PluginType
_yaml = YAML(pure=True)
_yaml.indent = 2
_yaml.allow_unicode = True
class Example(BaseModel):
"""
示例
"""
exec: str
"""执行命令"""
description: str = ""
"""命令描述"""
class Command(BaseModel):
"""
具体参数说明
"""
command: str
"""命令"""
params: list[str] = []
"""参数"""
description: str = ""
"""描述"""
examples: list[Example] = []
"""示例列表"""
class RegisterConfig(BaseModel):
"""
注册配置项
"""
key: str
"""配置项键"""
value: Any
"""配置项值"""
module: str | None = None
"""模块名"""
help: str | None
"""配置注解"""
default_value: Any | None = None
"""默认值"""
type: Any = None
"""参数类型"""
arg_parser: Callable | None = None
"""参数解析"""
class ConfigModel(BaseModel):
"""
配置项
"""
value: Any
"""配置项值"""
help: str | None
"""配置注解"""
default_value: Any | None = None
"""默认值"""
type: Any = None
"""参数类型"""
arg_parser: Callable | None = None
"""参数解析"""
def to_dict(self, **kwargs):
return model_dump(self, **kwargs)
class ConfigGroup(BaseModel):
"""
配置组
"""
module: str
"""模块名"""
name: str | None = None
"""插件名"""
configs: dict[str, ConfigModel] = defaultdict()
"""配置项列表"""
def get(self, c: str, default: Any = None) -> Any:
cfg = self.configs.get(c.upper())
if cfg is not None:
if cfg.value is not None:
return cfg.value
if cfg.default_value is not None:
return cfg.default_value
return default
def to_dict(self, **kwargs):
return model_dump(self, **kwargs)
class BaseBlock(BaseModel):
"""
插件阻断基本类(插件阻断限制)
"""
status: bool = True
"""限制状态"""
check_type: BlockType = BlockType.ALL
"""检查类型"""
watch_type: LimitWatchType = LimitWatchType.USER
"""监听对象"""
result: str | None = None
"""阻断时回复内容"""
_type: PluginLimitType = PluginLimitType.BLOCK
"""类型"""
def to_dict(self, **kwargs):
return model_dump(self, **kwargs)
class PluginCdBlock(BaseBlock):
"""
插件cd限制
"""
cd: int = 5
"""cd"""
_type: PluginLimitType = PluginLimitType.CD
"""类型"""
class PluginCountBlock(BaseBlock):
"""
插件次数限制
"""
max_count: int
"""最大调用次数"""
_type: PluginLimitType = PluginLimitType.COUNT
"""类型"""
class PluginSetting(BaseModel):
"""
插件基本配置
"""
level: int = 5
"""群权限等级"""
default_status: bool = True
"""进群默认开关状态"""
limit_superuser: bool = False
"""是否限制超级用户"""
cost_gold: int = 0
"""调用插件花费金币"""
class SchedulerModel(BaseModel):
trigger: Literal["date", "interval", "cron"]
"""trigger"""
day: int | None = None
"""天数"""
hour: int | None = None
"""小时"""
minute: int | None = None
"""分钟"""
second: int | None = None
"""秒"""
run_date: datetime | None = None
"""运行日期"""
id: str | None = None
"""id"""
max_instances: int | None = None
"""最大运行实例"""
args: list | None = None
"""参数"""
kwargs: dict | None = None
"""参数"""
class Task(BaseBlock):
module: str
"""被动技能模块名"""
name: str
"""被动技能名称"""
status: bool = True
"""全局开关状态"""
create_status: bool = False
"""初次加载默认开关状态"""
default_status: bool = True
"""进群时默认状态"""
scheduler: SchedulerModel | None = None
"""定时任务配置"""
run_func: Callable | None = None
"""运行函数"""
check: Callable | None = None
"""检查函数"""
check_args: list = []
"""检查函数参数"""
class PluginExtraData(BaseModel):
"""
插件扩展信息
"""
author: str | None = None
"""作者"""
version: str | None = None
"""版本"""
plugin_type: PluginType = PluginType.NORMAL
"""插件类型"""
menu_type: str = "功能"
"""菜单类型"""
admin_level: int | None = None
"""管理员插件所需权限等级"""
configs: list[RegisterConfig] | None = None
"""插件配置"""
setting: PluginSetting | None = None
"""插件基本配置"""
limits: list[BaseBlock | PluginCdBlock | PluginCountBlock] | None = None
"""插件限制"""
commands: list[Command] = []
"""命令列表,用于说明帮助"""
tasks: list[Task] | None = None
"""技能被动"""
superuser_help: str | None = None
"""超级用户帮助"""
aliases: set[str] = set()
"""额外名称"""
sql_list: list[str] | None = None
"""常用sql"""
is_show: bool = True
"""是否显示在菜单中"""
def to_dict(self, **kwargs):
return model_dump(self, **kwargs)
class NoSuchConfig(Exception):
pass
class ConfigsManager:
"""
插件配置 与 资源 管理器
"""
def __init__(self, file: Path):
self._data: dict[str, ConfigGroup] = {}
self._simple_data: dict = {}
self._simple_file = DATA_PATH / "config.yaml"
_yaml = YAML()
if file:
file.parent.mkdir(exist_ok=True, parents=True)
self.file = file
self.load_data()
if self._simple_file.exists():
try:
with self._simple_file.open(encoding="utf8") as f:
self._simple_data = _yaml.load(f)
except ScannerError as e:
raise ScannerError(
f"{e}\n**********************************************\n"
f"****** 可能为config.yaml配置文件填写不规范 ******\n"
f"**********************************************"
) from e
def set_name(self, module: str, name: str):
"""设置插件配置中文名出
参数:
module: 模块名
name: 中文名称
异常:
ValueError: module不能为为空
"""
if not module:
raise ValueError("set_name: module不能为为空")
if data := self._data.get(module):
data.name = name
def add_plugin_config(
self,
module: str,
key: str,
value: Any,
*,
help: str | None = None,
default_value: Any = None,
type: type | None = None,
arg_parser: Callable | None = None,
_override: bool = False,
):
"""为插件添加一个配置,不会被覆盖,只有第一个生效
参数:
module: 模块
key: 键
value: 值
help: 配置注解.
default_value: 默认值.
type: 值类型.
arg_parser: 值解析器,一般与webui配合使用.
_override: 强制覆盖值.
异常:
ValueError: module和key不能为为空
ValueError: 填写错误
"""
if not module or not key:
raise ValueError("add_plugin_config: module和key不能为为空")
if module in self._data and (config := self._data[module].configs.get(key)):
config.help = help
config.arg_parser = arg_parser
config.type = type
if _override:
config.value = value
config.default_value = default_value
else:
key = key.upper()
if not self._data.get(module):
self._data[module] = ConfigGroup(module=module)
self._data[module].configs[key] = ConfigModel(
value=value,
help=help,
default_value=default_value,
type=type,
)
def set_config(
self,
module: str,
key: str,
value: Any,
auto_save: bool = False,
):
"""设置配置值
参数:
module: 模块名
key: 配置名称
value: 值
auto_save: 自动保存.
"""
key = key.upper()
if module in self._data:
if self._data[module].configs.get(key):
self._data[module].configs[key].value = value
else:
self.add_plugin_config(module, key, value)
self._simple_data[module][key] = value
if auto_save:
self.save(save_simple_data=True)
def get_config(self, module: str, key: str, default: Any = None) -> Any:
"""获取指定配置值
参数:
module: 模块名
key: 配置键
default: 没有key值内容的默认返回值.
异常:
NoSuchConfig: 未查询到配置
返回:
Any: 配置值
"""
logger.debug(
f"尝试获取配置MODULE: [{module}] | KEY: [{key}]"
)
key = key.upper()
value = None
if module in self._data.keys():
config = self._data[module].configs.get(key) or self._data[
module
].configs.get(key)
if not config:
raise NoSuchConfig(
f"未查询到配置项 MODULE: [ {module} ] | KEY: [ {key} ]"
)
try:
if config.arg_parser:
value = config.arg_parser(value or config.default_value)
elif config.value is not None:
# try:
value = (
cattrs.structure(config.value, config.type)
if config.type
else config.value
)
elif config.default_value is not None:
value = (
cattrs.structure(config.default_value, config.type)
if config.type
else config.default_value
)
except Exception as e:
logger.warning(
f"配置项类型转换 MODULE: [{module}]"
" | KEY: [{key}]",
e=e,
)
value = config.value or config.default_value
if value is None:
value = default
logger.debug(
f"获取配置 MODULE: [{module}] | "
f" KEY: [{key}] -> [{value}]"
)
return value
def get(self, key: str) -> ConfigGroup:
"""获取插件配置数据
参数:
key: 键,一般为模块名
返回:
ConfigGroup: ConfigGroup
"""
return self._data.get(key) or ConfigGroup(module="")
def save(self, path: str | Path | None = None, save_simple_data: bool = False):
"""保存数据
参数:
path: 路径.
save_simple_data: 同时保存至config.yaml.
"""
if save_simple_data:
with open(self._simple_file, "w", encoding="utf8") as f:
_yaml.dump(self._simple_data, f)
path = path or self.file
data = {}
for module in self._data:
data[module] = {}
for config in self._data[module].configs:
value = self._data[module].configs[config].dict()
del value["type"]
del value["arg_parser"]
data[module][config] = value
with open(path, "w", encoding="utf8") as f:
_yaml.dump(data, f)
def reload(self):
"""重新加载配置文件"""
if self._simple_file.exists():
with open(self._simple_file, encoding="utf8") as f:
self._simple_data = _yaml.load(f)
for key in self._simple_data.keys():
for k in self._simple_data[key].keys():
self._data[key].configs[k].value = self._simple_data[key][k]
self.save()
def load_data(self):
"""加载数据
异常:
ValueError: 配置文件为空!
"""
if not self.file.exists():
return
with open(self.file, encoding="utf8") as f:
temp_data = _yaml.load(f)
if not temp_data:
self.file.unlink()
raise ValueError(
"配置文件为空!\n"
"***********************************************************\n"
"****** 配置文件 plugins2config.yaml 为空,已删除,请重启 ******\n"
"***********************************************************"
)
count = 0
for module in temp_data:
config_group = ConfigGroup(module=module)
for config in temp_data[module]:
config_group.configs[config] = ConfigModel(**temp_data[module][config])
count += 1
self._data[module] = config_group
logger.info(
f"加载配置完成,共加载 {len(temp_data)} 个配置组及对应"
f" {count} 个配置项"
)
def get_data(self) -> dict[str, ConfigGroup]:
return copy.deepcopy(self._data)
def is_empty(self) -> bool:
return not bool(self._data)
def keys(self):
return self._data.keys()
def __str__(self):
return str(self._data)
def __setitem__(self, key, value):
self._data[key] = value
def __getitem__(self, key):
return self._data[key]