🐛 修复sqlite下的日统计查询和0权限功能调用 (#1943)

This commit is contained in:
HibiKier 2025-07-11 10:07:23 +08:00 committed by GitHub
parent 579558e59b
commit 2921aed248
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 34 additions and 18 deletions

View File

@ -1,5 +1,3 @@
from datetime import datetime, timedelta
from tortoise.functions import Count
from zhenxun.models.group_console import GroupConsole
@ -10,6 +8,7 @@ from zhenxun.utils.echart_utils import ChartUtils
from zhenxun.utils.echart_utils.models import Barh
from zhenxun.utils.enum import PluginType
from zhenxun.utils.image_utils import BuildImage
from zhenxun.utils.utils import TimeUtils
class StatisticsManage:
@ -68,8 +67,7 @@ class StatisticsManage:
if plugin_name:
query = query.filter(plugin_name=plugin_name)
if day:
time = datetime.now() - timedelta(days=day)
query = query.filter(create_time__gte=time)
query = query.filter(create_time__gte=TimeUtils.get_day_start())
data_list = (
await query.annotate(count=Count("id"))
.group_by("plugin_name")
@ -89,8 +87,7 @@ class StatisticsManage:
if group_id:
query = query.filter(group_id=group_id)
if day:
time = datetime.now() - timedelta(days=day)
query = query.filter(create_time__gte=time)
query = query.filter(create_time__gte=TimeUtils.get_day_start())
data_list = (
await query.annotate(count=Count("id"))
.group_by("plugin_name")
@ -106,8 +103,7 @@ class StatisticsManage:
async def get_group_statistics(cls, group_id: str, day: int | None, title: str):
query = Statistics.filter(group_id=group_id)
if day:
time = datetime.now() - timedelta(days=day)
query = query.filter(create_time__gte=time)
query = query.filter(create_time__gte=TimeUtils.get_day_start())
data_list = (
await query.annotate(count=Count("id"))
.group_by("plugin_name")

View File

@ -49,7 +49,8 @@ class ChatHistory(Model):
o = "-" if order == "DESC" else ""
query = cls.filter(group_id=gid) if gid else cls
if date_scope:
query = query.filter(create_time__range=date_scope)
filter_scope = (date_scope[0].isoformat(" "), date_scope[1].isoformat(" "))
query = query.filter(create_time__range=filter_scope)
return list(
await query.annotate(count=Count("user_id"))
.order_by(f"{o}count")

View File

@ -90,13 +90,14 @@ class LevelUser(Model):
返回:
bool: 是否大于level
"""
if level == 0:
return True
if group_id:
if user := await cls.get_or_none(user_id=user_id, group_id=group_id):
return user.user_level >= level
else:
if user_list := await cls.filter(user_id=user_id).all():
user = max(user_list, key=lambda x: x.user_level)
return user.user_level >= level
elif user_list := await cls.filter(user_id=user_id).all():
user = max(user_list, key=lambda x: x.user_level)
return user.user_level >= level
return False
@classmethod
@ -119,8 +120,7 @@ class LevelUser(Model):
return [
# 将user_id改为user_id
"ALTER TABLE level_users RENAME COLUMN user_qq TO user_id;",
"ALTER TABLE level_users "
"ALTER COLUMN user_id TYPE character varying(255);",
"ALTER TABLE level_users ALTER COLUMN user_id TYPE character varying(255);",
# 将user_id字段类型改为character varying(255)
"ALTER TABLE level_users "
"ALTER COLUMN group_id TYPE character varying(255);",

View File

@ -80,14 +80,14 @@ class PlatformUtils:
@classmethod
async def send_superuser(
cls,
bot: Bot,
bot: Bot | None,
message: UniMessage | str,
superuser_id: str | None = None,
) -> list[tuple[str, Receipt]]:
"""发送消息给超级用户
参数:
bot: Bot
bot: Bot没有传入时使用get_bot随机获取
message: 消息
superuser_id: 指定超级用户id.
@ -97,6 +97,8 @@ class PlatformUtils:
返回:
Receipt | None: Receipt
"""
if not bot:
bot = nonebot.get_bot()
superuser_ids = []
if superuser_id:
superuser_ids.append(superuser_id)

View File

@ -1,5 +1,5 @@
from collections import defaultdict
from datetime import datetime
from datetime import date, datetime
import os
from pathlib import Path
import time
@ -244,3 +244,20 @@ def is_number(text: str) -> bool:
return True
except ValueError:
return False
class TimeUtils:
@classmethod
def get_day_start(cls, target_date: date | datetime | None = None) -> datetime:
"""获取某天的0点时间
返回:
datetime: 今天某天的0点时间
"""
if not target_date:
target_date = datetime.now()
return (
target_date.replace(hour=0, minute=0, second=0, microsecond=0)
if isinstance(target_date, datetime)
else datetime.combine(target_date, datetime.min.time())
)