Merge branch 'main' into feature/webui-config

This commit is contained in:
HibiKier 2025-06-13 11:35:06 +08:00 committed by GitHub
commit d657f47698
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 317 additions and 105 deletions

View File

@ -121,7 +121,7 @@ AccessToken: PUBLIC_ZHENXUN_TEST
- 实现了许多功能,且提供了大量功能管理命令,进行了多平台适配,兼容 nb2 商店插件
- 拥有完善可用的 webui
- 通过 Config 配置项将所有插件配置统保存至 config.yaml利于统一用户修改
- 通过 Config 配置项将所有插件配置统保存至 config.yaml利于统一用户修改
- 方便增删插件,原生 nonebot2 matcher不需要额外修改仅仅通过简单的配置属性就可以生成`帮助图片`和`帮助信息`
- 提供了 cd阻塞每日次数等限制仅仅通过简单的属性就可以生成一个限制例如`PluginCdBlock` 等
- **更多详细请通过 [传送门](https://zhenxun-org.github.io/zhenxun_bot/) 查看文档!**
@ -172,7 +172,7 @@ poetry run python bot.py
1.在 .env.dev 文件中填写你的机器人配置项
2.在 configs/config.yaml 文件中修改你需要修改的插件配置项
2.在 data/config.yaml 文件中修改你需要修改的插件配置项
<details>
<summary>数据库地址DB_URL配置说明</summary>

View File

@ -14,13 +14,19 @@ from nonebot_plugin_alconna import (
from nonebot_plugin_session import EventSession
from zhenxun.configs.config import BotConfig, Config
from zhenxun.configs.utils import PluginExtraData, RegisterConfig
from zhenxun.configs.utils import (
AICallableParam,
AICallableProperties,
AICallableTag,
PluginExtraData,
RegisterConfig,
)
from zhenxun.services.log import logger
from zhenxun.utils.enum import PluginType
from zhenxun.utils.message import MessageUtils
from zhenxun.utils.rules import admin_check
from ._data_source import BanManage
from ._data_source import BanManage, call_ban
base_config = Config.get("ban")
@ -78,6 +84,22 @@ __plugin_meta__ = PluginMetadata(
type=int,
)
],
smart_tools=[
AICallableTag(
name="call_ban",
description="某人多次(至少三次)辱骂你,调用此方法进行封禁",
parameters=AICallableParam(
type="object",
properties={
"user_id": AICallableProperties(
type="string", description="用户的id"
),
},
required=["user_id"],
),
func=call_ban,
)
],
).to_dict(),
)

View File

