添加bot画像

*  新增自我介绍功能及自动发送图片支持

- 在 bot_profile.py 中实现自我介绍指令及重载功能
- 在 group_handle 中添加自动发送自我介绍图片的逻辑
- 在 fg_request 中实现添加好友时自动发送自我介绍图片
- 新增 bot_profile_manager.py 管理 BOT 自我介绍及图片生成
- 更新 models.py 以支持插件自我介绍和注意事项字段

* 🎨 调整管理帮助宽度

*  更新数据访问层,优化获取数据的方法并引入缓存机制

*  更新用户数据访问逻辑,优化获取用户信息的方法,使用新的函数替代原有实现

*  在 BotProfileManager 中添加自我介绍文件不存在的日志记录,优化文件读取逻辑

*  更新 BOT 自我介绍帮助信息,增加文件不存在时自动创建功能
This commit is contained in:
HibiKier 2025-07-16 02:51:06 +08:00 committed by GitHub
parent b993450a23
commit 205f4ff1fa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 372 additions and 87 deletions

View File

@ -25,6 +25,11 @@ __plugin_meta__ = PluginMetadata(
version="0.1",
plugin_type=PluginType.ADMIN,
admin_level=1,
introduction="""这是 群主/群管理 的帮助列表,里面记录了群组内开关功能的
方法帮助以及群管特权方法建议首次时在群组中发送 '管理员帮助' 查看""",
precautions=[
"只有群主/群管理 才能使用哦群主拥有6级权限管理员拥有5级权限"
],
configs=[
RegisterConfig(
key="type",

View File

@ -16,7 +16,8 @@ async def get_task() -> dict[str, str] | None:
"name": "被动技能",
"description": "控制群组中的被动技能状态",
"usage": "通过 开启/关闭群被动 来控制群被动 <br>"
+ " 示例:开启/关闭群被动早晚安 <br> ---------- <br> "
+ " 示例:开启/关闭群被动早晚安 <br> 示例:开启/关闭全部群被动"
+ " <br> ---------- <br> "
+ "<br>".join([task.name for task in task_list]),
}
return None
@ -47,7 +48,7 @@ async def build_html_help():
}
},
pages={
"viewport": {"width": 1024, "height": 1024},
"viewport": {"width": 824, "height": 10},
"base_url": f"file://{TEMPLATE_PATH}",
},
wait=2,

View File

@ -0,0 +1,58 @@
from nonebot.permission import SUPERUSER
from nonebot.plugin import PluginMetadata
from nonebot.rule import to_me
from nonebot_plugin_alconna import Alconna, Arparma, on_alconna
from nonebot_plugin_uninfo import Uninfo
from zhenxun.configs.config import BotConfig
from zhenxun.configs.utils import PluginExtraData
from zhenxun.services.log import logger
from zhenxun.utils.manager.bot_profile_manager import BotProfileManager
from zhenxun.utils.message import MessageUtils
__plugin_meta__ = PluginMetadata(
name="自我介绍",
description=f"这是{BotConfig.self_nickname}的深情告白",
usage="""
指令
自我介绍
""".strip(),
extra=PluginExtraData(
author="HibiKier",
version="0.1",
menu_type="其他",
superuser_help="""
在data/bot_profile/bot_id/profile.txt 中编辑BOT自我介绍
在data/bot_profile/bot_id/bot_id.png 中编辑BOT头像
指令
重载自我介绍
""".strip(),
).to_dict(),
)
_matcher = on_alconna(Alconna("自我介绍"), priority=5, block=True, rule=to_me())
_reload_matcher = on_alconna(
Alconna("重载自我介绍"), priority=1, block=True, permission=SUPERUSER
)
@_matcher.handle()
async def _(session: Uninfo, arparma: Arparma):
file_path = await BotProfileManager.build_bot_profile_image(session.self_id)
if not file_path:
await MessageUtils.build_message(
f"{BotConfig.self_nickname}当前没有自我简介哦"
).finish(reply_to=True)
await MessageUtils.build_message(file_path).send()
logger.info("BOT自我介绍", arparma.header_result, session=session)
@_reload_matcher.handle()
async def _(session: Uninfo, arparma: Arparma):
BotProfileManager.clear_profile_image(session.self_id)
await MessageUtils.build_message(f"重载{BotConfig.self_nickname}自我介绍成功").send(
reply_to=True
)
logger.info("重载BOT自我介绍", arparma.header_result, session=session)

