zhenxun_bot/zhenxun/services/data_access.py

654 lines
23 KiB
Python
Raw Normal View History

from typing import Any, ClassVar, Generic, TypeVar, cast
2025-07-10 17:10:07 +08:00
from zhenxun.services.cache import Cache, CacheRoot, cache_config
from zhenxun.services.cache.config import COMPOSITE_KEY_SEPARATOR, CacheMode
from zhenxun.services.db_context import Model, with_db_timeout
from zhenxun.services.log import logger
T = TypeVar("T", bound=Model)
class DataAccess(Generic[T]):
"""数据访问层,根据配置决定是否使用缓存
使用示例:
```python
from zhenxun.services import DataAccess
from zhenxun.models.plugin_info import PluginInfo
# 创建数据访问对象
plugin_dao = DataAccess(PluginInfo)
# 获取单个数据
plugin = await plugin_dao.get(module="example_module")
# 获取所有数据
all_plugins = await plugin_dao.all()
# 筛选数据
enabled_plugins = await plugin_dao.filter(status=True)
# 创建数据
new_plugin = await plugin_dao.create(
module="new_module",
name="新插件",
status=True
)
```
"""
# 添加缓存统计信息
_cache_stats: ClassVar[dict] = {}
# 空结果标记
_NULL_RESULT = "__NULL_RESULT_PLACEHOLDER__"
# 默认空结果缓存时间(秒)- 设置为5分钟避免频繁查询数据库
_NULL_RESULT_TTL = 300
@classmethod
def set_null_result_ttl(cls, seconds: int) -> None:
"""设置空结果缓存时间
参数:
seconds: 缓存时间
"""
if seconds < 0:
raise ValueError("缓存时间不能为负数")
cls._NULL_RESULT_TTL = seconds
logger.info(f"已设置DataAccess空结果缓存时间为 {seconds}")
@classmethod
def get_null_result_ttl(cls) -> int:
"""获取空结果缓存时间
返回:
int: 缓存时间
"""
return cls._NULL_RESULT_TTL
2025-07-10 17:10:07 +08:00
def __init__(
self, model_cls: type[T], key_field: str = "id", cache_type: str | None = None
):
"""初始化数据访问对象
参数:
model_cls: 模型类
2025-07-10 17:10:07 +08:00
key_field: 主键字段
"""
self.model_cls = model_cls
2025-07-10 17:10:07 +08:00
self.key_field = getattr(model_cls, "cache_key_field", key_field)
self.cache_type = getattr(model_cls, "cache_type", cache_type)
if not self.cache_type:
raise ValueError("缓存类型不能为空")
self.cache = Cache(self.cache_type)
# 初始化缓存统计
if self.cache_type not in self._cache_stats:
self._cache_stats[self.cache_type] = {
"hits": 0, # 缓存命中次数
"misses": 0, # 缓存未命中次数
"null_hits": 0, # 空结果缓存命中次数
"sets": 0, # 缓存设置次数
"null_sets": 0, # 空结果缓存设置次数
"deletes": 0, # 缓存删除次数
}
@classmethod
def get_cache_stats(cls):
"""获取缓存统计信息"""
result = []
for cache_type, stats in cls._cache_stats.items():
hits = stats["hits"]
null_hits = stats.get("null_hits", 0)
misses = stats["misses"]
total = hits + null_hits + misses
hit_rate = ((hits + null_hits) / total * 100) if total > 0 else 0
result.append(
{
"cache_type": cache_type,
"hits": hits,
"null_hits": null_hits,
"misses": misses,
"sets": stats["sets"],
"null_sets": stats.get("null_sets", 0),
"deletes": stats["deletes"],
"hit_rate": f"{hit_rate:.2f}%",
}
)
return result
@classmethod
def reset_cache_stats(cls):
"""重置缓存统计信息"""
for stats in cls._cache_stats.values():
stats["hits"] = 0
stats["null_hits"] = 0
stats["misses"] = 0
stats["sets"] = 0
stats["null_sets"] = 0
stats["deletes"] = 0
2025-07-10 17:10:07 +08:00
def _build_cache_key_from_kwargs(self, **kwargs) -> str | None:
"""从关键字参数构建缓存键
参数:
**kwargs: 关键字参数
返回:
str | None: 缓存键如果无法构建则返回None
"""
if isinstance(self.key_field, tuple):
# 多字段主键
key_parts = []
key_parts.extend(str(kwargs.get(field, "")) for field in self.key_field)
return COMPOSITE_KEY_SEPARATOR.join(key_parts) if key_parts else None
2025-07-10 17:10:07 +08:00
elif self.key_field in kwargs:
# 单字段主键
return str(kwargs[self.key_field])
return None
async def safe_get_or_none(self, *args, **kwargs) -> T | None:
"""安全的获取单条数据
参数:
*args: 查询参数
**kwargs: 查询参数
返回:
Optional[T]: 查询结果如果不存在返回None
"""
# 如果没有缓存类型,直接从数据库获取
if not self.cache_type or cache_config.cache_mode == CacheMode.NONE:
logger.debug(f"{self.model_cls.__name__} 直接从数据库获取数据: {kwargs}")
return await with_db_timeout(
self.model_cls.safe_get_or_none(*args, **kwargs),
operation=f"{self.model_cls.__name__}.safe_get_or_none",
)
2025-07-10 17:10:07 +08:00
# 尝试从缓存获取
cache_key = None
2025-07-10 17:10:07 +08:00
try:
# 尝试构建缓存键
cache_key = self._build_cache_key_from_kwargs(**kwargs)
# 如果成功构建缓存键,尝试从缓存获取
if cache_key is not None:
data = await self.cache.get(cache_key)
logger.debug(
f"{self.model_cls.__name__} self.cache.get(cache_key)"
f" 从缓存获取到的数据 {type(data)}: {data}"
)
if data == self._NULL_RESULT:
# 空结果缓存命中
self._cache_stats[self.cache_type]["null_hits"] += 1
logger.debug(
f"{self.model_cls.__name__} 从缓存获取到空结果: {cache_key}"
)
return None
elif data:
# 缓存命中
self._cache_stats[self.cache_type]["hits"] += 1
logger.debug(
f"{self.model_cls.__name__} 从缓存获取数据成功: {cache_key}"
)
2025-07-10 17:10:07 +08:00
return cast(T, data)
else:
# 缓存未命中
self._cache_stats[self.cache_type]["misses"] += 1
logger.debug(f"{self.model_cls.__name__} 缓存未命中: {cache_key}")
2025-07-10 17:10:07 +08:00
except Exception as e:
logger.error(f"{self.model_cls.__name__} 从缓存获取数据失败: {kwargs}", e=e)
2025-07-10 17:10:07 +08:00
# 如果缓存中没有,从数据库获取
logger.debug(f"{self.model_cls.__name__} 从数据库获取数据: {kwargs}")
2025-07-10 17:10:07 +08:00
data = await self.model_cls.safe_get_or_none(*args, **kwargs)
# 如果获取到数据,存入缓存
if data:
try:
# 生成缓存键
cache_key = self._build_cache_key_for_item(data)
if cache_key is not None:
# 存入缓存
await self.cache.set(cache_key, data)
self._cache_stats[self.cache_type]["sets"] += 1
logger.debug(
f"{self.model_cls.__name__} 数据已存入缓存: {cache_key}"
)
2025-07-10 17:10:07 +08:00
except Exception as e:
logger.error(
f"{self.model_cls.__name__} 存入缓存失败,参数: {kwargs}", e=e
)
elif cache_key is not None:
# 如果没有获取到数据,缓存空结果
try:
# 存入空结果缓存,使用较短的过期时间
await self.cache.set(
cache_key, self._NULL_RESULT, expire=self._NULL_RESULT_TTL
)
self._cache_stats[self.cache_type]["null_sets"] += 1
logger.debug(
f"{self.model_cls.__name__} 空结果已存入缓存: {cache_key},"
f" TTL={self._NULL_RESULT_TTL}"
)
except Exception as e:
logger.error(
f"{self.model_cls.__name__} 存入空结果缓存失败,参数: {kwargs}", e=e
)
2025-07-10 17:10:07 +08:00
return data
async def get_or_none(self, *args, **kwargs) -> T | None:
"""获取单条数据
参数:
*args: 查询参数
**kwargs: 查询参数
返回:
Optional[T]: 查询结果如果不存在返回None
"""
2025-07-10 17:10:07 +08:00
# 如果没有缓存类型,直接从数据库获取
if not self.cache_type or cache_config.cache_mode == CacheMode.NONE:
logger.debug(f"{self.model_cls.__name__} 直接从数据库获取数据: {kwargs}")
return await with_db_timeout(
self.model_cls.get_or_none(*args, **kwargs),
operation=f"{self.model_cls.__name__}.get_or_none",
)
2025-07-10 17:10:07 +08:00
# 尝试从缓存获取
cache_key = None
try:
2025-07-10 17:10:07 +08:00
# 尝试构建缓存键
cache_key = self._build_cache_key_from_kwargs(**kwargs)
# 如果成功构建缓存键,尝试从缓存获取
if cache_key is not None:
data = await self.cache.get(cache_key)
if data == self._NULL_RESULT:
# 空结果缓存命中
self._cache_stats[self.cache_type]["null_hits"] += 1
logger.debug(
f"{self.model_cls.__name__} 从缓存获取到空结果: {cache_key}"
)
return None
elif data:
# 缓存命中
self._cache_stats[self.cache_type]["hits"] += 1
logger.debug(
f"{self.model_cls.__name__} 从缓存获取数据成功: {cache_key}"
)
2025-07-10 17:10:07 +08:00
return cast(T, data)
else:
# 缓存未命中
self._cache_stats[self.cache_type]["misses"] += 1
logger.debug(f"{self.model_cls.__name__} 缓存未命中: {cache_key}")
except Exception as e:
logger.error(f"{self.model_cls.__name__} 从缓存获取数据失败: {kwargs}", e=e)
# 如果缓存中没有,从数据库获取
logger.debug(f"{self.model_cls.__name__} 从数据库获取数据: {kwargs}")
2025-07-10 17:10:07 +08:00
data = await self.model_cls.get_or_none(*args, **kwargs)
# 如果获取到数据,存入缓存
if data:
try:
cache_key = self._build_cache_key_for_item(data)
# 生成缓存键
if cache_key is not None:
# 存入缓存
await self.cache.set(cache_key, data)
self._cache_stats[self.cache_type]["sets"] += 1
logger.debug(
f"{self.model_cls.__name__} 数据已存入缓存: {cache_key}"
)
2025-07-10 17:10:07 +08:00
except Exception as e:
logger.error(
f"{self.model_cls.__name__} 存入缓存失败,参数: {kwargs}", e=e
)
elif cache_key is not None:
# 如果没有获取到数据,缓存空结果
try:
# 存入空结果缓存,使用较短的过期时间
await self.cache.set(
cache_key, self._NULL_RESULT, expire=self._NULL_RESULT_TTL
)
self._cache_stats[self.cache_type]["null_sets"] += 1
logger.debug(
f"{self.model_cls.__name__} 空结果已存入缓存: {cache_key},"
f" TTL={self._NULL_RESULT_TTL}"
)
except Exception as e:
logger.error(
f"{self.model_cls.__name__} 存入空结果缓存失败,参数: {kwargs}", e=e
)
2025-07-10 17:10:07 +08:00
return data
async def clear_cache(self, **kwargs) -> bool:
"""只清除缓存,不影响数据库数据
参数:
**kwargs: 查询参数必须包含主键字段
返回:
bool: 是否成功清除缓存
"""
# 如果没有缓存类型直接返回True
if not self.cache_type or cache_config.cache_mode == CacheMode.NONE:
return True
try:
# 构建缓存键
cache_key = self._build_cache_key_from_kwargs(**kwargs)
if cache_key is None:
if isinstance(self.key_field, tuple):
# 如果是复合键,检查缺少哪些字段
missing_fields = [
field for field in self.key_field if field not in kwargs
]
logger.error(
f"清除{self.model_cls.__name__}缓存失败: "
f"缺少主键字段 {', '.join(missing_fields)}"
)
else:
logger.error(
f"清除{self.model_cls.__name__}缓存失败: "
f"缺少主键字段 {self.key_field}"
)
return False
# 删除缓存
await self.cache.delete(cache_key)
self._cache_stats[self.cache_type]["deletes"] += 1
2025-07-10 17:10:07 +08:00
logger.debug(f"已清除{self.model_cls.__name__}缓存: {cache_key}")
return True
except Exception as e:
logger.error(f"清除{self.model_cls.__name__}缓存失败", e=e)
return False
def _build_composite_key(self, data: T) -> str | None:
"""构建复合缓存键
参数:
data: 数据对象
返回:
str | None: 构建的缓存键如果无法构建则返回None
"""
# 如果是元组,表示多个字段组成键
if isinstance(self.key_field, tuple):
# 构建键参数列表
key_parts = []
for field in self.key_field:
value = getattr(data, field, "")
key_parts.append(value if value is not None else "")
# 如果没有有效参数返回None
return COMPOSITE_KEY_SEPARATOR.join(key_parts) if key_parts else None
2025-07-10 17:10:07 +08:00
elif hasattr(data, self.key_field):
value = getattr(data, self.key_field, None)
return str(value) if value is not None else None
return None
def _build_cache_key_for_item(self, item: T) -> str | None:
"""为数据项构建缓存键
参数:
item: 数据项
返回:
str | None: 缓存键如果无法生成则返回None
"""
# 如果没有缓存类型返回None
if not self.cache_type:
return None
# 获取缓存类型的配置信息
cache_model = CacheRoot.get_model(self.cache_type)
if not cache_model.key_format:
# 常规处理,使用主键作为缓存键
return self._build_composite_key(item)
# 构建键参数字典
key_parts = []
# 从格式字符串中提取所需的字段名
import re
2025-07-10 17:10:07 +08:00
field_names = re.findall(r"{([^}]+)}", cache_model.key_format)
2025-07-10 17:10:07 +08:00
# 收集所有字段值
for field in field_names:
value = getattr(item, field, "")
key_parts.append(value if value is not None else "")
2025-07-10 17:10:07 +08:00
return COMPOSITE_KEY_SEPARATOR.join(key_parts)
2025-07-10 17:10:07 +08:00
async def _cache_items(self, data_list: list[T]) -> None:
"""将数据列表存入缓存
参数:
data_list: 数据列表
"""
if (
not data_list
or not self.cache_type
or cache_config.cache_mode == CacheMode.NONE
):
return
try:
# 遍历数据列表,将每条数据存入缓存
cached_count = 0
2025-07-10 17:10:07 +08:00
for item in data_list:
cache_key = self._build_cache_key_for_item(item)
if cache_key is not None:
await self.cache.set(cache_key, item)
cached_count += 1
self._cache_stats[self.cache_type]["sets"] += 1
2025-07-10 17:10:07 +08:00
logger.debug(
f"{self.model_cls.__name__} 批量缓存: {cached_count}/{len(data_list)}"
)
2025-07-10 17:10:07 +08:00
except Exception as e:
logger.error(f"{self.model_cls.__name__} 批量缓存失败", e=e)
async def filter(self, *args, **kwargs) -> list[T]:
"""筛选数据
参数:
*args: 查询参数
**kwargs: 查询参数
返回:
List[T]: 查询结果列表
"""
2025-07-10 17:10:07 +08:00
# 从数据库获取数据
logger.debug(f"{self.model_cls.__name__} filter: 从数据库查询, 参数: {kwargs}")
2025-07-10 17:10:07 +08:00
data_list = await self.model_cls.filter(*args, **kwargs)
logger.debug(
f"{self.model_cls.__name__} filter: 查询结果数量: {len(data_list)}"
)
2025-07-10 17:10:07 +08:00
# 将数据存入缓存
await self._cache_items(data_list)
2025-07-10 17:10:07 +08:00
return data_list
async def all(self) -> list[T]:
"""获取所有数据
返回:
List[T]: 所有数据列表
"""
2025-07-10 17:10:07 +08:00
# 直接从数据库获取
logger.debug(f"{self.model_cls.__name__} all: 从数据库查询所有数据")
2025-07-10 17:10:07 +08:00
data_list = await self.model_cls.all()
logger.debug(f"{self.model_cls.__name__} all: 查询结果数量: {len(data_list)}")
2025-07-10 17:10:07 +08:00
# 将数据存入缓存
await self._cache_items(data_list)
2025-07-10 17:10:07 +08:00
return data_list
async def count(self, *args, **kwargs) -> int:
"""获取数据数量
参数:
*args: 查询参数
**kwargs: 查询参数
返回:
int: 数据数量
"""
# 直接从数据库获取数量
return await self.model_cls.filter(*args, **kwargs).count()
async def exists(self, *args, **kwargs) -> bool:
"""判断数据是否存在
参数:
*args: 查询参数
**kwargs: 查询参数
返回:
bool: 是否存在
"""
# 直接从数据库判断是否存在
return await self.model_cls.filter(*args, **kwargs).exists()
async def create(self, **kwargs) -> T:
"""创建数据
参数:
**kwargs: 创建参数
返回:
T: 创建的数据
"""
2025-07-10 17:10:07 +08:00
# 创建数据
logger.debug(f"{self.model_cls.__name__} create: 创建数据, 参数: {kwargs}")
2025-07-10 17:10:07 +08:00
data = await self.model_cls.create(**kwargs)
# 如果有缓存类型,将数据存入缓存
if self.cache_type and cache_config.cache_mode != CacheMode.NONE:
try:
# 生成缓存键
cache_key = self._build_cache_key_for_item(data)
if cache_key is not None:
# 存入缓存
await self.cache.set(cache_key, data)
self._cache_stats[self.cache_type]["sets"] += 1
2025-07-10 17:10:07 +08:00
logger.debug(
f"{self.model_cls.__name__} create: "
f"新创建的数据已存入缓存: {cache_key}"
2025-07-10 17:10:07 +08:00
)
except Exception as e:
logger.error(
f"{self.model_cls.__name__} create: 存入缓存失败,参数: {kwargs}",
e=e,
)
2025-07-10 17:10:07 +08:00
return data
async def update_or_create(
self, defaults: dict[str, Any] | None = None, **kwargs
) -> tuple[T, bool]:
"""更新或创建数据
参数:
defaults: 默认值
**kwargs: 查询参数
返回:
tuple[T, bool]: (数据, 是否创建)
"""
2025-07-10 17:10:07 +08:00
# 更新或创建数据
data, created = await self.model_cls.update_or_create(
defaults=defaults, **kwargs
)
# 如果有缓存类型,将数据存入缓存
if self.cache_type and cache_config.cache_mode != CacheMode.NONE:
try:
# 生成缓存键
cache_key = self._build_cache_key_for_item(data)
if cache_key is not None:
# 存入缓存
await self.cache.set(cache_key, data)
self._cache_stats[self.cache_type]["sets"] += 1
2025-07-10 17:10:07 +08:00
logger.debug(f"更新或创建的数据已存入缓存: {cache_key}")
except Exception as e:
logger.error(f"存入缓存失败,参数: {kwargs}", e=e)
return data, created
async def delete(self, *args, **kwargs) -> int:
"""删除数据
参数:
*args: 查询参数
**kwargs: 查询参数
返回:
int: 删除的数据数量
"""
logger.debug(f"{self.model_cls.__name__} delete: 删除数据, 参数: {kwargs}")
2025-07-10 17:10:07 +08:00
# 如果有缓存类型且有key_field参数先尝试删除缓存
if self.cache_type and cache_config.cache_mode != CacheMode.NONE:
try:
# 尝试构建缓存键
cache_key = self._build_cache_key_from_kwargs(**kwargs)
if cache_key is not None:
# 如果成功构建缓存键,直接删除缓存
await self.cache.delete(cache_key)
self._cache_stats[self.cache_type]["deletes"] += 1
logger.debug(
f"{self.model_cls.__name__} delete: 已删除缓存: {cache_key}"
)
2025-07-10 17:10:07 +08:00
else:
# 否则需要先查询出要删除的数据,然后删除对应的缓存
items = await self.model_cls.filter(*args, **kwargs)
logger.debug(
f"{self.model_cls.__name__} delete:"
f" 查询到 {len(items)} 条要删除的数据"
)
2025-07-10 17:10:07 +08:00
for item in items:
item_cache_key = self._build_cache_key_for_item(item)
if item_cache_key is not None:
await self.cache.delete(item_cache_key)
self._cache_stats[self.cache_type]["deletes"] += 1
2025-07-10 17:10:07 +08:00
if items:
logger.debug(
f"{self.model_cls.__name__} delete:"
f" 已删除 {len(items)} 条数据的缓存"
)
2025-07-10 17:10:07 +08:00
except Exception as e:
logger.error(f"{self.model_cls.__name__} delete: 删除缓存失败", e=e)
2025-07-10 17:10:07 +08:00
# 删除数据
result = await self.model_cls.filter(*args, **kwargs).delete()
logger.debug(
f"{self.model_cls.__name__} delete: 已从数据库删除 {result} 条数据"
)
return result
2025-07-10 17:10:07 +08:00
def _generate_cache_key(self, data: T) -> str:
"""根据数据对象生成缓存键
参数:
2025-07-10 17:10:07 +08:00
data: 数据对象
返回:
str: 缓存键
"""
2025-07-10 17:10:07 +08:00
# 使用新方法构建复合键
if composite_key := self._build_composite_key(data):
return composite_key
# 如果无法生成复合键,生成一个唯一键
return f"object_{id(data)}"