feat(core): 更新群组信息、Markdown 样式与 Pydantic 兼容层

- 【group】添加更新所有群组信息指令,并同步群组控制台数据
- 【markdown】支持合并 Markdown 的 CSS 来源
- 【pydantic-compat】提供 model_validate 兼容函数
This commit is contained in:
webjoin111 2025-10-10 20:27:42 +08:00
parent 4b8013d2d6
commit ae07e5f329
4 changed files with 108 additions and 3 deletions

View File

@ -1,7 +1,11 @@
import asyncio
import random
import nonebot import nonebot
from nonebot import on_notice from nonebot import on_notice
from nonebot.adapters import Bot from nonebot.adapters import Bot
from nonebot.adapters.onebot.v11 import GroupIncreaseNoticeEvent from nonebot.adapters.onebot.v11 import GroupIncreaseNoticeEvent
from nonebot.permission import SUPERUSER
from nonebot.plugin import PluginMetadata from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import Alconna, Arparma, on_alconna from nonebot_plugin_alconna import Alconna, Arparma, on_alconna
from nonebot_plugin_apscheduler import scheduler from nonebot_plugin_apscheduler import scheduler
@ -45,6 +49,71 @@ _matcher = on_alconna(
_notice = on_notice(priority=1, block=False, rule=notice_rule(GroupIncreaseNoticeEvent)) _notice = on_notice(priority=1, block=False, rule=notice_rule(GroupIncreaseNoticeEvent))
_update_all_matcher = on_alconna(
Alconna("更新所有群组信息"),
permission=SUPERUSER,
priority=1,
block=True,
)
async def _update_all_groups_task(bot: Bot, session: EventSession):
"""
在后台执行所有群组的更新任务并向超级用户发送最终报告
"""
success_count = 0
fail_count = 0
total_count = 0
bot_id = bot.self_id
logger.info(f"Bot {bot_id}: 开始执行所有群组信息更新任务...", "更新所有群组")
try:
group_list, _ = await PlatformUtils.get_group_list(bot)
total_count = len(group_list)
for i, group in enumerate(group_list):
try:
logger.debug(
f"Bot {bot_id}: 正在更新第 {i + 1}/{total_count} 个群组: "
f"{group.group_id}",
"更新所有群组",
)
await MemberUpdateManage.update_group_member(bot, group.group_id)
success_count += 1
except Exception as e:
fail_count += 1
logger.error(
f"Bot {bot_id}: 更新群组 {group.group_id} 信息失败",
"更新所有群组",
e=e,
)
await asyncio.sleep(random.uniform(1.5, 3.0))
except Exception as e:
logger.error(f"Bot {bot_id}: 获取群组列表失败,任务中断", "更新所有群组", e=e)
await PlatformUtils.send_superuser(
bot,
f"Bot {bot_id} 更新所有群组信息任务失败:无法获取群组列表。",
session.id1,
)
return
summary_message = (
f"🤖 Bot {bot_id} 所有群组信息更新任务完成!\n"
f"总计群组: {total_count}\n"
f"✅ 成功: {success_count}\n"
f"❌ 失败: {fail_count}"
)
logger.info(summary_message.replace("\n", " | "), "更新所有群组")
await PlatformUtils.send_superuser(bot, summary_message, session.id1)
@_update_all_matcher.handle()
async def _(bot: Bot, session: EventSession):
await MessageUtils.build_message(
"已开始在后台更新所有群组信息,过程可能需要几分钟到几十分钟,完成后将私聊通知您。"
).send(reply_to=True)
asyncio.create_task(_update_all_groups_task(bot, session)) # noqa: RUF006
@_matcher.handle() @_matcher.handle()
async def _(bot: Bot, session: EventSession, arparma: Arparma): async def _(bot: Bot, session: EventSession, arparma: Arparma):
if gid := session.id3 or session.id2: if gid := session.id3 or session.id2:

View File

@ -6,6 +6,7 @@ from nonebot.adapters import Bot
from nonebot_plugin_uninfo import Member, SceneType, get_interface from nonebot_plugin_uninfo import Member, SceneType, get_interface
from zhenxun.configs.config import Config from zhenxun.configs.config import Config
from zhenxun.models.group_console import GroupConsole
from zhenxun.models.group_member_info import GroupInfoUser from zhenxun.models.group_member_info import GroupInfoUser
from zhenxun.models.level_user import LevelUser from zhenxun.models.level_user import LevelUser
from zhenxun.services.log import logger from zhenxun.services.log import logger
@ -94,6 +95,25 @@ class MemberUpdateManage:
) )
return "更新群组失败,群组不存在..." return "更新群组失败,群组不存在..."
members = await interface.get_members(SceneType.GROUP, group_list[0].id) members = await interface.get_members(SceneType.GROUP, group_list[0].id)
try:
group_console, _ = await GroupConsole.get_or_create(
group_id=group_id, defaults={"platform": platform}
)
group_console.member_count = len(members)
group_console.group_name = group_list[0].name or ""
await group_console.save(update_fields=["member_count", "group_name"])
logger.debug(
f"已更新群组 {group_id} 的成员总数为 {len(members)}",
"更新群组成员信息",
)
except Exception as e:
logger.error(
f"更新群组 {group_id} 的 GroupConsole 信息失败",
"更新群组成员信息",
e=e,
)
db_user = await GroupInfoUser.filter(group_id=group_id).all() db_user = await GroupInfoUser.filter(group_id=group_id).all()
db_user_uid = [u.user_id for u in db_user] db_user_uid = [u.user_id for u in db_user]
data_list = ([], [], []) data_list = ([], [], [])

View File

@ -192,11 +192,15 @@ class MarkdownData(ContainerComponent):
yield from find_components_recursive(self.elements) yield from find_components_recursive(self.elements)
async def get_extra_css(self, context: Any) -> str: async def get_extra_css(self, context: Any) -> str:
css_parts = []
if self.component_css:
css_parts.append(self.component_css)
if self.css_path: if self.css_path:
css_file = Path(self.css_path) css_file = Path(self.css_path)
if css_file.is_file(): if css_file.is_file():
async with aiofiles.open(css_file, encoding="utf-8") as f: async with aiofiles.open(css_file, encoding="utf-8") as f:
return await f.read() css_parts.append(await f.read())
else: else:
logger.warning(f"Markdown自定义CSS文件不存在: {self.css_path}") logger.warning(f"Markdown自定义CSS文件不存在: {self.css_path}")
else: else:
@ -206,5 +210,6 @@ class MarkdownData(ContainerComponent):
) )
if css_path and css_path.exists(): if css_path and css_path.exists():
async with aiofiles.open(css_path, encoding="utf-8") as f: async with aiofiles.open(css_path, encoding="utf-8") as f:
return await f.read() css_parts.append(await f.read())
return ""
return "\n".join(css_parts)

View File

@ -27,6 +27,7 @@ __all__ = [
"model_copy", "model_copy",
"model_dump", "model_dump",
"model_json_schema", "model_json_schema",
"model_validate",
"parse_as", "parse_as",
] ]
@ -44,6 +45,16 @@ def model_copy(
return model.copy(update=update_dict, deep=deep) return model.copy(update=update_dict, deep=deep)
def model_validate(model_class: type[T], obj: Any) -> T:
"""
Pydantic `model_validate` (v2) `parse_obj` (v1) 的兼容函数
"""
if PYDANTIC_V2:
return model_class.model_validate(obj)
else:
return model_class.parse_obj(obj)
if PYDANTIC_V2: if PYDANTIC_V2:
from pydantic import computed_field as compat_computed_field from pydantic import computed_field as compat_computed_field
else: else: