2025-08-04 23:36:12 +08:00
|
|
|
|
"""
|
|
|
|
|
|
Pydantic V1 & V2 兼容层模块
|
|
|
|
|
|
|
|
|
|
|
|
为 Pydantic V1 与 V2 版本提供统一的便捷函数与类,
|
|
|
|
|
|
包括 model_dump, model_copy, model_json_schema, parse_as 等。
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
2025-10-09 08:50:40 +08:00
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
from enum import Enum
|
|
|
|
|
|
from pathlib import Path
|
2025-08-04 23:36:12 +08:00
|
|
|
|
from typing import Any, TypeVar, get_args, get_origin
|
|
|
|
|
|
|
2025-12-07 18:57:55 +08:00
|
|
|
|
from nonebot.compat import (
|
|
|
|
|
|
PYDANTIC_V2,
|
|
|
|
|
|
model_dump,
|
|
|
|
|
|
model_fields,
|
|
|
|
|
|
type_validate_json,
|
|
|
|
|
|
type_validate_python,
|
|
|
|
|
|
)
|
|
|
|
|
|
from pydantic import BaseModel
|
2025-10-09 08:50:40 +08:00
|
|
|
|
import ujson as json
|
2025-08-04 23:36:12 +08:00
|
|
|
|
|
|
|
|
|
|
T = TypeVar("T", bound=BaseModel)
|
|
|
|
|
|
V = TypeVar("V")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__all__ = [
|
|
|
|
|
|
"PYDANTIC_V2",
|
|
|
|
|
|
"_dump_pydantic_obj",
|
|
|
|
|
|
"_is_pydantic_type",
|
2025-08-28 09:20:15 +08:00
|
|
|
|
"compat_computed_field",
|
2025-10-09 08:50:40 +08:00
|
|
|
|
"dump_json_safely",
|
2025-12-01 14:52:36 +08:00
|
|
|
|
"model_construct",
|
2025-08-04 23:36:12 +08:00
|
|
|
|
"model_copy",
|
|
|
|
|
|
"model_dump",
|
2025-12-07 18:57:55 +08:00
|
|
|
|
"model_dump_json",
|
|
|
|
|
|
"model_fields",
|
2025-08-04 23:36:12 +08:00
|
|
|
|
"model_json_schema",
|
2025-11-03 10:53:40 +08:00
|
|
|
|
"model_validate",
|
2025-08-04 23:36:12 +08:00
|
|
|
|
"parse_as",
|
2025-12-07 18:57:55 +08:00
|
|
|
|
"type_validate_json",
|
|
|
|
|
|
"type_validate_python",
|
2025-08-04 23:36:12 +08:00
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def model_copy(
|
|
|
|
|
|
model: T, *, update: dict[str, Any] | None = None, deep: bool = False
|
|
|
|
|
|
) -> T:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Pydantic `model.copy()` (v1) 和 `model.model_copy()` (v2) 的兼容函数。
|
|
|
|
|
|
"""
|
|
|
|
|
|
if PYDANTIC_V2:
|
|
|
|
|
|
return model.model_copy(update=update, deep=deep)
|
|
|
|
|
|
else:
|
|
|
|
|
|
update_dict = update or {}
|
|
|
|
|
|
return model.copy(update=update_dict, deep=deep)
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-12-01 14:52:36 +08:00
|
|
|
|
def model_construct(model_class: type[T], **kwargs: Any) -> T:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Pydantic `model_construct` (v2) 与 `construct` (v1) 的兼容函数。
|
|
|
|
|
|
"""
|
|
|
|
|
|
if PYDANTIC_V2:
|
|
|
|
|
|
return model_class.model_construct(**kwargs)
|
|
|
|
|
|
else:
|
|
|
|
|
|
return model_class.construct(**kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-11-03 10:53:40 +08:00
|
|
|
|
def model_validate(model_class: type[T], obj: Any) -> T:
|
|
|
|
|
|
"""
|
2025-12-07 18:57:55 +08:00
|
|
|
|
Pydantic 模型验证兼容函数。
|
|
|
|
|
|
"""
|
|
|
|
|
|
return type_validate_python(model_class, obj)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def model_dump_json(model: BaseModel, **kwargs: Any) -> str:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Pydantic `model.json()` (v1) 和 `model.model_dump_json()` (v2) 的兼容函数。
|
2025-11-03 10:53:40 +08:00
|
|
|
|
"""
|
|
|
|
|
|
if PYDANTIC_V2:
|
2025-12-07 18:57:55 +08:00
|
|
|
|
return model.model_dump_json(**kwargs)
|
|
|
|
|
|
return model.json(**kwargs)
|
2025-11-03 10:53:40 +08:00
|
|
|
|
|
|
|
|
|
|
|
2025-08-28 09:20:15 +08:00
|
|
|
|
if PYDANTIC_V2:
|
|
|
|
|
|
from pydantic import computed_field as compat_computed_field
|
|
|
|
|
|
else:
|
|
|
|
|
|
compat_computed_field = property
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-08-04 23:36:12 +08:00
|
|
|
|
def model_json_schema(model_class: type[BaseModel], **kwargs: Any) -> dict[str, Any]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Pydantic `Model.schema()` (v1) 和 `Model.model_json_schema()` (v2) 的兼容函数。
|
|
|
|
|
|
"""
|
|
|
|
|
|
if PYDANTIC_V2:
|
|
|
|
|
|
return model_class.model_json_schema(**kwargs)
|
2025-12-07 18:57:55 +08:00
|
|
|
|
return model_class.schema(by_alias=kwargs.get("by_alias", True))
|
2025-08-04 23:36:12 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _is_pydantic_type(t: Any) -> bool:
|
|
|
|
|
|
"""
|
|
|
|
|
|
递归检查一个类型注解是否与 Pydantic BaseModel 相关。
|
|
|
|
|
|
"""
|
|
|
|
|
|
if t is None:
|
|
|
|
|
|
return False
|
|
|
|
|
|
origin = get_origin(t)
|
|
|
|
|
|
if origin:
|
|
|
|
|
|
return any(_is_pydantic_type(arg) for arg in get_args(t))
|
|
|
|
|
|
return isinstance(t, type) and issubclass(t, BaseModel)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _dump_pydantic_obj(obj: Any) -> Any:
|
|
|
|
|
|
"""
|
|
|
|
|
|
递归地将一个对象内部的 Pydantic BaseModel 实例转换为字典。
|
|
|
|
|
|
支持单个实例、实例列表、实例字典等情况。
|
|
|
|
|
|
"""
|
|
|
|
|
|
if isinstance(obj, BaseModel):
|
|
|
|
|
|
return model_dump(obj)
|
|
|
|
|
|
if isinstance(obj, list):
|
|
|
|
|
|
return [_dump_pydantic_obj(item) for item in obj]
|
|
|
|
|
|
if isinstance(obj, dict):
|
|
|
|
|
|
return {key: _dump_pydantic_obj(value) for key, value in obj.items()}
|
|
|
|
|
|
return obj
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-12-07 18:57:55 +08:00
|
|
|
|
parse_as = type_validate_python
|
2025-10-09 08:50:40 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def dump_json_safely(obj: Any, **kwargs) -> str:
|
|
|
|
|
|
"""
|
|
|
|
|
|
安全地将可能包含 Pydantic 特定类型 (如 Enum) 的对象序列化为 JSON 字符串。
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
def default_serializer(o):
|
|
|
|
|
|
if isinstance(o, Enum):
|
|
|
|
|
|
return o.value
|
|
|
|
|
|
if isinstance(o, datetime):
|
|
|
|
|
|
return o.isoformat()
|
|
|
|
|
|
if isinstance(o, Path):
|
|
|
|
|
|
return str(o.as_posix())
|
|
|
|
|
|
if isinstance(o, set):
|
|
|
|
|
|
return list(o)
|
|
|
|
|
|
if isinstance(o, BaseModel):
|
|
|
|
|
|
return model_dump(o)
|
|
|
|
|
|
raise TypeError(
|
|
|
|
|
|
f"Object of type {o.__class__.__name__} is not JSON serializable"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
return json.dumps(obj, default=default_serializer, **kwargs)
|