🔧 优化数据访问和数据库上下文逻辑,移除不必要的全局变量和日志信息,调整日志级别为调试,提升代码可读性和性能。

This commit is contained in:
HibiKier 2025-08-26 16:28:53 +08:00
parent 90a0ef58b9
commit a89d87a0b4
3 changed files with 2 additions and 42 deletions

View File

@ -181,7 +181,7 @@ class DataAccess(Generic[T]):
# 如果成功构建缓存键,尝试从缓存获取
if cache_key is not None:
data = await self.cache.get(cache_key)
logger.info(
logger.debug(
f"{self.model_cls.__name__} key: {cache_key}"
f" 从缓存获取到的数据 {type(data)}: {data}"
)
@ -197,13 +197,6 @@ class DataAccess(Generic[T]):
f"{self.model_cls.__name__} 从缓存获取"
f"到空结果: {cache_key}, 允许数据不存在返回None"
)
if self.model_cls.__name__ == "BanConsole":
uid = kwargs.get("user_id")
if uid:
logger.info(
f"查询ban: {cache_key}:{uid}数据不存在返回NULL结果",
"DB_TEST__BAN",
)
return None
elif data:
# 缓存命中

View File

@ -16,9 +16,6 @@ from zhenxun.utils.enum import DbLockType
from .config import LOG_COMMAND, db_model
from .utils import with_db_timeout
total = {}
index = 0
class Model(TortoiseModel):
"""
@ -224,19 +221,7 @@ class Model(TortoiseModel):
返回:
Self | None: 查询结果如果不存在返回None
"""
global index, total
try:
if cls.__name__ == "BanConsole":
uid = kwargs.get("user_id")
if uid:
if uid not in total:
total[uid] = CacheRoot.cache_dict(f"DB_TEST_UID_{uid}", 10, int)
total[uid][str(index)] = "1"
index += 1
logger.info(
f"BanConsole 10秒内查询次数 uid: {uid}"
f" 查询次数: {len(total[uid])}"
)
# 先尝试使用 get_or_none 获取单个记录
try:
return await with_db_timeout(

View File

@ -1,7 +1,6 @@
import asyncio
import time
from zhenxun.services.cache import CacheRoot
from zhenxun.services.log import logger
from .config import (
@ -10,13 +9,6 @@ from .config import (
SLOW_QUERY_THRESHOLD,
)
cache = CacheRoot.cache_dict("DB_TEST", 10)
index = 0
total = {}
async def with_db_timeout(
coro,
@ -25,20 +17,10 @@ async def with_db_timeout(
source: str | None = None,
):
"""带超时控制的数据库操作"""
global index, total, index2
start_time = time.time()
try:
if operation not in total:
total[operation] = CacheRoot.cache_dict(f"DB_TEST_{operation}", 10)
total[operation][str(index)] = "1"
index += 1
logger.info(f"当前数据库查询来源: {source}")
logger.info(
f"10秒内数据库查询次数 [{operation}]: {len(total[operation])}", "DB_TEST"
)
logger.debug(f"开始执行数据库操作: {operation} 来源: {source}")
result = await asyncio.wait_for(coro, timeout=timeout)
cache[str(index)] = "1"
index += 1
elapsed = time.time() - start_time
if elapsed > SLOW_QUERY_THRESHOLD and operation:
logger.warning(f"慢查询: {operation} 耗时 {elapsed:.3f}s", LOG_COMMAND)