mirror of
https://github.com/zhenxun-org/zhenxun_bot.git
synced 2025-12-14 21:52:56 +08:00
🐛 修复数据库超时问题
This commit is contained in:
parent
a63f26c3b6
commit
8d8cbf20c3
@ -4,6 +4,7 @@ from typing_extensions import Self
|
||||
|
||||
from tortoise import fields
|
||||
|
||||
from zhenxun.services.data_access import DataAccess
|
||||
from zhenxun.services.db_context import Model
|
||||
from zhenxun.services.log import logger
|
||||
from zhenxun.utils.enum import CacheType, DbLockType
|
||||
@ -57,14 +58,15 @@ class BanConsole(Model):
|
||||
"""
|
||||
if not user_id and not group_id:
|
||||
raise UserAndGroupIsNone()
|
||||
dao = DataAccess(cls)
|
||||
if user_id:
|
||||
return (
|
||||
await cls.safe_get_or_none(user_id=user_id, group_id=group_id)
|
||||
await dao.safe_get_or_none(user_id=user_id, group_id=group_id)
|
||||
if group_id
|
||||
else await cls.safe_get_or_none(user_id=user_id, group_id__isnull=True)
|
||||
else await dao.safe_get_or_none(user_id=user_id, group_id__isnull=True)
|
||||
)
|
||||
else:
|
||||
return await cls.safe_get_or_none(user_id="", group_id=group_id)
|
||||
return await dao.safe_get_or_none(user_id="", group_id=group_id)
|
||||
|
||||
@classmethod
|
||||
async def check_ban_level(
|
||||
|
||||
@ -7,6 +7,7 @@ from tortoise.backends.base.client import BaseDBAsyncClient
|
||||
from zhenxun.models.plugin_info import PluginInfo
|
||||
from zhenxun.models.task_info import TaskInfo
|
||||
from zhenxun.services.cache import CacheRoot
|
||||
from zhenxun.services.data_access import DataAccess
|
||||
from zhenxun.services.db_context import Model
|
||||
from zhenxun.utils.enum import CacheType, DbLockType, PluginType
|
||||
|
||||
@ -254,13 +255,14 @@ class GroupConsole(Model):
|
||||
返回:
|
||||
Self: GroupConsole
|
||||
"""
|
||||
dao = DataAccess(cls)
|
||||
if channel_id:
|
||||
return await cls.safe_get_or_none(
|
||||
return await dao.safe_get_or_none(
|
||||
group_id=group_id,
|
||||
channel_id=channel_id,
|
||||
clean_duplicates=clean_duplicates,
|
||||
)
|
||||
return await cls.safe_get_or_none(
|
||||
return await dao.safe_get_or_none(
|
||||
group_id=group_id,
|
||||
channel_id__isnull=True,
|
||||
clean_duplicates=clean_duplicates,
|
||||
|
||||
8
zhenxun/services/cache/cache_containers.py
vendored
8
zhenxun/services/cache/cache_containers.py
vendored
@ -37,7 +37,7 @@ class CacheDict(Generic[T]):
|
||||
return 0
|
||||
return data.expire_time
|
||||
|
||||
def __getitem__(self, key: str) -> T | None:
|
||||
def __getitem__(self, key: str) -> T:
|
||||
"""获取字典项
|
||||
|
||||
参数:
|
||||
@ -47,8 +47,10 @@ class CacheDict(Generic[T]):
|
||||
T: 字典值
|
||||
"""
|
||||
if value := self._data.get(key):
|
||||
return value.value if self.expire_time(key) else None
|
||||
return None
|
||||
if self.expire_time(key):
|
||||
raise KeyError(f"键 {key} 已过期")
|
||||
return value.value
|
||||
raise KeyError(f"键 {key} 不存在")
|
||||
|
||||
def __setitem__(self, key: str, value: T) -> None:
|
||||
"""设置字典项
|
||||
|
||||
@ -7,6 +7,8 @@ from zhenxun.services.log import logger
|
||||
|
||||
T = TypeVar("T", bound=Model)
|
||||
|
||||
cache = CacheRoot.cache_dict("DB_TEST_BAN", 10, int)
|
||||
|
||||
|
||||
class DataAccess(Generic[T]):
|
||||
"""数据访问层,根据配置决定是否使用缓存
|
||||
@ -167,6 +169,7 @@ class DataAccess(Generic[T]):
|
||||
return await with_db_timeout(
|
||||
db_query_func(*args, **kwargs),
|
||||
operation=f"{self.model_cls.__name__}.{db_query_func.__name__}",
|
||||
source="DataAccess",
|
||||
)
|
||||
|
||||
# 尝试从缓存获取
|
||||
@ -178,10 +181,11 @@ class DataAccess(Generic[T]):
|
||||
# 如果成功构建缓存键,尝试从缓存获取
|
||||
if cache_key is not None:
|
||||
data = await self.cache.get(cache_key)
|
||||
logger.debug(
|
||||
f"{self.model_cls.__name__} self.cache.get(cache_key)"
|
||||
logger.info(
|
||||
f"{self.model_cls.__name__} key: {cache_key}"
|
||||
f" 从缓存获取到的数据 {type(data)}: {data}"
|
||||
)
|
||||
|
||||
if data == self._NULL_RESULT:
|
||||
# 空结果缓存命中
|
||||
self._cache_stats[self.cache_type]["null_hits"] += 1
|
||||
@ -193,6 +197,13 @@ class DataAccess(Generic[T]):
|
||||
f"{self.model_cls.__name__} 从缓存获取"
|
||||
f"到空结果: {cache_key}, 允许数据不存在,返回None"
|
||||
)
|
||||
if self.model_cls.__name__ == "BanConsole":
|
||||
uid = kwargs.get("user_id")
|
||||
if uid:
|
||||
logger.info(
|
||||
f"查询ban: {cache_key}:{uid}数据不存在返回NULL结果",
|
||||
"DB_TEST__BAN",
|
||||
)
|
||||
return None
|
||||
elif data:
|
||||
# 缓存命中
|
||||
|
||||
@ -16,6 +16,9 @@ from zhenxun.utils.enum import DbLockType
|
||||
from .config import LOG_COMMAND, db_model
|
||||
from .utils import with_db_timeout
|
||||
|
||||
total = {}
|
||||
index = 0
|
||||
|
||||
|
||||
class Model(TortoiseModel):
|
||||
"""
|
||||
@ -221,12 +224,25 @@ class Model(TortoiseModel):
|
||||
返回:
|
||||
Self | None: 查询结果,如果不存在返回None
|
||||
"""
|
||||
global index, total
|
||||
try:
|
||||
if cls.__name__ == "BanConsole":
|
||||
uid = kwargs.get("user_id")
|
||||
if uid:
|
||||
if uid not in total:
|
||||
total[uid] = CacheRoot.cache_dict(f"DB_TEST_UID_{uid}", 10, int)
|
||||
total[uid][str(index)] = "1"
|
||||
index += 1
|
||||
logger.info(
|
||||
f"BanConsole 10秒内查询次数 uid: {uid}"
|
||||
f" 查询次数: {len(total[uid])}"
|
||||
)
|
||||
# 先尝试使用 get_or_none 获取单个记录
|
||||
try:
|
||||
return await with_db_timeout(
|
||||
cls.get_or_none(*args, using_db=using_db, **kwargs),
|
||||
operation=f"{cls.__name__}.get_or_none",
|
||||
source="DataBaseModel",
|
||||
)
|
||||
except MultipleObjectsReturned:
|
||||
# 如果出现多个记录的情况,进行特殊处理
|
||||
@ -239,6 +255,7 @@ class Model(TortoiseModel):
|
||||
records = await with_db_timeout(
|
||||
cls.filter(*args, **kwargs).all(),
|
||||
operation=f"{cls.__name__}.filter.all",
|
||||
source="DataBaseModel",
|
||||
)
|
||||
|
||||
if not records:
|
||||
@ -255,6 +272,7 @@ class Model(TortoiseModel):
|
||||
await with_db_timeout(
|
||||
record.delete(),
|
||||
operation=f"{cls.__name__}.delete_duplicate",
|
||||
source="DataBaseModel",
|
||||
)
|
||||
logger.info(
|
||||
f"{cls.__name__} 删除重复记录:"
|
||||
@ -269,11 +287,13 @@ class Model(TortoiseModel):
|
||||
return await with_db_timeout(
|
||||
cls.filter(*args, **kwargs).order_by("-id").first(),
|
||||
operation=f"{cls.__name__}.filter.order_by.first",
|
||||
source="DataBaseModel",
|
||||
)
|
||||
# 如果没有 id 字段,则返回第一个记录
|
||||
return await with_db_timeout(
|
||||
cls.filter(*args, **kwargs).first(),
|
||||
operation=f"{cls.__name__}.filter.first",
|
||||
source="DataBaseModel",
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
logger.error(
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
from zhenxun.services.cache import CacheRoot
|
||||
from zhenxun.services.log import logger
|
||||
|
||||
from .config import (
|
||||
@ -9,14 +10,35 @@ from .config import (
|
||||
SLOW_QUERY_THRESHOLD,
|
||||
)
|
||||
|
||||
cache = CacheRoot.cache_dict("DB_TEST", 10)
|
||||
|
||||
index = 0
|
||||
|
||||
|
||||
total = {}
|
||||
|
||||
|
||||
async def with_db_timeout(
|
||||
coro, timeout: float = DB_TIMEOUT_SECONDS, operation: str | None = None
|
||||
coro,
|
||||
timeout: float = DB_TIMEOUT_SECONDS,
|
||||
operation: str | None = None,
|
||||
source: str | None = None,
|
||||
):
|
||||
"""带超时控制的数据库操作"""
|
||||
global index, total, index2
|
||||
start_time = time.time()
|
||||
try:
|
||||
if operation not in total:
|
||||
total[operation] = CacheRoot.cache_dict(f"DB_TEST_{operation}", 10)
|
||||
total[operation][str(index)] = "1"
|
||||
index += 1
|
||||
logger.info(f"当前数据库查询来源: {source}")
|
||||
logger.info(
|
||||
f"10秒内数据库查询次数 [{operation}]: {len(total[operation])}", "DB_TEST"
|
||||
)
|
||||
result = await asyncio.wait_for(coro, timeout=timeout)
|
||||
cache[str(index)] = "1"
|
||||
index += 1
|
||||
elapsed = time.time() - start_time
|
||||
if elapsed > SLOW_QUERY_THRESHOLD and operation:
|
||||
logger.warning(f"慢查询: {operation} 耗时 {elapsed:.3f}s", LOG_COMMAND)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user