From 5aee439bb27b6c3399e58b03b18d620bd49e00fc Mon Sep 17 00:00:00 2001
From: HibiKier <775757368@qq.com>
Date: Tue, 17 Jun 2025 17:43:10 +0800
Subject: [PATCH] =?UTF-8?q?:sparkles:=20Config=E9=85=8D=E7=BD=AE=E7=B1=BB?=
=?UTF-8?q?=E6=94=AF=E6=8C=81BaseModel=E5=AD=98=E5=82=A8?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
zhenxun/builtin_plugins/init/init_config.py | 8 +-
zhenxun/configs/utils/__init__.py | 490 +++++++-------------
zhenxun/configs/utils/models.py | 270 +++++++++++
3 files changed, 454 insertions(+), 314 deletions(-)
create mode 100644 zhenxun/configs/utils/models.py
diff --git a/zhenxun/builtin_plugins/init/init_config.py b/zhenxun/builtin_plugins/init/init_config.py
index eef63635..b25568d8 100644
--- a/zhenxun/builtin_plugins/init/init_config.py
+++ b/zhenxun/builtin_plugins/init/init_config.py
@@ -58,7 +58,7 @@ def _generate_simple_config(exists_module: list[str]):
生成简易配置
异常:
- AttributeError: _description_
+ AttributeError: AttributeError
"""
# 读取用户配置
_data = {}
@@ -74,7 +74,11 @@ def _generate_simple_config(exists_module: list[str]):
if _data.get(module) and k in _data[module].keys():
Config.set_config(module, k, _data[module][k])
if f"{module}:{k}".lower() in exists_module:
- _tmp_data[module][k] = Config.get_config(module, k)
+ if k == "TEST9":
+ print()
+ _tmp_data[module][k] = Config.get_config(
+ module, k, build_model=False
+ )
except AttributeError as e:
raise AttributeError(f"{e}\n可能为config.yaml配置文件填写不规范") from e
if not _tmp_data[module]:
diff --git a/zhenxun/configs/utils/__init__.py b/zhenxun/configs/utils/__init__.py
index 03bc7331..f2bf5b3f 100644
--- a/zhenxun/configs/utils/__init__.py
+++ b/zhenxun/configs/utils/__init__.py
@@ -1,89 +1,82 @@
from collections.abc import Callable
import copy
-from datetime import datetime
from pathlib import Path
-from typing import Any, Literal
+from typing import Any, TypeVar, get_args, get_origin
import cattrs
from nonebot.compat import model_dump
-from pydantic import BaseModel, Field
+from pydantic import VERSION, BaseModel, Field, TypeAdapter
from ruamel.yaml import YAML
from ruamel.yaml.scanner import ScannerError
from zhenxun.configs.path_config import DATA_PATH
from zhenxun.services.log import logger
-from zhenxun.utils.enum import BlockType, LimitWatchType, PluginLimitType, PluginType
+
+from .models import (
+ AICallableParam,
+ AICallableProperties,
+ AICallableTag,
+ BaseBlock,
+ Command,
+ ConfigModel,
+ Example,
+ PluginCdBlock,
+ PluginCountBlock,
+ PluginExtraData,
+ PluginSetting,
+ RegisterConfig,
+ Task,
+)
_yaml = YAML(pure=True)
_yaml.indent = 2
_yaml.allow_unicode = True
+T = TypeVar("T")
-class Example(BaseModel):
+
+class NoSuchConfig(Exception):
+ pass
+
+
+def _dump_pydantic_obj(obj: Any) -> Any:
"""
- 示例
+ 递归地将一个对象内部的 Pydantic BaseModel 实例转换为字典。
+ 支持单个实例、实例列表、实例字典等情况。
"""
-
- exec: str
- """执行命令"""
- description: str = ""
- """命令描述"""
+ if isinstance(obj, BaseModel):
+ return model_dump(obj)
+ if isinstance(obj, list):
+ return [_dump_pydantic_obj(item) for item in obj]
+ if isinstance(obj, dict):
+ return {key: _dump_pydantic_obj(value) for key, value in obj.items()}
+ return obj
-class Command(BaseModel):
+def _is_pydantic_type(t: Any) -> bool:
"""
- 具体参数说明
+ 递归检查一个类型注解是否与 Pydantic BaseModel 相关。
"""
-
- command: str
- """命令名称"""
- params: list[str] = Field(default_factory=list)
- """参数"""
- description: str = ""
- """描述"""
- examples: list[Example] = Field(default_factory=list)
- """示例列表"""
+ if t is None:
+ return False
+ origin = get_origin(t)
+ if origin:
+ return any(_is_pydantic_type(arg) for arg in get_args(t))
+ return isinstance(t, type) and issubclass(t, BaseModel)
-class RegisterConfig(BaseModel):
+def parse_as(type_: type[T], obj: Any) -> T:
"""
- 注册配置项
+ 一个兼容 Pydantic V1 的 parse_obj_as 和V2的TypeAdapter.validate_python 的辅助函数。
"""
+ if VERSION.startswith("1"):
+ from pydantic import parse_obj_as
- key: str
- """配置项键"""
- value: Any
- """配置项值"""
- module: str | None = None
- """模块名"""
- help: str | None
- """配置注解"""
- default_value: Any | None = None
- """默认值"""
- type: Any = None
- """参数类型"""
- arg_parser: Callable | None = None
- """参数解析"""
+ return parse_obj_as(type_, obj)
+ else:
+ from pydantic import TypeAdapter
-
-class ConfigModel(BaseModel):
- """
- 配置项
- """
-
- value: Any
- """配置项值"""
- help: str | None
- """配置注解"""
- default_value: Any | None = None
- """默认值"""
- type: Any = None
- """参数类型"""
- arg_parser: Callable | None = None
- """参数解析"""
-
- def to_dict(self, **kwargs):
- return model_dump(self, **kwargs)
+ return TypeAdapter(type_).validate_python(obj)
class ConfigGroup(BaseModel):
@@ -98,202 +91,41 @@ class ConfigGroup(BaseModel):
configs: dict[str, ConfigModel] = Field(default_factory=dict)
"""配置项列表"""
- def get(self, c: str, default: Any = None) -> Any:
- cfg = self.configs.get(c.upper())
- if cfg is not None:
- if cfg.value is not None:
- return cfg.value
- if cfg.default_value is not None:
- return cfg.default_value
- return default
+ def get(self, c: str, default: T = None, *, build_model: bool = True) -> T | Any:
+ """
+ 获取配置项的值。如果指定了类型,会自动构建实例。
+ """
+ key = c.upper()
+ cfg = self.configs.get(key)
+
+ if cfg is None:
+ return default
+
+ value_to_process = cfg.value if cfg.value is not None else cfg.default_value
+
+ if value_to_process is None:
+ return default
+
+ if cfg.type:
+ if _is_pydantic_type(cfg.type):
+ if build_model:
+ try:
+ return parse_as(cfg.type, value_to_process)
+ except Exception as e:
+ logger.warning(
+ f"Pydantic 模型解析失败 (key: {c.upper()}). ", e=e
+ )
+ try:
+ return cattrs.structure(value_to_process, cfg.type)
+ except Exception as e:
+ logger.warning(f"Cattrs 结构化失败 (key: {key}),返回原始值。", e=e)
+
+ return value_to_process
def to_dict(self, **kwargs):
return model_dump(self, **kwargs)
-class BaseBlock(BaseModel):
- """
- 插件阻断基本类(插件阻断限制)
- """
-
- status: bool = True
- """限制状态"""
- check_type: BlockType = BlockType.ALL
- """检查类型"""
- watch_type: LimitWatchType = LimitWatchType.USER
- """监听对象"""
- result: str | None = None
- """阻断时回复内容"""
- _type: PluginLimitType = PluginLimitType.BLOCK
- """类型"""
-
- def to_dict(self, **kwargs):
- return model_dump(self, **kwargs)
-
-
-class PluginCdBlock(BaseBlock):
- """
- 插件cd限制
- """
-
- cd: int = 5
- """cd"""
- _type: PluginLimitType = PluginLimitType.CD
- """类型"""
-
-
-class PluginCountBlock(BaseBlock):
- """
- 插件次数限制
- """
-
- max_count: int
- """最大调用次数"""
- _type: PluginLimitType = PluginLimitType.COUNT
- """类型"""
-
-
-class PluginSetting(BaseModel):
- """
- 插件基本配置
- """
-
- level: int = 5
- """群权限等级"""
- default_status: bool = True
- """进群默认开关状态"""
- limit_superuser: bool = False
- """是否限制超级用户"""
- cost_gold: int = 0
- """调用插件花费金币"""
- impression: float = 0.0
- """调用插件好感度限制"""
-
-
-class AICallableProperties(BaseModel):
- type: str
- """参数类型"""
- description: str
- """参数描述"""
- enums: list[str] | None = None
- """参数枚举"""
-
-
-class AICallableParam(BaseModel):
- type: str
- """类型"""
- properties: dict[str, AICallableProperties]
- """参数列表"""
- required: list[str]
- """必要参数"""
-
-
-class AICallableTag(BaseModel):
- name: str
- """工具名称"""
- parameters: AICallableParam | None = None
- """工具参数"""
- description: str
- """工具描述"""
- func: Callable | None = None
- """工具函数"""
-
- def to_dict(self):
- result = model_dump(self)
- del result["func"]
- return result
-
-
-class SchedulerModel(BaseModel):
- trigger: Literal["date", "interval", "cron"]
- """trigger"""
- day: int | None = None
- """天数"""
- hour: int | None = None
- """小时"""
- minute: int | None = None
- """分钟"""
- second: int | None = None
- """秒"""
- run_date: datetime | None = None
- """运行日期"""
- id: str | None = None
- """id"""
- max_instances: int | None = None
- """最大运行实例"""
- args: list | None = None
- """参数"""
- kwargs: dict | None = None
- """参数"""
-
-
-class Task(BaseBlock):
- module: str
- """被动技能模块名"""
- name: str
- """被动技能名称"""
- status: bool = True
- """全局开关状态"""
- create_status: bool = False
- """初次加载默认开关状态"""
- default_status: bool = True
- """进群时默认状态"""
- scheduler: SchedulerModel | None = None
- """定时任务配置"""
- run_func: Callable | None = None
- """运行函数"""
- check: Callable | None = None
- """检查函数"""
- check_args: list = Field(default_factory=list)
- """检查函数参数"""
-
-
-class PluginExtraData(BaseModel):
- """
- 插件扩展信息
- """
-
- author: str | None = None
- """作者"""
- version: str | None = None
- """版本"""
- plugin_type: PluginType = PluginType.NORMAL
- """插件类型"""
- menu_type: str = "功能"
- """菜单类型"""
- admin_level: int | None = None
- """管理员插件所需权限等级"""
- configs: list[RegisterConfig] | None = None
- """插件配置"""
- setting: PluginSetting | None = None
- """插件基本配置"""
- limits: list[BaseBlock | PluginCdBlock | PluginCountBlock] | None = None
- """插件限制"""
- commands: list[Command] = Field(default_factory=list)
- """命令列表,用于说明帮助"""
- ignore_prompt: bool = False
- """是否忽略阻断提示"""
- tasks: list[Task] | None = None
- """技能被动"""
- superuser_help: str | None = None
- """超级用户帮助"""
- aliases: set[str] = Field(default_factory=set)
- """额外名称"""
- sql_list: list[str] | None = None
- """常用sql"""
- is_show: bool = True
- """是否显示在菜单中"""
- smart_tools: list[AICallableTag] | None = None
- """智能模式函数工具集"""
-
- def to_dict(self, **kwargs):
- return model_dump(self, **kwargs)
-
-
-class NoSuchConfig(Exception):
- pass
-
-
class ConfigsManager:
"""
插件配置 与 资源 管理器
@@ -366,23 +198,32 @@ class ConfigsManager:
if not module or not key:
raise ValueError("add_plugin_config: module和key不能为为空")
+ if isinstance(value, BaseModel):
+ value = model_dump(value)
+ if isinstance(default_value, BaseModel):
+ default_value = model_dump(default_value)
+
+ processed_value = _dump_pydantic_obj(value)
+ processed_default_value = _dump_pydantic_obj(default_value)
+
self.add_module.append(f"{module}:{key}".lower())
if module in self._data and (config := self._data[module].configs.get(key)):
config.help = help
config.arg_parser = arg_parser
config.type = type
if _override:
- config.value = value
- config.default_value = default_value
+ config.value = processed_value
+ config.default_value = processed_default_value
else:
key = key.upper()
if not self._data.get(module):
self._data[module] = ConfigGroup(module=module)
self._data[module].configs[key] = ConfigModel(
- value=value,
+ value=processed_value,
help=help,
- default_value=default_value,
+ default_value=processed_default_value,
type=type,
+ arg_parser=arg_parser,
)
def set_config(
@@ -402,6 +243,8 @@ class ConfigsManager:
"""
key = key.upper()
if module in self._data:
+ if module not in self._simple_data:
+ self._simple_data[module] = {}
if self._data[module].configs.get(key):
self._data[module].configs[key].value = value
else:
@@ -410,63 +253,66 @@ class ConfigsManager:
if auto_save:
self.save(save_simple_data=True)
- def get_config(self, module: str, key: str, default: Any = None) -> Any:
- """获取指定配置值
-
- 参数:
- module: 模块名
- key: 配置键
- default: 没有key值内容的默认返回值.
-
- 异常:
- NoSuchConfig: 未查询到配置
-
- 返回:
- Any: 配置值
+ def get_config(
+ self,
+ module: str,
+ key: str,
+ default: T = None,
+ *,
+ build_model: bool = True,
+ ) -> T | Any:
+ """
+ 获取指定配置值,自动构建Pydantic模型或其它类型实例。
+ - 兼容Pydantic V1/V2。
+ - 支持 list[BaseModel] 等泛型容器。
+ - 优先使用Pydantic原生方式解析,失败后回退到cattrs。
"""
- logger.debug(
- f"尝试获取配置MODULE: [{module}] | KEY: [{key}]"
- )
key = key.upper()
- value = None
- if module in self._data.keys():
- config = self._data[module].configs.get(key) or self._data[
- module
- ].configs.get(key)
- if not config:
- raise NoSuchConfig(
- f"未查询到配置项 MODULE: [ {module} ] | KEY: [ {key} ]"
- )
+ config_group = self._data.get(module)
+ if not config_group:
+ return default
+
+ config = config_group.configs.get(key)
+ if not config:
+ return default
+
+ value_to_process = (
+ config.value if config.value is not None else config.default_value
+ )
+ if value_to_process is None:
+ return default
+
+ # 1. 最高优先级:自定义的参数解析器
+ if config.arg_parser:
try:
- if config.arg_parser:
- value = config.arg_parser(value or config.default_value)
- elif config.value is not None:
- # try:
- value = (
- cattrs.structure(config.value, config.type)
- if config.type
- else config.value
- )
- elif config.default_value is not None:
- value = (
- cattrs.structure(config.default_value, config.type)
- if config.type
- else config.default_value
- )
+ return config.arg_parser(value_to_process)
except Exception as e:
logger.warning(
- f"配置项类型转换 MODULE: [{module}]"
- " | KEY: [{key}]",
- e=e,
+ f"arg_parser 执行失败 (key: {key}),将尝试其他方法。", e=e
)
- value = config.value or config.default_value
- if value is None:
- value = default
- logger.debug(
- f"获取配置 MODULE: [{module}] | "
- f" KEY: [{key}] -> [{value}]"
- )
- return value
+
+ if config.type:
+ if _is_pydantic_type(config.type):
+ if build_model:
+ try:
+ return parse_as(config.type, value_to_process)
+ except Exception as e:
+ logger.warning(
+ f"pydantic类型转换失败 MODULE: [{module}] | "
+ f"KEY: [{key}].",
+ e=e,
+ )
+ else:
+ try:
+ return cattrs.structure(value_to_process, config.type)
+ except Exception as e:
+ logger.warning(
+ f"cattrs类型转换失败 MODULE: [{module}] | "
+ f"KEY: [{key}].",
+ e=e,
+ )
+
+ return value_to_process
def get(self, key: str) -> ConfigGroup:
"""获取插件配置数据
@@ -490,16 +336,16 @@ class ConfigsManager:
with open(self._simple_file, "w", encoding="utf8") as f:
_yaml.dump(self._simple_data, f)
path = path or self.file
- data = {}
- for module in self._data:
- data[module] = {}
- for config in self._data[module].configs:
- value = self._data[module].configs[config].dict()
- del value["type"]
- del value["arg_parser"]
- data[module][config] = value
+ save_data = {}
+ for module, config_group in self._data.items():
+ save_data[module] = {}
+ for config_key, config_model in config_group.configs.items():
+ save_data[module][config_key] = model_dump(
+ config_model, exclude={"type", "arg_parser"}
+ )
+
with open(path, "w", encoding="utf8") as f:
- _yaml.dump(data, f)
+ _yaml.dump(save_data, f)
def reload(self):
"""重新加载配置文件"""
@@ -558,3 +404,23 @@ class ConfigsManager:
def __getitem__(self, key):
return self._data[key]
+
+
+__all__ = [
+ "AICallableParam",
+ "AICallableProperties",
+ "AICallableTag",
+ "BaseBlock",
+ "Command",
+ "ConfigGroup",
+ "ConfigModel",
+ "ConfigsManager",
+ "Example",
+ "NoSuchConfig",
+ "PluginCdBlock",
+ "PluginCountBlock",
+ "PluginExtraData",
+ "PluginSetting",
+ "RegisterConfig",
+ "Task",
+]
diff --git a/zhenxun/configs/utils/models.py b/zhenxun/configs/utils/models.py
new file mode 100644
index 00000000..d3c0db7f
--- /dev/null
+++ b/zhenxun/configs/utils/models.py
@@ -0,0 +1,270 @@
+from collections.abc import Callable
+from datetime import datetime
+from typing import Any, Literal
+
+from nonebot.compat import model_dump
+from pydantic import BaseModel, Field
+
+from zhenxun.utils.enum import BlockType, LimitWatchType, PluginLimitType, PluginType
+
+__all__ = [
+ "AICallableParam",
+ "AICallableProperties",
+ "AICallableTag",
+ "BaseBlock",
+ "Command",
+ "ConfigModel",
+ "Example",
+ "PluginCdBlock",
+ "PluginCountBlock",
+ "PluginExtraData",
+ "PluginSetting",
+ "RegisterConfig",
+ "Task",
+]
+
+
+class Example(BaseModel):
+ """
+ 示例
+ """
+
+ exec: str
+ """执行命令"""
+ description: str = ""
+ """命令描述"""
+
+
+class Command(BaseModel):
+ """
+ 具体参数说明
+ """
+
+ command: str
+ """命令名称"""
+ params: list[str] = Field(default_factory=list)
+ """参数"""
+ description: str = ""
+ """描述"""
+ examples: list[Example] = Field(default_factory=list)
+ """示例列表"""
+
+
+class RegisterConfig(BaseModel):
+ """
+ 注册配置项
+ """
+
+ key: str
+ """配置项键"""
+ value: Any
+ """配置项值"""
+ module: str | None = None
+ """模块名"""
+ help: str | None
+ """配置注解"""
+ default_value: Any | None = None
+ """默认值"""
+ type: Any = None
+ """参数类型"""
+ arg_parser: Callable | None = None
+ """参数解析"""
+
+
+class ConfigModel(BaseModel):
+ """
+ 配置项
+ """
+
+ value: Any
+ """配置项值"""
+ help: str | None
+ """配置注解"""
+ default_value: Any | None = None
+ """默认值"""
+ type: Any = None
+ """参数类型"""
+ arg_parser: Callable | None = None
+ """参数解析"""
+
+ def to_dict(self, **kwargs):
+ return model_dump(self, **kwargs)
+
+
+class BaseBlock(BaseModel):
+ """
+ 插件阻断基本类(插件阻断限制)
+ """
+
+ status: bool = True
+ """限制状态"""
+ check_type: BlockType = BlockType.ALL
+ """检查类型"""
+ watch_type: LimitWatchType = LimitWatchType.USER
+ """监听对象"""
+ result: str | None = None
+ """阻断时回复内容"""
+ _type: PluginLimitType = PluginLimitType.BLOCK
+ """类型"""
+
+ def to_dict(self, **kwargs):
+ return model_dump(self, **kwargs)
+
+
+class PluginCdBlock(BaseBlock):
+ """
+ 插件cd限制
+ """
+
+ cd: int = 5
+ """cd"""
+ _type: PluginLimitType = PluginLimitType.CD
+ """类型"""
+
+
+class PluginCountBlock(BaseBlock):
+ """
+ 插件次数限制
+ """
+
+ max_count: int
+ """最大调用次数"""
+ _type: PluginLimitType = PluginLimitType.COUNT
+ """类型"""
+
+
+class PluginSetting(BaseModel):
+ """
+ 插件基本配置
+ """
+
+ level: int = 5
+ """群权限等级"""
+ default_status: bool = True
+ """进群默认开关状态"""
+ limit_superuser: bool = False
+ """是否限制超级用户"""
+ cost_gold: int = 0
+ """调用插件花费金币"""
+ impression: float = 0.0
+ """调用插件好感度限制"""
+
+
+class AICallableProperties(BaseModel):
+ type: str
+ """参数类型"""
+ description: str
+ """参数描述"""
+ enums: list[str] | None = None
+ """参数枚举"""
+
+
+class AICallableParam(BaseModel):
+ type: str
+ """类型"""
+ properties: dict[str, AICallableProperties]
+ """参数列表"""
+ required: list[str]
+ """必要参数"""
+
+
+class AICallableTag(BaseModel):
+ name: str
+ """工具名称"""
+ parameters: AICallableParam | None = None
+ """工具参数"""
+ description: str
+ """工具描述"""
+ func: Callable | None = None
+ """工具函数"""
+
+ def to_dict(self):
+ result = model_dump(self)
+ del result["func"]
+ return result
+
+
+class SchedulerModel(BaseModel):
+ trigger: Literal["date", "interval", "cron"]
+ """trigger"""
+ day: int | None = None
+ """天数"""
+ hour: int | None = None
+ """小时"""
+ minute: int | None = None
+ """分钟"""
+ second: int | None = None
+ """秒"""
+ run_date: datetime | None = None
+ """运行日期"""
+ id: str | None = None
+ """id"""
+ max_instances: int | None = None
+ """最大运行实例"""
+ args: list | None = None
+ """参数"""
+ kwargs: dict | None = None
+ """参数"""
+
+
+class Task(BaseBlock):
+ module: str
+ """被动技能模块名"""
+ name: str
+ """被动技能名称"""
+ status: bool = True
+ """全局开关状态"""
+ create_status: bool = False
+ """初次加载默认开关状态"""
+ default_status: bool = True
+ """进群时默认状态"""
+ scheduler: SchedulerModel | None = None
+ """定时任务配置"""
+ run_func: Callable | None = None
+ """运行函数"""
+ check: Callable | None = None
+ """检查函数"""
+ check_args: list = Field(default_factory=list)
+ """检查函数参数"""
+
+
+class PluginExtraData(BaseModel):
+ """
+ 插件扩展信息
+ """
+
+ author: str | None = None
+ """作者"""
+ version: str | None = None
+ """版本"""
+ plugin_type: PluginType = PluginType.NORMAL
+ """插件类型"""
+ menu_type: str = "功能"
+ """菜单类型"""
+ admin_level: int | None = None
+ """管理员插件所需权限等级"""
+ configs: list[RegisterConfig] | None = None
+ """插件配置"""
+ setting: PluginSetting | None = None
+ """插件基本配置"""
+ limits: list[BaseBlock | PluginCdBlock | PluginCountBlock] | None = None
+ """插件限制"""
+ commands: list[Command] = Field(default_factory=list)
+ """命令列表,用于说明帮助"""
+ ignore_prompt: bool = False
+ """是否忽略阻断提示"""
+ tasks: list[Task] | None = None
+ """技能被动"""
+ superuser_help: str | None = None
+ """超级用户帮助"""
+ aliases: set[str] = Field(default_factory=set)
+ """额外名称"""
+ sql_list: list[str] | None = None
+ """常用sql"""
+ is_show: bool = True
+ """是否显示在菜单中"""
+ smart_tools: list[AICallableTag] | None = None
+ """智能模式函数工具集"""
+
+ def to_dict(self, **kwargs):
+ return model_dump(self, **kwargs)