View File

@ -131,7 +131,9 @@ async def get_plugin_and_user(
# 并行查询插件和用户数据
plugin_task = plugin_dao.safe_get_or_none(module=module)
user_task = user_dao.safe_get_or_none(user_id=user_id)
user_task = user_dao.get_by_func_or_none(
UserConsole.get_user, False, user_id=user_id
)
try:
plugin, user = await with_timeout(
@ -155,7 +157,9 @@ async def get_plugin_and_user(
)
user = None
try:
user = await user_dao.safe_get_or_none(user_id=user_id)
user = await user_dao.get_by_func_or_none(
UserConsole.get_user, False, user_id=user_id
)
except IntegrityError as e:
raise PermissionExemption("重复创建用户,已跳过该次权限检查...") from e
if not user:

View File

@ -10,7 +10,7 @@ from nonebot_plugin_uninfo import Uninfo
import ujson as json
from zhenxun.builtin_plugins.platform.qq.exception import ForceAddGroupError
from zhenxun.configs.config import Config
from zhenxun.configs.config import BotConfig, Config
from zhenxun.configs.path_config import DATA_PATH, IMAGE_PATH
from zhenxun.models.fg_request import FgRequest
from zhenxun.models.group_console import GroupConsole
@ -20,6 +20,7 @@ from zhenxun.models.plugin_info import PluginInfo
from zhenxun.services.log import logger
from zhenxun.utils.common_utils import CommonUtils
from zhenxun.utils.enum import RequestHandleType
from zhenxun.utils.manager.bot_profile_manager import BotProfileManager
from zhenxun.utils.message import MessageUtils
from zhenxun.utils.platform import PlatformUtils
from zhenxun.utils.utils import FreqLimiter
@ -153,6 +154,17 @@ class GroupManager:
await cls.__handle_add_group(bot, group_id, group)
"""刷新群管理员权限"""
await cls.__refresh_level(bot, group_id)
if BotProfileManager.is_auto_send_profile():
file_path = await BotProfileManager.build_bot_profile_image(bot.self_id)
if file_path:
await MessageUtils.build_message(
[
f"嗨,大家好,我是{BotConfig.self_nickname} "
"希望我们可以友好相处(眨眼眨眼)!",
file_path,
]
).send()
logger.info("加入群组自动发送BOT自我介绍图片", session=group_id)
@classmethod
def get_path(cls, session: Uninfo) -> Path | None:

View File

@ -344,6 +344,16 @@ class ShopManage:
if goods_name.isdigit():
try:
user = await UserConsole.get_user(user_id=session.user.id)
goods_list = await GoodsInfo.filter(uuid__in=user.props.keys()).all()
goods_by_uuid = {item.uuid: item for item in goods_list}
props_str = str(user.props)
user.props = {
uuid: count
for uuid, count in user.props.items()
if count > 0 and goods_by_uuid.get(uuid)
}
if props_str != str(user.props):
await user.save(update_fields=["props"])
uuid = list(user.props.keys())[int(goods_name)]
goods_info = await GoodsInfo.get_or_none(uuid=uuid)
except IndexError:
@ -501,11 +511,14 @@ class ShopManage:
goods_list = await GoodsInfo.filter(uuid__in=user.props.keys()).all()
goods_by_uuid = {item.uuid: item for item in goods_list}
props_str = str(user.props)
user.props = {
uuid: count
for uuid, count in user.props.items()
if count > 0 and goods_by_uuid.get(uuid)
}
if props_str != str(user.props):
await user.save(update_fields=["props"])
table_rows = []
for i, prop_uuid in enumerate(user.props):

View File

@ -163,15 +163,20 @@ async def _(
req = await FgRequest.ignore(handle_id)
except NotFoundError:
await MessageUtils.build_message("未发现此id的请求...").finish(reply_to=True)
except Exception:
await MessageUtils.build_message("其他错误, 可能flag已失效...").finish(
except Exception as e:
logger.error(f"处理请求失败 ID: {handle_id}", session=session, e=e)
await MessageUtils.build_message(f"其他错误, 可能flag已失效...: {e}").finish(
reply_to=True
)
logger.info(
f"处理请求 Id: {req.id if req else ''}", arparma.header_result, session=session
)
await MessageUtils.build_message("成功处理请求!").send(reply_to=True)
if req and handle_type == RequestHandleType.APPROVE:
if (
req
and req.request_type == RequestType.GROUP
and handle_type == RequestHandleType.APPROVE
):
await bot.send_private_msg(
user_id=req.user_id,
message=f"管理员已同意此次群组邀请,请不要让{BotConfig.self_nickname}受委屈哦(狠狠监控)"

View File

@ -51,7 +51,7 @@ async def build_html_help():
}
},
pages={
"viewport": {"width": 1024, "height": 1024},
"viewport": {"width": 824, "height": 10},
"base_url": f"file://{TEMPLATE_PATH}",
},
wait=2,

View File

@ -263,6 +263,10 @@ class PluginExtraData(BaseModel):
"""是否显示在菜单中"""
smart_tools: list[AICallableTag] | None = None
"""智能模式函数工具集"""
introduction: str | None = None
"""BOT自我介绍时插件的自我介绍"""
precautions: list[str] | None = None
"""BOT自我介绍时插件的注意事项"""
def to_dict(self, **kwargs):
return model_dump(self, **kwargs)

View File

@ -6,9 +6,13 @@ from tortoise import fields
from zhenxun.configs.config import BotConfig
from zhenxun.models.group_console import GroupConsole
from zhenxun.services.db_context import Model
from zhenxun.services.log import logger
from zhenxun.utils.common_utils import SqlUtils
from zhenxun.utils.enum import RequestHandleType, RequestType
from zhenxun.utils.exception import NotFoundError
from zhenxun.utils.manager.bot_profile_manager import BotProfileManager
from zhenxun.utils.message import MessageUtils
from zhenxun.utils.platform import PlatformUtils
class FgRequest(Model):
@ -123,6 +127,26 @@ class FgRequest(Model):
await bot.set_friend_add_request(
flag=req.flag, approve=handle_type == RequestHandleType.APPROVE
)
if BotProfileManager.is_auto_send_profile():
file_path = await BotProfileManager.build_bot_profile_image(
bot.self_id
)
if file_path:
await PlatformUtils.send_message(
bot,
req.user_id,
None,
MessageUtils.build_message(
[
f"你好,我是{BotConfig.self_nickname} "
"初次见面,希望我们可以好好相处!",
file_path,
]
),
)
logger.info(
"添加好友自动发送BOT自我介绍图片", session=req.user_id
)
else:
await GroupConsole.update_or_create(
group_id=req.group_id, defaults={"group_flag": 1}

View File

@ -147,10 +147,14 @@ class DataAccess(Generic[T]):
return str(kwargs[self.key_field])
return None
async def safe_get_or_none(self, *args, **kwargs) -> T | None:
"""安全的获取单条数据
async def _get_with_cache(
self, db_query_func, allow_not_exist: bool = True, *args, **kwargs
) -> T | None:
"""带缓存的通用获取方法
参数:
db_query_func: 数据库查询函数
allow_not_exist: 是否允许数据不存在
*args: 查询参数
**kwargs: 查询参数
@ -161,8 +165,8 @@ class DataAccess(Generic[T]):
if not self.cache_type or cache_config.cache_mode == CacheMode.NONE:
logger.debug(f"{self.model_cls.__name__} 直接从数据库获取数据: {kwargs}")
return await with_db_timeout(
self.model_cls.safe_get_or_none(*args, **kwargs),
operation=f"{self.model_cls.__name__}.safe_get_or_none",
db_query_func(*args, **kwargs),
operation=f"{self.model_cls.__name__}.{db_query_func.__name__}",
)
# 尝试从缓存获取
@ -184,7 +188,12 @@ class DataAccess(Generic[T]):
logger.debug(
f"{self.model_cls.__name__} 从缓存获取到空结果: {cache_key}"
)
return None
if allow_not_exist:
logger.debug(
f"{self.model_cls.__name__} 从缓存获取"
f"到空结果: {cache_key}, 允许数据不存在返回None"
)
return None
elif data:
# 缓存命中
self._cache_stats[self.cache_type]["hits"] += 1
@ -201,7 +210,7 @@ class DataAccess(Generic[T]):
# 如果缓存中没有,从数据库获取
logger.debug(f"{self.model_cls.__name__} 从数据库获取数据: {kwargs}")
data = await self.model_cls.safe_get_or_none(*args, **kwargs)
data = await db_query_func(*args, **kwargs)
# 如果获取到数据,存入缓存
if data:
@ -238,92 +247,52 @@ class DataAccess(Generic[T]):
return data
async def get_or_none(self, *args, **kwargs) -> T | None:
async def get_or_none(
self, allow_not_exist: bool = True, *args, **kwargs
) -> T | None:
"""获取单条数据
参数:
allow_not_exist: 是否允许数据不存在
*args: 查询参数
**kwargs: 查询参数
返回:
Optional[T]: 查询结果如果不存在返回None
"""
# 如果没有缓存类型,直接从数据库获取
if not self.cache_type or cache_config.cache_mode == CacheMode.NONE:
logger.debug(f"{self.model_cls.__name__} 直接从数据库获取数据: {kwargs}")
return await with_db_timeout(
self.model_cls.get_or_none(*args, **kwargs),
operation=f"{self.model_cls.__name__}.get_or_none",
)
return await self._get_with_cache(
self.model_cls.get_or_none, allow_not_exist, *args, **kwargs
)
# 尝试从缓存获取
cache_key = None
try:
# 尝试构建缓存键
cache_key = self._build_cache_key_from_kwargs(**kwargs)
async def safe_get_or_none(
self, allow_not_exist: bool = True, *args, **kwargs
) -> T | None:
"""安全的获取单条数据
# 如果成功构建缓存键,尝试从缓存获取
if cache_key is not None:
data = await self.cache.get(cache_key)
if data == self._NULL_RESULT:
# 空结果缓存命中
self._cache_stats[self.cache_type]["null_hits"] += 1
logger.debug(
f"{self.model_cls.__name__} 从缓存获取到空结果: {cache_key}"
)
return None
elif data:
# 缓存命中
self._cache_stats[self.cache_type]["hits"] += 1
logger.debug(
f"{self.model_cls.__name__} 从缓存获取数据成功: {cache_key}"
)
return cast(T, data)
else:
# 缓存未命中
self._cache_stats[self.cache_type]["misses"] += 1
logger.debug(f"{self.model_cls.__name__} 缓存未命中: {cache_key}")
except Exception as e:
logger.error(f"{self.model_cls.__name__} 从缓存获取数据失败: {kwargs}", e=e)
参数:
allow_not_exist: 是否允许数据不存在
*args: 查询参数
**kwargs: 查询参数
# 如果缓存中没有,从数据库获取
logger.debug(f"{self.model_cls.__name__} 从数据库获取数据: {kwargs}")
data = await self.model_cls.get_or_none(*args, **kwargs)
返回:
Optional[T]: 查询结果如果不存在返回None
"""
return await self._get_with_cache(
self.model_cls.safe_get_or_none, allow_not_exist, *args, **kwargs
)
# 如果获取到数据,存入缓存
if data:
try:
cache_key = self._build_cache_key_for_item(data)
# 生成缓存键
if cache_key is not None:
# 存入缓存
await self.cache.set(cache_key, data)
self._cache_stats[self.cache_type]["sets"] += 1
logger.debug(
f"{self.model_cls.__name__} 数据已存入缓存: {cache_key}"
)
except Exception as e:
logger.error(
f"{self.model_cls.__name__} 存入缓存失败,参数: {kwargs}", e=e
)
elif cache_key is not None:
# 如果没有获取到数据,缓存空结果
try:
# 存入空结果缓存,使用较短的过期时间
await self.cache.set(
cache_key, self._NULL_RESULT, expire=self._NULL_RESULT_TTL
)
self._cache_stats[self.cache_type]["null_sets"] += 1
logger.debug(
f"{self.model_cls.__name__} 空结果已存入缓存: {cache_key},"
f" TTL={self._NULL_RESULT_TTL}"
)
except Exception as e:
logger.error(
f"{self.model_cls.__name__} 存入空结果缓存失败,参数: {kwargs}", e=e
)
async def get_by_func_or_none(
self, func, allow_not_exist: bool = True, *args, **kwargs
) -> T | None:
"""根据函数获取数据
return data
参数:
func: 函数
allow_not_exist: 是否允许数据不存在
*args: 查询参数
**kwargs: 查询参数
"""
return await self._get_with_cache(func, allow_not_exist, *args, **kwargs)
async def clear_cache(self, **kwargs) -> bool:
"""只清除缓存,不影响数据库数据

View File

@ -0,0 +1,190 @@
import asyncio
import os
from pathlib import Path
from typing import ClassVar
import aiofiles
import nonebot
from nonebot.compat import model_dump
from nonebot_plugin_htmlrender import template_to_pic
from pydantic import BaseModel
from zhenxun.configs.config import BotConfig, Config
from zhenxun.configs.path_config import DATA_PATH, TEMPLATE_PATH
from zhenxun.configs.utils.models import PluginExtraData
from zhenxun.models.statistics import Statistics
from zhenxun.models.user_console import UserConsole
from zhenxun.services.log import logger
from zhenxun.utils._build_image import BuildImage
from zhenxun.utils.platform import PlatformUtils
DIR_PATH = DATA_PATH / "bot_profile"
PROFILE_PATH = DIR_PATH / "profile"
PROFILE_PATH.mkdir(parents=True, exist_ok=True)
PROFILE_IMAGE_PATH = DIR_PATH / "image"
PROFILE_IMAGE_PATH.mkdir(parents=True, exist_ok=True)
Config.add_plugin_config(
"bot_profile",
"AUTO_SEND_PROFILE",
True,
help="在添加好友/群组时是否自动发送BOT自我介绍图片",
default_value=True,
type=bool,
)
class Profile(BaseModel):
bot_id: str
"""BOT ID"""
introduction: str
"""BOT自我介绍"""
avatar: Path | None
"""BOT头像"""
name: str
"""BOT名称"""
class PluginProfile(BaseModel):
name: str
"""插件名称"""
introduction: str
"""插件自我介绍"""
precautions: list[str] | None = None
"""BOT自我介绍时插件的注意事项"""
class BotProfileManager:
"""BOT自我介绍管理器"""
_bot_data: ClassVar[dict[str, Profile]] = {}
_plugin_data: ClassVar[dict[str, PluginProfile]] = {}
@classmethod
def clear_profile_image(cls, bot_id: str | None = None):
"""清除BOT自我介绍图片"""
if bot_id:
file_path = PROFILE_IMAGE_PATH / f"{bot_id}.png"
if file_path.exists():
file_path.unlink()
else:
for f in os.listdir(PROFILE_IMAGE_PATH):
_f = PROFILE_IMAGE_PATH / f
if _f.is_file():
_f.unlink()
@classmethod
async def _read_profile(cls, bot_id: str):
"""读取BOT自我介绍
参数:
bot_id: BOT ID
异常:
FileNotFoundError: 文件不存在
"""
bot_file_path = PROFILE_PATH / f"{bot_id}"
bot_file_path.mkdir(parents=True, exist_ok=True)
bot_profile_file = bot_file_path / "profile.txt"
if not bot_profile_file.exists():
logger.debug(f"BOT自我介绍文件不存在: {bot_profile_file}, 跳过读取")
bot_file_path.touch()
return
async with aiofiles.open(bot_profile_file, encoding="utf-8") as f:
introduction = await f.read()
avatar = bot_file_path / f"{bot_id}.png"
if not avatar.exists():
avatar = None
bot = await PlatformUtils.get_user(nonebot.get_bot(bot_id), bot_id)
name = bot.name if bot else "未知"
cls._bot_data[bot_id] = Profile(
bot_id=bot_id, introduction=introduction, avatar=avatar, name=name
)
@classmethod
async def get_bot_profile(cls, bot_id: str) -> Profile | None:
if bot_id not in cls._bot_data:
await cls._read_profile(bot_id)
return cls._bot_data.get(bot_id)
@classmethod
def load_plugin_profile(cls):
"""加载插件自我介绍"""
for plugin in nonebot.get_loaded_plugins():
if plugin.module_name in cls._plugin_data:
continue
metadata = plugin.metadata
if not metadata:
continue
extra = metadata.extra
if not extra:
continue
extra_data = PluginExtraData(**extra)
if extra_data.introduction or extra_data.precautions:
cls._plugin_data[plugin.name] = PluginProfile(
name=metadata.name,
introduction=extra_data.introduction or "",
precautions=extra_data.precautions or [],
)
@classmethod
def get_plugin_profile(cls) -> list[dict]:
"""获取插件自我介绍"""
if not cls._plugin_data:
cls.load_plugin_profile()
return [model_dump(e) for e in cls._plugin_data.values()]
@classmethod
def is_auto_send_profile(cls) -> bool:
"""是否自动发送BOT自我介绍图片"""
return Config.get_config("bot_profile", "AUTO_SEND_PROFILE")
@classmethod
async def build_bot_profile_image(
cls, bot_id: str, tags: list[dict[str, str]] | None = None
) -> Path | None:
"""构建BOT自我介绍图片"""
file_path = PROFILE_IMAGE_PATH / f"{bot_id}.png"
if file_path.exists():
return file_path
profile, service_count, call_count = await asyncio.gather(
cls.get_bot_profile(bot_id),
UserConsole.get_new_uid(),
Statistics.filter(bot_id=bot_id).count(),
)
if not profile:
return None
if not tags:
tags = [
{"text": f"服务人数: {service_count}", "color": "#5e92e0"},
{"text": f"调用次数: {call_count}", "color": "#31e074"},
]
image_bytes = await template_to_pic(
template_path=str((TEMPLATE_PATH / "bot_profile").absolute()),
template_name="main.html",
templates={
"avatar": str(profile.avatar.absolute()) if profile.avatar else None,
"bot_name": profile.name,
"bot_description": profile.introduction,
"service_count": service_count,
"call_count": call_count,
"plugin_list": cls.get_plugin_profile(),
"tags": tags,
"title": f"{BotConfig.self_nickname}简介",
},
pages={
"viewport": {"width": 1077, "height": 1000},
"base_url": f"file://{TEMPLATE_PATH}",
},
wait=2,
)
image = BuildImage.open(image_bytes)
await image.save(file_path)
return file_path
BotProfileManager.clear_profile_image()