@ -5,9 +5,20 @@ from nonebot_plugin_session import EventSession
from zhenxun.models.ban_console import BanConsole
from zhenxun.models.level_user import LevelUser
from zhenxun.services.log import logger
from zhenxun.utils.image_utils import BuildImage, ImageTemplate
async def call_ban(user_id: str):
"""调用ban
参数:
user_id: 用户id
"""
await BanConsole.ban(user_id, None, 9, 60 * 12)
logger.info("辱骂次数过多,已将用户加入黑名单...", "ban", session=user_id)
class BanManage:
@classmethod
async def build_ban_image(

View File

@ -21,7 +21,7 @@ from zhenxun.utils.message import MessageUtils
__plugin_meta__ = PluginMetadata(
name="笨蛋检测",
description="功能名称当命令检测",
usage="""被动""".strip(),
usage="""当一些笨蛋直接输入功能名称时,提示笨蛋使用帮助指令查看功能帮助""".strip(),
extra=PluginExtraData(
author="HibiKier",
version="0.1",

View File

@ -1,4 +1,4 @@
from nonebot import on_notice, on_request
from nonebot import on_notice
from nonebot.adapters import Bot
from nonebot.adapters.onebot.v11 import (
GroupDecreaseNoticeEvent,
@ -14,9 +14,10 @@ from nonebot_plugin_uninfo import Uninfo
from zhenxun.builtin_plugins.platform.qq.exception import ForceAddGroupError
from zhenxun.configs.config import BotConfig, Config
from zhenxun.configs.utils import PluginExtraData, RegisterConfig, Task
from zhenxun.models.event_log import EventLog
from zhenxun.models.group_console import GroupConsole
from zhenxun.utils.common_utils import CommonUtils
from zhenxun.utils.enum import PluginType
from zhenxun.utils.enum import EventLogType, PluginType
from zhenxun.utils.platform import PlatformUtils
from zhenxun.utils.rules import notice_rule
@ -106,8 +107,6 @@ group_decrease_handle = on_notice(
rule=notice_rule([GroupMemberDecreaseEvent, GroupDecreaseNoticeEvent]),
)
"""群员减少处理"""
add_group = on_request(priority=1, block=False)
"""加群同意请求"""
@group_increase_handle.handle()
@ -142,7 +141,20 @@ async def _(
if event.sub_type == "kick_me":
"""踢出Bot"""
await GroupManager.kick_bot(bot, group_id, str(event.operator_id))
await EventLog.create(
user_id=user_id, group_id=group_id, event_type=EventLogType.KICK_BOT
)
elif event.sub_type in ["leave", "kick"]:
if event.sub_type == "leave":
"""主动退群"""
await EventLog.create(
user_id=user_id, group_id=group_id, event_type=EventLogType.LEAVE_MEMBER
)
else:
"""被踢出群"""
await EventLog.create(
user_id=user_id, group_id=group_id, event_type=EventLogType.KICK_MEMBER
)
result = await GroupManager.run_user(
bot, user_id, group_id, str(event.operator_id), event.sub_type
)

View File

@ -0,0 +1,100 @@
import asyncio
from datetime import datetime
import random
from nonebot.adapters import Bot
from nonebot.plugin import PluginMetadata
from nonebot.rule import to_me
from nonebot_plugin_alconna import Alconna, Args, Arparma, Field, on_alconna
from nonebot_plugin_uninfo import Uninfo
from zhenxun.configs.utils import PluginCdBlock, PluginExtraData
from zhenxun.models.fg_request import FgRequest
from zhenxun.services.log import logger
from zhenxun.utils.depends import UserName
from zhenxun.utils.enum import RequestHandleType, RequestType
from zhenxun.utils.platform import PlatformUtils
__plugin_meta__ = PluginMetadata(
name="群组申请",
description="""
一些小群直接邀请入群导致无法正常生成审核请求需要用该方法手动生成审核请求
当管理员同意同意时会发送消息进行提示之后再进行拉群不会退出
该消息会发送至管理员多次发送不存在的群组id或相同群组id可能导致ban
""".strip(),
usage="""
指令
申请入群 [群号]
示例: 申请入群 123123123
""".strip(),
extra=PluginExtraData(
author="HibiKier",
version="0.1",
menu_type="其他",
limits=[PluginCdBlock(cd=300, result="每5分钟只能申请一次哦~")],
).to_dict(),
)
_matcher = on_alconna(
Alconna(
"申请入群",
Args[
"group_id",
int,
Field(
missing_tips=lambda: "请在命令后跟随群组id",
unmatch_tips=lambda _: "群组id必须为数字",
),
],
),
skip_for_unmatch=False,
priority=5,
block=True,
rule=to_me(),
)
@_matcher.handle()
async def _(
bot: Bot, session: Uninfo, arparma: Arparma, group_id: int, uname: str = UserName()
):
# 旧请求全部设置为过期
await FgRequest.filter(
request_type=RequestType.GROUP,
user_id=session.user.id,
group_id=str(group_id),
handle_type__isnull=True,
).update(handle_type=RequestHandleType.EXPIRE)
f = await FgRequest.create(
request_type=RequestType.GROUP,
platform=PlatformUtils.get_platform(session),
bot_id=bot.self_id,
flag="0",
user_id=session.user.id,
nickname=uname,
group_id=str(group_id),
)
results = await PlatformUtils.send_superuser(
bot,
f"*****一份入群申请*****\n"
f"ID{f.id}\n"
f"申请人:{uname}({session.user.id})\n群聊:"
f"{group_id}\n邀请日期:{datetime.now().replace(microsecond=0)}\n"
"注:该请求为手动申请入群",
)
if message_ids := [
str(r[1].msg_ids[0]["message_id"]) for r in results if r[1] and r[1].msg_ids
]:
f.message_ids = ",".join(message_ids)
await f.save(update_fields=["message_ids"])
await asyncio.sleep(random.randint(1, 5))
await bot.send_private_msg(
user_id=int(session.user.id),
message=f"已发送申请请等待管理员审核ID{f.id}",
)
logger.info(
f"用户 {uname}({session.user.id}) 申请入群 {group_id}ID{f.id}",
arparma.header_result,
session=session,
)

View File

@ -99,8 +99,8 @@ class ShopManage:
"plugins.json"
)
extra_github_url = await extra_github_repo.get_raw_download_urls("plugins.json")
res = await AsyncHttpx.get(default_github_url)
res2 = await AsyncHttpx.get(extra_github_url)
res = await AsyncHttpx.get(default_github_url, check_status_code=200)
res2 = await AsyncHttpx.get(extra_github_url, check_status_code=200)
# 检查请求结果
if res.status_code != 200 or res2.status_code != 200:

View File

@ -17,11 +17,12 @@ from nonebot_plugin_session import EventSession
from zhenxun.configs.config import BotConfig, Config
from zhenxun.configs.utils import PluginExtraData, RegisterConfig
from zhenxun.models.event_log import EventLog
from zhenxun.models.fg_request import FgRequest
from zhenxun.models.friend_user import FriendUser
from zhenxun.models.group_console import GroupConsole
from zhenxun.services.log import logger
from zhenxun.utils.enum import PluginType, RequestHandleType, RequestType
from zhenxun.utils.enum import EventLogType, PluginType, RequestHandleType, RequestType
from zhenxun.utils.platform import PlatformUtils
base_config = Config.get("invite_manager")
@ -112,21 +113,29 @@ async def _(bot: v12Bot | v11Bot, event: FriendRequestEvent, session: EventSessi
nickname=nickname,
comment=comment,
)
await PlatformUtils.send_superuser(
results = await PlatformUtils.send_superuser(
bot,
f"*****一份好友申请*****\n"
f"ID: {f.id}\n"
f"昵称:{nickname}({event.user_id})\n"
f"自动同意:{'' if base_config.get('AUTO_ADD_FRIEND') else '×'}\n"
f"日期:{str(datetime.now()).split('.')[0]}\n"
f"日期:{datetime.now().replace(microsecond=0)}\n"
f"备注:{event.comment}",
)
if message_ids := [
str(r[1].msg_ids[0]["message_id"])
for r in results
if r[1] and r[1].msg_ids
]:
f.message_ids = ",".join(message_ids)
await f.save(update_fields=["message_ids"])
else:
logger.debug("好友请求五分钟内重复, 已忽略", "好友请求", target=event.user_id)
@group_req.handle()
async def _(bot: v12Bot | v11Bot, event: GroupRequestEvent, session: EventSession):
# sourcery skip: low-code-quality
if event.sub_type != "invite":
return
if str(event.user_id) in bot.config.superusers or base_config.get("AUTO_ADD_GROUP"):
@ -186,7 +195,7 @@ async def _(bot: v12Bot | v11Bot, event: GroupRequestEvent, session: EventSessio
group_id=str(event.group_id),
handle_type=RequestHandleType.APPROVE,
)
await PlatformUtils.send_superuser(
results = await PlatformUtils.send_superuser(
bot,
f"*****一份入群申请*****\n"
f"ID{f.id}\n"
@ -230,13 +239,27 @@ async def _(bot: v12Bot | v11Bot, event: GroupRequestEvent, session: EventSessio
nickname=nickname,
group_id=str(event.group_id),
)
await PlatformUtils.send_superuser(
kick_count = await EventLog.filter(
group_id=str(event.group_id), event_type=EventLogType.KICK_BOT
).count()
kick_message = (
f"\n该群累计踢出{BotConfig.self_nickname} <{kick_count}>次"
if kick_count
else ""
)
results = await PlatformUtils.send_superuser(
bot,
f"*****一份入群申请*****\n"
f"ID{f.id}\n"
f"申请人:{nickname}({event.user_id})\n群聊:"
f"{event.group_id}\n邀请日期:{datetime.now().replace(microsecond=0)}",
f"{event.group_id}\n邀请日期:{datetime.now().replace(microsecond=0)}"
f"{kick_message}",
)
if message_ids := [
str(r[1].msg_ids[0]["message_id"]) for r in results if r[1] and r[1].msg_ids
]:
f.message_ids = ",".join(message_ids)
await f.save(update_fields=["message_ids"])
else:
logger.debug(
"群聊请求五分钟内重复, 已忽略",

View File

@ -1,67 +1,11 @@
from asyncio.exceptions import TimeoutError
import aiofiles
import nonebot
from nonebot.drivers import Driver
from nonebot_plugin_apscheduler import scheduler
import ujson as json
from zhenxun.configs.path_config import TEXT_PATH
from zhenxun.models.group_console import GroupConsole
from zhenxun.services.log import logger
from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.utils.manager.priority_manager import PriorityLifecycle
driver: Driver = nonebot.get_driver()
@PriorityLifecycle.on_startup(priority=5)
async def update_city():
"""
部分插件需要中国省份城市
这里直接更新避免插件内代码重复
"""
china_city = TEXT_PATH / "china_city.json"
if not china_city.exists():
data = {}
try:
logger.debug("开始更新城市列表...")
res = await AsyncHttpx.get(
"http://www.weather.com.cn/data/city3jdata/china.html", timeout=5
)
res.encoding = "utf8"
provinces_data = json.loads(res.text)
for province in provinces_data.keys():
data[provinces_data[province]] = []
res = await AsyncHttpx.get(
f"http://www.weather.com.cn/data/city3jdata/provshi/{province}.html",
timeout=5,
)
res.encoding = "utf8"
city_data = json.loads(res.text)
for city in city_data.keys():
data[provinces_data[province]].append(city_data[city])
async with aiofiles.open(china_city, "w", encoding="utf8") as f:
json.dump(data, f, indent=4, ensure_ascii=False)
logger.info("自动更新城市列表完成.....")
except TimeoutError as e:
logger.warning("自动更新城市列表超时...", e=e)
except ValueError as e:
logger.warning("自动城市列表失败.....", e=e)
except Exception as e:
logger.error("自动城市列表未知错误", e=e)
# 自动更新城市列表
@scheduler.scheduled_job(
"cron",
hour=6,
minute=1,
)
async def _():
await update_city()
@PriorityLifecycle.on_startup(priority=5)
async def _():
"""开启/禁用插件格式修改"""

View File

@ -33,6 +33,7 @@ __plugin_meta__ = PluginMetadata(
usage="""
商品操作
指令
商店
我的金币
我的道具
使用道具 [名称/Id]

View File

@ -345,10 +345,11 @@ class ShopManage:
if num > param.max_num_limit:
return f"{goods_info.goods_name} 单次使用最大数量为{param.max_num_limit}..."
await cls.run_before_after(goods, param, session, message, "before", **kwargs)
result = await cls.__run(goods, param, session, message, **kwargs)
await UserConsole.use_props(
session.user.id, goods_info.uuid, num, PlatformUtils.get_platform(session)
)
result = await cls.__run(goods, param, session, message, **kwargs)
await cls.run_before_after(goods, param, session, message, "after", **kwargs)
if not result and param.send_success_msg:
result = f"使用道具 {goods.name} {num} 次成功!"

View File

@ -53,10 +53,7 @@ async def _(
)
@scheduler.scheduled_job(
"interval",
minutes=1,
)
@scheduler.scheduled_job("interval", minutes=1, max_instances=5)
async def _():
try:
call_list = TEMP_LIST.copy()

View File

@ -2,7 +2,7 @@ from io import BytesIO
from arclet.alconna import Args, Option
from arclet.alconna.typing import CommandMeta
from nonebot.adapters import Bot
from nonebot.adapters import Bot, Event
from nonebot.permission import SUPERUSER
from nonebot.plugin import PluginMetadata
from nonebot.rule import to_me
@ -10,10 +10,13 @@ from nonebot_plugin_alconna import (
Alconna,
AlconnaQuery,
Arparma,
Match,
Query,
Reply,
on_alconna,
store_true,
)
from nonebot_plugin_alconna.uniseg.tools import reply_fetch
from nonebot_plugin_session import EventSession
from zhenxun.configs.config import BotConfig
@ -54,7 +57,7 @@ __plugin_meta__ = PluginMetadata(
_req_matcher = on_alconna(
Alconna(
"请求处理",
Args["handle", ["-fa", "-fr", "-fi", "-ga", "-gr", "-gi"]]["id", int],
Args["handle", ["-fa", "-fr", "-fi", "-ga", "-gr", "-gi"]]["id?", int],
meta=CommandMeta(
description="好友/群组请求处理",
usage=usage,
@ -105,12 +108,12 @@ _clear_matcher = on_alconna(
)
reg_arg_list = [
(r"同意好友请求", ["-fa", "{%0}"]),
(r"拒绝好友请求", ["-fr", "{%0}"]),
(r"忽略好友请求", ["-fi", "{%0}"]),
(r"同意群组请求", ["-ga", "{%0}"]),
(r"拒绝群组请求", ["-gr", "{%0}"]),
(r"忽略群组请求", ["-gi", "{%0}"]),
(r"同意好友请求\s*(?P<id>\d*)", ["-fa", "{id}"]),
(r"拒绝好友请求\s*(?P<id>\d*)", ["-fr", "{id}"]),
(r"忽略好友请求\s*(?P<id>\d*)", ["-fi", "{id}"]),
(r"同意群组请求\s*(?P<id>\d*)", ["-ga", "{id}"]),
(r"拒绝群组请求\s*(?P<id>\d*)", ["-gr", "{id}"]),
(r"忽略群组请求\s*(?P<id>\d*)", ["-gi", "{id}"]),
]
for r in reg_arg_list:
@ -125,32 +128,48 @@ for r in reg_arg_list:
@_req_matcher.handle()
async def _(
bot: Bot,
event: Event,
session: EventSession,
handle: str,
id: int,
id: Match[int],
arparma: Arparma,
):
reply: Reply | None = None
type_dict = {
"a": RequestHandleType.APPROVE,
"r": RequestHandleType.REFUSED,
"i": RequestHandleType.IGNORE,
}
if not id.available:
reply = await reply_fetch(event, bot)
if not reply:
await MessageUtils.build_message("请引用消息处理或添加处理Id.").finish()
handle_id = id.result
if reply:
db_data = await FgRequest.get_or_none(message_ids__contains=reply.id)
if not db_data:
await MessageUtils.build_message(
"未发现此消息的Id请使用Id进行处理..."
).finish(reply_to=True)
handle_id = db_data.id
req = None
handle_type = type_dict[handle[-1]]
try:
if handle_type == RequestHandleType.APPROVE:
req = await FgRequest.approve(bot, id)
req = await FgRequest.approve(bot, handle_id)
if handle_type == RequestHandleType.REFUSED:
req = await FgRequest.refused(bot, id)
req = await FgRequest.refused(bot, handle_id)
if handle_type == RequestHandleType.IGNORE:
req = await FgRequest.ignore(id)
req = await FgRequest.ignore(handle_id)
except NotFoundError:
await MessageUtils.build_message("未发现此id的请求...").finish(reply_to=True)
except Exception:
await MessageUtils.build_message("其他错误, 可能flag已失效...").finish(
reply_to=True
)
logger.info("处理请求", arparma.header_result, session=session)
logger.info(
f"处理请求 Id: {req.id if req else ''}", arparma.header_result, session=session
)
await MessageUtils.build_message("成功处理请求!").send(reply_to=True)
if req and handle_type == RequestHandleType.APPROVE:
await bot.send_private_msg(

View File

@ -0,0 +1,21 @@
from tortoise import fields
from zhenxun.services.db_context import Model
from zhenxun.utils.enum import EventLogType
class EventLog(Model):
id = fields.IntField(pk=True, generated=True, auto_increment=True)
"""自增id"""
user_id = fields.CharField(255, description="用户id")
"""用户id"""
group_id = fields.CharField(255, description="群组id")
"""群组id"""
event_type = fields.CharEnumField(EventLogType, default=None, description="类型")
"""类型"""
create_time = fields.DatetimeField(auto_now_add=True, description="创建时间")
"""创建时间"""
class Meta: # pyright: ignore [reportIncompatibleVariableOverride]
table = "event_log"
table_description = "各种请求通知记录表"

View File

@ -3,8 +3,10 @@ from typing_extensions import Self
from nonebot.adapters import Bot
from tortoise import fields
from zhenxun.configs.config import BotConfig
from zhenxun.models.group_console import GroupConsole
from zhenxun.services.db_context import Model
from zhenxun.utils.common_utils import SqlUtils
from zhenxun.utils.enum import RequestHandleType, RequestType
from zhenxun.utils.exception import NotFoundError
@ -34,6 +36,8 @@ class FgRequest(Model):
RequestHandleType, null=True, description="处理类型"
)
"""处理类型"""
message_ids = fields.CharField(max_length=255, null=True, description="消息id列表")
"""消息id列表"""
class Meta: # pyright: ignore [reportIncompatibleVariableOverride]
table = "fg_request"
@ -123,9 +127,24 @@ class FgRequest(Model):
await GroupConsole.update_or_create(
group_id=req.group_id, defaults={"group_flag": 1}
)
await bot.set_group_add_request(
flag=req.flag,
sub_type="invite",
approve=handle_type == RequestHandleType.APPROVE,
)
if req.flag == "0":
# 用户手动申请入群,创建群认证后提醒用户拉群
await bot.send_private_msg(
user_id=req.user_id,
message=f"已同意你对{BotConfig.self_nickname}的申请群组:"
f"{req.group_id},可以直接手动拉入群组,{BotConfig.self_nickname}会自动同意。",
)
else:
# 正常同意群组请求
await bot.set_group_add_request(
flag=req.flag,
sub_type="invite",
approve=handle_type == RequestHandleType.APPROVE,
)
return req
@classmethod
async def _run_script(cls):
return [
SqlUtils.add_column("fg_request", "message_ids", "character varying(255)")
]

View File

@ -26,6 +26,19 @@ class BankHandleType(StrEnum):
"""利息"""
class EventLogType(StrEnum):
GROUP_MEMBER_INCREASE = "GROUP_MEMBER_INCREASE"
"""群成员增加"""
GROUP_MEMBER_DECREASE = "GROUP_MEMBER_DECREASE"
"""群成员减少"""
KICK_MEMBER = "KICK_MEMBER"
"""踢出群成员"""
KICK_BOT = "KICK_BOT"
"""踢出Bot"""
LEAVE_MEMBER = "LEAVE_MEMBER"
"""主动退群"""
class GoldHandle(StrEnum):
"""
金币处理
@ -117,7 +130,9 @@ class RequestType(StrEnum):
"""
FRIEND = "FRIEND"
"""好友"""
GROUP = "GROUP"
"""群组"""
class RequestHandleType(StrEnum):

View File

@ -43,6 +43,7 @@ class AsyncHttpx:
use_proxy: bool = True,
proxy: dict[str, str] | None = None,
timeout: int = 30, # noqa: ASYNC109
check_status_code: int | None = None,
**kwargs,
) -> Response:
"""Get
@ -56,6 +57,7 @@ class AsyncHttpx:
use_proxy: 使用默认代理
proxy: 指定代理
timeout: 超时时间
check_status_code: 检查状态码
"""
urls = [url] if isinstance(url, str) else url
return await cls._get_first_successful(
@ -67,6 +69,7 @@ class AsyncHttpx:
use_proxy=use_proxy,
proxy=proxy,
timeout=timeout,
check_status_code=check_status_code,
**kwargs,
)
@ -74,12 +77,18 @@ class AsyncHttpx:
async def _get_first_successful(
cls,
urls: list[str],
check_status_code: int | None = None,
**kwargs,
) -> Response:
last_exception = None
for url in urls:
try:
return await cls._get_single(url, **kwargs)
logger.info(f"开始获取 {url}..")
response = await cls._get_single(url, **kwargs)
if check_status_code and response.status_code != check_status_code:
status_code = response.status_code
raise Exception(f"状态码错误:{status_code}!={check_status_code}")
return response
except Exception as e:
last_exception = e
if url != urls[-1]:

View File

@ -83,7 +83,7 @@ class PlatformUtils:
bot: Bot,
message: UniMessage | str,
superuser_id: str | None = None,
) -> Receipt | None:
) -> list[tuple[str, Receipt]]:
"""发送消息给超级用户
参数:
@ -97,15 +97,33 @@ class PlatformUtils:
返回:
Receipt | None: Receipt
"""
if not superuser_id:
if platform := cls.get_platform(bot):
if platform_superusers := BotConfig.get_superuser(platform):
superuser_id = random.choice(platform_superusers)
else:
raise NotFindSuperuser()
superuser_ids = []
if superuser_id:
superuser_ids.append(superuser_id)
elif platform := cls.get_platform(bot):
if platform_superusers := BotConfig.get_superuser(platform):
superuser_ids = platform_superusers
else:
raise NotFindSuperuser()
if isinstance(message, str):
message = MessageUtils.build_message(message)
return await cls.send_message(bot, superuser_id, None, message)
result = []
for superuser_id in superuser_ids:
try:
result.append(
(
superuser_id,
await cls.send_message(bot, superuser_id, None, message),
)
)
except Exception as e:
logger.error(
"发送消息给超级用户失败",
"PlatformUtils:send_superuser",
target=superuser_id,
e=e,
)
return result
@classmethod
async def get_group_member_list(cls, bot: Bot, group_id: str) -> list[UserData]: