feat(broadcast): 实现标签定向广播、强制发送及并发控制

- 【新功能】
  - 新增标签定向广播功能,支持通过 `-t <标签名>` 或 `广播到 <标签名>` 命令向指定标签的群组发送消息
  - 引入广播强制发送模式,允许绕过群组的任务阻断设置
  - 实现广播并发控制,通过配置限制同时发送任务数量,避免API速率限制
  - 优化视频消息处理,支持从URL下载视频内容并作为原始数据发送,提高跨平台兼容性
- 【配置】
  - 添加 `DEFAULT_BROADCAST` 配置项,用于设置群组进群时广播功能的默认开关状态
  - 添加 `BROADCAST_CONCURRENCY_LIMIT` 配置项,用于控制广播时的最大并发任务数
This commit is contained in:
webjoin111 2025-11-12 16:12:37 +08:00
parent 412654d165
commit b4df0c0fe9
4 changed files with 359 additions and 160 deletions

View File

@ -28,7 +28,8 @@ from nonebot_plugin_alconna.uniseg.segment import (
)
from nonebot_plugin_session import EventSession
from zhenxun.configs.utils import PluginExtraData, Task
from zhenxun.configs.utils import PluginExtraData, RegisterConfig, Task
from zhenxun.services.log import logger
from zhenxun.utils.enum import PluginType
from zhenxun.utils.message import MessageUtils
@ -45,34 +46,52 @@ __plugin_meta__ = PluginMetadata(
name="广播",
description="昭告天下!",
usage="""
广播 [消息内容]
- 直接发送消息到除当前群组外的所有群组
- 支持文本图片@表情视频等多种消息类型
- 示例广播 你们好
- 示例广播 [图片] 新活动开始啦
向所有群组或指定标签的群组发送广播消息
广播 + 引用消息
- 将引用的消息作为广播内容发送
- 支持引用普通消息或合并转发消息
- 示例(引用一条消息) 广播
**基础用法**
- `广播 [消息内容]`向所有群组发送广播
- `广播` (并引用一条消息)将引用的消息作为内容进行广播
广播撤回
- 撤回最近一次由您触发的广播消息
- 仅能撤回短时间内的消息
- 示例广播撤回
**高级定向广播**
- `广播 -t <标签名> [消息内容]`向指定标签下的所有群组广播
- `广播到 <标签名> [消息内容]` `-t` 等效的快捷方式
特性
- 在群组中使用广播时不会将消息发送到当前群组
- 在私聊中使用广播时会发送到所有群组
**标签可以是静态的也可以是动态的例如**
- `广播到 核心群 通知...`
- `广播到 成员数>500的群 通知...`
别名
- bc (广播的简写)
- recall (广播撤回的别名)
**其他命令**
- `广播撤回` (别名: `recall`)撤回最近一次发送的广播
特性
- 在群组中使用广播时不会将消息发送到当前群组
- 在私聊中使用广播时会发送到所有群组
别名
- bc (广播的简写)
- recall (广播撤回的别名)
""".strip(),
extra=PluginExtraData(
author="HibiKier",
version="1.2",
version="1.3",
plugin_type=PluginType.SUPERUSER,
configs=[
RegisterConfig(
module="_task",
key="DEFAULT_BROADCAST",
value=True,
help="被动 广播 进群默认开关状态",
default_value=True,
type=bool,
),
RegisterConfig(
module="_task",
key="BROADCAST_CONCURRENCY_LIMIT",
value=10,
help="广播时的最大并发任务数以避免API速率限制",
default_value=10,
),
],
tasks=[Task(module="broadcast", name="广播")],
).to_dict(),
)
@ -103,6 +122,9 @@ _matcher = on_alconna(
Alconna(
"广播",
Args["content?", AllParam],
alc.Option(
"-t|--tag", Args["tag_name_bc", str], help_text="向指定标签的群组广播"
),
),
aliases={"bc"},
priority=1,
@ -112,6 +134,8 @@ _matcher = on_alconna(
use_origin=False,
)
_matcher.shortcut("广播到 {tag}", command="广播 -t {tag} {%*}")
_recall_matcher = on_alconna(
Alconna("广播撤回"),
aliases={"recall"},
@ -128,23 +152,59 @@ async def handle_broadcast(
event: Event,
session: EventSession,
arp: alc.Arparma,
tag_name_match: alc.Match[str] = alc.AlconnaMatch("tag_name_bc"),
):
broadcast_content_msg = await _extract_broadcast_content(bot, event, arp, session)
if not broadcast_content_msg:
return
target_groups, enabled_groups = await get_broadcast_target_groups(bot, session)
if not target_groups or not enabled_groups:
tag_name_to_broadcast = None
force_send = False
if tag_name_match.available:
tag_name_to_broadcast = tag_name_match.result
force_send = True
mode_desc = "强制发送到标签" if force_send else "普通发送"
logger.debug(
f"广播模式: {mode_desc}, 标签名: {tag_name_to_broadcast}",
"广播",
)
target_groups_console, groups_to_actually_send = await get_broadcast_target_groups(
bot, session, tag_name_to_broadcast, force_send
)
if not target_groups_console:
if tag_name_to_broadcast:
await MessageUtils.build_message(
f"标签 '{tag_name_to_broadcast}' 中没有群组或标签不存在。"
).send(reply_to=True)
return
if not groups_to_actually_send:
if not force_send and target_groups_console:
await MessageUtils.build_message(
"没有启用了广播功能的目标群组可供立即发送。"
).send(reply_to=True)
return
try:
await send_broadcast_and_notify(
bot, event, broadcast_content_msg, enabled_groups, target_groups, session
bot,
event,
broadcast_content_msg,
groups_to_actually_send,
target_groups_console,
session,
force_send,
)
except Exception as e:
error_msg = "发送广播失败"
BroadcastManager.log_error(error_msg, e, session)
await MessageUtils.build_message(f"{error_msg}").send(reply_to=True)
await bot.send_private_msg(
user_id=str(event.get_user_id()), message=f"{error_msg}"
)
@_recall_matcher.handle()
@ -178,5 +238,6 @@ async def handle_broadcast_recall(
except Exception as e:
error_msg = "撤回广播消息失败"
BroadcastManager.log_error(error_msg, e, session)
user_id = str(event.get_user_id())
await bot.send_private_msg(user_id=user_id, message=f"{error_msg}")
await bot.send_private_msg(
user_id=str(event.get_user_id()), message=f"{error_msg}"
)

View File

@ -5,11 +5,12 @@ from typing import ClassVar
from nonebot.adapters import Bot
from nonebot.adapters.onebot.v11 import Bot as V11Bot
from nonebot.exception import ActionFailed
from nonebot.exception import ActionFailed, AdapterException
from nonebot_plugin_alconna import UniMessage
from nonebot_plugin_alconna.uniseg import Receipt, Reference
from nonebot_plugin_session import EventSession
from zhenxun.configs.config import Config
from zhenxun.models.group_console import GroupConsole
from zhenxun.services.log import logger
from zhenxun.utils.common_utils import CommonUtils
@ -18,6 +19,8 @@ from zhenxun.utils.platform import PlatformUtils
from .models import BroadcastDetailResult, BroadcastResult
from .utils import custom_nodes_to_v11_nodes, uni_message_to_v11_list_of_dicts
BROADCAST_SEND_DELAY_RANGE = (1, 3)
class BroadcastManager:
"""广播管理器"""
@ -92,8 +95,16 @@ class BroadcastManager:
logger.debug("清空上一次的广播消息ID记录", "广播", session=session)
cls.clear_last_broadcast_msg_ids()
concurrency_limit = Config.get_config(
"_task",
"BROADCAST_CONCURRENCY_LIMIT",
10,
)
all_groups, _ = await cls.get_all_groups(bot)
return await cls.send_to_specific_groups(bot, message, all_groups, session)
return await cls.send_to_specific_groups(
bot, message, all_groups, session, concurrency_limit=concurrency_limit
)
@classmethod
async def send_to_specific_groups(
@ -102,14 +113,17 @@ class BroadcastManager:
message: UniMessage,
target_groups: list[GroupConsole],
session_info: EventSession | str | None = None,
force_send: bool = False,
concurrency_limit: int = 10,
) -> BroadcastResult:
"""发送广播到指定群组"""
log_session = session_info or bot.self_id
logger.debug(
f"开始广播,目标 {len(target_groups)} 个群组Bot ID: {bot.self_id}",
"广播",
session=log_session,
target_count = len(target_groups)
log_message = (
f"开始广播,目标 {target_count} 个群组 (并发数: {concurrency_limit})"
f"Bot ID: {bot.self_id}, ForceSend: {force_send}"
)
logger.info(log_message, "广播", session=log_session)
if not target_groups:
logger.debug("目标群组列表为空,广播结束", "广播", session=log_session)
@ -165,7 +179,12 @@ class BroadcastManager:
)
return 0, len(target_groups)
success_count, error_count, skip_count = await cls._broadcast_forward(
bot, log_session, target_groups, v11_nodes
bot,
log_session,
target_groups,
v11_nodes,
force_send,
concurrency_limit,
)
else:
if is_forward_broadcast:
@ -175,7 +194,12 @@ class BroadcastManager:
session=log_session,
)
success_count, error_count, skip_count = await cls._broadcast_normal(
bot, log_session, target_groups, message
bot,
log_session,
target_groups,
message,
force_send,
concurrency_limit,
)
total = len(target_groups)
@ -287,11 +311,16 @@ class BroadcastManager:
)
@classmethod
async def _check_group_availability(cls, bot: Bot, group: GroupConsole) -> bool:
async def _check_group_availability(
cls, bot: Bot, group: GroupConsole, force_send: bool = False
) -> bool:
"""检查群组是否可用"""
if not group.group_id:
return False
if force_send:
return True
if await CommonUtils.task_is_block(bot, "broadcast", group.group_id):
return False
@ -304,54 +333,69 @@ class BroadcastManager:
session_info: EventSession | str,
group_list: list[GroupConsole],
v11_nodes: list[dict],
force_send: bool = False,
concurrency_limit: int = 10,
) -> BroadcastDetailResult:
"""发送合并转发"""
success_count = 0
error_count = 0
skip_count = 0
semaphore = asyncio.Semaphore(concurrency_limit)
msg_id_lock = asyncio.Lock()
for _, group in enumerate(group_list):
async def send_to_group(group: GroupConsole) -> GroupConsole:
group_key = group.group_id or group.channel_id
async with semaphore:
try:
result = await bot.send_group_forward_msg(
group_id=int(group.group_id), messages=v11_nodes
)
async with msg_id_lock:
await cls._extract_message_id_from_result(
result, group_key, session_info, "合并转发"
)
await asyncio.sleep(random.uniform(*BROADCAST_SEND_DELAY_RANGE))
return group
except (ActionFailed, AdapterException) as ae:
logger.error(
f"发送失败(合并转发) to {group_key}: {ae}",
"广播",
session=session_info,
e=ae,
)
raise
except Exception as e:
logger.error(
f"发送失败(合并转发) to {group_key}: {e}",
"广播",
session=session_info,
e=e,
)
raise
if not await cls._check_group_availability(bot, group):
skip_count += 1
continue
tasks: list[asyncio.Task] = []
skipped_groups: list[GroupConsole] = []
for group in group_list:
if await cls._check_group_availability(bot, group, force_send):
tasks.append(asyncio.create_task(send_to_group(group)))
else:
skipped_groups.append(group)
try:
result = await bot.send_group_forward_msg(
group_id=int(group.group_id), messages=v11_nodes
)
if skipped_groups:
logger.info(
f"跳过 {len(skipped_groups)} 个不符合条件的群组",
"广播",
session=session_info,
)
logger.debug(
f"合并转发消息发送结果: {result}, 类型: {type(result)}",
"广播",
session=session_info,
)
if not tasks:
return 0, 0, len(skipped_groups)
await cls._extract_message_id_from_result(
result, group_key, session_info, "合并转发"
)
results = await asyncio.gather(*tasks, return_exceptions=True)
success_count += 1
await asyncio.sleep(random.randint(1, 3))
except ActionFailed as af_e:
error_count += 1
logger.error(
f"发送失败(合并转发) to {group_key}: {af_e}",
"广播",
session=session_info,
e=af_e,
)
except Exception as e:
error_count += 1
logger.error(
f"发送失败(合并转发) to {group_key}: {e}",
"广播",
session=session_info,
e=e,
)
success_count = sum(
1 for result in results if not isinstance(result, Exception)
)
error_count = len(results) - success_count
return success_count, error_count, skip_count
return success_count, error_count, len(skipped_groups)
@classmethod
async def _broadcast_normal(
@ -360,58 +404,83 @@ class BroadcastManager:
session_info: EventSession | str,
group_list: list[GroupConsole],
message: UniMessage,
force_send: bool = False,
concurrency_limit: int = 10,
) -> BroadcastDetailResult:
"""发送普通消息"""
success_count = 0
error_count = 0
skip_count = 0
semaphore = asyncio.Semaphore(concurrency_limit)
msg_id_lock = asyncio.Lock()
for _, group in enumerate(group_list):
async def send_to_group(group: GroupConsole) -> GroupConsole:
group_key = (
f"{group.group_id}:{group.channel_id}"
if group.channel_id
else str(group.group_id)
)
if not await cls._check_group_availability(bot, group):
skip_count += 1
continue
try:
target = PlatformUtils.get_target(
group_id=group.group_id, channel_id=group.channel_id
)
if target:
receipt: Receipt = await message.send(target, bot=bot)
logger.debug(
f"广播消息发送结果: {receipt}, 类型: {type(receipt)}",
"广播",
session=session_info,
)
await cls._extract_message_id_from_result(
receipt, group_key, session_info
)
success_count += 1
await asyncio.sleep(random.randint(1, 3))
else:
logger.warning(
"target为空", "广播", session=session_info, target=group_key
)
skip_count += 1
except Exception as e:
error_count += 1
logger.error(
f"发送失败(普通) to {group_key}: {e}",
target = PlatformUtils.get_target(
group_id=group.group_id, channel_id=group.channel_id
)
if not target:
logger.warning(
"target为空",
"广播",
session=session_info,
e=e,
target=group_key,
)
raise ValueError(f"无法为群组 {group_key} 创建发送目标")
return success_count, error_count, skip_count
async with semaphore:
try:
receipt: Receipt = await message.send(target, bot=bot)
async with msg_id_lock:
await cls._extract_message_id_from_result(
receipt, group_key, session_info
)
await asyncio.sleep(random.uniform(*BROADCAST_SEND_DELAY_RANGE))
return group
except (ActionFailed, AdapterException) as ae:
logger.error(
f"发送失败(普通) to {group_key}: {ae}",
"广播",
session=session_info,
e=ae,
)
raise
except Exception as e:
logger.error(
f"发送失败(普通) to {group_key}: {e}",
"广播",
session=session_info,
e=e,
)
raise
tasks: list[asyncio.Task] = []
skipped_groups: list[GroupConsole] = []
for group in group_list:
if await cls._check_group_availability(bot, group, force_send):
tasks.append(asyncio.create_task(send_to_group(group)))
else:
skipped_groups.append(group)
if skipped_groups:
logger.info(
f"跳过 {len(skipped_groups)} 个不符合条件的群组",
"广播",
session=session_info,
)
if not tasks:
return 0, 0, len(skipped_groups)
results = await asyncio.gather(*tasks, return_exceptions=True)
success_count = sum(
1 for result in results if not isinstance(result, Exception)
)
error_count = len(results) - success_count
return success_count, error_count, len(skipped_groups)
@classmethod
async def recall_last_broadcast(

View File

@ -21,8 +21,11 @@ from nonebot_plugin_alconna.uniseg.segment import (
from nonebot_plugin_alconna.uniseg.tools import reply_fetch
from nonebot_plugin_session import EventSession
from zhenxun.models.group_console import GroupConsole
from zhenxun.services.log import logger
from zhenxun.services.tags import tag_manager as TagManager
from zhenxun.utils.common_utils import CommonUtils
from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.utils.message import MessageUtils
from .broadcast_manager import BroadcastManager
@ -399,22 +402,29 @@ async def _process_v11_segment(
elif target_qq:
result.append(At(flag="user", target=target_qq))
elif seg_type == "video":
video_seg = None
if data_dict.get("url"):
video_seg = Video(url=data_dict["url"])
elif data_dict.get("file"):
file_val = data_dict["file"]
if url := data_dict.get("url"):
try:
logger.debug(f"[D{depth}] 正在下载视频用于广播: {url}", "广播")
video_bytes = await AsyncHttpx.get_content(url)
video_seg = Video(raw=video_bytes)
logger.debug(
f"[D{depth}] 视频下载成功, 大小: {len(video_bytes)} bytes",
"广播",
)
result.append(video_seg)
except Exception as e:
logger.error(f"[D{depth}] 广播时下载视频失败: {url}", "广播", e=e)
result.append(Text(f"[视频下载失败: {url}]"))
elif file_val := data_dict.get("file"):
if isinstance(file_val, str) and file_val.startswith("base64://"):
b64_data = file_val[9:]
raw_bytes = base64.b64decode(b64_data)
video_seg = Video(raw=raw_bytes)
result.append(video_seg)
else:
video_seg = Video(path=file_val)
if video_seg:
result.append(video_seg)
logger.debug(f"[Depth {depth}] 处理视频消息成功", "广播")
else:
logger.warning(f"[Depth {depth}] V11 视频 {index} 缺少URL/文件", "广播")
result.append(video_seg)
return result
elif seg_type == "forward":
nested_forward_id = data_dict.get("id") or data_dict.get("resid")
nested_forward_content = data_dict.get("content")
@ -515,70 +525,129 @@ async def _extract_content_from_message(
async def get_broadcast_target_groups(
bot: Bot, session: EventSession
bot: Bot,
session: EventSession,
tag_name: str | None = None,
force_send: bool = False,
) -> tuple[list, list]:
"""获取广播目标群组和启用了广播功能的群组"""
target_groups = []
all_groups, _ = await BroadcastManager.get_all_groups(bot)
target_groups_console: list[GroupConsole] = []
current_group_id = None
if hasattr(session, "id2") and session.id2:
current_group_id = session.id2
current_group_raw = getattr(session, "id2", None) or getattr(
session, "group_id", None
)
current_group_id = str(current_group_raw) if current_group_raw else None
if current_group_id:
target_groups = [
group for group in all_groups if group.group_id != current_group_id
]
logger.info(
f"向除当前群组({current_group_id})外的所有群组广播", "广播", session=session
)
logger.debug(f"当前群组ID: {current_group_id}", "广播")
if tag_name:
tagged_group_ids = await TagManager.resolve_tag_to_group_ids(tag_name, bot=bot)
if not tagged_group_ids:
return [], []
valid_groups = await GroupConsole.filter(group_id__in=tagged_group_ids)
if current_group_id:
target_groups_console = [
group
for group in valid_groups
if str(group.group_id) != current_group_id
]
excluded_msg = (
f",已排除当前群组({current_group_id})"
if any(
str(group.group_id) == current_group_id for group in valid_groups
)
else ""
)
broadcast_msg = (
f"向标签 '{tag_name}' 中的 {len(target_groups_console)} 个群组广播 "
f"(ForceSend: {force_send}){excluded_msg}"
)
logger.info(broadcast_msg, "广播", session=session)
else:
target_groups_console = valid_groups
broadcast_msg = (
f"向标签 '{tag_name}' 中的 {len(target_groups_console)} 个群组广播 "
f"(ForceSend: {force_send})"
)
logger.info(broadcast_msg, "广播", session=session)
else:
target_groups = all_groups
logger.info("向所有群组广播", "广播", session=session)
all_groups, _ = await BroadcastManager.get_all_groups(bot)
if not target_groups:
await MessageUtils.build_message("没有找到符合条件的广播目标群组。").send(
reply_to=True
)
if current_group_id:
target_groups_console = [
group for group in all_groups if str(group.group_id) != current_group_id
]
logger.info(
(
f"向除当前群组({current_group_id})外的所有群组广播 "
f"(ForceSend: {force_send})"
),
"广播",
session=session,
)
else:
target_groups_console = all_groups
logger.info(
f"向所有群组广播 (ForceSend: {force_send})", "广播", session=session
)
if not target_groups_console:
if not tag_name:
await MessageUtils.build_message("没有找到符合条件的广播目标群组。").send(
reply_to=True
)
return [], []
enabled_groups = []
for group in target_groups:
if not await CommonUtils.task_is_block(bot, "broadcast", group.group_id):
enabled_groups.append(group)
groups_to_actually_send = []
if force_send:
groups_to_actually_send = target_groups_console
logger.debug(
f"强制发送模式,将向 {len(groups_to_actually_send)} 个目标群组尝试发送。",
"广播",
)
else:
for group in target_groups_console:
if not await CommonUtils.task_is_block(bot, "broadcast", group.group_id):
groups_to_actually_send.append(group)
logger.debug(
f"普通发送模式,筛选后将向 {len(groups_to_actually_send)} "
f"个目标群组尝试发送",
"广播",
)
if not enabled_groups:
await MessageUtils.build_message(
"没有启用了广播功能的目标群组可供立即发送。"
).send(reply_to=True)
return target_groups, []
return target_groups, enabled_groups
return target_groups_console, groups_to_actually_send
async def send_broadcast_and_notify(
bot: Bot,
event: Event,
message: UniMessage,
enabled_groups: list,
target_groups: list,
groups_to_send: list,
all_target_groups_for_stats: list,
session: EventSession,
force_send: bool = False,
) -> None:
"""发送广播并通知结果"""
BroadcastManager.clear_last_broadcast_msg_ids()
count, error_count = await BroadcastManager.send_to_specific_groups(
bot, message, enabled_groups, session
bot, message, groups_to_send, session, force_send
)
result = f"成功广播 {count} 个群组"
if error_count:
result += f"\n发送失败 {error_count} 个群组"
result += f"\n有效: {len(enabled_groups)} / 总计: {len(target_groups)}"
effective_sent_count = len(groups_to_send)
total_considered_count = len(all_target_groups_for_stats)
result += f"\n有效: {effective_sent_count} / 总计目标: {total_considered_count}"
user_id = str(event.get_user_id())
await bot.send_private_msg(user_id=user_id, message=f"发送广播完成!\n{result}")
BroadcastManager.log_info(
f"广播完成,有效/总计: {len(enabled_groups)}/{len(target_groups)}",
f"广播完成,有效/总计目标: {effective_sent_count}/{total_considered_count}",
session,
)

View File

@ -59,7 +59,7 @@ def uni_segment_to_v11_segment_dict(
logger.warning(f"无法处理 Video.raw 的类型: {type(raw_data)}", "广播")
elif getattr(seg, "path", None):
logger.warning(
f"在合并转发中使用了本地视频路径,可能无法显示: {seg.path}", "广播"
f"在合并转发中使用了本地视频路径,可能无法发送: {seg.path}", "广播"
)
return {"type": "video", "data": {"file": f"file:///{seg.path}"}}
else: