diff --git a/zhenxun/models/ban_console.py b/zhenxun/models/ban_console.py index b1c6a4d9..4fec9608 100644 --- a/zhenxun/models/ban_console.py +++ b/zhenxun/models/ban_console.py @@ -4,6 +4,7 @@ from typing_extensions import Self from tortoise import fields +from zhenxun.services.data_access import DataAccess from zhenxun.services.db_context import Model from zhenxun.services.log import logger from zhenxun.utils.enum import CacheType, DbLockType @@ -57,14 +58,15 @@ class BanConsole(Model): """ if not user_id and not group_id: raise UserAndGroupIsNone() + dao = DataAccess(cls) if user_id: return ( - await cls.safe_get_or_none(user_id=user_id, group_id=group_id) + await dao.safe_get_or_none(user_id=user_id, group_id=group_id) if group_id - else await cls.safe_get_or_none(user_id=user_id, group_id__isnull=True) + else await dao.safe_get_or_none(user_id=user_id, group_id__isnull=True) ) else: - return await cls.safe_get_or_none(user_id="", group_id=group_id) + return await dao.safe_get_or_none(user_id="", group_id=group_id) @classmethod async def check_ban_level( diff --git a/zhenxun/models/group_console.py b/zhenxun/models/group_console.py index ad36dccd..e73c4cde 100644 --- a/zhenxun/models/group_console.py +++ b/zhenxun/models/group_console.py @@ -7,6 +7,7 @@ from tortoise.backends.base.client import BaseDBAsyncClient from zhenxun.models.plugin_info import PluginInfo from zhenxun.models.task_info import TaskInfo from zhenxun.services.cache import CacheRoot +from zhenxun.services.data_access import DataAccess from zhenxun.services.db_context import Model from zhenxun.utils.enum import CacheType, DbLockType, PluginType @@ -254,13 +255,14 @@ class GroupConsole(Model): 返回: Self: GroupConsole """ + dao = DataAccess(cls) if channel_id: - return await cls.safe_get_or_none( + return await dao.safe_get_or_none( group_id=group_id, channel_id=channel_id, clean_duplicates=clean_duplicates, ) - return await cls.safe_get_or_none( + return await dao.safe_get_or_none( group_id=group_id, channel_id__isnull=True, clean_duplicates=clean_duplicates, diff --git a/zhenxun/services/cache/cache_containers.py b/zhenxun/services/cache/cache_containers.py index 098d03f9..4302a747 100644 --- a/zhenxun/services/cache/cache_containers.py +++ b/zhenxun/services/cache/cache_containers.py @@ -37,7 +37,7 @@ class CacheDict(Generic[T]): return 0 return data.expire_time - def __getitem__(self, key: str) -> T | None: + def __getitem__(self, key: str) -> T: """获取字典项 参数: @@ -47,8 +47,10 @@ class CacheDict(Generic[T]): T: 字典值 """ if value := self._data.get(key): - return value.value if self.expire_time(key) else None - return None + if self.expire_time(key): + raise KeyError(f"键 {key} 已过期") + return value.value + raise KeyError(f"键 {key} 不存在") def __setitem__(self, key: str, value: T) -> None: """设置字典项 diff --git a/zhenxun/services/data_access.py b/zhenxun/services/data_access.py index a4ebd28b..e67e3eef 100644 --- a/zhenxun/services/data_access.py +++ b/zhenxun/services/data_access.py @@ -7,6 +7,8 @@ from zhenxun.services.log import logger T = TypeVar("T", bound=Model) +cache = CacheRoot.cache_dict("DB_TEST_BAN", 10, int) + class DataAccess(Generic[T]): """数据访问层,根据配置决定是否使用缓存 @@ -167,6 +169,7 @@ class DataAccess(Generic[T]): return await with_db_timeout( db_query_func(*args, **kwargs), operation=f"{self.model_cls.__name__}.{db_query_func.__name__}", + source="DataAccess", ) # 尝试从缓存获取 @@ -178,10 +181,11 @@ class DataAccess(Generic[T]): # 如果成功构建缓存键,尝试从缓存获取 if cache_key is not None: data = await self.cache.get(cache_key) - logger.debug( - f"{self.model_cls.__name__} self.cache.get(cache_key)" + logger.info( + f"{self.model_cls.__name__} key: {cache_key}" f" 从缓存获取到的数据 {type(data)}: {data}" ) + if data == self._NULL_RESULT: # 空结果缓存命中 self._cache_stats[self.cache_type]["null_hits"] += 1 @@ -193,6 +197,13 @@ class DataAccess(Generic[T]): f"{self.model_cls.__name__} 从缓存获取" f"到空结果: {cache_key}, 允许数据不存在,返回None" ) + if self.model_cls.__name__ == "BanConsole": + uid = kwargs.get("user_id") + if uid: + logger.info( + f"查询ban: {cache_key}:{uid}数据不存在返回NULL结果", + "DB_TEST__BAN", + ) return None elif data: # 缓存命中 diff --git a/zhenxun/services/db_context/base_model.py b/zhenxun/services/db_context/base_model.py index 3e0e23ef..10707cfa 100644 --- a/zhenxun/services/db_context/base_model.py +++ b/zhenxun/services/db_context/base_model.py @@ -16,6 +16,9 @@ from zhenxun.utils.enum import DbLockType from .config import LOG_COMMAND, db_model from .utils import with_db_timeout +total = {} +index = 0 + class Model(TortoiseModel): """ @@ -221,12 +224,25 @@ class Model(TortoiseModel): 返回: Self | None: 查询结果,如果不存在返回None """ + global index, total try: + if cls.__name__ == "BanConsole": + uid = kwargs.get("user_id") + if uid: + if uid not in total: + total[uid] = CacheRoot.cache_dict(f"DB_TEST_UID_{uid}", 10, int) + total[uid][str(index)] = "1" + index += 1 + logger.info( + f"BanConsole 10秒内查询次数 uid: {uid}" + f" 查询次数: {len(total[uid])}" + ) # 先尝试使用 get_or_none 获取单个记录 try: return await with_db_timeout( cls.get_or_none(*args, using_db=using_db, **kwargs), operation=f"{cls.__name__}.get_or_none", + source="DataBaseModel", ) except MultipleObjectsReturned: # 如果出现多个记录的情况,进行特殊处理 @@ -239,6 +255,7 @@ class Model(TortoiseModel): records = await with_db_timeout( cls.filter(*args, **kwargs).all(), operation=f"{cls.__name__}.filter.all", + source="DataBaseModel", ) if not records: @@ -255,6 +272,7 @@ class Model(TortoiseModel): await with_db_timeout( record.delete(), operation=f"{cls.__name__}.delete_duplicate", + source="DataBaseModel", ) logger.info( f"{cls.__name__} 删除重复记录:" @@ -269,11 +287,13 @@ class Model(TortoiseModel): return await with_db_timeout( cls.filter(*args, **kwargs).order_by("-id").first(), operation=f"{cls.__name__}.filter.order_by.first", + source="DataBaseModel", ) # 如果没有 id 字段,则返回第一个记录 return await with_db_timeout( cls.filter(*args, **kwargs).first(), operation=f"{cls.__name__}.filter.first", + source="DataBaseModel", ) except asyncio.TimeoutError: logger.error( diff --git a/zhenxun/services/db_context/utils.py b/zhenxun/services/db_context/utils.py index 47db548f..3691fa31 100644 --- a/zhenxun/services/db_context/utils.py +++ b/zhenxun/services/db_context/utils.py @@ -1,6 +1,7 @@ import asyncio import time +from zhenxun.services.cache import CacheRoot from zhenxun.services.log import logger from .config import ( @@ -9,14 +10,35 @@ from .config import ( SLOW_QUERY_THRESHOLD, ) +cache = CacheRoot.cache_dict("DB_TEST", 10) + +index = 0 + + +total = {} + async def with_db_timeout( - coro, timeout: float = DB_TIMEOUT_SECONDS, operation: str | None = None + coro, + timeout: float = DB_TIMEOUT_SECONDS, + operation: str | None = None, + source: str | None = None, ): """带超时控制的数据库操作""" + global index, total, index2 start_time = time.time() try: + if operation not in total: + total[operation] = CacheRoot.cache_dict(f"DB_TEST_{operation}", 10) + total[operation][str(index)] = "1" + index += 1 + logger.info(f"当前数据库查询来源: {source}") + logger.info( + f"10秒内数据库查询次数 [{operation}]: {len(total[operation])}", "DB_TEST" + ) result = await asyncio.wait_for(coro, timeout=timeout) + cache[str(index)] = "1" + index += 1 elapsed = time.time() - start_time if elapsed > SLOW_QUERY_THRESHOLD and operation: logger.warning(f"慢查询: {operation} 耗时 {elapsed:.3f}s", LOG_COMMAND)