diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index df117731..998571b0 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -45,12 +45,9 @@ jobs: include: - language: python build-mode: none - - language: javascript-typescript - build-mode: none # CodeQL supports the following values keywords for 'language': 'c-cpp', 'csharp', 'go', 'java-kotlin', 'javascript-typescript', 'python', 'ruby', 'swift' # Use `c-cpp` to analyze code written in C, C++ or both # Use 'java-kotlin' to analyze code written in Java, Kotlin or both - # Use 'javascript-typescript' to analyze code written in JavaScript, TypeScript or both # To learn more about changing the languages that are analyzed or customizing the build mode for your analysis, # see https://docs.github.com/en/code-security/code-scanning/creating-an-advanced-setup-for-code-scanning/customizing-your-advanced-setup-for-code-scanning. # If you are analyzing a compiled language, you can modify the 'build-mode' for that language to customize how diff --git a/zhenxun/builtin_plugins/llm_manager/__init__.py b/zhenxun/builtin_plugins/llm_manager/__init__.py index de0e0caf..d48893fc 100644 --- a/zhenxun/builtin_plugins/llm_manager/__init__.py +++ b/zhenxun/builtin_plugins/llm_manager/__init__.py @@ -1,3 +1,5 @@ +from collections import defaultdict + from nonebot.permission import SUPERUSER from nonebot.plugin import PluginMetadata from nonebot_plugin_alconna import ( @@ -58,7 +60,12 @@ __plugin_meta__ = PluginMetadata( llm_cmd = on_alconna( Alconna( "llm", - Subcommand("list", alias=["ls"], help_text="查看模型列表"), + Subcommand( + "list", + Option("--text", action=store_true, help_text="以纯文本格式输出模型列表"), + alias=["ls"], + help_text="查看模型列表", + ), Subcommand("info", Args["model_name", str], help_text="查看模型详情"), Subcommand("default", Args["model_name?", str], help_text="查看或设置默认模型"), Subcommand( @@ -80,13 +87,36 @@ llm_cmd = on_alconna( @llm_cmd.assign("list") -async def handle_list(arp: Arparma, show_all: Query[bool] = Query("all")): +async def handle_list( + arp: Arparma, + show_all: Query[bool] = Query("all"), + text_mode: Query[bool] = Query("list.text.value", False), +): """处理 'llm list' 命令""" logger.info("获取LLM模型列表", command="LLM Manage", session=arp.header_result) models = await DataSource.get_model_list(show_all=show_all.result) - image = await Presenters.format_model_list_as_image(models, show_all.result) - await llm_cmd.finish(MessageUtils.build_message(image)) + if text_mode.result: + if not models: + await llm_cmd.finish("当前没有配置任何LLM模型。") + + grouped_models = defaultdict(list) + for model in models: + grouped_models[model["provider_name"]].append(model) + + response_parts = ["可用的LLM模型列表:"] + for provider, model_list in grouped_models.items(): + response_parts.append(f"\n{provider}:") + for model in model_list: + response_parts.append( + f" {model['provider_name']}/{model['model_name']}" + ) + + response_text = "\n".join(response_parts) + await llm_cmd.finish(response_text) + else: + image = await Presenters.format_model_list_as_image(models, show_all.result) + await llm_cmd.finish(MessageUtils.build_message(image)) @llm_cmd.assign("info") @@ -114,7 +144,7 @@ async def handle_default(arp: Arparma, model_name: Match[str]): command="LLM Manage", session=arp.header_result, ) - success, message = await DataSource.set_default_model(model_name.result) + _success, message = await DataSource.set_default_model(model_name.result) await llm_cmd.finish(message) else: logger.info("查看默认模型", command="LLM Manage", session=arp.header_result) @@ -132,7 +162,7 @@ async def handle_test(arp: Arparma, model_name: Match[str]): ) await llm_cmd.send(f"正在测试模型 '{model_name.result}',请稍候...") - success, message = await DataSource.test_model_connectivity(model_name.result) + _success, message = await DataSource.test_model_connectivity(model_name.result) await llm_cmd.finish(message) @@ -167,5 +197,5 @@ async def handle_reset_key( ) logger.info(log_msg, command="LLM Manage", session=arp.header_result) - success, message = await DataSource.reset_key(provider_name.result, key_to_reset) + _success, message = await DataSource.reset_key(provider_name.result, key_to_reset) await llm_cmd.finish(message) diff --git a/zhenxun/builtin_plugins/platform/qq/group_handle/__init__.py b/zhenxun/builtin_plugins/platform/qq/group_handle/__init__.py index ff8a1158..4ddcc47e 100644 --- a/zhenxun/builtin_plugins/platform/qq/group_handle/__init__.py +++ b/zhenxun/builtin_plugins/platform/qq/group_handle/__init__.py @@ -17,6 +17,8 @@ from zhenxun.configs.utils import PluginExtraData, RegisterConfig, Task from zhenxun.models.event_log import EventLog from zhenxun.models.group_console import GroupConsole from zhenxun.services.cache import CacheRoot +from zhenxun.services.log import logger +from zhenxun.services.tags import tag_manager from zhenxun.utils.common_utils import CommonUtils from zhenxun.utils.enum import EventLogType, PluginType from zhenxun.utils.platform import PlatformUtils @@ -135,6 +137,11 @@ async def _( await EventLog.create( user_id=user_id, group_id=group_id, event_type=EventLogType.KICK_BOT ) + await tag_manager.remove_group_from_all_tags(group_id) + logger.info( + f"机器人被移出群聊,已自动从所有静态标签中移除群组 {group_id}", + "群组标签管理", + ) elif event.sub_type in ["leave", "kick"]: if event.sub_type == "leave": """主动退群""" diff --git a/zhenxun/builtin_plugins/scheduler/auto_update_group.py b/zhenxun/builtin_plugins/scheduler/auto_update_group.py index dda1f3e3..8f4b2578 100644 --- a/zhenxun/builtin_plugins/scheduler/auto_update_group.py +++ b/zhenxun/builtin_plugins/scheduler/auto_update_group.py @@ -2,6 +2,7 @@ import nonebot from nonebot_plugin_apscheduler import scheduler from zhenxun.services.log import logger +from zhenxun.services.tags import tag_manager from zhenxun.utils.platform import PlatformUtils @@ -37,3 +38,20 @@ async def _(): f"Bot: {bot.self_id} 自动更新好友信息错误", "自动更新好友", e=e ) logger.info("自动更新好友信息成功...") + + +# 自动清理静态标签中的无效群组 +@scheduler.scheduled_job( + "cron", + hour=23, + minute=30, +) +async def _prune_stale_tags(): + deleted_count = await tag_manager.prune_stale_group_links() + if deleted_count > 0: + logger.info( + f"定时任务:成功清理了 {deleted_count} 个无效的群组标签" f"关联。", + "群组标签管理", + ) + else: + logger.debug("定时任务:未发现无效的群组标签关联。", "群组标签管理") diff --git a/zhenxun/builtin_plugins/superuser/broadcast/__init__.py b/zhenxun/builtin_plugins/superuser/broadcast/__init__.py index 0b9c15ba..008e377b 100644 --- a/zhenxun/builtin_plugins/superuser/broadcast/__init__.py +++ b/zhenxun/builtin_plugins/superuser/broadcast/__init__.py @@ -28,7 +28,8 @@ from nonebot_plugin_alconna.uniseg.segment import ( ) from nonebot_plugin_session import EventSession -from zhenxun.configs.utils import PluginExtraData, Task +from zhenxun.configs.utils import PluginExtraData, RegisterConfig, Task +from zhenxun.services.log import logger from zhenxun.utils.enum import PluginType from zhenxun.utils.message import MessageUtils @@ -45,34 +46,52 @@ __plugin_meta__ = PluginMetadata( name="广播", description="昭告天下!", usage=""" - 广播 [消息内容] - - 直接发送消息到除当前群组外的所有群组 - - 支持文本、图片、@、表情、视频等多种消息类型 - - 示例:广播 你们好! - - 示例:广播 [图片] 新活动开始啦! + 向所有群组或指定标签的群组发送广播消息。 - 广播 + 引用消息 - - 将引用的消息作为广播内容发送 - - 支持引用普通消息或合并转发消息 - - 示例:(引用一条消息) 广播 +**基础用法** +- `广播 [消息内容]`:向所有群组发送广播。 +- `广播` (并引用一条消息):将引用的消息作为内容进行广播。 - 广播撤回 - - 撤回最近一次由您触发的广播消息 - - 仅能撤回短时间内的消息 - - 示例:广播撤回 +**高级定向广播** +- `广播 -t <标签名> [消息内容]`:向指定标签下的所有群组广播。 +- `广播到 <标签名> [消息内容]`:与 `-t` 等效的快捷方式。 - 特性: - - 在群组中使用广播时,不会将消息发送到当前群组 - - 在私聊中使用广播时,会发送到所有群组 +**标签可以是静态的,也可以是动态的,例如:** +- `广播到 核心群 通知:...` +- `广播到 成员数>500的群 通知:...` - 别名: - - bc (广播的简写) - - recall (广播撤回的别名) +**其他命令** +- `广播撤回` (别名: `recall`):撤回最近一次发送的广播。 + +特性: +- 在群组中使用广播时,不会将消息发送到当前群组 +- 在私聊中使用广播时,会发送到所有群组 + +别名: +- bc (广播的简写) +- recall (广播撤回的别名) """.strip(), extra=PluginExtraData( author="HibiKier", - version="1.2", + version="1.3", plugin_type=PluginType.SUPERUSER, + configs=[ + RegisterConfig( + module="_task", + key="DEFAULT_BROADCAST", + value=True, + help="被动 广播 进群默认开关状态", + default_value=True, + type=bool, + ), + RegisterConfig( + module="_task", + key="BROADCAST_CONCURRENCY_LIMIT", + value=10, + help="广播时的最大并发任务数,以避免API速率限制", + default_value=10, + ), + ], tasks=[Task(module="broadcast", name="广播")], ).to_dict(), ) @@ -103,6 +122,9 @@ _matcher = on_alconna( Alconna( "广播", Args["content?", AllParam], + alc.Option( + "-t|--tag", Args["tag_name_bc", str], help_text="向指定标签的群组广播" + ), ), aliases={"bc"}, priority=1, @@ -112,6 +134,8 @@ _matcher = on_alconna( use_origin=False, ) +_matcher.shortcut("广播到 {tag}", command="广播 -t {tag} {%*}") + _recall_matcher = on_alconna( Alconna("广播撤回"), aliases={"recall"}, @@ -128,23 +152,59 @@ async def handle_broadcast( event: Event, session: EventSession, arp: alc.Arparma, + tag_name_match: alc.Match[str] = alc.AlconnaMatch("tag_name_bc"), ): broadcast_content_msg = await _extract_broadcast_content(bot, event, arp, session) if not broadcast_content_msg: return - target_groups, enabled_groups = await get_broadcast_target_groups(bot, session) - if not target_groups or not enabled_groups: + tag_name_to_broadcast = None + force_send = False + + if tag_name_match.available: + tag_name_to_broadcast = tag_name_match.result + force_send = True + + mode_desc = "强制发送到标签" if force_send else "普通发送" + logger.debug( + f"广播模式: {mode_desc}, 标签名: {tag_name_to_broadcast}", + "广播", + ) + + target_groups_console, groups_to_actually_send = await get_broadcast_target_groups( + bot, session, tag_name_to_broadcast, force_send + ) + + if not target_groups_console: + if tag_name_to_broadcast: + await MessageUtils.build_message( + f"标签 '{tag_name_to_broadcast}' 中没有群组或标签不存在。" + ).send(reply_to=True) + return + + if not groups_to_actually_send: + if not force_send and target_groups_console: + await MessageUtils.build_message( + "没有启用了广播功能的目标群组可供立即发送。" + ).send(reply_to=True) return try: await send_broadcast_and_notify( - bot, event, broadcast_content_msg, enabled_groups, target_groups, session + bot, + event, + broadcast_content_msg, + groups_to_actually_send, + target_groups_console, + session, + force_send, ) except Exception as e: error_msg = "发送广播失败" BroadcastManager.log_error(error_msg, e, session) - await MessageUtils.build_message(f"{error_msg}。").send(reply_to=True) + await bot.send_private_msg( + user_id=str(event.get_user_id()), message=f"{error_msg}。" + ) @_recall_matcher.handle() @@ -178,5 +238,6 @@ async def handle_broadcast_recall( except Exception as e: error_msg = "撤回广播消息失败" BroadcastManager.log_error(error_msg, e, session) - user_id = str(event.get_user_id()) - await bot.send_private_msg(user_id=user_id, message=f"{error_msg}。") + await bot.send_private_msg( + user_id=str(event.get_user_id()), message=f"{error_msg}。" + ) diff --git a/zhenxun/builtin_plugins/superuser/broadcast/broadcast_manager.py b/zhenxun/builtin_plugins/superuser/broadcast/broadcast_manager.py index c3d7b5cc..fcc78b79 100644 --- a/zhenxun/builtin_plugins/superuser/broadcast/broadcast_manager.py +++ b/zhenxun/builtin_plugins/superuser/broadcast/broadcast_manager.py @@ -5,11 +5,12 @@ from typing import ClassVar from nonebot.adapters import Bot from nonebot.adapters.onebot.v11 import Bot as V11Bot -from nonebot.exception import ActionFailed +from nonebot.exception import ActionFailed, AdapterException from nonebot_plugin_alconna import UniMessage from nonebot_plugin_alconna.uniseg import Receipt, Reference from nonebot_plugin_session import EventSession +from zhenxun.configs.config import Config from zhenxun.models.group_console import GroupConsole from zhenxun.services.log import logger from zhenxun.utils.common_utils import CommonUtils @@ -18,6 +19,8 @@ from zhenxun.utils.platform import PlatformUtils from .models import BroadcastDetailResult, BroadcastResult from .utils import custom_nodes_to_v11_nodes, uni_message_to_v11_list_of_dicts +BROADCAST_SEND_DELAY_RANGE = (1, 3) + class BroadcastManager: """广播管理器""" @@ -92,8 +95,16 @@ class BroadcastManager: logger.debug("清空上一次的广播消息ID记录", "广播", session=session) cls.clear_last_broadcast_msg_ids() + concurrency_limit = Config.get_config( + "_task", + "BROADCAST_CONCURRENCY_LIMIT", + 10, + ) + all_groups, _ = await cls.get_all_groups(bot) - return await cls.send_to_specific_groups(bot, message, all_groups, session) + return await cls.send_to_specific_groups( + bot, message, all_groups, session, concurrency_limit=concurrency_limit + ) @classmethod async def send_to_specific_groups( @@ -102,14 +113,17 @@ class BroadcastManager: message: UniMessage, target_groups: list[GroupConsole], session_info: EventSession | str | None = None, + force_send: bool = False, + concurrency_limit: int = 10, ) -> BroadcastResult: """发送广播到指定群组""" log_session = session_info or bot.self_id - logger.debug( - f"开始广播,目标 {len(target_groups)} 个群组,Bot ID: {bot.self_id}", - "广播", - session=log_session, + target_count = len(target_groups) + log_message = ( + f"开始广播,目标 {target_count} 个群组 (并发数: {concurrency_limit})," + f"Bot ID: {bot.self_id}, ForceSend: {force_send}" ) + logger.info(log_message, "广播", session=log_session) if not target_groups: logger.debug("目标群组列表为空,广播结束", "广播", session=log_session) @@ -165,7 +179,12 @@ class BroadcastManager: ) return 0, len(target_groups) success_count, error_count, skip_count = await cls._broadcast_forward( - bot, log_session, target_groups, v11_nodes + bot, + log_session, + target_groups, + v11_nodes, + force_send, + concurrency_limit, ) else: if is_forward_broadcast: @@ -175,7 +194,12 @@ class BroadcastManager: session=log_session, ) success_count, error_count, skip_count = await cls._broadcast_normal( - bot, log_session, target_groups, message + bot, + log_session, + target_groups, + message, + force_send, + concurrency_limit, ) total = len(target_groups) @@ -287,11 +311,16 @@ class BroadcastManager: ) @classmethod - async def _check_group_availability(cls, bot: Bot, group: GroupConsole) -> bool: + async def _check_group_availability( + cls, bot: Bot, group: GroupConsole, force_send: bool = False + ) -> bool: """检查群组是否可用""" if not group.group_id: return False + if force_send: + return True + if await CommonUtils.task_is_block(bot, "broadcast", group.group_id): return False @@ -304,54 +333,69 @@ class BroadcastManager: session_info: EventSession | str, group_list: list[GroupConsole], v11_nodes: list[dict], + force_send: bool = False, + concurrency_limit: int = 10, ) -> BroadcastDetailResult: """发送合并转发""" - success_count = 0 - error_count = 0 - skip_count = 0 + semaphore = asyncio.Semaphore(concurrency_limit) + msg_id_lock = asyncio.Lock() - for _, group in enumerate(group_list): + async def send_to_group(group: GroupConsole) -> GroupConsole: group_key = group.group_id or group.channel_id + async with semaphore: + try: + result = await bot.send_group_forward_msg( + group_id=int(group.group_id), messages=v11_nodes + ) + async with msg_id_lock: + await cls._extract_message_id_from_result( + result, group_key, session_info, "合并转发" + ) + await asyncio.sleep(random.uniform(*BROADCAST_SEND_DELAY_RANGE)) + return group + except (ActionFailed, AdapterException) as ae: + logger.error( + f"发送失败(合并转发) to {group_key}: {ae}", + "广播", + session=session_info, + e=ae, + ) + raise + except Exception as e: + logger.error( + f"发送失败(合并转发) to {group_key}: {e}", + "广播", + session=session_info, + e=e, + ) + raise - if not await cls._check_group_availability(bot, group): - skip_count += 1 - continue + tasks: list[asyncio.Task] = [] + skipped_groups: list[GroupConsole] = [] + for group in group_list: + if await cls._check_group_availability(bot, group, force_send): + tasks.append(asyncio.create_task(send_to_group(group))) + else: + skipped_groups.append(group) - try: - result = await bot.send_group_forward_msg( - group_id=int(group.group_id), messages=v11_nodes - ) + if skipped_groups: + logger.info( + f"跳过 {len(skipped_groups)} 个不符合条件的群组", + "广播", + session=session_info, + ) - logger.debug( - f"合并转发消息发送结果: {result}, 类型: {type(result)}", - "广播", - session=session_info, - ) + if not tasks: + return 0, 0, len(skipped_groups) - await cls._extract_message_id_from_result( - result, group_key, session_info, "合并转发" - ) + results = await asyncio.gather(*tasks, return_exceptions=True) - success_count += 1 - await asyncio.sleep(random.randint(1, 3)) - except ActionFailed as af_e: - error_count += 1 - logger.error( - f"发送失败(合并转发) to {group_key}: {af_e}", - "广播", - session=session_info, - e=af_e, - ) - except Exception as e: - error_count += 1 - logger.error( - f"发送失败(合并转发) to {group_key}: {e}", - "广播", - session=session_info, - e=e, - ) + success_count = sum( + 1 for result in results if not isinstance(result, Exception) + ) + error_count = len(results) - success_count - return success_count, error_count, skip_count + return success_count, error_count, len(skipped_groups) @classmethod async def _broadcast_normal( @@ -360,58 +404,83 @@ class BroadcastManager: session_info: EventSession | str, group_list: list[GroupConsole], message: UniMessage, + force_send: bool = False, + concurrency_limit: int = 10, ) -> BroadcastDetailResult: """发送普通消息""" - success_count = 0 - error_count = 0 - skip_count = 0 + semaphore = asyncio.Semaphore(concurrency_limit) + msg_id_lock = asyncio.Lock() - for _, group in enumerate(group_list): + async def send_to_group(group: GroupConsole) -> GroupConsole: group_key = ( f"{group.group_id}:{group.channel_id}" if group.channel_id else str(group.group_id) ) - - if not await cls._check_group_availability(bot, group): - skip_count += 1 - continue - - try: - target = PlatformUtils.get_target( - group_id=group.group_id, channel_id=group.channel_id - ) - - if target: - receipt: Receipt = await message.send(target, bot=bot) - - logger.debug( - f"广播消息发送结果: {receipt}, 类型: {type(receipt)}", - "广播", - session=session_info, - ) - - await cls._extract_message_id_from_result( - receipt, group_key, session_info - ) - - success_count += 1 - await asyncio.sleep(random.randint(1, 3)) - else: - logger.warning( - "target为空", "广播", session=session_info, target=group_key - ) - skip_count += 1 - except Exception as e: - error_count += 1 - logger.error( - f"发送失败(普通) to {group_key}: {e}", + target = PlatformUtils.get_target( + group_id=group.group_id, channel_id=group.channel_id + ) + if not target: + logger.warning( + "target为空", "广播", session=session_info, - e=e, + target=group_key, ) + raise ValueError(f"无法为群组 {group_key} 创建发送目标") - return success_count, error_count, skip_count + async with semaphore: + try: + receipt: Receipt = await message.send(target, bot=bot) + async with msg_id_lock: + await cls._extract_message_id_from_result( + receipt, group_key, session_info + ) + await asyncio.sleep(random.uniform(*BROADCAST_SEND_DELAY_RANGE)) + return group + except (ActionFailed, AdapterException) as ae: + logger.error( + f"发送失败(普通) to {group_key}: {ae}", + "广播", + session=session_info, + e=ae, + ) + raise + except Exception as e: + logger.error( + f"发送失败(普通) to {group_key}: {e}", + "广播", + session=session_info, + e=e, + ) + raise + + tasks: list[asyncio.Task] = [] + skipped_groups: list[GroupConsole] = [] + for group in group_list: + if await cls._check_group_availability(bot, group, force_send): + tasks.append(asyncio.create_task(send_to_group(group))) + else: + skipped_groups.append(group) + + if skipped_groups: + logger.info( + f"跳过 {len(skipped_groups)} 个不符合条件的群组", + "广播", + session=session_info, + ) + + if not tasks: + return 0, 0, len(skipped_groups) + + results = await asyncio.gather(*tasks, return_exceptions=True) + + success_count = sum( + 1 for result in results if not isinstance(result, Exception) + ) + error_count = len(results) - success_count + + return success_count, error_count, len(skipped_groups) @classmethod async def recall_last_broadcast( diff --git a/zhenxun/builtin_plugins/superuser/broadcast/message_processor.py b/zhenxun/builtin_plugins/superuser/broadcast/message_processor.py index 809e3645..2e60fc37 100644 --- a/zhenxun/builtin_plugins/superuser/broadcast/message_processor.py +++ b/zhenxun/builtin_plugins/superuser/broadcast/message_processor.py @@ -21,8 +21,11 @@ from nonebot_plugin_alconna.uniseg.segment import ( from nonebot_plugin_alconna.uniseg.tools import reply_fetch from nonebot_plugin_session import EventSession +from zhenxun.models.group_console import GroupConsole from zhenxun.services.log import logger +from zhenxun.services.tags import tag_manager as TagManager from zhenxun.utils.common_utils import CommonUtils +from zhenxun.utils.http_utils import AsyncHttpx from zhenxun.utils.message import MessageUtils from .broadcast_manager import BroadcastManager @@ -399,22 +402,29 @@ async def _process_v11_segment( elif target_qq: result.append(At(flag="user", target=target_qq)) elif seg_type == "video": - video_seg = None - if data_dict.get("url"): - video_seg = Video(url=data_dict["url"]) - elif data_dict.get("file"): - file_val = data_dict["file"] + if url := data_dict.get("url"): + try: + logger.debug(f"[D{depth}] 正在下载视频用于广播: {url}", "广播") + video_bytes = await AsyncHttpx.get_content(url) + video_seg = Video(raw=video_bytes) + logger.debug( + f"[D{depth}] 视频下载成功, 大小: {len(video_bytes)} bytes", + "广播", + ) + result.append(video_seg) + except Exception as e: + logger.error(f"[D{depth}] 广播时下载视频失败: {url}", "广播", e=e) + result.append(Text(f"[视频下载失败: {url}]")) + elif file_val := data_dict.get("file"): if isinstance(file_val, str) and file_val.startswith("base64://"): b64_data = file_val[9:] raw_bytes = base64.b64decode(b64_data) video_seg = Video(raw=raw_bytes) + result.append(video_seg) else: video_seg = Video(path=file_val) - if video_seg: - result.append(video_seg) - logger.debug(f"[Depth {depth}] 处理视频消息成功", "广播") - else: - logger.warning(f"[Depth {depth}] V11 视频 {index} 缺少URL/文件", "广播") + result.append(video_seg) + return result elif seg_type == "forward": nested_forward_id = data_dict.get("id") or data_dict.get("resid") nested_forward_content = data_dict.get("content") @@ -515,70 +525,129 @@ async def _extract_content_from_message( async def get_broadcast_target_groups( - bot: Bot, session: EventSession + bot: Bot, + session: EventSession, + tag_name: str | None = None, + force_send: bool = False, ) -> tuple[list, list]: """获取广播目标群组和启用了广播功能的群组""" - target_groups = [] - all_groups, _ = await BroadcastManager.get_all_groups(bot) + target_groups_console: list[GroupConsole] = [] - current_group_id = None - if hasattr(session, "id2") and session.id2: - current_group_id = session.id2 + current_group_raw = getattr(session, "id2", None) or getattr( + session, "group_id", None + ) + current_group_id = str(current_group_raw) if current_group_raw else None - if current_group_id: - target_groups = [ - group for group in all_groups if group.group_id != current_group_id - ] - logger.info( - f"向除当前群组({current_group_id})外的所有群组广播", "广播", session=session - ) + logger.debug(f"当前群组ID: {current_group_id}", "广播") + + if tag_name: + tagged_group_ids = await TagManager.resolve_tag_to_group_ids(tag_name, bot=bot) + if not tagged_group_ids: + return [], [] + + valid_groups = await GroupConsole.filter(group_id__in=tagged_group_ids) + + if current_group_id: + target_groups_console = [ + group + for group in valid_groups + if str(group.group_id) != current_group_id + ] + excluded_msg = ( + f",已排除当前群组({current_group_id})" + if any( + str(group.group_id) == current_group_id for group in valid_groups + ) + else "" + ) + broadcast_msg = ( + f"向标签 '{tag_name}' 中的 {len(target_groups_console)} 个群组广播 " + f"(ForceSend: {force_send}){excluded_msg}" + ) + logger.info(broadcast_msg, "广播", session=session) + else: + target_groups_console = valid_groups + broadcast_msg = ( + f"向标签 '{tag_name}' 中的 {len(target_groups_console)} 个群组广播 " + f"(ForceSend: {force_send})" + ) + logger.info(broadcast_msg, "广播", session=session) else: - target_groups = all_groups - logger.info("向所有群组广播", "广播", session=session) + all_groups, _ = await BroadcastManager.get_all_groups(bot) - if not target_groups: - await MessageUtils.build_message("没有找到符合条件的广播目标群组。").send( - reply_to=True - ) + if current_group_id: + target_groups_console = [ + group for group in all_groups if str(group.group_id) != current_group_id + ] + logger.info( + ( + f"向除当前群组({current_group_id})外的所有群组广播 " + f"(ForceSend: {force_send})" + ), + "广播", + session=session, + ) + else: + target_groups_console = all_groups + logger.info( + f"向所有群组广播 (ForceSend: {force_send})", "广播", session=session + ) + + if not target_groups_console: + if not tag_name: + await MessageUtils.build_message("没有找到符合条件的广播目标群组。").send( + reply_to=True + ) return [], [] - enabled_groups = [] - for group in target_groups: - if not await CommonUtils.task_is_block(bot, "broadcast", group.group_id): - enabled_groups.append(group) + groups_to_actually_send = [] + if force_send: + groups_to_actually_send = target_groups_console + logger.debug( + f"强制发送模式,将向 {len(groups_to_actually_send)} 个目标群组尝试发送。", + "广播", + ) + else: + for group in target_groups_console: + if not await CommonUtils.task_is_block(bot, "broadcast", group.group_id): + groups_to_actually_send.append(group) + logger.debug( + f"普通发送模式,筛选后将向 {len(groups_to_actually_send)} " + f"个目标群组尝试发送", + "广播", + ) - if not enabled_groups: - await MessageUtils.build_message( - "没有启用了广播功能的目标群组可供立即发送。" - ).send(reply_to=True) - return target_groups, [] - - return target_groups, enabled_groups + return target_groups_console, groups_to_actually_send async def send_broadcast_and_notify( bot: Bot, event: Event, message: UniMessage, - enabled_groups: list, - target_groups: list, + groups_to_send: list, + all_target_groups_for_stats: list, session: EventSession, + force_send: bool = False, ) -> None: """发送广播并通知结果""" BroadcastManager.clear_last_broadcast_msg_ids() count, error_count = await BroadcastManager.send_to_specific_groups( - bot, message, enabled_groups, session + bot, message, groups_to_send, session, force_send ) result = f"成功广播 {count} 个群组" if error_count: result += f"\n发送失败 {error_count} 个群组" - result += f"\n有效: {len(enabled_groups)} / 总计: {len(target_groups)}" + + effective_sent_count = len(groups_to_send) + total_considered_count = len(all_target_groups_for_stats) + + result += f"\n有效: {effective_sent_count} / 总计目标: {total_considered_count}" user_id = str(event.get_user_id()) await bot.send_private_msg(user_id=user_id, message=f"发送广播完成!\n{result}") BroadcastManager.log_info( - f"广播完成,有效/总计: {len(enabled_groups)}/{len(target_groups)}", + f"广播完成,有效/总计目标: {effective_sent_count}/{total_considered_count}", session, ) diff --git a/zhenxun/builtin_plugins/superuser/broadcast/utils.py b/zhenxun/builtin_plugins/superuser/broadcast/utils.py index 748559fd..c3626f87 100644 --- a/zhenxun/builtin_plugins/superuser/broadcast/utils.py +++ b/zhenxun/builtin_plugins/superuser/broadcast/utils.py @@ -59,7 +59,7 @@ def uni_segment_to_v11_segment_dict( logger.warning(f"无法处理 Video.raw 的类型: {type(raw_data)}", "广播") elif getattr(seg, "path", None): logger.warning( - f"在合并转发中使用了本地视频路径,可能无法显示: {seg.path}", "广播" + f"在合并转发中使用了本地视频路径,可能无法发送: {seg.path}", "广播" ) return {"type": "video", "data": {"file": f"file:///{seg.path}"}} else: diff --git a/zhenxun/builtin_plugins/superuser/tag_manage.py b/zhenxun/builtin_plugins/superuser/tag_manage.py index f934531b..dfbbdb77 100644 --- a/zhenxun/builtin_plugins/superuser/tag_manage.py +++ b/zhenxun/builtin_plugins/superuser/tag_manage.py @@ -176,12 +176,30 @@ tag_cmd = on_alconna( help_text="删除标签", ), Subcommand("clear", help_text="清空所有标签"), + Subcommand("prune", alias=["check", "清理"], help_text="清理无效的群组关联"), + Subcommand( + "clone", + Args["source_name", str]["new_name", str], + Option("--add", Args["add_groups", MultiVar(str)]), + Option("--remove", Args["remove_groups", MultiVar(str)]), + Option("--as-dynamic", action=store_true), + Option("--desc", Args["description", str]), + Option("--mode", Args["mode", ["black", "white"]]), + help_text="克隆标签", + ), ), permission=SUPERUSER, priority=5, block=True, ) +tag_cmd.shortcut( + "清理标签", + command="tag", + arguments=["prune"], + prefix=True, +) + @tag_cmd.assign("list") async def handle_list(): @@ -269,17 +287,24 @@ async def handle_create( ).finish() try: + gids_to_create = None + unique_gids_count = 0 + if group_ids.available: + unique_gids = list(dict.fromkeys(group_ids.result)) + gids_to_create = unique_gids + unique_gids_count = len(unique_gids) + tag = await tag_manager.create_tag( name=name.result, is_blacklist=blacklist.result, description=description.result if description.available else None, - group_ids=group_ids.result if group_ids.available else None, + group_ids=gids_to_create, tag_type=ttype, dynamic_rule=rule.result if rule.available else None, ) msg = f"标签 '{tag.name}' 创建成功!" if group_ids.available: - msg += f"\n已同时关联 {len(group_ids.result)} 个群组。" + msg += f"\n已同时关联 {unique_gids_count} 个群组。" await MessageUtils.build_message(msg).finish() except IntegrityError: await MessageUtils.build_message( @@ -411,3 +436,48 @@ async def handle_clear(): await MessageUtils.build_message(f"操作完成,已清空 {count} 个标签。").finish() else: await MessageUtils.build_message("操作已取消。").finish() + + +@tag_cmd.assign("clone") +async def handle_clone( + bot: Bot, + source_name: Match[str], + new_name: Match[str], + add_groups: Query[list[str] | None] = AlconnaQuery("clone.add.add_groups", None), + remove_groups: Query[list[str] | None] = AlconnaQuery( + "clone.remove.remove_groups", None + ), + as_dynamic: Query[bool] = AlconnaQuery("clone.as-dynamic.value", False), + description: Query[str | None] = AlconnaQuery("clone.desc.description", None), + mode: Query[str | None] = AlconnaQuery("clone.mode.mode", None), +): + try: + new_tag = await tag_manager.clone_tag( + source_name=source_name.result, + new_name=new_name.result, + bot=bot, + add_groups=add_groups.result, + remove_groups=remove_groups.result, + as_dynamic=as_dynamic.result, + description=description.result, + mode=mode.result, + ) + + tag_type_str = "动态" if new_tag.tag_type == "DYNAMIC" else "静态" + group_count = 0 + if new_tag.tag_type == "STATIC": + group_count = await new_tag.groups.all().count() + + msg = f"✅ 成功克隆标签!\n- 新标签: {new_tag.name}\n- 类型: {tag_type_str}" + if new_tag.tag_type == "STATIC": + msg += f" (含 {group_count} 个群组)" + await MessageUtils.build_message(msg).finish() + except (ValueError, IntegrityError) as e: + await MessageUtils.build_message(f"克隆失败: {e}").finish() + + +@tag_cmd.assign("prune") +async def handle_prune(): + deleted_count = await tag_manager.prune_stale_group_links() + msg = f"清理完成!共移除了 {deleted_count} 个无效的群组关联。" + await MessageUtils.build_message(msg).finish() diff --git a/zhenxun/services/llm/adapters/gemini.py b/zhenxun/services/llm/adapters/gemini.py index b3fb109d..444a9023 100644 --- a/zhenxun/services/llm/adapters/gemini.py +++ b/zhenxun/services/llm/adapters/gemini.py @@ -354,6 +354,24 @@ class GeminiAdapter(BaseAdapter): return safety_settings if safety_settings else None + def validate_response(self, response_json: dict[str, Any]) -> None: + """验证 Gemini API 响应,增加对 promptFeedback 的检查""" + super().validate_response(response_json) + + if prompt_feedback := response_json.get("promptFeedback"): + if block_reason := prompt_feedback.get("blockReason"): + logger.warning( + f"Gemini 内容因 promptFeedback 被安全过滤: {block_reason}" + ) + raise LLMException( + f"内容被安全过滤: {block_reason}", + code=LLMErrorCode.CONTENT_FILTERED, + details={ + "block_reason": block_reason, + "safety_ratings": prompt_feedback.get("safetyRatings"), + }, + ) + def parse_response( self, model: "LLMModel", diff --git a/zhenxun/services/llm/config/providers.py b/zhenxun/services/llm/config/providers.py index 61f6189c..f3895481 100644 --- a/zhenxun/services/llm/config/providers.py +++ b/zhenxun/services/llm/config/providers.py @@ -192,10 +192,20 @@ def get_default_providers() -> list[dict[str, Any]]: "api_base": "https://generativelanguage.googleapis.com", "api_type": "gemini", "models": [ - {"model_name": "gemini-2.0-flash"}, {"model_name": "gemini-2.5-flash"}, {"model_name": "gemini-2.5-pro"}, - {"model_name": "gemini-2.5-flash-lite-preview-06-17"}, + {"model_name": "gemini-2.5-flash-lite"}, + ], + }, + { + "name": "OpenRouter", + "api_key": "YOUR_OPENROUTER_API_KEY", + "api_base": "https://openrouter.ai/api", + "api_type": "openrouter", + "models": [ + {"model_name": "google/gemini-2.5-pro"}, + {"model_name": "google/gemini-2.5-flash"}, + {"model_name": "x-ai/grok-4"}, ], }, ] diff --git a/zhenxun/services/llm/types/capabilities.py b/zhenxun/services/llm/types/capabilities.py index 51a7d8d1..2e083708 100644 --- a/zhenxun/services/llm/types/capabilities.py +++ b/zhenxun/services/llm/types/capabilities.py @@ -9,6 +9,8 @@ import fnmatch from pydantic import BaseModel, Field +from zhenxun.services.log import logger + class ModelModality(str, Enum): TEXT = "text" @@ -50,6 +52,46 @@ GEMINI_IMAGE_GEN_CAPABILITIES = ModelCapabilities( supports_tool_calling=True, ) +GPT_ADVANCED_TEXT_IMAGE_CAPABILITIES = ModelCapabilities( + input_modalities={ModelModality.TEXT, ModelModality.IMAGE}, + output_modalities={ModelModality.TEXT}, + supports_tool_calling=True, +) + +GPT_MULTIMODAL_IO_CAPABILITIES = ModelCapabilities( + input_modalities={ModelModality.TEXT, ModelModality.AUDIO, ModelModality.IMAGE}, + output_modalities={ModelModality.TEXT, ModelModality.AUDIO}, + supports_tool_calling=True, +) + +GPT_IMAGE_GENERATION_CAPABILITIES = ModelCapabilities( + input_modalities={ModelModality.TEXT, ModelModality.IMAGE}, + output_modalities={ModelModality.IMAGE}, + supports_tool_calling=True, +) + +GPT_VIDEO_GENERATION_CAPABILITIES = ModelCapabilities( + input_modalities={ModelModality.TEXT, ModelModality.IMAGE, ModelModality.VIDEO}, + output_modalities={ModelModality.VIDEO}, + supports_tool_calling=True, +) + +DEFAULT_PERMISSIVE_CAPABILITIES = ModelCapabilities( + input_modalities={ + ModelModality.TEXT, + ModelModality.IMAGE, + ModelModality.AUDIO, + ModelModality.VIDEO, + }, + output_modalities={ + ModelModality.TEXT, + ModelModality.IMAGE, + ModelModality.AUDIO, + ModelModality.VIDEO, + }, + supports_tool_calling=True, +) + DOUBAO_ADVANCED_MULTIMODAL_CAPABILITIES = ModelCapabilities( input_modalities={ModelModality.TEXT, ModelModality.IMAGE, ModelModality.VIDEO}, @@ -91,11 +133,8 @@ MODEL_CAPABILITIES_REGISTRY: dict[str, ModelCapabilities] = { is_embedding_model=True, ), "*gemini-*-image-preview*": GEMINI_IMAGE_GEN_CAPABILITIES, - "gemini-2.5-pro*": GEMINI_CAPABILITIES, - "gemini-1.5-pro*": GEMINI_CAPABILITIES, - "gemini-2.5-flash*": GEMINI_CAPABILITIES, - "gemini-2.0-flash*": GEMINI_CAPABILITIES, - "gemini-1.5-flash*": GEMINI_CAPABILITIES, + "gemini-*-pro*": GEMINI_CAPABILITIES, + "gemini-*-flash*": GEMINI_CAPABILITIES, "GLM-4V-Flash": ModelCapabilities( input_modalities={ModelModality.TEXT, ModelModality.IMAGE}, output_modalities={ModelModality.TEXT}, @@ -112,6 +151,13 @@ MODEL_CAPABILITIES_REGISTRY: dict[str, ModelCapabilities] = { "doubao-1-5-thinking-vision-pro": DOUBAO_ADVANCED_MULTIMODAL_CAPABILITIES, "deepseek-chat": STANDARD_TEXT_TOOL_CAPABILITIES, "deepseek-reasoner": STANDARD_TEXT_TOOL_CAPABILITIES, + "gpt-5*": GPT_ADVANCED_TEXT_IMAGE_CAPABILITIES, + "gpt-4.1*": GPT_ADVANCED_TEXT_IMAGE_CAPABILITIES, + "gpt-4o*": GPT_MULTIMODAL_IO_CAPABILITIES, + "o3*": GPT_ADVANCED_TEXT_IMAGE_CAPABILITIES, + "o4-mini*": GPT_ADVANCED_TEXT_IMAGE_CAPABILITIES, + "gpt image*": GPT_IMAGE_GENERATION_CAPABILITIES, + "sora*": GPT_VIDEO_GENERATION_CAPABILITIES, } @@ -126,11 +172,25 @@ def get_model_capabilities(model_name: str) -> ModelCapabilities: canonical_name = c_name break - if canonical_name in MODEL_CAPABILITIES_REGISTRY: - return MODEL_CAPABILITIES_REGISTRY[canonical_name] + parts = canonical_name.split("/") + names_to_check = ["/".join(parts[i:]) for i in range(len(parts))] - for pattern, capabilities in MODEL_CAPABILITIES_REGISTRY.items(): - if "*" in pattern and fnmatch.fnmatch(model_name, pattern): - return capabilities + logger.trace(f"为 '{model_name}' 生成的检查列表: {names_to_check}") - return ModelCapabilities() + for name in names_to_check: + if name in MODEL_CAPABILITIES_REGISTRY: + logger.debug(f"模型 '{model_name}' 通过精确匹配 '{name}' 找到能力定义。") + return MODEL_CAPABILITIES_REGISTRY[name] + + for pattern, capabilities in MODEL_CAPABILITIES_REGISTRY.items(): + if "*" in pattern and fnmatch.fnmatch(name, pattern): + logger.debug( + f"模型 '{model_name}' 通过通配符匹配 '{name}'(pattern: '{pattern}')" + f"找到能力定义。" + ) + return capabilities + + logger.warning( + f"模型 '{model_name}' 的能力定义未在注册表中找到,将使用默认的'全功能'回退配置" + ) + return DEFAULT_PERMISSIVE_CAPABILITIES diff --git a/zhenxun/services/renderer/protocols.py b/zhenxun/services/renderer/protocols.py index 3255b0d3..619cdc48 100644 --- a/zhenxun/services/renderer/protocols.py +++ b/zhenxun/services/renderer/protocols.py @@ -40,7 +40,7 @@ class Renderable(ABC): @abstractmethod def get_children(self) -> Iterable["Renderable"]: """ - [新增] 返回一个包含所有直接子组件的可迭代对象。 + 返回一个包含所有直接子组件的可迭代对象。 这使得渲染服务能够递归地遍历整个组件树,以执行依赖收集(CSS、JS)等任务。 非容器组件应返回一个空列表。 diff --git a/zhenxun/services/renderer/service.py b/zhenxun/services/renderer/service.py index 0e2a1413..2b470d2a 100644 --- a/zhenxun/services/renderer/service.py +++ b/zhenxun/services/renderer/service.py @@ -75,6 +75,7 @@ class RendererService: self._custom_globals: dict[str, Callable] = {} self.filter("dump_json")(self._pydantic_tojson_filter) + self.global_function("inline_asset")(self._inline_asset_global) def _create_jinja_env(self) -> Environment: """ @@ -176,9 +177,24 @@ class RendererService: return decorator + async def _inline_asset_global(self, namespaced_path: str) -> str: + """ + 一个Jinja2全局函数,用于读取并内联一个已注册命名空间下的资源文件内容。 + 主要用于内联SVG,以解决浏览器的跨域安全问题。 + """ + if not self._jinja_env or not self._jinja_env.loader: + return f"" + try: + source, _, _ = self._jinja_env.loader.get_source( + self._jinja_env, namespaced_path + ) + return source + except TemplateNotFound: + return f"" + async def initialize(self): """ - [新增] 延迟初始化方法,在 on_startup 钩子中调用。 + 延迟初始化方法,在 on_startup 钩子中调用。 负责初始化截图引擎和主题管理器,确保在首次渲染前所有依赖都已准备就绪。 使用锁来防止并发初始化。 @@ -223,27 +239,36 @@ class RendererService: ) style_paths_to_load = [] - if manifest and "styles" in manifest: - styles = ( - [manifest["styles"]] - if isinstance(manifest["styles"], str) - else manifest["styles"] - ) - for style_path in styles: - full_style_path = str(Path(component_path_base) / style_path).replace( - "\\", "/" + if manifest and manifest.get("styles"): + styles = manifest["styles"] + styles = [styles] if isinstance(styles, str) else styles + + resolution_base_path = Path(component_path_base) + if variant: + skin_manifest_path = str(Path(component_path_base) / "skins" / variant) + skin_manifest = await context.theme_manager._load_single_manifest( + skin_manifest_path ) - style_paths_to_load.append(full_style_path) + if skin_manifest and "styles" in skin_manifest: + resolution_base_path = Path(skin_manifest_path) + + style_paths_to_load.extend( + str(resolution_base_path / style).replace("\\", "/") for style in styles + ) else: - resolved_template_name = ( + base_template_path = ( await context.theme_manager._resolve_component_template( component, context ) ) - conventional_style_path = str( - Path(resolved_template_name).with_name("style.css") + base_style_path = str( + Path(base_template_path).with_name("style.css") ).replace("\\", "/") - style_paths_to_load.append(conventional_style_path) + style_paths_to_load.append(base_style_path) + + if variant: + skin_style_path = f"{component_path_base}/skins/{variant}/style.css" + style_paths_to_load.append(skin_style_path) for css_template_path in style_paths_to_load: try: diff --git a/zhenxun/services/renderer/theme.py b/zhenxun/services/renderer/theme.py index 410176b2..2e372668 100644 --- a/zhenxun/services/renderer/theme.py +++ b/zhenxun/services/renderer/theme.py @@ -172,24 +172,45 @@ class ResourceResolver: if asset_path.startswith("@"): try: - full_asset_path = self.theme_manager.jinja_env.join_path( - asset_path, current_template_name - ) - _source, file_abs_path, _uptodate = ( - self.theme_manager.jinja_env.loader.get_source( - self.theme_manager.jinja_env, full_asset_path + if "/" not in asset_path: + raise TemplateNotFound(f"无效的命名空间路径: {asset_path}") + + namespace, rel_path = asset_path.split("/", 1) + + loader = self.theme_manager.jinja_env.loader + if ( + isinstance(loader, ChoiceLoader) + and loader.loaders + and isinstance(loader.loaders[0], PrefixLoader) + ): + prefix_loader = loader.loaders[0] + if namespace in prefix_loader.mapping: + loader_for_namespace = prefix_loader.mapping[namespace] + if isinstance(loader_for_namespace, FileSystemLoader): + base_path = Path(loader_for_namespace.searchpath[0]) + file_abs_path = (base_path / rel_path).resolve() + + if file_abs_path.is_file(): + logger.debug( + f"Resolved namespaced asset" + f" '{asset_path}' -> '{file_abs_path}'" + ) + return file_abs_path.as_uri() + else: + raise TemplateNotFound(asset_path) + else: + raise TemplateNotFound( + f"Unsupported loader type for namespace '{namespace}'." + ) + else: + raise TemplateNotFound(f"Namespace '{namespace}' not found.") + else: + raise TemplateNotFound( + f"无法解析命名空间资源 '{asset_path}',加载器结构不符合预期。" ) - ) - if file_abs_path: - logger.debug( - f"Jinja Loader resolved asset '{asset_path}'->'{file_abs_path}'" - ) - return Path(file_abs_path).absolute().as_uri() + except TemplateNotFound: - logger.warning( - f"资源文件在命名空间中未找到: '{asset_path}'" - f"(在模板 '{current_template_name}' 中引用)" - ) + logger.warning(f"资源文件在命名空间中未找到: '{asset_path}'") return "" search_paths: list[tuple[str, Path]] = [] diff --git a/zhenxun/services/tags/manager.py b/zhenxun/services/tags/manager.py index 8c914baa..79147a21 100644 --- a/zhenxun/services/tags/manager.py +++ b/zhenxun/services/tags/manager.py @@ -9,6 +9,7 @@ from typing import Any, ClassVar from aiocache import Cache, cached from arclet.alconna import Alconna, Args +import nonebot from nonebot.adapters import Bot from tortoise.exceptions import IntegrityError from tortoise.expressions import Q @@ -156,8 +157,9 @@ class TagManager: dynamic_rule=dynamic_rule, ) if group_ids: + unique_group_ids = list(dict.fromkeys(group_ids)) await GroupTagLink.bulk_create( - [GroupTagLink(tag=tag, group_id=gid) for gid in group_ids] + [GroupTagLink(tag=tag, group_id=gid) for gid in unique_group_ids] ) return tag @@ -175,6 +177,49 @@ class TagManager: deleted_count = await GroupTag.filter(name=name).delete() return deleted_count > 0 + @invalidate_on_change + async def remove_group_from_all_tags(self, group_id: str) -> int: + """ + 从所有静态标签中移除一个指定的群组ID。 + 主要用于机器人退群时的实时清理。 + + 参数: + group_id: 要移除的群组ID。 + + 返回: + 被删除的关联数量。 + """ + deleted_count = await GroupTagLink.filter(group_id=group_id).delete() + if deleted_count > 0: + logger.info(f"已从 {deleted_count} 个标签中移除群组 {group_id} 的关联。") + return deleted_count + + @invalidate_on_change + async def prune_stale_group_links(self) -> int: + """ + 清理所有静态标签中无效的群组关联。 + 无效指的是机器人已不再任何一个已连接的Bot的群组列表中。 + + 返回: + 被清理的无效关联的总数。 + """ + all_bot_group_ids = set() + for bot in nonebot.get_bots().values(): + groups, _ = await PlatformUtils.get_group_list(bot) + all_bot_group_ids.update(g.group_id for g in groups if g.group_id) + + all_static_links = await GroupTagLink.filter(tag__tag_type="STATIC").all() + + stale_link_ids = [ + link.id + for link in all_static_links + if link.group_id not in all_bot_group_ids + ] + + if stale_link_ids: + return await GroupTagLink.filter(id__in=stale_link_ids).delete() + return 0 + @invalidate_on_change async def add_groups_to_tag(self, name: str, group_ids: list[str]) -> int: # type: ignore """ @@ -186,11 +231,12 @@ class TagManager: if tag.tag_type == "DYNAMIC": raise ValueError("不能向动态标签手动添加群组。") + unique_group_ids = list(dict.fromkeys(group_ids)) await GroupTagLink.bulk_create( - [GroupTagLink(tag=tag, group_id=gid) for gid in group_ids], + [GroupTagLink(tag=tag, group_id=gid) for gid in unique_group_ids], ignore_conflicts=True, ) - return len(group_ids) + return len(unique_group_ids) @invalidate_on_change async def remove_groups_from_tag(self, name: str, group_ids: list[str]) -> int: @@ -205,6 +251,72 @@ class TagManager: ).delete() return deleted_count + @invalidate_on_change + async def clone_tag( + self, + source_name: str, + new_name: str, + bot: Bot, + add_groups: list[str] | None = None, + remove_groups: list[str] | None = None, + as_dynamic: bool = False, + description: str | None = None, + mode: str | None = None, + ) -> GroupTag: + """ + 克隆一个标签,支持动态转静态、修改群组等。 + """ + source_tag = await GroupTag.get_or_none(name=source_name) + if not source_tag: + raise ValueError(f"源标签 '{source_name}' 不存在。") + + if await GroupTag.exists(name=new_name): + raise IntegrityError(f"目标标签 '{new_name}' 已存在。") + + tag_type = "STATIC" + group_ids_to_set: list[str] | None = None + dynamic_rule: str | dict | None = None + + if source_tag.tag_type == "STATIC": + if as_dynamic: + raise ValueError("不能将静态标签克隆为动态标签。") + group_ids_to_set = await GroupTagLink.filter(tag=source_tag).values_list( # type: ignore + "group_id", flat=True + ) + else: + if as_dynamic: + tag_type = "DYNAMIC" + dynamic_rule = source_tag.dynamic_rule + if add_groups or remove_groups: + raise ValueError( + "克隆为动态标签时,不支持 --add 或 --remove 操作。" + ) + else: + group_ids_to_set = await self.resolve_tag_to_group_ids( + source_name, bot=bot + ) + + if group_ids_to_set is not None: + final_group_set = set(group_ids_to_set) + if add_groups: + final_group_set.update(add_groups) + if remove_groups: + final_group_set.difference_update(remove_groups) + group_ids_to_set = list(final_group_set) + + is_blacklist = ( + (mode == "black") if mode is not None else source_tag.is_blacklist + ) + + return await self.create_tag( + name=new_name, + is_blacklist=is_blacklist, + description=description, + group_ids=group_ids_to_set, + tag_type=tag_type, + dynamic_rule=dynamic_rule, + ) + async def list_tags_with_counts(self) -> list[dict]: """列出所有标签及其关联的群组数量。""" tags = await GroupTag.all().prefetch_related("groups") @@ -514,11 +626,13 @@ class TagManager: raise ValueError("不能为动态标签设置静态群组列表。") async with in_transaction(): await GroupTagLink.filter(tag=tag).delete() - await GroupTagLink.bulk_create( - [GroupTagLink(tag=tag, group_id=gid) for gid in group_ids], - ignore_conflicts=True, - ) - return len(group_ids) + unique_group_ids = list(dict.fromkeys(group_ids)) + if unique_group_ids: + await GroupTagLink.bulk_create( + [GroupTagLink(tag=tag, group_id=gid) for gid in unique_group_ids], + ignore_conflicts=True, + ) + return len(unique_group_ids) @invalidate_on_change async def clear_all_tags(self) -> int: diff --git a/zhenxun/ui/models/core/base.py b/zhenxun/ui/models/core/base.py index 09d14862..aad9f942 100644 --- a/zhenxun/ui/models/core/base.py +++ b/zhenxun/ui/models/core/base.py @@ -64,13 +64,13 @@ class RenderableComponent(BaseModel, Renderable): @compat_computed_field def inline_style_str(self) -> str: - """[新增] 一个辅助属性,将内联样式字典转换为CSS字符串""" + """一个辅助属性,将内联样式字典转换为CSS字符串""" if not self.inline_style: return "" return "; ".join(f"{k}: {v}" for k, v in self.inline_style.items()) def get_extra_css(self, context: Any) -> str | Awaitable[str]: - return "" + return self.component_css or "" class ContainerComponent(RenderableComponent, ABC): @@ -86,7 +86,7 @@ class ContainerComponent(RenderableComponent, ABC): raise NotImplementedError def get_required_scripts(self) -> list[str]: - """[新增] 聚合所有子组件的脚本依赖。""" + """聚合所有子组件的脚本依赖。""" scripts = set(super().get_required_scripts()) for child in self.get_children(): if child: @@ -94,7 +94,7 @@ class ContainerComponent(RenderableComponent, ABC): return list(scripts) def get_required_styles(self) -> list[str]: - """[新增] 聚合所有子组件的样式依赖。""" + """聚合所有子组件的样式依赖。""" styles = set(super().get_required_styles()) for child in self.get_children(): if child: diff --git a/zhenxun/utils/image_utils.py b/zhenxun/utils/image_utils.py index d5affc7d..8ad9f01e 100644 --- a/zhenxun/utils/image_utils.py +++ b/zhenxun/utils/image_utils.py @@ -6,7 +6,7 @@ import random import re import imagehash -from nonebot.utils import is_coroutine_callable +from nonebot.utils import is_coroutine_callable, run_sync from PIL import Image from zhenxun.configs.path_config import TEMP_PATH @@ -378,7 +378,9 @@ async def get_download_image_hash(url: str, mark: str, use_proxy: bool = False) if await AsyncHttpx.download_file( url, TEMP_PATH / f"compare_download_{mark}_img.jpg", use_proxy=use_proxy ): - img_hash = get_img_hash(TEMP_PATH / f"compare_download_{mark}_img.jpg") + img_hash = await run_sync(get_img_hash)( + TEMP_PATH / f"compare_download_{mark}_img.jpg" + ) return str(img_hash) except Exception as e: logger.warning("下载读取图片Hash出错", e=e)