♻️ refactor(pydantic): 提取 Pydantic 兼容函数到独立模块

This commit is contained in:
webjoin111 2025-08-02 13:19:53 +08:00
parent 59d72c3b3d
commit e4f087e661
3 changed files with 97 additions and 56 deletions

View File

@ -1,16 +1,21 @@
from collections.abc import Callable
import copy
from pathlib import Path
from typing import Any, TypeVar, get_args, get_origin
from typing import Any, TypeVar
import cattrs
from nonebot.compat import model_dump
from pydantic import VERSION, BaseModel, Field
from pydantic import BaseModel, Field
from ruamel.yaml import YAML
from ruamel.yaml.scanner import ScannerError
from zhenxun.configs.path_config import DATA_PATH
from zhenxun.services.log import logger
from zhenxun.utils.pydantic_compat import (
_dump_pydantic_obj,
_is_pydantic_type,
model_dump,
parse_as,
)
from .models import (
AICallableParam,
@ -39,46 +44,6 @@ class NoSuchConfig(Exception):
pass
def _dump_pydantic_obj(obj: Any) -> Any:
"""
递归地将一个对象内部的 Pydantic BaseModel 实例转换为字典
支持单个实例实例列表实例字典等情况
"""
if isinstance(obj, BaseModel):
return model_dump(obj)
if isinstance(obj, list):
return [_dump_pydantic_obj(item) for item in obj]
if isinstance(obj, dict):
return {key: _dump_pydantic_obj(value) for key, value in obj.items()}
return obj
def _is_pydantic_type(t: Any) -> bool:
"""
递归检查一个类型注解是否与 Pydantic BaseModel 相关
"""
if t is None:
return False
origin = get_origin(t)
if origin:
return any(_is_pydantic_type(arg) for arg in get_args(t))
return isinstance(t, type) and issubclass(t, BaseModel)
def parse_as(type_: type[T], obj: Any) -> T:
"""
一个兼容 Pydantic V1 parse_obj_as 和V2的TypeAdapter.validate_python 的辅助函数
"""
if VERSION.startswith("1"):
from pydantic import parse_obj_as
return parse_obj_as(type_, obj)
else:
from pydantic import TypeAdapter # type: ignore
return TypeAdapter(type_).validate_python(obj)
class ConfigGroup(BaseModel):
"""
配置组
@ -194,16 +159,11 @@ class ConfigsManager:
"""
result = dict(original_data)
# 遍历新数据的键
for key, value in new_data.items():
# 如果键不在原数据中,添加它
if key not in original_data:
result[key] = value
# 如果两边都是字典,递归处理
elif isinstance(value, dict) and isinstance(original_data[key], dict):
result[key] = self._merge_dicts(value, original_data[key])
# 如果键已存在,保留原值,不覆盖
# (不做任何操作,保持原值)
return result
@ -217,15 +177,11 @@ class ConfigsManager:
返回:
标准化后的值
"""
# 处理BaseModel
processed_value = _dump_pydantic_obj(value)
# 如果处理后的值是字典,且原始值也存在
if isinstance(processed_value, dict) and original_value is not None:
# 处理原始值
processed_original = _dump_pydantic_obj(original_value)
# 如果原始值也是字典,合并它们
if isinstance(processed_original, dict):
return self._merge_dicts(processed_value, processed_original)
@ -263,12 +219,10 @@ class ConfigsManager:
if not module or not key:
raise ValueError("add_plugin_config: module和key不能为为空")
# 获取现有配置值(如果存在)
existing_value = None
if module in self._data and (config := self._data[module].configs.get(key)):
existing_value = config.value
# 标准化值和默认值
processed_value = self._normalize_config_data(value, existing_value)
processed_default_value = self._normalize_config_data(default_value)
@ -348,7 +302,6 @@ class ConfigsManager:
if value_to_process is None:
return default
# 1. 最高优先级:自定义的参数解析器
if config.arg_parser:
try:
return config.arg_parser(value_to_process)

View File

@ -2,10 +2,10 @@ from collections.abc import Callable
from datetime import datetime
from typing import Any, Literal
from nonebot.compat import model_dump
from pydantic import BaseModel, Field
from zhenxun.utils.enum import BlockType, LimitWatchType, PluginLimitType, PluginType
from zhenxun.utils.pydantic_compat import model_dump
__all__ = [
"AICallableParam",

View File

@ -0,0 +1,88 @@
"""
Pydantic V1 & V2 兼容层模块
Pydantic V1 V2 版本提供统一的便捷函数与类
包括 model_dump, model_copy, model_json_schema, parse_as
"""
from typing import Any, TypeVar, get_args, get_origin
from nonebot.compat import PYDANTIC_V2, model_dump
from pydantic import VERSION, BaseModel
T = TypeVar("T", bound=BaseModel)
V = TypeVar("V")
__all__ = [
"PYDANTIC_V2",
"_dump_pydantic_obj",
"_is_pydantic_type",
"model_copy",
"model_dump",
"model_json_schema",
"parse_as",
]
def model_copy(
model: T, *, update: dict[str, Any] | None = None, deep: bool = False
) -> T:
"""
Pydantic `model.copy()` (v1) `model.model_copy()` (v2) 的兼容函数
"""
if PYDANTIC_V2:
return model.model_copy(update=update, deep=deep)
else:
update_dict = update or {}
return model.copy(update=update_dict, deep=deep)
def model_json_schema(model_class: type[BaseModel], **kwargs: Any) -> dict[str, Any]:
"""
Pydantic `Model.schema()` (v1) `Model.model_json_schema()` (v2) 的兼容函数
"""
if PYDANTIC_V2:
return model_class.model_json_schema(**kwargs)
else:
return model_class.schema(by_alias=kwargs.get("by_alias", True))
def _is_pydantic_type(t: Any) -> bool:
"""
递归检查一个类型注解是否与 Pydantic BaseModel 相关
"""
if t is None:
return False
origin = get_origin(t)
if origin:
return any(_is_pydantic_type(arg) for arg in get_args(t))
return isinstance(t, type) and issubclass(t, BaseModel)
def _dump_pydantic_obj(obj: Any) -> Any:
"""
递归地将一个对象内部的 Pydantic BaseModel 实例转换为字典
支持单个实例实例列表实例字典等情况
"""
if isinstance(obj, BaseModel):
return model_dump(obj)
if isinstance(obj, list):
return [_dump_pydantic_obj(item) for item in obj]
if isinstance(obj, dict):
return {key: _dump_pydantic_obj(value) for key, value in obj.items()}
return obj
def parse_as(type_: type[V], obj: Any) -> V:
"""
一个兼容 Pydantic V1 parse_obj_as 和V2的TypeAdapter.validate_python 的辅助函数
"""
if VERSION.startswith("1"):
from pydantic import parse_obj_as
return parse_obj_as(type_, obj)
else:
from pydantic import TypeAdapter # type: ignore
return TypeAdapter(type_).validate_python(obj)