diff --git a/zhenxun/builtin_plugins/superuser/broadcast/__init__.py b/zhenxun/builtin_plugins/superuser/broadcast/__init__.py index 0b9c15ba..008e377b 100644 --- a/zhenxun/builtin_plugins/superuser/broadcast/__init__.py +++ b/zhenxun/builtin_plugins/superuser/broadcast/__init__.py @@ -28,7 +28,8 @@ from nonebot_plugin_alconna.uniseg.segment import ( ) from nonebot_plugin_session import EventSession -from zhenxun.configs.utils import PluginExtraData, Task +from zhenxun.configs.utils import PluginExtraData, RegisterConfig, Task +from zhenxun.services.log import logger from zhenxun.utils.enum import PluginType from zhenxun.utils.message import MessageUtils @@ -45,34 +46,52 @@ __plugin_meta__ = PluginMetadata( name="广播", description="昭告天下!", usage=""" - 广播 [消息内容] - - 直接发送消息到除当前群组外的所有群组 - - 支持文本、图片、@、表情、视频等多种消息类型 - - 示例:广播 你们好! - - 示例:广播 [图片] 新活动开始啦! + 向所有群组或指定标签的群组发送广播消息。 - 广播 + 引用消息 - - 将引用的消息作为广播内容发送 - - 支持引用普通消息或合并转发消息 - - 示例:(引用一条消息) 广播 +**基础用法** +- `广播 [消息内容]`:向所有群组发送广播。 +- `广播` (并引用一条消息):将引用的消息作为内容进行广播。 - 广播撤回 - - 撤回最近一次由您触发的广播消息 - - 仅能撤回短时间内的消息 - - 示例:广播撤回 +**高级定向广播** +- `广播 -t <标签名> [消息内容]`:向指定标签下的所有群组广播。 +- `广播到 <标签名> [消息内容]`:与 `-t` 等效的快捷方式。 - 特性: - - 在群组中使用广播时,不会将消息发送到当前群组 - - 在私聊中使用广播时,会发送到所有群组 +**标签可以是静态的,也可以是动态的,例如:** +- `广播到 核心群 通知:...` +- `广播到 成员数>500的群 通知:...` - 别名: - - bc (广播的简写) - - recall (广播撤回的别名) +**其他命令** +- `广播撤回` (别名: `recall`):撤回最近一次发送的广播。 + +特性: +- 在群组中使用广播时,不会将消息发送到当前群组 +- 在私聊中使用广播时,会发送到所有群组 + +别名: +- bc (广播的简写) +- recall (广播撤回的别名) """.strip(), extra=PluginExtraData( author="HibiKier", - version="1.2", + version="1.3", plugin_type=PluginType.SUPERUSER, + configs=[ + RegisterConfig( + module="_task", + key="DEFAULT_BROADCAST", + value=True, + help="被动 广播 进群默认开关状态", + default_value=True, + type=bool, + ), + RegisterConfig( + module="_task", + key="BROADCAST_CONCURRENCY_LIMIT", + value=10, + help="广播时的最大并发任务数,以避免API速率限制", + default_value=10, + ), + ], tasks=[Task(module="broadcast", name="广播")], ).to_dict(), ) @@ -103,6 +122,9 @@ _matcher = on_alconna( Alconna( "广播", Args["content?", AllParam], + alc.Option( + "-t|--tag", Args["tag_name_bc", str], help_text="向指定标签的群组广播" + ), ), aliases={"bc"}, priority=1, @@ -112,6 +134,8 @@ _matcher = on_alconna( use_origin=False, ) +_matcher.shortcut("广播到 {tag}", command="广播 -t {tag} {%*}") + _recall_matcher = on_alconna( Alconna("广播撤回"), aliases={"recall"}, @@ -128,23 +152,59 @@ async def handle_broadcast( event: Event, session: EventSession, arp: alc.Arparma, + tag_name_match: alc.Match[str] = alc.AlconnaMatch("tag_name_bc"), ): broadcast_content_msg = await _extract_broadcast_content(bot, event, arp, session) if not broadcast_content_msg: return - target_groups, enabled_groups = await get_broadcast_target_groups(bot, session) - if not target_groups or not enabled_groups: + tag_name_to_broadcast = None + force_send = False + + if tag_name_match.available: + tag_name_to_broadcast = tag_name_match.result + force_send = True + + mode_desc = "强制发送到标签" if force_send else "普通发送" + logger.debug( + f"广播模式: {mode_desc}, 标签名: {tag_name_to_broadcast}", + "广播", + ) + + target_groups_console, groups_to_actually_send = await get_broadcast_target_groups( + bot, session, tag_name_to_broadcast, force_send + ) + + if not target_groups_console: + if tag_name_to_broadcast: + await MessageUtils.build_message( + f"标签 '{tag_name_to_broadcast}' 中没有群组或标签不存在。" + ).send(reply_to=True) + return + + if not groups_to_actually_send: + if not force_send and target_groups_console: + await MessageUtils.build_message( + "没有启用了广播功能的目标群组可供立即发送。" + ).send(reply_to=True) return try: await send_broadcast_and_notify( - bot, event, broadcast_content_msg, enabled_groups, target_groups, session + bot, + event, + broadcast_content_msg, + groups_to_actually_send, + target_groups_console, + session, + force_send, ) except Exception as e: error_msg = "发送广播失败" BroadcastManager.log_error(error_msg, e, session) - await MessageUtils.build_message(f"{error_msg}。").send(reply_to=True) + await bot.send_private_msg( + user_id=str(event.get_user_id()), message=f"{error_msg}。" + ) @_recall_matcher.handle() @@ -178,5 +238,6 @@ async def handle_broadcast_recall( except Exception as e: error_msg = "撤回广播消息失败" BroadcastManager.log_error(error_msg, e, session) - user_id = str(event.get_user_id()) - await bot.send_private_msg(user_id=user_id, message=f"{error_msg}。") + await bot.send_private_msg( + user_id=str(event.get_user_id()), message=f"{error_msg}。" + ) diff --git a/zhenxun/builtin_plugins/superuser/broadcast/broadcast_manager.py b/zhenxun/builtin_plugins/superuser/broadcast/broadcast_manager.py index c3d7b5cc..fcc78b79 100644 --- a/zhenxun/builtin_plugins/superuser/broadcast/broadcast_manager.py +++ b/zhenxun/builtin_plugins/superuser/broadcast/broadcast_manager.py @@ -5,11 +5,12 @@ from typing import ClassVar from nonebot.adapters import Bot from nonebot.adapters.onebot.v11 import Bot as V11Bot -from nonebot.exception import ActionFailed +from nonebot.exception import ActionFailed, AdapterException from nonebot_plugin_alconna import UniMessage from nonebot_plugin_alconna.uniseg import Receipt, Reference from nonebot_plugin_session import EventSession +from zhenxun.configs.config import Config from zhenxun.models.group_console import GroupConsole from zhenxun.services.log import logger from zhenxun.utils.common_utils import CommonUtils @@ -18,6 +19,8 @@ from zhenxun.utils.platform import PlatformUtils from .models import BroadcastDetailResult, BroadcastResult from .utils import custom_nodes_to_v11_nodes, uni_message_to_v11_list_of_dicts +BROADCAST_SEND_DELAY_RANGE = (1, 3) + class BroadcastManager: """广播管理器""" @@ -92,8 +95,16 @@ class BroadcastManager: logger.debug("清空上一次的广播消息ID记录", "广播", session=session) cls.clear_last_broadcast_msg_ids() + concurrency_limit = Config.get_config( + "_task", + "BROADCAST_CONCURRENCY_LIMIT", + 10, + ) + all_groups, _ = await cls.get_all_groups(bot) - return await cls.send_to_specific_groups(bot, message, all_groups, session) + return await cls.send_to_specific_groups( + bot, message, all_groups, session, concurrency_limit=concurrency_limit + ) @classmethod async def send_to_specific_groups( @@ -102,14 +113,17 @@ class BroadcastManager: message: UniMessage, target_groups: list[GroupConsole], session_info: EventSession | str | None = None, + force_send: bool = False, + concurrency_limit: int = 10, ) -> BroadcastResult: """发送广播到指定群组""" log_session = session_info or bot.self_id - logger.debug( - f"开始广播,目标 {len(target_groups)} 个群组,Bot ID: {bot.self_id}", - "广播", - session=log_session, + target_count = len(target_groups) + log_message = ( + f"开始广播,目标 {target_count} 个群组 (并发数: {concurrency_limit})," + f"Bot ID: {bot.self_id}, ForceSend: {force_send}" ) + logger.info(log_message, "广播", session=log_session) if not target_groups: logger.debug("目标群组列表为空,广播结束", "广播", session=log_session) @@ -165,7 +179,12 @@ class BroadcastManager: ) return 0, len(target_groups) success_count, error_count, skip_count = await cls._broadcast_forward( - bot, log_session, target_groups, v11_nodes + bot, + log_session, + target_groups, + v11_nodes, + force_send, + concurrency_limit, ) else: if is_forward_broadcast: @@ -175,7 +194,12 @@ class BroadcastManager: session=log_session, ) success_count, error_count, skip_count = await cls._broadcast_normal( - bot, log_session, target_groups, message + bot, + log_session, + target_groups, + message, + force_send, + concurrency_limit, ) total = len(target_groups) @@ -287,11 +311,16 @@ class BroadcastManager: ) @classmethod - async def _check_group_availability(cls, bot: Bot, group: GroupConsole) -> bool: + async def _check_group_availability( + cls, bot: Bot, group: GroupConsole, force_send: bool = False + ) -> bool: """检查群组是否可用""" if not group.group_id: return False + if force_send: + return True + if await CommonUtils.task_is_block(bot, "broadcast", group.group_id): return False @@ -304,54 +333,69 @@ class BroadcastManager: session_info: EventSession | str, group_list: list[GroupConsole], v11_nodes: list[dict], + force_send: bool = False, + concurrency_limit: int = 10, ) -> BroadcastDetailResult: """发送合并转发""" - success_count = 0 - error_count = 0 - skip_count = 0 + semaphore = asyncio.Semaphore(concurrency_limit) + msg_id_lock = asyncio.Lock() - for _, group in enumerate(group_list): + async def send_to_group(group: GroupConsole) -> GroupConsole: group_key = group.group_id or group.channel_id + async with semaphore: + try: + result = await bot.send_group_forward_msg( + group_id=int(group.group_id), messages=v11_nodes + ) + async with msg_id_lock: + await cls._extract_message_id_from_result( + result, group_key, session_info, "合并转发" + ) + await asyncio.sleep(random.uniform(*BROADCAST_SEND_DELAY_RANGE)) + return group + except (ActionFailed, AdapterException) as ae: + logger.error( + f"发送失败(合并转发) to {group_key}: {ae}", + "广播", + session=session_info, + e=ae, + ) + raise + except Exception as e: + logger.error( + f"发送失败(合并转发) to {group_key}: {e}", + "广播", + session=session_info, + e=e, + ) + raise - if not await cls._check_group_availability(bot, group): - skip_count += 1 - continue + tasks: list[asyncio.Task] = [] + skipped_groups: list[GroupConsole] = [] + for group in group_list: + if await cls._check_group_availability(bot, group, force_send): + tasks.append(asyncio.create_task(send_to_group(group))) + else: + skipped_groups.append(group) - try: - result = await bot.send_group_forward_msg( - group_id=int(group.group_id), messages=v11_nodes - ) + if skipped_groups: + logger.info( + f"跳过 {len(skipped_groups)} 个不符合条件的群组", + "广播", + session=session_info, + ) - logger.debug( - f"合并转发消息发送结果: {result}, 类型: {type(result)}", - "广播", - session=session_info, - ) + if not tasks: + return 0, 0, len(skipped_groups) - await cls._extract_message_id_from_result( - result, group_key, session_info, "合并转发" - ) + results = await asyncio.gather(*tasks, return_exceptions=True) - success_count += 1 - await asyncio.sleep(random.randint(1, 3)) - except ActionFailed as af_e: - error_count += 1 - logger.error( - f"发送失败(合并转发) to {group_key}: {af_e}", - "广播", - session=session_info, - e=af_e, - ) - except Exception as e: - error_count += 1 - logger.error( - f"发送失败(合并转发) to {group_key}: {e}", - "广播", - session=session_info, - e=e, - ) + success_count = sum( + 1 for result in results if not isinstance(result, Exception) + ) + error_count = len(results) - success_count - return success_count, error_count, skip_count + return success_count, error_count, len(skipped_groups) @classmethod async def _broadcast_normal( @@ -360,58 +404,83 @@ class BroadcastManager: session_info: EventSession | str, group_list: list[GroupConsole], message: UniMessage, + force_send: bool = False, + concurrency_limit: int = 10, ) -> BroadcastDetailResult: """发送普通消息""" - success_count = 0 - error_count = 0 - skip_count = 0 + semaphore = asyncio.Semaphore(concurrency_limit) + msg_id_lock = asyncio.Lock() - for _, group in enumerate(group_list): + async def send_to_group(group: GroupConsole) -> GroupConsole: group_key = ( f"{group.group_id}:{group.channel_id}" if group.channel_id else str(group.group_id) ) - - if not await cls._check_group_availability(bot, group): - skip_count += 1 - continue - - try: - target = PlatformUtils.get_target( - group_id=group.group_id, channel_id=group.channel_id - ) - - if target: - receipt: Receipt = await message.send(target, bot=bot) - - logger.debug( - f"广播消息发送结果: {receipt}, 类型: {type(receipt)}", - "广播", - session=session_info, - ) - - await cls._extract_message_id_from_result( - receipt, group_key, session_info - ) - - success_count += 1 - await asyncio.sleep(random.randint(1, 3)) - else: - logger.warning( - "target为空", "广播", session=session_info, target=group_key - ) - skip_count += 1 - except Exception as e: - error_count += 1 - logger.error( - f"发送失败(普通) to {group_key}: {e}", + target = PlatformUtils.get_target( + group_id=group.group_id, channel_id=group.channel_id + ) + if not target: + logger.warning( + "target为空", "广播", session=session_info, - e=e, + target=group_key, ) + raise ValueError(f"无法为群组 {group_key} 创建发送目标") - return success_count, error_count, skip_count + async with semaphore: + try: + receipt: Receipt = await message.send(target, bot=bot) + async with msg_id_lock: + await cls._extract_message_id_from_result( + receipt, group_key, session_info + ) + await asyncio.sleep(random.uniform(*BROADCAST_SEND_DELAY_RANGE)) + return group + except (ActionFailed, AdapterException) as ae: + logger.error( + f"发送失败(普通) to {group_key}: {ae}", + "广播", + session=session_info, + e=ae, + ) + raise + except Exception as e: + logger.error( + f"发送失败(普通) to {group_key}: {e}", + "广播", + session=session_info, + e=e, + ) + raise + + tasks: list[asyncio.Task] = [] + skipped_groups: list[GroupConsole] = [] + for group in group_list: + if await cls._check_group_availability(bot, group, force_send): + tasks.append(asyncio.create_task(send_to_group(group))) + else: + skipped_groups.append(group) + + if skipped_groups: + logger.info( + f"跳过 {len(skipped_groups)} 个不符合条件的群组", + "广播", + session=session_info, + ) + + if not tasks: + return 0, 0, len(skipped_groups) + + results = await asyncio.gather(*tasks, return_exceptions=True) + + success_count = sum( + 1 for result in results if not isinstance(result, Exception) + ) + error_count = len(results) - success_count + + return success_count, error_count, len(skipped_groups) @classmethod async def recall_last_broadcast( diff --git a/zhenxun/builtin_plugins/superuser/broadcast/message_processor.py b/zhenxun/builtin_plugins/superuser/broadcast/message_processor.py index 809e3645..2e60fc37 100644 --- a/zhenxun/builtin_plugins/superuser/broadcast/message_processor.py +++ b/zhenxun/builtin_plugins/superuser/broadcast/message_processor.py @@ -21,8 +21,11 @@ from nonebot_plugin_alconna.uniseg.segment import ( from nonebot_plugin_alconna.uniseg.tools import reply_fetch from nonebot_plugin_session import EventSession +from zhenxun.models.group_console import GroupConsole from zhenxun.services.log import logger +from zhenxun.services.tags import tag_manager as TagManager from zhenxun.utils.common_utils import CommonUtils +from zhenxun.utils.http_utils import AsyncHttpx from zhenxun.utils.message import MessageUtils from .broadcast_manager import BroadcastManager @@ -399,22 +402,29 @@ async def _process_v11_segment( elif target_qq: result.append(At(flag="user", target=target_qq)) elif seg_type == "video": - video_seg = None - if data_dict.get("url"): - video_seg = Video(url=data_dict["url"]) - elif data_dict.get("file"): - file_val = data_dict["file"] + if url := data_dict.get("url"): + try: + logger.debug(f"[D{depth}] 正在下载视频用于广播: {url}", "广播") + video_bytes = await AsyncHttpx.get_content(url) + video_seg = Video(raw=video_bytes) + logger.debug( + f"[D{depth}] 视频下载成功, 大小: {len(video_bytes)} bytes", + "广播", + ) + result.append(video_seg) + except Exception as e: + logger.error(f"[D{depth}] 广播时下载视频失败: {url}", "广播", e=e) + result.append(Text(f"[视频下载失败: {url}]")) + elif file_val := data_dict.get("file"): if isinstance(file_val, str) and file_val.startswith("base64://"): b64_data = file_val[9:] raw_bytes = base64.b64decode(b64_data) video_seg = Video(raw=raw_bytes) + result.append(video_seg) else: video_seg = Video(path=file_val) - if video_seg: - result.append(video_seg) - logger.debug(f"[Depth {depth}] 处理视频消息成功", "广播") - else: - logger.warning(f"[Depth {depth}] V11 视频 {index} 缺少URL/文件", "广播") + result.append(video_seg) + return result elif seg_type == "forward": nested_forward_id = data_dict.get("id") or data_dict.get("resid") nested_forward_content = data_dict.get("content") @@ -515,70 +525,129 @@ async def _extract_content_from_message( async def get_broadcast_target_groups( - bot: Bot, session: EventSession + bot: Bot, + session: EventSession, + tag_name: str | None = None, + force_send: bool = False, ) -> tuple[list, list]: """获取广播目标群组和启用了广播功能的群组""" - target_groups = [] - all_groups, _ = await BroadcastManager.get_all_groups(bot) + target_groups_console: list[GroupConsole] = [] - current_group_id = None - if hasattr(session, "id2") and session.id2: - current_group_id = session.id2 + current_group_raw = getattr(session, "id2", None) or getattr( + session, "group_id", None + ) + current_group_id = str(current_group_raw) if current_group_raw else None - if current_group_id: - target_groups = [ - group for group in all_groups if group.group_id != current_group_id - ] - logger.info( - f"向除当前群组({current_group_id})外的所有群组广播", "广播", session=session - ) + logger.debug(f"当前群组ID: {current_group_id}", "广播") + + if tag_name: + tagged_group_ids = await TagManager.resolve_tag_to_group_ids(tag_name, bot=bot) + if not tagged_group_ids: + return [], [] + + valid_groups = await GroupConsole.filter(group_id__in=tagged_group_ids) + + if current_group_id: + target_groups_console = [ + group + for group in valid_groups + if str(group.group_id) != current_group_id + ] + excluded_msg = ( + f",已排除当前群组({current_group_id})" + if any( + str(group.group_id) == current_group_id for group in valid_groups + ) + else "" + ) + broadcast_msg = ( + f"向标签 '{tag_name}' 中的 {len(target_groups_console)} 个群组广播 " + f"(ForceSend: {force_send}){excluded_msg}" + ) + logger.info(broadcast_msg, "广播", session=session) + else: + target_groups_console = valid_groups + broadcast_msg = ( + f"向标签 '{tag_name}' 中的 {len(target_groups_console)} 个群组广播 " + f"(ForceSend: {force_send})" + ) + logger.info(broadcast_msg, "广播", session=session) else: - target_groups = all_groups - logger.info("向所有群组广播", "广播", session=session) + all_groups, _ = await BroadcastManager.get_all_groups(bot) - if not target_groups: - await MessageUtils.build_message("没有找到符合条件的广播目标群组。").send( - reply_to=True - ) + if current_group_id: + target_groups_console = [ + group for group in all_groups if str(group.group_id) != current_group_id + ] + logger.info( + ( + f"向除当前群组({current_group_id})外的所有群组广播 " + f"(ForceSend: {force_send})" + ), + "广播", + session=session, + ) + else: + target_groups_console = all_groups + logger.info( + f"向所有群组广播 (ForceSend: {force_send})", "广播", session=session + ) + + if not target_groups_console: + if not tag_name: + await MessageUtils.build_message("没有找到符合条件的广播目标群组。").send( + reply_to=True + ) return [], [] - enabled_groups = [] - for group in target_groups: - if not await CommonUtils.task_is_block(bot, "broadcast", group.group_id): - enabled_groups.append(group) + groups_to_actually_send = [] + if force_send: + groups_to_actually_send = target_groups_console + logger.debug( + f"强制发送模式,将向 {len(groups_to_actually_send)} 个目标群组尝试发送。", + "广播", + ) + else: + for group in target_groups_console: + if not await CommonUtils.task_is_block(bot, "broadcast", group.group_id): + groups_to_actually_send.append(group) + logger.debug( + f"普通发送模式,筛选后将向 {len(groups_to_actually_send)} " + f"个目标群组尝试发送", + "广播", + ) - if not enabled_groups: - await MessageUtils.build_message( - "没有启用了广播功能的目标群组可供立即发送。" - ).send(reply_to=True) - return target_groups, [] - - return target_groups, enabled_groups + return target_groups_console, groups_to_actually_send async def send_broadcast_and_notify( bot: Bot, event: Event, message: UniMessage, - enabled_groups: list, - target_groups: list, + groups_to_send: list, + all_target_groups_for_stats: list, session: EventSession, + force_send: bool = False, ) -> None: """发送广播并通知结果""" BroadcastManager.clear_last_broadcast_msg_ids() count, error_count = await BroadcastManager.send_to_specific_groups( - bot, message, enabled_groups, session + bot, message, groups_to_send, session, force_send ) result = f"成功广播 {count} 个群组" if error_count: result += f"\n发送失败 {error_count} 个群组" - result += f"\n有效: {len(enabled_groups)} / 总计: {len(target_groups)}" + + effective_sent_count = len(groups_to_send) + total_considered_count = len(all_target_groups_for_stats) + + result += f"\n有效: {effective_sent_count} / 总计目标: {total_considered_count}" user_id = str(event.get_user_id()) await bot.send_private_msg(user_id=user_id, message=f"发送广播完成!\n{result}") BroadcastManager.log_info( - f"广播完成,有效/总计: {len(enabled_groups)}/{len(target_groups)}", + f"广播完成,有效/总计目标: {effective_sent_count}/{total_considered_count}", session, ) diff --git a/zhenxun/builtin_plugins/superuser/broadcast/utils.py b/zhenxun/builtin_plugins/superuser/broadcast/utils.py index 748559fd..c3626f87 100644 --- a/zhenxun/builtin_plugins/superuser/broadcast/utils.py +++ b/zhenxun/builtin_plugins/superuser/broadcast/utils.py @@ -59,7 +59,7 @@ def uni_segment_to_v11_segment_dict( logger.warning(f"无法处理 Video.raw 的类型: {type(raw_data)}", "广播") elif getattr(seg, "path", None): logger.warning( - f"在合并转发中使用了本地视频路径,可能无法显示: {seg.path}", "广播" + f"在合并转发中使用了本地视频路径,可能无法发送: {seg.path}", "广播" ) return {"type": "video", "data": {"file": f"file:///{seg.path}"}} else: