🩹 精确webui调用统计

This commit is contained in:
HibiKier 2025-02-01 23:56:11 +08:00
parent e992a85ab7
commit 95e8040609
5 changed files with 72 additions and 33 deletions

View File

@ -16,6 +16,7 @@ from zhenxun.models.sign_log import SignLog
from zhenxun.models.sign_user import SignUser
from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.utils.image_utils import BuildImage
from zhenxun.utils.platform import PlatformUtils
from .config import (
SIGN_BACKGROUND_PATH,
@ -27,9 +28,9 @@ from .config import (
lik2relation,
)
assert (
len(level2attitude) == len(lik2level) == len(lik2relation)
), "好感度态度、等级、关系长度不匹配!"
assert len(level2attitude) == len(lik2level) == len(lik2relation), (
"好感度态度、等级、关系长度不匹配!"
)
AVA_URL = "http://q1.qlogo.cn/g?b=qq&nk={}&s=160"
@ -430,7 +431,9 @@ async def _generate_html_card(
)
now = datetime.now()
data = {
"ava_url": session.user.avatar,
"ava_url": PlatformUtils.get_user_avatar_url(
user.user_id, PlatformUtils.get_platform(session), session.self_id
),
"name": nickname,
"uid": uid,
"sign_count": f"{user.sign_count}",

View File

@ -248,13 +248,31 @@ class ApiDataSource:
if bot_id:
query = query.filter(bot_id=bot_id)
if date_type == QueryDateType.DAY:
query = query.filter(create_time__gte=now - timedelta(hours=now.hour))
query = query.filter(
create_time__gte=now
- timedelta(hours=now.hour, minutes=now.minute, seconds=now.second)
)
if date_type == QueryDateType.WEEK:
query = query.filter(create_time__gte=now - timedelta(days=7))
query = query.filter(
create_time__gte=now
- timedelta(
days=7, hours=now.hour, minutes=now.minute, seconds=now.second
)
)
if date_type == QueryDateType.MONTH:
query = query.filter(create_time__gte=now - timedelta(days=30))
query = query.filter(
create_time__gte=now
- timedelta(
days=30, hours=now.hour, minutes=now.minute, seconds=now.second
)
)
if date_type == QueryDateType.YEAR:
query = query.filter(create_time__gte=now - timedelta(days=365))
query = query.filter(
create_time__gte=now
- timedelta(
days=365, hours=now.hour, minutes=now.minute, seconds=now.second
)
)
return query
@classmethod

View File

@ -5,6 +5,8 @@ from pydantic import BaseModel
from .utils import ConfigsManager
__all__ = ["BotConfig", "Config"]
class BotSetting(BaseModel):
self_nickname: str = ""

View File

@ -17,6 +17,7 @@ from nonebot_plugin_alconna import (
Voice,
)
from pydantic import BaseModel
import ujson as json
from zhenxun.configs.config import BotConfig
from zhenxun.services.log import logger
@ -141,6 +142,21 @@ class MessageUtils:
)
return UniMessage(Reference(nodes=node_list))
@classmethod
def markdown(cls, content: dict) -> Message:
"""markdown格式消息
参数:
content: 消息内容
返回:
Message: 构造完成的消息
"""
content_data = base64.b64encode(json.dumps(content).encode("utf-8")).decode(
"utf-8"
)
return Message(f"[CQ:markdown,data=base64://{content_data}]")
@classmethod
def custom_forward_msg(
cls,

View File

@ -55,6 +55,8 @@ class PlatformUtils:
"""
if isinstance(session, Bot):
return bool(BotConfig.get_qbot_uid(session.self_id))
if BotConfig.get_qbot_uid(session.self_id):
return True
return session.scope == SupportScope.qq_api
@classmethod
@ -354,32 +356,30 @@ class PlatformUtils:
返回:
tuple[list[GroupConsole], str]: 群组列表, 平台
"""
if interface := get_interface(bot):
platform = cls.get_platform(bot)
result_list = []
scenes = await interface.get_scenes(SceneType.GROUP)
for scene in scenes:
group_id = scene.id
result_list.append(
GroupConsole(
group_id=scene.id,
group_name=scene.name,
)
if not (interface := get_interface(bot)):
return [], ""
platform = cls.get_platform(bot)
result_list = []
scenes = await interface.get_scenes(SceneType.GROUP)
for scene in scenes:
group_id = scene.id
result_list.append(
GroupConsole(
group_id=scene.id,
group_name=scene.name,
)
if not only_group and platform != "qq":
if channel_list := await interface.get_scenes(
parent_scene_id=group_id
):
for channel in channel_list:
result_list.append(
GroupConsole(
group_id=scene.id,
group_name=channel.name,
channel_id=channel.id,
)
)
return result_list, platform
return [], ""
)
if not only_group and platform != "qq":
if channel_list := await interface.get_scenes(parent_scene_id=group_id):
result_list.extend(
GroupConsole(
group_id=scene.id,
group_name=channel.name,
channel_id=channel.id,
)
for channel in channel_list
)
return result_list, platform
@classmethod
async def update_friend(cls, bot: Bot) -> int: