Config配置类支持BaseModel存储

This commit is contained in:
HibiKier 2025-06-17 17:43:10 +08:00
parent d47ecb215e
commit 5aee439bb2
3 changed files with 454 additions and 314 deletions

View File

@ -58,7 +58,7 @@ def _generate_simple_config(exists_module: list[str]):
生成简易配置 生成简易配置
异常: 异常:
AttributeError: _description_ AttributeError: AttributeError
""" """
# 读取用户配置 # 读取用户配置
_data = {} _data = {}
@ -74,7 +74,11 @@ def _generate_simple_config(exists_module: list[str]):
if _data.get(module) and k in _data[module].keys(): if _data.get(module) and k in _data[module].keys():
Config.set_config(module, k, _data[module][k]) Config.set_config(module, k, _data[module][k])
if f"{module}:{k}".lower() in exists_module: if f"{module}:{k}".lower() in exists_module:
_tmp_data[module][k] = Config.get_config(module, k) if k == "TEST9":
print()
_tmp_data[module][k] = Config.get_config(
module, k, build_model=False
)
except AttributeError as e: except AttributeError as e:
raise AttributeError(f"{e}\n可能为config.yaml配置文件填写不规范") from e raise AttributeError(f"{e}\n可能为config.yaml配置文件填写不规范") from e
if not _tmp_data[module]: if not _tmp_data[module]:

View File

