diff --git a/zhenxun/builtin_plugins/superuser/broadcast/__init__.py b/zhenxun/builtin_plugins/superuser/broadcast/__init__.py index c025fd0c..d5b27e61 100644 --- a/zhenxun/builtin_plugins/superuser/broadcast/__init__.py +++ b/zhenxun/builtin_plugins/superuser/broadcast/__init__.py @@ -1,32 +1,73 @@ -from typing import Annotated - -from nonebot import on_command -from nonebot.adapters import Bot -from nonebot.params import Command +from arclet.alconna import AllParam +from nepattern import UnionPattern +from nonebot.adapters import Bot, Event from nonebot.permission import SUPERUSER from nonebot.plugin import PluginMetadata from nonebot.rule import to_me -from nonebot_plugin_alconna import Text as alcText -from nonebot_plugin_alconna import UniMsg +import nonebot_plugin_alconna as alc +from nonebot_plugin_alconna import ( + Alconna, + Args, + on_alconna, +) +from nonebot_plugin_alconna.uniseg.segment import ( + At, + AtAll, + Audio, + Button, + Emoji, + File, + Hyper, + Image, + Keyboard, + Reference, + Reply, + Text, + Video, + Voice, +) from nonebot_plugin_session import EventSession from zhenxun.configs.utils import PluginExtraData, RegisterConfig, Task -from zhenxun.services.log import logger from zhenxun.utils.enum import PluginType from zhenxun.utils.message import MessageUtils -from ._data_source import BroadcastManage +from .broadcast_manager import BroadcastManager +from .message_processor import ( + _extract_broadcast_content, + get_broadcast_target_groups, + send_broadcast_and_notify, +) + +BROADCAST_SEND_DELAY_RANGE = (1, 3) __plugin_meta__ = PluginMetadata( name="广播", description="昭告天下!", usage=""" - 广播 [消息] [图片] - 示例:广播 你们好! + 广播 [消息内容] + - 直接发送消息到所有群组 + - 支持文本、图片、@、表情、视频等多种消息类型 + - 示例:广播 你们好! + - 示例:广播 [图片] 新活动开始啦! + + 广播 + 引用消息 + - 将引用的消息作为广播内容发送 + - 支持引用普通消息或合并转发消息 + - 示例:(引用一条消息) 广播 + + 广播撤回 + - 撤回最近一次由您触发的广播消息 + - 仅能撤回短时间内的消息 + - 示例:广播撤回 + + 别名: + - bc (广播的简写) + - recall (广播撤回的别名) """.strip(), extra=PluginExtraData( author="HibiKier", - version="0.1", + version="1.2", plugin_type=PluginType.SUPERUSER, configs=[ RegisterConfig( @@ -42,26 +83,106 @@ __plugin_meta__ = PluginMetadata( ).to_dict(), ) -_matcher = on_command( - "广播", priority=1, permission=SUPERUSER, block=True, rule=to_me() +AnySeg = ( + UnionPattern( + [ + Text, + Image, + At, + AtAll, + Audio, + Video, + File, + Emoji, + Reply, + Reference, + Hyper, + Button, + Keyboard, + Voice, + ] + ) + @ "AnySeg" +) + +_matcher = on_alconna( + Alconna( + "广播", + Args["content?", AllParam], + ), + aliases={"bc"}, + priority=1, + permission=SUPERUSER, + block=True, + rule=to_me(), + use_origin=False, +) + +_recall_matcher = on_alconna( + Alconna("广播撤回"), + aliases={"recall"}, + priority=1, + permission=SUPERUSER, + block=True, + rule=to_me(), ) @_matcher.handle() -async def _( +async def handle_broadcast( bot: Bot, + event: Event, session: EventSession, - message: UniMsg, - command: Annotated[tuple[str, ...], Command()], + arp: alc.Arparma, ): - for msg in message: - if isinstance(msg, alcText) and msg.text.strip().startswith(command[0]): - msg.text = msg.text.replace(command[0], "", 1).strip() - break - await MessageUtils.build_message("正在发送..请等一下哦!").send() - count, error_count = await BroadcastManage.send(bot, message, session) - result = f"成功广播 {count} 个群组" - if error_count: - result += f"\n广播失败 {error_count} 个群组" - await MessageUtils.build_message(f"发送广播完成!\n{result}").send(reply_to=True) - logger.info(f"发送广播信息: {message}", "广播", session=session) + broadcast_content_msg = await _extract_broadcast_content(bot, event, arp, session) + if not broadcast_content_msg: + return + + target_groups, enabled_groups = await get_broadcast_target_groups(bot, session) + if not target_groups or not enabled_groups: + return + + try: + await send_broadcast_and_notify( + bot, event, broadcast_content_msg, enabled_groups, target_groups, session + ) + except Exception as e: + error_msg = "发送广播失败" + BroadcastManager.log_error(error_msg, e, session) + await MessageUtils.build_message(f"{error_msg}。").send(reply_to=True) + + +@_recall_matcher.handle() +async def handle_broadcast_recall( + bot: Bot, + event: Event, + session: EventSession, +): + """处理广播撤回命令""" + await MessageUtils.build_message("正在尝试撤回最近一次广播...").send() + + try: + success_count, error_count = await BroadcastManager.recall_last_broadcast( + bot, session + ) + + user_id = str(event.get_user_id()) + if success_count == 0 and error_count == 0: + await bot.send_private_msg( + user_id=user_id, + message="没有找到最近的广播消息记录,可能已经撤回或超过可撤回时间。", + ) + else: + result = f"广播撤回完成!\n成功撤回 {success_count} 条消息" + if error_count: + result += f"\n撤回失败 {error_count} 条消息 (可能已过期或无权限)" + await bot.send_private_msg(user_id=user_id, message=result) + BroadcastManager.log_info( + f"广播撤回完成: 成功 {success_count}, 失败 {error_count}", session + ) + except Exception as e: + error_msg = "撤回广播消息失败" + BroadcastManager.log_error(error_msg, e, session) + user_id = str(event.get_user_id()) + await bot.send_private_msg(user_id=user_id, message=f"{error_msg}。") diff --git a/zhenxun/builtin_plugins/superuser/broadcast/_data_source.py b/zhenxun/builtin_plugins/superuser/broadcast/_data_source.py deleted file mode 100644 index 1ee1a28c..00000000 --- a/zhenxun/builtin_plugins/superuser/broadcast/_data_source.py +++ /dev/null @@ -1,72 +0,0 @@ -import asyncio -import random - -from nonebot.adapters import Bot -import nonebot_plugin_alconna as alc -from nonebot_plugin_alconna import Image, UniMsg -from nonebot_plugin_session import EventSession - -from zhenxun.services.log import logger -from zhenxun.utils.common_utils import CommonUtils -from zhenxun.utils.message import MessageUtils -from zhenxun.utils.platform import PlatformUtils - - -class BroadcastManage: - @classmethod - async def send( - cls, bot: Bot, message: UniMsg, session: EventSession - ) -> tuple[int, int]: - """发送广播消息 - - 参数: - bot: Bot - message: 消息内容 - session: Session - - 返回: - tuple[int, int]: 发送成功的群组数量, 发送失败的群组数量 - """ - message_list = [] - for msg in message: - if isinstance(msg, alc.Image) and msg.url: - message_list.append(Image(url=msg.url)) - elif isinstance(msg, alc.Text): - message_list.append(msg.text) - group_list, _ = await PlatformUtils.get_group_list(bot) - if group_list: - error_count = 0 - for group in group_list: - try: - if not await CommonUtils.task_is_block( - bot, - "broadcast", # group.channel_id - group.group_id, - ): - target = PlatformUtils.get_target( - group_id=group.group_id, channel_id=group.channel_id - ) - if target: - await MessageUtils.build_message(message_list).send( - target, bot - ) - logger.debug( - "发送成功", - "广播", - session=session, - target=f"{group.group_id}:{group.channel_id}", - ) - await asyncio.sleep(random.randint(1, 3)) - else: - logger.warning("target为空", "广播", session=session) - except Exception as e: - error_count += 1 - logger.error( - "发送失败", - "广播", - session=session, - target=f"{group.group_id}:{group.channel_id}", - e=e, - ) - return len(group_list) - error_count, error_count - return 0, 0 diff --git a/zhenxun/builtin_plugins/superuser/broadcast/broadcast_manager.py b/zhenxun/builtin_plugins/superuser/broadcast/broadcast_manager.py new file mode 100644 index 00000000..c3d7b5cc --- /dev/null +++ b/zhenxun/builtin_plugins/superuser/broadcast/broadcast_manager.py @@ -0,0 +1,490 @@ +import asyncio +import random +import traceback +from typing import ClassVar + +from nonebot.adapters import Bot +from nonebot.adapters.onebot.v11 import Bot as V11Bot +from nonebot.exception import ActionFailed +from nonebot_plugin_alconna import UniMessage +from nonebot_plugin_alconna.uniseg import Receipt, Reference +from nonebot_plugin_session import EventSession + +from zhenxun.models.group_console import GroupConsole +from zhenxun.services.log import logger +from zhenxun.utils.common_utils import CommonUtils +from zhenxun.utils.platform import PlatformUtils + +from .models import BroadcastDetailResult, BroadcastResult +from .utils import custom_nodes_to_v11_nodes, uni_message_to_v11_list_of_dicts + + +class BroadcastManager: + """广播管理器""" + + _last_broadcast_msg_ids: ClassVar[dict[str, int]] = {} + + @staticmethod + def _get_session_info(session: EventSession | None) -> str: + """获取会话信息字符串""" + if not session: + return "" + + try: + platform = getattr(session, "platform", "unknown") + session_id = str(session) + return f"[{platform}:{session_id}]" + except Exception: + return "[session-info-error]" + + @staticmethod + def log_error( + message: str, error: Exception, session: EventSession | None = None, **kwargs + ): + """记录错误日志""" + session_info = BroadcastManager._get_session_info(session) + error_type = type(error).__name__ + stack_trace = traceback.format_exc() + error_details = f"\n类型: {error_type}\n信息: {error!s}\n堆栈: {stack_trace}" + + logger.error( + f"{session_info} {message}{error_details}", "广播", e=error, **kwargs + ) + + @staticmethod + def log_warning(message: str, session: EventSession | None = None, **kwargs): + """记录警告级别日志""" + session_info = BroadcastManager._get_session_info(session) + logger.warning(f"{session_info} {message}", "广播", **kwargs) + + @staticmethod + def log_info(message: str, session: EventSession | None = None, **kwargs): + """记录信息级别日志""" + session_info = BroadcastManager._get_session_info(session) + logger.info(f"{session_info} {message}", "广播", **kwargs) + + @classmethod + def get_last_broadcast_msg_ids(cls) -> dict[str, int]: + """获取最近广播消息ID""" + return cls._last_broadcast_msg_ids.copy() + + @classmethod + def clear_last_broadcast_msg_ids(cls) -> None: + """清空消息ID记录""" + cls._last_broadcast_msg_ids.clear() + + @classmethod + async def get_all_groups(cls, bot: Bot) -> tuple[list[GroupConsole], str]: + """获取群组列表""" + return await PlatformUtils.get_group_list(bot) + + @classmethod + async def send( + cls, bot: Bot, message: UniMessage, session: EventSession + ) -> BroadcastResult: + """发送广播到所有群组""" + logger.debug( + f"开始广播(send - 广播到所有群组),Bot ID: {bot.self_id}", + "广播", + session=session, + ) + + logger.debug("清空上一次的广播消息ID记录", "广播", session=session) + cls.clear_last_broadcast_msg_ids() + + all_groups, _ = await cls.get_all_groups(bot) + return await cls.send_to_specific_groups(bot, message, all_groups, session) + + @classmethod + async def send_to_specific_groups( + cls, + bot: Bot, + message: UniMessage, + target_groups: list[GroupConsole], + session_info: EventSession | str | None = None, + ) -> BroadcastResult: + """发送广播到指定群组""" + log_session = session_info or bot.self_id + logger.debug( + f"开始广播,目标 {len(target_groups)} 个群组,Bot ID: {bot.self_id}", + "广播", + session=log_session, + ) + + if not target_groups: + logger.debug("目标群组列表为空,广播结束", "广播", session=log_session) + return 0, 0 + + platform = PlatformUtils.get_platform(bot) + is_forward_broadcast = any( + isinstance(seg, Reference) and getattr(seg, "nodes", None) + for seg in message + ) + + if platform == "qq" and isinstance(bot, V11Bot) and is_forward_broadcast: + if ( + len(message) == 1 + and isinstance(message[0], Reference) + and getattr(message[0], "nodes", None) + ): + nodes_list = getattr(message[0], "nodes", []) + v11_nodes = custom_nodes_to_v11_nodes(nodes_list) + node_count = len(v11_nodes) + logger.debug( + f"从 UniMessage 构造转发节点数: {node_count}", + "广播", + session=log_session, + ) + else: + logger.warning( + "广播消息包含合并转发段和其他段,将尝试打平成一个节点发送", + "广播", + session=log_session, + ) + v11_content_list = uni_message_to_v11_list_of_dicts(message) + v11_nodes = ( + [ + { + "type": "node", + "data": { + "user_id": bot.self_id, + "nickname": "广播", + "content": v11_content_list, + }, + } + ] + if v11_content_list + else [] + ) + + if not v11_nodes: + logger.warning( + "构造出的 V11 合并转发节点为空,无法发送", + "广播", + session=log_session, + ) + return 0, len(target_groups) + success_count, error_count, skip_count = await cls._broadcast_forward( + bot, log_session, target_groups, v11_nodes + ) + else: + if is_forward_broadcast: + logger.warning( + f"合并转发消息在适配器 ({platform}) 不支持,将作为普通消息发送", + "广播", + session=log_session, + ) + success_count, error_count, skip_count = await cls._broadcast_normal( + bot, log_session, target_groups, message + ) + + total = len(target_groups) + stats = f"成功: {success_count}, 失败: {error_count}" + stats += f", 跳过: {skip_count}, 总计: {total}" + logger.debug( + f"广播统计 - {stats}", + "广播", + session=log_session, + ) + + msg_ids = cls.get_last_broadcast_msg_ids() + if msg_ids: + id_list_str = ", ".join([f"{k}:{v}" for k, v in msg_ids.items()]) + logger.debug( + f"广播结束,记录了 {len(msg_ids)} 条消息ID: {id_list_str}", + "广播", + session=log_session, + ) + else: + logger.warning( + "广播结束,但没有记录任何消息ID", + "广播", + session=log_session, + ) + + return success_count, error_count + + @classmethod + async def _extract_message_id_from_result( + cls, + result: dict | Receipt, + group_key: str, + session_info: EventSession | str, + msg_type: str = "普通", + ) -> None: + """提取消息ID并记录""" + if isinstance(result, dict) and "message_id" in result: + msg_id = result["message_id"] + try: + msg_id_int = int(msg_id) + cls._last_broadcast_msg_ids[group_key] = msg_id_int + logger.debug( + f"记录群 {group_key} 的{msg_type}消息ID: {msg_id_int}", + "广播", + session=session_info, + ) + except (ValueError, TypeError): + logger.warning( + f"{msg_type}结果中的 message_id 不是有效整数: {msg_id}", + "广播", + session=session_info, + ) + elif isinstance(result, Receipt) and result.msg_ids: + try: + first_id_info = result.msg_ids[0] + msg_id = None + if isinstance(first_id_info, dict) and "message_id" in first_id_info: + msg_id = first_id_info["message_id"] + logger.debug( + f"从 Receipt.msg_ids[0] 提取到 ID: {msg_id}", + "广播", + session=session_info, + ) + elif isinstance(first_id_info, int | str): + msg_id = first_id_info + logger.debug( + f"从 Receipt.msg_ids[0] 提取到原始ID: {msg_id}", + "广播", + session=session_info, + ) + + if msg_id is not None: + try: + msg_id_int = int(msg_id) + cls._last_broadcast_msg_ids[group_key] = msg_id_int + logger.debug( + f"记录群 {group_key} 的消息ID: {msg_id_int}", + "广播", + session=session_info, + ) + except (ValueError, TypeError): + logger.warning( + f"提取的ID ({msg_id}) 不是有效整数", + "广播", + session=session_info, + ) + else: + info_str = str(first_id_info) + logger.warning( + f"无法从 Receipt.msg_ids[0] 提取ID: {info_str}", + "广播", + session=session_info, + ) + except IndexError: + logger.warning("Receipt.msg_ids 为空", "广播", session=session_info) + except Exception as e_extract: + logger.error( + f"从 Receipt 提取 msg_id 时出错: {e_extract}", + "广播", + session=session_info, + e=e_extract, + ) + else: + logger.warning( + f"发送成功但无法从结果获取消息 ID. 结果: {result}", + "广播", + session=session_info, + ) + + @classmethod + async def _check_group_availability(cls, bot: Bot, group: GroupConsole) -> bool: + """检查群组是否可用""" + if not group.group_id: + return False + + if await CommonUtils.task_is_block(bot, "broadcast", group.group_id): + return False + + return True + + @classmethod + async def _broadcast_forward( + cls, + bot: V11Bot, + session_info: EventSession | str, + group_list: list[GroupConsole], + v11_nodes: list[dict], + ) -> BroadcastDetailResult: + """发送合并转发""" + success_count = 0 + error_count = 0 + skip_count = 0 + + for _, group in enumerate(group_list): + group_key = group.group_id or group.channel_id + + if not await cls._check_group_availability(bot, group): + skip_count += 1 + continue + + try: + result = await bot.send_group_forward_msg( + group_id=int(group.group_id), messages=v11_nodes + ) + + logger.debug( + f"合并转发消息发送结果: {result}, 类型: {type(result)}", + "广播", + session=session_info, + ) + + await cls._extract_message_id_from_result( + result, group_key, session_info, "合并转发" + ) + + success_count += 1 + await asyncio.sleep(random.randint(1, 3)) + except ActionFailed as af_e: + error_count += 1 + logger.error( + f"发送失败(合并转发) to {group_key}: {af_e}", + "广播", + session=session_info, + e=af_e, + ) + except Exception as e: + error_count += 1 + logger.error( + f"发送失败(合并转发) to {group_key}: {e}", + "广播", + session=session_info, + e=e, + ) + + return success_count, error_count, skip_count + + @classmethod + async def _broadcast_normal( + cls, + bot: Bot, + session_info: EventSession | str, + group_list: list[GroupConsole], + message: UniMessage, + ) -> BroadcastDetailResult: + """发送普通消息""" + success_count = 0 + error_count = 0 + skip_count = 0 + + for _, group in enumerate(group_list): + group_key = ( + f"{group.group_id}:{group.channel_id}" + if group.channel_id + else str(group.group_id) + ) + + if not await cls._check_group_availability(bot, group): + skip_count += 1 + continue + + try: + target = PlatformUtils.get_target( + group_id=group.group_id, channel_id=group.channel_id + ) + + if target: + receipt: Receipt = await message.send(target, bot=bot) + + logger.debug( + f"广播消息发送结果: {receipt}, 类型: {type(receipt)}", + "广播", + session=session_info, + ) + + await cls._extract_message_id_from_result( + receipt, group_key, session_info + ) + + success_count += 1 + await asyncio.sleep(random.randint(1, 3)) + else: + logger.warning( + "target为空", "广播", session=session_info, target=group_key + ) + skip_count += 1 + except Exception as e: + error_count += 1 + logger.error( + f"发送失败(普通) to {group_key}: {e}", + "广播", + session=session_info, + e=e, + ) + + return success_count, error_count, skip_count + + @classmethod + async def recall_last_broadcast( + cls, bot: Bot, session_info: EventSession | str + ) -> BroadcastResult: + """撤回最近广播""" + msg_ids_to_recall = cls.get_last_broadcast_msg_ids() + + if not msg_ids_to_recall: + logger.warning( + "没有找到最近的广播消息ID记录", "广播撤回", session=session_info + ) + return 0, 0 + + id_list_str = ", ".join([f"{k}:{v}" for k, v in msg_ids_to_recall.items()]) + logger.debug( + f"找到 {len(msg_ids_to_recall)} 条广播消息ID记录: {id_list_str}", + "广播撤回", + session=session_info, + ) + + success_count = 0 + error_count = 0 + + logger.info( + f"准备撤回 {len(msg_ids_to_recall)} 条广播消息", + "广播撤回", + session=session_info, + ) + + for group_key, msg_id in msg_ids_to_recall.items(): + try: + logger.debug( + f"尝试撤回消息 (ID: {msg_id}) in {group_key}", + "广播撤回", + session=session_info, + ) + await bot.call_api("delete_msg", message_id=msg_id) + success_count += 1 + except ActionFailed as af_e: + retcode = getattr(af_e, "retcode", None) + wording = getattr(af_e, "wording", "") + if retcode == 100 and "MESSAGE_NOT_FOUND" in wording.upper(): + logger.warning( + f"消息 (ID: {msg_id}) 可能已被撤回或不存在于 {group_key}", + "广播撤回", + session=session_info, + ) + elif retcode == 300 and "delete message" in wording.lower(): + logger.warning( + f"消息 (ID: {msg_id}) 可能已被撤回或不存在于 {group_key}", + "广播撤回", + session=session_info, + ) + else: + error_count += 1 + logger.error( + f"撤回消息失败 (ID: {msg_id}) in {group_key}: {af_e}", + "广播撤回", + session=session_info, + e=af_e, + ) + except Exception as e: + error_count += 1 + logger.error( + f"撤回消息时发生未知错误 (ID: {msg_id}) in {group_key}: {e}", + "广播撤回", + session=session_info, + e=e, + ) + await asyncio.sleep(0.2) + + logger.debug("撤回操作完成,清空消息ID记录", "广播撤回", session=session_info) + cls.clear_last_broadcast_msg_ids() + + return success_count, error_count diff --git a/zhenxun/builtin_plugins/superuser/broadcast/message_processor.py b/zhenxun/builtin_plugins/superuser/broadcast/message_processor.py new file mode 100644 index 00000000..f9e64208 --- /dev/null +++ b/zhenxun/builtin_plugins/superuser/broadcast/message_processor.py @@ -0,0 +1,571 @@ +import base64 +import json +from typing import Any + +from nonebot.adapters import Bot, Event +from nonebot.adapters.onebot.v11 import Message as V11Message +from nonebot.adapters.onebot.v11 import MessageSegment as V11MessageSegment +from nonebot.exception import ActionFailed +import nonebot_plugin_alconna as alc +from nonebot_plugin_alconna import UniMessage +from nonebot_plugin_alconna.uniseg.segment import ( + At, + AtAll, + CustomNode, + Image, + Reference, + Reply, + Text, + Video, +) +from nonebot_plugin_alconna.uniseg.tools import reply_fetch +from nonebot_plugin_session import EventSession + +from zhenxun.services.log import logger +from zhenxun.utils.common_utils import CommonUtils +from zhenxun.utils.message import MessageUtils + +from .broadcast_manager import BroadcastManager + +MAX_FORWARD_DEPTH = 3 + + +async def _process_forward_content( + forward_content: Any, forward_id: str | None, bot: Bot, depth: int +) -> list[CustomNode]: + """处理转发消息内容""" + nodes_for_alc = [] + content_parsed = False + + if forward_content: + nodes_from_content = None + if isinstance(forward_content, list): + nodes_from_content = forward_content + elif isinstance(forward_content, str): + try: + parsed_content = json.loads(forward_content) + if isinstance(parsed_content, list): + nodes_from_content = parsed_content + except Exception as json_e: + logger.debug( + f"[Depth {depth}] JSON解析失败: {json_e}", + "广播", + ) + + if nodes_from_content is not None: + logger.debug( + f"[D{depth}] 节点数: {len(nodes_from_content)}", + "广播", + ) + content_parsed = True + for node_data in nodes_from_content: + node = await _create_custom_node_from_data(node_data, bot, depth + 1) + if node: + nodes_for_alc.append(node) + + if not content_parsed and forward_id: + logger.debug( + f"[D{depth}] 尝试API调用ID: {forward_id}", + "广播", + ) + try: + forward_data = await bot.call_api("get_forward_msg", id=forward_id) + nodes_list = None + + if isinstance(forward_data, dict) and "messages" in forward_data: + nodes_list = forward_data["messages"] + elif ( + isinstance(forward_data, dict) + and "data" in forward_data + and isinstance(forward_data["data"], dict) + and "message" in forward_data["data"] + ): + nodes_list = forward_data["data"]["message"] + elif isinstance(forward_data, list): + nodes_list = forward_data + + if nodes_list: + node_count = len(nodes_list) + logger.debug( + f"[D{depth + 1}] 节点:{node_count}", + "广播", + ) + for node_data in nodes_list: + node = await _create_custom_node_from_data( + node_data, bot, depth + 1 + ) + if node: + nodes_for_alc.append(node) + else: + logger.warning( + f"[D{depth + 1}] ID:{forward_id}无节点", + "广播", + ) + nodes_for_alc.append( + CustomNode( + uid="0", + name="错误", + content="[嵌套转发消息获取失败]", + ) + ) + except ActionFailed as af_e: + logger.error( + f"[D{depth + 1}] API失败: {af_e}", + "广播", + e=af_e, + ) + nodes_for_alc.append( + CustomNode( + uid="0", + name="错误", + content="[嵌套转发消息获取失败]", + ) + ) + except Exception as e: + logger.error( + f"[D{depth + 1}] 处理出错: {e}", + "广播", + e=e, + ) + nodes_for_alc.append( + CustomNode( + uid="0", + name="错误", + content="[处理嵌套转发时出错]", + ) + ) + elif not content_parsed and not forward_id: + logger.warning( + f"[D{depth}] 转发段无内容也无ID", + "广播", + ) + nodes_for_alc.append( + CustomNode( + uid="0", + name="错误", + content="[嵌套转发消息无法解析]", + ) + ) + elif content_parsed and not nodes_for_alc: + logger.warning( + f"[D{depth}] 解析成功但无有效节点", + "广播", + ) + nodes_for_alc.append( + CustomNode( + uid="0", + name="信息", + content="[嵌套转发内容为空]", + ) + ) + + return nodes_for_alc + + +async def _create_custom_node_from_data( + node_data: dict, bot: Bot, depth: int +) -> CustomNode | None: + """从节点数据创建CustomNode""" + node_content_raw = node_data.get("message") or node_data.get("content") + if not node_content_raw: + logger.warning(f"[D{depth}] 节点缺少消息内容", "广播") + return None + + sender = node_data.get("sender", {}) + uid = str(sender.get("user_id", "10000")) + name = sender.get("nickname", f"用户{uid[:4]}") + + extracted_uni_msg = await _extract_content_from_message( + node_content_raw, bot, depth + ) + if not extracted_uni_msg: + return None + + return CustomNode(uid=uid, name=name, content=extracted_uni_msg) + + +async def _extract_broadcast_content( + bot: Bot, + event: Event, + arp: alc.Arparma, + session: EventSession, +) -> UniMessage | None: + """从命令参数或引用消息中提取广播内容""" + broadcast_content_msg: UniMessage | None = None + + command_content_list = arp.all_matched_args.get("content", []) + + processed_command_list = [] + has_command_content = False + + if command_content_list: + for item in command_content_list: + if isinstance(item, alc.Segment): + processed_command_list.append(item) + if not (isinstance(item, Text) and not item.text.strip()): + has_command_content = True + elif isinstance(item, str): + if item.strip(): + processed_command_list.append(Text(item.strip())) + has_command_content = True + else: + logger.warning( + f"Unexpected type in command content: {type(item)}", "广播" + ) + + if has_command_content: + logger.debug("检测到命令参数内容,优先使用参数内容", "广播", session=session) + broadcast_content_msg = UniMessage(processed_command_list) + + if not broadcast_content_msg.filter( + lambda x: not (isinstance(x, Text) and not x.text.strip()) + ): + logger.warning( + "命令参数内容解析后为空或只包含空白", "广播", session=session + ) + broadcast_content_msg = None + + if not broadcast_content_msg: + reply_segment_obj: Reply | None = await reply_fetch(event, bot) + if ( + reply_segment_obj + and hasattr(reply_segment_obj, "msg") + and reply_segment_obj.msg + ): + logger.debug( + "未检测到有效命令参数,检测到引用消息", "广播", session=session + ) + raw_quoted_content = reply_segment_obj.msg + is_forward = False + forward_id = None + + if isinstance(raw_quoted_content, V11Message): + for seg in raw_quoted_content: + if isinstance(seg, V11MessageSegment): + if seg.type == "forward": + forward_id = seg.data.get("id") + is_forward = bool(forward_id) + break + elif seg.type == "json": + try: + json_data_str = seg.data.get("data", "{}") + if isinstance(json_data_str, str): + import json + + json_data = json.loads(json_data_str) + if ( + json_data.get("app") == "com.tencent.multimsg" + or json_data.get("view") == "Forward" + ) and json_data.get("meta", {}).get( + "detail", {} + ).get("resid"): + forward_id = json_data["meta"]["detail"][ + "resid" + ] + is_forward = True + break + except Exception: + pass + + if is_forward and forward_id: + logger.info( + f"尝试获取并构造合并转发内容 (ID: {forward_id})", + "广播", + session=session, + ) + nodes_to_forward: list[CustomNode] = [] + try: + forward_data = await bot.call_api("get_forward_msg", id=forward_id) + nodes_list = None + if isinstance(forward_data, dict) and "messages" in forward_data: + nodes_list = forward_data["messages"] + elif ( + isinstance(forward_data, dict) + and "data" in forward_data + and isinstance(forward_data["data"], dict) + and "message" in forward_data["data"] + ): + nodes_list = forward_data["data"]["message"] + elif isinstance(forward_data, list): + nodes_list = forward_data + + if nodes_list is not None: + for node_data in nodes_list: + node_sender = node_data.get("sender", {}) + node_user_id = str(node_sender.get("user_id", "10000")) + node_nickname = node_sender.get( + "nickname", f"用户{node_user_id[:4]}" + ) + node_content_raw = node_data.get( + "message" + ) or node_data.get("content") + if node_content_raw: + extracted_node_uni_msg = ( + await _extract_content_from_message( + node_content_raw, bot + ) + ) + if extracted_node_uni_msg: + nodes_to_forward.append( + CustomNode( + uid=node_user_id, + name=node_nickname, + content=extracted_node_uni_msg, + ) + ) + if nodes_to_forward: + broadcast_content_msg = UniMessage( + Reference(nodes=nodes_to_forward) + ) + except ActionFailed: + await MessageUtils.build_message( + "获取合并转发消息失败,可能不支持此 API。" + ).send(reply_to=True) + return None + except Exception as api_e: + logger.error(f"处理合并转发时出错: {api_e}", "广播", e=api_e) + await MessageUtils.build_message( + "处理合并转发消息时发生内部错误。" + ).send(reply_to=True) + return None + else: + broadcast_content_msg = await _extract_content_from_message( + raw_quoted_content, bot + ) + else: + logger.debug("未检测到命令参数和引用消息", "广播", session=session) + await MessageUtils.build_message("请提供广播内容或引用要广播的消息").send( + reply_to=True + ) + return None + + if not broadcast_content_msg: + logger.error( + "未能从命令参数或引用消息中获取有效的广播内容", "广播", session=session + ) + await MessageUtils.build_message("错误:未能获取有效的广播内容。").send( + reply_to=True + ) + return None + + return broadcast_content_msg + + +async def _process_v11_segment( + seg_obj: V11MessageSegment | dict, depth: int, index: int, bot: Bot +) -> list[alc.Segment]: + """处理V11消息段""" + result = [] + seg_type = None + data_dict = None + + if isinstance(seg_obj, V11MessageSegment): + seg_type = seg_obj.type + data_dict = seg_obj.data + elif isinstance(seg_obj, dict): + seg_type = seg_obj.get("type") + data_dict = seg_obj.get("data") + else: + return result + + if not (seg_type and data_dict is not None): + logger.warning(f"[D{depth}] 跳过无效数据: {type(seg_obj)}", "广播") + return result + + if seg_type == "text": + text_content = data_dict.get("text", "") + if isinstance(text_content, str) and text_content.strip(): + result.append(Text(text_content)) + elif seg_type == "image": + img_seg = None + if data_dict.get("url"): + img_seg = Image(url=data_dict["url"]) + elif data_dict.get("file"): + file_val = data_dict["file"] + if isinstance(file_val, str) and file_val.startswith("base64://"): + b64_data = file_val[9:] + raw_bytes = base64.b64decode(b64_data) + img_seg = Image(raw=raw_bytes) + else: + img_seg = Image(path=file_val) + if img_seg: + result.append(img_seg) + else: + logger.warning(f"[Depth {depth}] V11 图片 {index} 缺少URL/文件", "广播") + elif seg_type == "at": + target_qq = data_dict.get("qq", "") + if target_qq.lower() == "all": + result.append(AtAll()) + elif target_qq: + result.append(At(flag="user", target=target_qq)) + elif seg_type == "video": + video_seg = None + if data_dict.get("url"): + video_seg = Video(url=data_dict["url"]) + elif data_dict.get("file"): + file_val = data_dict["file"] + if isinstance(file_val, str) and file_val.startswith("base64://"): + b64_data = file_val[9:] + raw_bytes = base64.b64decode(b64_data) + video_seg = Video(raw=raw_bytes) + else: + video_seg = Video(path=file_val) + if video_seg: + result.append(video_seg) + logger.debug(f"[Depth {depth}] 处理视频消息成功", "广播") + else: + logger.warning(f"[Depth {depth}] V11 视频 {index} 缺少URL/文件", "广播") + elif seg_type == "forward": + nested_forward_id = data_dict.get("id") or data_dict.get("resid") + nested_forward_content = data_dict.get("content") + + logger.debug(f"[D{depth}] 嵌套转发ID: {nested_forward_id}", "广播") + + nested_nodes = await _process_forward_content( + nested_forward_content, nested_forward_id, bot, depth + ) + + if nested_nodes: + result.append(Reference(nodes=nested_nodes)) + else: + logger.warning(f"[D{depth}] 跳过类型: {seg_type}", "广播") + + return result + + +async def _extract_content_from_message( + message_content: Any, bot: Bot, depth: int = 0 +) -> UniMessage: + """提取消息内容到UniMessage""" + temp_msg = UniMessage() + input_type_str = str(type(message_content)) + + if depth >= MAX_FORWARD_DEPTH: + logger.warning( + f"[Depth {depth}] 达到最大递归深度 {MAX_FORWARD_DEPTH},停止解析嵌套转发。", + "广播", + ) + temp_msg.append(Text("[嵌套转发层数过多,内容已省略]")) + return temp_msg + + segments_to_process = [] + + if isinstance(message_content, UniMessage): + segments_to_process = list(message_content) + elif isinstance(message_content, V11Message): + segments_to_process = list(message_content) + elif isinstance(message_content, list): + segments_to_process = message_content + elif ( + isinstance(message_content, dict) + and "type" in message_content + and "data" in message_content + ): + segments_to_process = [message_content] + elif isinstance(message_content, str): + if message_content.strip(): + temp_msg.append(Text(message_content)) + return temp_msg + else: + logger.warning(f"[Depth {depth}] 无法处理的输入类型: {input_type_str}", "广播") + return temp_msg + + if segments_to_process: + for index, seg_obj in enumerate(segments_to_process): + try: + if isinstance(seg_obj, Text): + text_content = getattr(seg_obj, "text", None) + if isinstance(text_content, str) and text_content.strip(): + temp_msg.append(seg_obj) + elif isinstance(seg_obj, Image): + if ( + getattr(seg_obj, "url", None) + or getattr(seg_obj, "path", None) + or getattr(seg_obj, "raw", None) + ): + temp_msg.append(seg_obj) + elif isinstance(seg_obj, At): + temp_msg.append(seg_obj) + elif isinstance(seg_obj, AtAll): + temp_msg.append(seg_obj) + elif isinstance(seg_obj, Video): + if ( + getattr(seg_obj, "url", None) + or getattr(seg_obj, "path", None) + or getattr(seg_obj, "raw", None) + ): + temp_msg.append(seg_obj) + logger.debug(f"[D{depth}] 处理Video对象成功", "广播") + else: + processed_segments = await _process_v11_segment( + seg_obj, depth, index, bot + ) + temp_msg.extend(processed_segments) + except Exception as e_conv_seg: + logger.warning( + f"[D{depth}] 处理段 {index} 出错: {e_conv_seg}", + "广播", + e=e_conv_seg, + ) + + if not temp_msg and message_content: + logger.warning(f"未能从类型 {input_type_str} 中提取内容", "广播") + + return temp_msg + + +async def get_broadcast_target_groups( + bot: Bot, session: EventSession +) -> tuple[list, list]: + """获取广播目标群组和启用了广播功能的群组""" + target_groups = [] + all_groups, _ = await BroadcastManager.get_all_groups(bot) + target_groups = all_groups + logger.info("向所有群组广播", "广播", session=session) + + if not target_groups: + await MessageUtils.build_message("没有找到符合条件的广播目标群组。").send( + reply_to=True + ) + return [], [] + + enabled_groups = [] + for group in target_groups: + if not await CommonUtils.task_is_block(bot, "broadcast", group.group_id): + enabled_groups.append(group) + + if not enabled_groups: + await MessageUtils.build_message( + "没有启用了广播功能的目标群组可供立即发送。" + ).send(reply_to=True) + return target_groups, [] + + return target_groups, enabled_groups + + +async def send_broadcast_and_notify( + bot: Bot, + event: Event, + message: UniMessage, + enabled_groups: list, + target_groups: list, + session: EventSession, +) -> None: + """发送广播并通知结果""" + BroadcastManager.clear_last_broadcast_msg_ids() + count, error_count = await BroadcastManager.send_to_specific_groups( + bot, message, enabled_groups, session + ) + + result = f"成功广播 {count} 个群组" + if error_count: + result += f"\n发送失败 {error_count} 个群组" + result += f"\n有效: {len(enabled_groups)} / 总计: {len(target_groups)}" + + user_id = str(event.get_user_id()) + await bot.send_private_msg(user_id=user_id, message=f"发送广播完成!\n{result}") + + BroadcastManager.log_info( + f"广播完成,有效/总计: {len(enabled_groups)}/{len(target_groups)}", + session, + ) diff --git a/zhenxun/builtin_plugins/superuser/broadcast/models.py b/zhenxun/builtin_plugins/superuser/broadcast/models.py new file mode 100644 index 00000000..4bcdf936 --- /dev/null +++ b/zhenxun/builtin_plugins/superuser/broadcast/models.py @@ -0,0 +1,64 @@ +from datetime import datetime +from typing import Any + +from nonebot_plugin_alconna import UniMessage + +from zhenxun.models.group_console import GroupConsole + +GroupKey = str +MessageID = int +BroadcastResult = tuple[int, int] +BroadcastDetailResult = tuple[int, int, int] + + +class BroadcastTarget: + """广播目标""" + + def __init__(self, group_id: str, channel_id: str | None = None): + self.group_id = group_id + self.channel_id = channel_id + + def to_dict(self) -> dict[str, str | None]: + """转换为字典格式""" + return {"group_id": self.group_id, "channel_id": self.channel_id} + + @classmethod + def from_group_console(cls, group: GroupConsole) -> "BroadcastTarget": + """从 GroupConsole 对象创建""" + return cls(group_id=group.group_id, channel_id=group.channel_id) + + @property + def key(self) -> str: + """获取群组的唯一标识""" + if self.channel_id: + return f"{self.group_id}:{self.channel_id}" + return str(self.group_id) + + +class BroadcastTask: + """广播任务""" + + def __init__( + self, + bot_id: str, + message: UniMessage, + targets: list[BroadcastTarget], + scheduled_time: datetime | None = None, + task_id: str | None = None, + ): + self.bot_id = bot_id + self.message = message + self.targets = targets + self.scheduled_time = scheduled_time + self.task_id = task_id + + def to_dict(self) -> dict[str, Any]: + """转换为字典格式,用于序列化""" + return { + "bot_id": self.bot_id, + "targets": [t.to_dict() for t in self.targets], + "scheduled_time": self.scheduled_time.isoformat() + if self.scheduled_time + else None, + "task_id": self.task_id, + } diff --git a/zhenxun/builtin_plugins/superuser/broadcast/utils.py b/zhenxun/builtin_plugins/superuser/broadcast/utils.py new file mode 100644 index 00000000..748559fd --- /dev/null +++ b/zhenxun/builtin_plugins/superuser/broadcast/utils.py @@ -0,0 +1,175 @@ +import base64 + +import nonebot_plugin_alconna as alc +from nonebot_plugin_alconna import UniMessage +from nonebot_plugin_alconna.uniseg import Reference +from nonebot_plugin_alconna.uniseg.segment import CustomNode, Video + +from zhenxun.services.log import logger + + +def uni_segment_to_v11_segment_dict( + seg: alc.Segment, depth: int = 0 +) -> dict | list[dict] | None: + """UniSeg段转V11字典""" + if isinstance(seg, alc.Text): + return {"type": "text", "data": {"text": seg.text}} + elif isinstance(seg, alc.Image): + if getattr(seg, "url", None): + return { + "type": "image", + "data": {"file": seg.url}, + } + elif getattr(seg, "raw", None): + raw_data = seg.raw + if isinstance(raw_data, str): + if len(raw_data) >= 9 and raw_data[:9] == "base64://": + return {"type": "image", "data": {"file": raw_data}} + elif isinstance(raw_data, bytes): + b64_str = base64.b64encode(raw_data).decode() + return {"type": "image", "data": {"file": f"base64://{b64_str}"}} + else: + logger.warning(f"无法处理 Image.raw 的类型: {type(raw_data)}", "广播") + elif getattr(seg, "path", None): + logger.warning( + f"在合并转发中使用了本地图片路径,可能无法显示: {seg.path}", "广播" + ) + return {"type": "image", "data": {"file": f"file:///{seg.path}"}} + else: + logger.warning(f"alc.Image 缺少有效数据,无法转换为 V11 段: {seg}", "广播") + elif isinstance(seg, alc.At): + return {"type": "at", "data": {"qq": seg.target}} + elif isinstance(seg, alc.AtAll): + return {"type": "at", "data": {"qq": "all"}} + elif isinstance(seg, Video): + if getattr(seg, "url", None): + return { + "type": "video", + "data": {"file": seg.url}, + } + elif getattr(seg, "raw", None): + raw_data = seg.raw + if isinstance(raw_data, str): + if len(raw_data) >= 9 and raw_data[:9] == "base64://": + return {"type": "video", "data": {"file": raw_data}} + elif isinstance(raw_data, bytes): + b64_str = base64.b64encode(raw_data).decode() + return {"type": "video", "data": {"file": f"base64://{b64_str}"}} + else: + logger.warning(f"无法处理 Video.raw 的类型: {type(raw_data)}", "广播") + elif getattr(seg, "path", None): + logger.warning( + f"在合并转发中使用了本地视频路径,可能无法显示: {seg.path}", "广播" + ) + return {"type": "video", "data": {"file": f"file:///{seg.path}"}} + else: + logger.warning(f"Video 缺少有效数据,无法转换为 V11 段: {seg}", "广播") + elif isinstance(seg, Reference) and getattr(seg, "nodes", None): + if depth >= 3: + logger.warning( + f"嵌套转发深度超过限制 (depth={depth}),不再继续解析", "广播" + ) + return {"type": "text", "data": {"text": "[嵌套转发层数过多,内容已省略]"}} + + nested_v11_content_list = [] + nodes_list = getattr(seg, "nodes", []) + for node in nodes_list: + if isinstance(node, CustomNode): + node_v11_content = [] + if isinstance(node.content, UniMessage): + for nested_seg in node.content: + converted_dict = uni_segment_to_v11_segment_dict( + nested_seg, depth + 1 + ) + if isinstance(converted_dict, list): + node_v11_content.extend(converted_dict) + elif converted_dict: + node_v11_content.append(converted_dict) + elif isinstance(node.content, str): + node_v11_content.append( + {"type": "text", "data": {"text": node.content}} + ) + if node_v11_content: + separator = { + "type": "text", + "data": { + "text": f"\n--- 来自 {node.name} ({node.uid}) 的消息 ---\n" + }, + } + nested_v11_content_list.insert(0, separator) + nested_v11_content_list.extend(node_v11_content) + nested_v11_content_list.append( + {"type": "text", "data": {"text": "\n---\n"}} + ) + + return nested_v11_content_list + + else: + logger.warning(f"广播时跳过不支持的 UniSeg 段类型: {type(seg)}", "广播") + return None + + +def uni_message_to_v11_list_of_dicts(uni_msg: UniMessage | str | list) -> list[dict]: + """UniMessage转V11字典列表""" + try: + if isinstance(uni_msg, str): + return [{"type": "text", "data": {"text": uni_msg}}] + + if isinstance(uni_msg, list): + if not uni_msg: + return [] + + if all(isinstance(item, str) for item in uni_msg): + return [{"type": "text", "data": {"text": item}} for item in uni_msg] + + result = [] + for item in uni_msg: + if hasattr(item, "__iter__") and not isinstance(item, str | bytes): + result.extend(uni_message_to_v11_list_of_dicts(item)) + elif hasattr(item, "text") and not isinstance(item, str | bytes): + text_value = getattr(item, "text", "") + result.append({"type": "text", "data": {"text": str(text_value)}}) + elif hasattr(item, "url") and not isinstance(item, str | bytes): + url_value = getattr(item, "url", "") + if isinstance(item, Video): + result.append( + {"type": "video", "data": {"file": str(url_value)}} + ) + else: + result.append( + {"type": "image", "data": {"file": str(url_value)}} + ) + else: + try: + result.append({"type": "text", "data": {"text": str(item)}}) + except Exception as e: + logger.warning(f"无法转换列表元素: {item}, 错误: {e}", "广播") + return result + except Exception as e: + logger.warning(f"消息转换过程中出错: {e}", "广播") + + return [{"type": "text", "data": {"text": str(uni_msg)}}] + + +def custom_nodes_to_v11_nodes(custom_nodes: list[CustomNode]) -> list[dict]: + """CustomNode列表转V11节点""" + v11_nodes = [] + for node in custom_nodes: + v11_content_list = uni_message_to_v11_list_of_dicts(node.content) + + if v11_content_list: + v11_nodes.append( + { + "type": "node", + "data": { + "user_id": str(node.uid), + "nickname": node.name, + "content": v11_content_list, + }, + } + ) + else: + logger.warning( + f"CustomNode (uid={node.uid}) 内容转换后为空,跳过此节点", "广播" + ) + return v11_nodes