@ -1,89 +1,82 @@
from collections.abc import Callable from collections.abc import Callable
import copy import copy
from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Any, Literal from typing import Any, TypeVar, get_args, get_origin
import cattrs import cattrs
from nonebot.compat import model_dump from nonebot.compat import model_dump
from pydantic import BaseModel, Field from pydantic import VERSION, BaseModel, Field, TypeAdapter
from ruamel.yaml import YAML from ruamel.yaml import YAML
from ruamel.yaml.scanner import ScannerError from ruamel.yaml.scanner import ScannerError
from zhenxun.configs.path_config import DATA_PATH from zhenxun.configs.path_config import DATA_PATH
from zhenxun.services.log import logger from zhenxun.services.log import logger
from zhenxun.utils.enum import BlockType, LimitWatchType, PluginLimitType, PluginType
from .models import (
AICallableParam,
AICallableProperties,
AICallableTag,
BaseBlock,
Command,
ConfigModel,
Example,
PluginCdBlock,
PluginCountBlock,
PluginExtraData,
PluginSetting,
RegisterConfig,
Task,
)
_yaml = YAML(pure=True) _yaml = YAML(pure=True)
_yaml.indent = 2 _yaml.indent = 2
_yaml.allow_unicode = True _yaml.allow_unicode = True
T = TypeVar("T")
class Example(BaseModel):
class NoSuchConfig(Exception):
pass
def _dump_pydantic_obj(obj: Any) -> Any:
""" """
示例 递归地将一个对象内部的 Pydantic BaseModel 实例转换为字典
支持单个实例实例列表实例字典等情况
""" """
if isinstance(obj, BaseModel):
exec: str return model_dump(obj)
"""执行命令""" if isinstance(obj, list):
description: str = "" return [_dump_pydantic_obj(item) for item in obj]
"""命令描述""" if isinstance(obj, dict):
return {key: _dump_pydantic_obj(value) for key, value in obj.items()}
return obj
class Command(BaseModel): def _is_pydantic_type(t: Any) -> bool:
""" """
具体参数说明 递归检查一个类型注解是否与 Pydantic BaseModel 相关
""" """
if t is None:
command: str return False
"""命令名称""" origin = get_origin(t)
params: list[str] = Field(default_factory=list) if origin:
"""参数""" return any(_is_pydantic_type(arg) for arg in get_args(t))
description: str = "" return isinstance(t, type) and issubclass(t, BaseModel)
"""描述"""
examples: list[Example] = Field(default_factory=list)
"""示例列表"""
class RegisterConfig(BaseModel): def parse_as(type_: type[T], obj: Any) -> T:
""" """
注册配置项 一个兼容 Pydantic V1 parse_obj_as 和V2的TypeAdapter.validate_python 的辅助函数
""" """
if VERSION.startswith("1"):
from pydantic import parse_obj_as
key: str return parse_obj_as(type_, obj)
"""配置项键""" else:
value: Any from pydantic import TypeAdapter
"""配置项值"""
module: str | None = None
"""模块名"""
help: str | None
"""配置注解"""
default_value: Any | None = None
"""默认值"""
type: Any = None
"""参数类型"""
arg_parser: Callable | None = None
"""参数解析"""
return TypeAdapter(type_).validate_python(obj)
class ConfigModel(BaseModel):
"""
配置项
"""
value: Any
"""配置项值"""
help: str | None
"""配置注解"""
default_value: Any | None = None
"""默认值"""
type: Any = None
"""参数类型"""
arg_parser: Callable | None = None
"""参数解析"""
def to_dict(self, **kwargs):
return model_dump(self, **kwargs)
class ConfigGroup(BaseModel): class ConfigGroup(BaseModel):
@ -98,202 +91,41 @@ class ConfigGroup(BaseModel):
configs: dict[str, ConfigModel] = Field(default_factory=dict) configs: dict[str, ConfigModel] = Field(default_factory=dict)
"""配置项列表""" """配置项列表"""
def get(self, c: str, default: Any = None) -> Any: def get(self, c: str, default: T = None, *, build_model: bool = True) -> T | Any:
cfg = self.configs.get(c.upper()) """
if cfg is not None: 获取配置项的值如果指定了类型会自动构建实例
if cfg.value is not None: """
return cfg.value key = c.upper()
if cfg.default_value is not None: cfg = self.configs.get(key)
return cfg.default_value
return default if cfg is None:
return default
value_to_process = cfg.value if cfg.value is not None else cfg.default_value
if value_to_process is None:
return default
if cfg.type:
if _is_pydantic_type(cfg.type):
if build_model:
try:
return parse_as(cfg.type, value_to_process)
except Exception as e:
logger.warning(
f"Pydantic 模型解析失败 (key: {c.upper()}). ", e=e
)
try:
return cattrs.structure(value_to_process, cfg.type)
except Exception as e:
logger.warning(f"Cattrs 结构化失败 (key: {key}),返回原始值。", e=e)
return value_to_process
def to_dict(self, **kwargs): def to_dict(self, **kwargs):
return model_dump(self, **kwargs) return model_dump(self, **kwargs)
class BaseBlock(BaseModel):
"""
插件阻断基本类插件阻断限制
"""
status: bool = True
"""限制状态"""
check_type: BlockType = BlockType.ALL
"""检查类型"""
watch_type: LimitWatchType = LimitWatchType.USER
"""监听对象"""
result: str | None = None
"""阻断时回复内容"""
_type: PluginLimitType = PluginLimitType.BLOCK
"""类型"""
def to_dict(self, **kwargs):
return model_dump(self, **kwargs)
class PluginCdBlock(BaseBlock):
"""
插件cd限制
"""
cd: int = 5
"""cd"""
_type: PluginLimitType = PluginLimitType.CD
"""类型"""
class PluginCountBlock(BaseBlock):
"""
插件次数限制
"""
max_count: int
"""最大调用次数"""
_type: PluginLimitType = PluginLimitType.COUNT
"""类型"""
class PluginSetting(BaseModel):
"""
插件基本配置
"""
level: int = 5
"""群权限等级"""
default_status: bool = True
"""进群默认开关状态"""
limit_superuser: bool = False
"""是否限制超级用户"""
cost_gold: int = 0
"""调用插件花费金币"""
impression: float = 0.0
"""调用插件好感度限制"""
class AICallableProperties(BaseModel):
type: str
"""参数类型"""
description: str
"""参数描述"""
enums: list[str] | None = None
"""参数枚举"""
class AICallableParam(BaseModel):
type: str
"""类型"""
properties: dict[str, AICallableProperties]
"""参数列表"""
required: list[str]
"""必要参数"""
class AICallableTag(BaseModel):
name: str
"""工具名称"""
parameters: AICallableParam | None = None
"""工具参数"""
description: str
"""工具描述"""
func: Callable | None = None
"""工具函数"""
def to_dict(self):
result = model_dump(self)
del result["func"]
return result
class SchedulerModel(BaseModel):
trigger: Literal["date", "interval", "cron"]
"""trigger"""
day: int | None = None
"""天数"""
hour: int | None = None
"""小时"""
minute: int | None = None
"""分钟"""
second: int | None = None
""""""
run_date: datetime | None = None
"""运行日期"""
id: str | None = None
"""id"""
max_instances: int | None = None
"""最大运行实例"""
args: list | None = None
"""参数"""
kwargs: dict | None = None
"""参数"""
class Task(BaseBlock):
module: str
"""被动技能模块名"""
name: str
"""被动技能名称"""
status: bool = True
"""全局开关状态"""
create_status: bool = False
"""初次加载默认开关状态"""
default_status: bool = True
"""进群时默认状态"""
scheduler: SchedulerModel | None = None
"""定时任务配置"""
run_func: Callable | None = None
"""运行函数"""
check: Callable | None = None
"""检查函数"""
check_args: list = Field(default_factory=list)
"""检查函数参数"""
class PluginExtraData(BaseModel):
"""
插件扩展信息
"""
author: str | None = None
"""作者"""
version: str | None = None
"""版本"""
plugin_type: PluginType = PluginType.NORMAL
"""插件类型"""
menu_type: str = "功能"
"""菜单类型"""
admin_level: int | None = None
"""管理员插件所需权限等级"""
configs: list[RegisterConfig] | None = None
"""插件配置"""
setting: PluginSetting | None = None
"""插件基本配置"""
limits: list[BaseBlock | PluginCdBlock | PluginCountBlock] | None = None
"""插件限制"""
commands: list[Command] = Field(default_factory=list)
"""命令列表,用于说明帮助"""
ignore_prompt: bool = False
"""是否忽略阻断提示"""
tasks: list[Task] | None = None
"""技能被动"""
superuser_help: str | None = None
"""超级用户帮助"""
aliases: set[str] = Field(default_factory=set)
"""额外名称"""
sql_list: list[str] | None = None
"""常用sql"""
is_show: bool = True
"""是否显示在菜单中"""
smart_tools: list[AICallableTag] | None = None
"""智能模式函数工具集"""
def to_dict(self, **kwargs):
return model_dump(self, **kwargs)
class NoSuchConfig(Exception):
pass
class ConfigsManager: class ConfigsManager:
""" """
插件配置 资源 管理器 插件配置 资源 管理器
@ -366,23 +198,32 @@ class ConfigsManager:
if not module or not key: if not module or not key:
raise ValueError("add_plugin_config: module和key不能为为空") raise ValueError("add_plugin_config: module和key不能为为空")
if isinstance(value, BaseModel):
value = model_dump(value)
if isinstance(default_value, BaseModel):
default_value = model_dump(default_value)
processed_value = _dump_pydantic_obj(value)
processed_default_value = _dump_pydantic_obj(default_value)
self.add_module.append(f"{module}:{key}".lower()) self.add_module.append(f"{module}:{key}".lower())
if module in self._data and (config := self._data[module].configs.get(key)): if module in self._data and (config := self._data[module].configs.get(key)):
config.help = help config.help = help
config.arg_parser = arg_parser config.arg_parser = arg_parser
config.type = type config.type = type
if _override: if _override:
config.value = value config.value = processed_value
config.default_value = default_value config.default_value = processed_default_value
else: else:
key = key.upper() key = key.upper()
if not self._data.get(module): if not self._data.get(module):
self._data[module] = ConfigGroup(module=module) self._data[module] = ConfigGroup(module=module)
self._data[module].configs[key] = ConfigModel( self._data[module].configs[key] = ConfigModel(
value=value, value=processed_value,
help=help, help=help,
default_value=default_value, default_value=processed_default_value,
type=type, type=type,
arg_parser=arg_parser,
) )
def set_config( def set_config(
@ -402,6 +243,8 @@ class ConfigsManager:
""" """
key = key.upper() key = key.upper()
if module in self._data: if module in self._data:
if module not in self._simple_data:
self._simple_data[module] = {}
if self._data[module].configs.get(key): if self._data[module].configs.get(key):
self._data[module].configs[key].value = value self._data[module].configs[key].value = value
else: else:
@ -410,63 +253,66 @@ class ConfigsManager:
if auto_save: if auto_save:
self.save(save_simple_data=True) self.save(save_simple_data=True)
def get_config(self, module: str, key: str, default: Any = None) -> Any: def get_config(
"""获取指定配置值 self,
module: str,
参数: key: str,
module: 模块名 default: T = None,
key: 配置键 *,
default: 没有key值内容的默认返回值. build_model: bool = True,
) -> T | Any:
异常: """
NoSuchConfig: 未查询到配置 获取指定配置值自动构建Pydantic模型或其它类型实例
- 兼容Pydantic V1/V2
返回: - 支持 list[BaseModel] 等泛型容器
Any: 配置值 - 优先使用Pydantic原生方式解析失败后回退到cattrs
""" """
logger.debug(
f"尝试获取配置MODULE: [<u><y>{module}</y></u>] | KEY: [<u><y>{key}</y></u>]"
)
key = key.upper() key = key.upper()
value = None config_group = self._data.get(module)
if module in self._data.keys(): if not config_group:
config = self._data[module].configs.get(key) or self._data[ return default
module
].configs.get(key) config = config_group.configs.get(key)
if not config: if not config:
raise NoSuchConfig( return default
f"未查询到配置项 MODULE: [ {module} ] | KEY: [ {key} ]"
) value_to_process = (
config.value if config.value is not None else config.default_value
)
if value_to_process is None:
return default
# 1. 最高优先级:自定义的参数解析器
if config.arg_parser:
try: try:
if config.arg_parser: return config.arg_parser(value_to_process)
value = config.arg_parser(value or config.default_value)
elif config.value is not None:
# try:
value = (
cattrs.structure(config.value, config.type)
if config.type
else config.value
)
elif config.default_value is not None:
value = (
cattrs.structure(config.default_value, config.type)
if config.type
else config.default_value
)
except Exception as e: except Exception as e:
logger.warning( logger.warning(
f"配置项类型转换 MODULE: [<u><y>{module}</y></u>]" f"arg_parser 执行失败 (key: {key}),将尝试其他方法。", e=e
" | KEY: [<u><y>{key}</y></u>]",
e=e,
) )
value = config.value or config.default_value
if value is None: if config.type:
value = default if _is_pydantic_type(config.type):
logger.debug( if build_model:
f"获取配置 MODULE: [<u><y>{module}</y></u>] | " try:
f" KEY: [<u><y>{key}</y></u>] -> [<u><c>{value}</c></u>]" return parse_as(config.type, value_to_process)
) except Exception as e:
return value logger.warning(
f"pydantic类型转换失败 MODULE: [<u><y>{module}</y></u>] | "
f"KEY: [<u><y>{key}</y></u>].",
e=e,
)
else:
try:
return cattrs.structure(value_to_process, config.type)
except Exception as e:
logger.warning(
f"cattrs类型转换失败 MODULE: [<u><y>{module}</y></u>] | "
f"KEY: [<u><y>{key}</y></u>].",
e=e,
)
return value_to_process
def get(self, key: str) -> ConfigGroup: def get(self, key: str) -> ConfigGroup:
"""获取插件配置数据 """获取插件配置数据
@ -490,16 +336,16 @@ class ConfigsManager:
with open(self._simple_file, "w", encoding="utf8") as f: with open(self._simple_file, "w", encoding="utf8") as f:
_yaml.dump(self._simple_data, f) _yaml.dump(self._simple_data, f)
path = path or self.file path = path or self.file
data = {} save_data = {}
for module in self._data: for module, config_group in self._data.items():
data[module] = {} save_data[module] = {}
for config in self._data[module].configs: for config_key, config_model in config_group.configs.items():
value = self._data[module].configs[config].dict() save_data[module][config_key] = model_dump(
del value["type"] config_model, exclude={"type", "arg_parser"}
del value["arg_parser"] )
data[module][config] = value
with open(path, "w", encoding="utf8") as f: with open(path, "w", encoding="utf8") as f:
_yaml.dump(data, f) _yaml.dump(save_data, f)
def reload(self): def reload(self):
"""重新加载配置文件""" """重新加载配置文件"""
@ -558,3 +404,23 @@ class ConfigsManager:
def __getitem__(self, key): def __getitem__(self, key):
return self._data[key] return self._data[key]
__all__ = [
"AICallableParam",
"AICallableProperties",
"AICallableTag",
"BaseBlock",
"Command",
"ConfigGroup",
"ConfigModel",
"ConfigsManager",
"Example",
"NoSuchConfig",
"PluginCdBlock",
"PluginCountBlock",
"PluginExtraData",
"PluginSetting",
"RegisterConfig",
"Task",
]

View File

@ -0,0 +1,270 @@
from collections.abc import Callable
from datetime import datetime
from typing import Any, Literal
from nonebot.compat import model_dump
from pydantic import BaseModel, Field
from zhenxun.utils.enum import BlockType, LimitWatchType, PluginLimitType, PluginType
__all__ = [
"AICallableParam",
"AICallableProperties",
"AICallableTag",
"BaseBlock",
"Command",
"ConfigModel",
"Example",
"PluginCdBlock",
"PluginCountBlock",
"PluginExtraData",
"PluginSetting",
"RegisterConfig",
"Task",
]
class Example(BaseModel):
"""
示例
"""
exec: str
"""执行命令"""
description: str = ""
"""命令描述"""
class Command(BaseModel):
"""
具体参数说明
"""
command: str
"""命令名称"""
params: list[str] = Field(default_factory=list)
"""参数"""
description: str = ""
"""描述"""
examples: list[Example] = Field(default_factory=list)
"""示例列表"""
class RegisterConfig(BaseModel):
"""
注册配置项
"""
key: str
"""配置项键"""
value: Any
"""配置项值"""
module: str | None = None
"""模块名"""
help: str | None
"""配置注解"""
default_value: Any | None = None
"""默认值"""
type: Any = None
"""参数类型"""
arg_parser: Callable | None = None
"""参数解析"""
class ConfigModel(BaseModel):
"""
配置项
"""
value: Any
"""配置项值"""
help: str | None
"""配置注解"""
default_value: Any | None = None
"""默认值"""
type: Any = None
"""参数类型"""
arg_parser: Callable | None = None
"""参数解析"""
def to_dict(self, **kwargs):
return model_dump(self, **kwargs)
class BaseBlock(BaseModel):
"""
插件阻断基本类插件阻断限制
"""
status: bool = True
"""限制状态"""
check_type: BlockType = BlockType.ALL
"""检查类型"""
watch_type: LimitWatchType = LimitWatchType.USER
"""监听对象"""
result: str | None = None
"""阻断时回复内容"""
_type: PluginLimitType = PluginLimitType.BLOCK
"""类型"""
def to_dict(self, **kwargs):
return model_dump(self, **kwargs)
class PluginCdBlock(BaseBlock):
"""
插件cd限制
"""
cd: int = 5
"""cd"""
_type: PluginLimitType = PluginLimitType.CD
"""类型"""
class PluginCountBlock(BaseBlock):
"""
插件次数限制
"""
max_count: int
"""最大调用次数"""
_type: PluginLimitType = PluginLimitType.COUNT
"""类型"""
class PluginSetting(BaseModel):
"""
插件基本配置
"""
level: int = 5
"""群权限等级"""
default_status: bool = True
"""进群默认开关状态"""
limit_superuser: bool = False
"""是否限制超级用户"""
cost_gold: int = 0
"""调用插件花费金币"""
impression: float = 0.0
"""调用插件好感度限制"""
class AICallableProperties(BaseModel):
type: str
"""参数类型"""
description: str
"""参数描述"""
enums: list[str] | None = None
"""参数枚举"""
class AICallableParam(BaseModel):
type: str
"""类型"""
properties: dict[str, AICallableProperties]
"""参数列表"""
required: list[str]
"""必要参数"""
class AICallableTag(BaseModel):
name: str
"""工具名称"""
parameters: AICallableParam | None = None
"""工具参数"""
description: str
"""工具描述"""
func: Callable | None = None
"""工具函数"""
def to_dict(self):
result = model_dump(self)
del result["func"]
return result
class SchedulerModel(BaseModel):
trigger: Literal["date", "interval", "cron"]
"""trigger"""
day: int | None = None
"""天数"""
hour: int | None = None
"""小时"""
minute: int | None = None
"""分钟"""
second: int | None = None
""""""
run_date: datetime | None = None
"""运行日期"""
id: str | None = None
"""id"""
max_instances: int | None = None
"""最大运行实例"""
args: list | None = None
"""参数"""
kwargs: dict | None = None
"""参数"""
class Task(BaseBlock):
module: str
"""被动技能模块名"""
name: str
"""被动技能名称"""
status: bool = True
"""全局开关状态"""
create_status: bool = False
"""初次加载默认开关状态"""
default_status: bool = True
"""进群时默认状态"""
scheduler: SchedulerModel | None = None
"""定时任务配置"""
run_func: Callable | None = None
"""运行函数"""
check: Callable | None = None
"""检查函数"""
check_args: list = Field(default_factory=list)
"""检查函数参数"""
class PluginExtraData(BaseModel):
"""
插件扩展信息
"""
author: str | None = None
"""作者"""
version: str | None = None
"""版本"""
plugin_type: PluginType = PluginType.NORMAL
"""插件类型"""
menu_type: str = "功能"
"""菜单类型"""
admin_level: int | None = None
"""管理员插件所需权限等级"""
configs: list[RegisterConfig] | None = None
"""插件配置"""
setting: PluginSetting | None = None
"""插件基本配置"""
limits: list[BaseBlock | PluginCdBlock | PluginCountBlock] | None = None
"""插件限制"""
commands: list[Command] = Field(default_factory=list)
"""命令列表,用于说明帮助"""
ignore_prompt: bool = False
"""是否忽略阻断提示"""
tasks: list[Task] | None = None
"""技能被动"""
superuser_help: str | None = None
"""超级用户帮助"""
aliases: set[str] = Field(default_factory=set)
"""额外名称"""
sql_list: list[str] | None = None
"""常用sql"""
is_show: bool = True
"""是否显示在菜单中"""
smart_tools: list[AICallableTag] | None = None
"""智能模式函数工具集"""
def to_dict(self, **kwargs):
return model_dump(self, **kwargs)