使用道具允许at目标用户 (#1877)

*  使用道具允许at目标用户

*  修改帮助配置项默认值

*  修改帮助配置项默认值

* 🎨 优化广播方法

* 💡 添加广播方法注释
This commit is contained in:
HibiKier 2025-06-16 09:18:58 +08:00 committed by GitHub
parent 99f1388e23
commit 13579f5842
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 180 additions and 97 deletions

View File

@ -37,8 +37,8 @@ __plugin_meta__ = PluginMetadata(
configs=[
RegisterConfig(
key="type",
value="normal",
help="帮助图片样式 ['normal', 'HTML', 'zhenxun']",
value="zhenxun",
help="帮助图片样式 [normal, HTML, zhenxun]",
default_value="zhenxun",
)
],

View File

@ -5,7 +5,9 @@ from nonebot_plugin_alconna import (
AlconnaQuery,
Args,
Arparma,
At,
Match,
MultiVar,
Option,
Query,
Subcommand,
@ -47,6 +49,7 @@ __plugin_meta__ = PluginMetadata(
plugin_type=PluginType.NORMAL,
menu_type="商店",
commands=[
Command(command="商店"),
Command(command="我的金币"),
Command(command="我的道具"),
Command(command="购买道具"),
@ -75,13 +78,21 @@ _matcher = on_alconna(
Subcommand("my-cost", help_text="我的金币"),
Subcommand("my-props", help_text="我的道具"),
Subcommand("buy", Args["name?", str]["num?", int], help_text="购买道具"),
Subcommand("use", Args["name?", str]["num?", int], help_text="使用道具"),
Subcommand("gold-list", Args["num?", int], help_text="金币排行"),
),
priority=5,
block=True,
)
_use_matcher = on_alconna(
Alconna(
"使用道具",
Args["name?", str]["num?", int]["at_users?", MultiVar(At)],
),
priority=5,
block=True,
)
_matcher.shortcut(
"我的金币",
command="商店",
@ -103,13 +114,6 @@ _matcher.shortcut(
prefix=True,
)
_matcher.shortcut(
"使用道具(?P<name>.*?)",
command="商店",
arguments=["use", "{name}"],
prefix=True,
)
_matcher.shortcut(
"金币排行",
command="商店",
@ -173,7 +177,7 @@ async def _(
await MessageUtils.build_message(result).send(reply_to=True)
@_matcher.assign("use")
@_use_matcher.handle()
async def _(
bot: Bot,
event: Event,
@ -182,6 +186,7 @@ async def _(
arparma: Arparma,
name: Match[str],
num: Query[int] = AlconnaQuery("num", 1),
at_users: Query[list[At]] = AlconnaQuery("at_users", []),
):
if not name.available:
await MessageUtils.build_message(
@ -189,7 +194,7 @@ async def _(
).finish(reply_to=True)
try:
result = await ShopManage.use(
bot, event, session, message, name.result, num.result, ""
bot, event, session, message, name.result, num.result, "", at_users.result
)
logger.info(
f"使用道具 {name.result}, 数量: {num.result}",

View File

@ -8,7 +8,7 @@ from typing import Any, Literal
from nonebot.adapters import Bot, Event
from nonebot.compat import model_dump
from nonebot_plugin_alconna import UniMessage, UniMsg
from nonebot_plugin_alconna import At, UniMessage, UniMsg
from nonebot_plugin_uninfo import Uninfo
from pydantic import BaseModel, Field, create_model
from tortoise.expressions import Q
@ -48,6 +48,10 @@ class Goods(BaseModel):
"""model"""
session: Uninfo | None = None
"""Uninfo"""
at_user: str | None = None
"""At对象"""
at_users: list[str] = []
"""At对象列表"""
class ShopParam(BaseModel):
@ -73,6 +77,10 @@ class ShopParam(BaseModel):
"""Uninfo"""
message: UniMsg
"""UniMessage"""
at_user: str | None = None
"""At对象"""
at_users: list[str] = []
"""At对象列表"""
extra_data: dict[str, Any] = Field(default_factory=dict)
"""额外数据"""
@ -156,6 +164,7 @@ class ShopManage:
goods: Goods,
num: int,
text: str,
at_users: list[str] = [],
) -> tuple[ShopParam, dict[str, Any]]:
"""构造参数
@ -165,6 +174,7 @@ class ShopManage:
goods_name: 商品名称
num: 数量
text: 其他信息
at_users: at用户
"""
group_id = None
if session.group:
@ -172,6 +182,7 @@ class ShopManage:
session.group.parent.id if session.group.parent else session.group.id
)
_kwargs = goods.params
at_user = at_users[0] if at_users else None
model = goods.model(
**{
"goods_name": goods.name,
@ -183,6 +194,8 @@ class ShopManage:
"text": text,
"session": session,
"message": message,
"at_user": at_user,
"at_users": at_users,
}
)
return model, {
@ -194,6 +207,8 @@ class ShopManage:
"num": num,
"text": text,
"goods_name": goods.name,
"at_user": at_user,
"at_users": at_users,
}
@classmethod
@ -223,6 +238,7 @@ class ShopManage:
**param.extra_data,
"session": session,
"message": message,
"shop_param": ShopParam,
}
for key in list(param_json.keys()):
if key not in args:
@ -308,6 +324,7 @@ class ShopManage:
goods_name: str,
num: int,
text: str,
at_users: list[At] = [],
) -> str | UniMessage | None:
"""使用道具
@ -319,6 +336,7 @@ class ShopManage:
goods_name: 商品名称
num: 使用数量
text: 其他信息
at_users: at用户
返回:
str | MessageFactory | None: 使用完成后返回信息
@ -339,8 +357,9 @@ class ShopManage:
goods = cls.uuid2goods.get(goods_info.uuid)
if not goods or not goods.func:
return f"{goods_info.goods_name} 未注册使用函数, 无法使用..."
at_user_ids = [at.target for at in at_users]
param, kwargs = cls.__build_params(
bot, event, session, message, goods, num, text
bot, event, session, message, goods, num, text, at_user_ids
)
if num > param.max_num_limit:
return f"{goods_info.goods_name} 单次使用最大数量为{param.max_num_limit}..."
@ -480,10 +499,13 @@ class ShopManage:
if not user.props:
return None
user.props = {uuid: count for uuid, count in user.props.items() if count > 0}
goods_list = await GoodsInfo.filter(uuid__in=user.props.keys()).all()
goods_by_uuid = {item.uuid: item for item in goods_list}
user.props = {
uuid: count
for uuid, count in user.props.items()
if count > 0 and goods_by_uuid.get(uuid)
}
table_rows = []
for i, prop_uuid in enumerate(user.props):

View File

@ -1,7 +1,7 @@
import asyncio
from collections.abc import Awaitable, Callable
import random
from typing import Literal
from typing import cast
import httpx
import nonebot
@ -486,15 +486,134 @@ class PlatformUtils:
return target
class BroadcastEngine:
def __init__(
self,
message: str | UniMessage,
bot: Bot | list[Bot] | None = None,
bot_id: str | set[str] | None = None,
ignore_group: list[str] | None = None,
check_func: Callable[[Bot, str], Awaitable] | None = None,
log_cmd: str | None = None,
platform: str | None = None,
):
"""广播引擎
参数:
message: 广播消息内容
bot: 指定bot对象.
bot_id: 指定bot id.
ignore_group: 忽略群聊列表.
check_func: 发送前对群聊检测方法判断是否发送.
log_cmd: 日志标记.
platform: 指定平台.
异常:
ValueError: 没有可用的Bot对象
"""
if ignore_group is None:
ignore_group = []
self.message = MessageUtils.build_message(message)
self.ignore_group = ignore_group
self.check_func = check_func
self.log_cmd = log_cmd
self.platform = platform
self.bot_list = []
self.count = 0
if bot:
self.bot_list = [bot] if isinstance(bot, Bot) else bot
if isinstance(bot_id, str):
bot_id = set(bot_id)
if bot_id:
for i in bot_id:
try:
self.bot_list.append(nonebot.get_bot(i))
except KeyError:
logger.warning(f"Bot:{i} 对象未连接或不存在")
if not self.bot_list:
raise ValueError("当前没有可用的Bot对象...", log_cmd)
async def call_check(self, bot: Bot, group_id: str) -> bool:
"""运行发送检测函数
参数:
bot: Bot
group_id: 群组id
返回:
bool: 是否发送
"""
if not self.check_func:
return True
if is_coroutine_callable(self.check_func):
is_run = await self.check_func(bot, group_id)
else:
is_run = self.check_func(bot, group_id)
return cast(bool, is_run)
async def __send_message(self, bot: Bot, group: GroupConsole):
"""群组发送消息
参数:
bot: Bot
group: GroupConsole
"""
key = f"{group.group_id}:{group.channel_id}"
if not await self.call_check(bot, group.group_id):
logger.debug(
"广播方法检测运行方法为 False, 已跳过该群组...",
self.log_cmd,
group_id=group.group_id,
)
return
if target := PlatformUtils.get_target(
group_id=group.group_id,
channel_id=group.channel_id,
):
self.ignore_group.append(key)
await MessageUtils.build_message(self.message).send(target, bot)
logger.debug("广播消息发送成功...", self.log_cmd, target=key)
else:
logger.warning("广播消息获取Target失败...", self.log_cmd, target=key)
async def broadcast(self) -> int:
"""广播消息
返回:
int: 成功发送次数
"""
for bot in self.bot_list:
if self.platform and self.platform != PlatformUtils.get_platform(bot):
continue
group_list, _ = await PlatformUtils.get_group_list(bot)
if not group_list:
continue
for group in group_list:
if (
group.group_id in self.ignore_group
or group.channel_id in self.ignore_group
):
continue
try:
await self.__send_message(bot, group)
await asyncio.sleep(random.randint(1, 3))
self.count += 1
except Exception as e:
logger.warning(
"广播消息发送失败", self.log_cmd, target=group.group_id, e=e
)
return self.count
async def broadcast_group(
message: str | UniMessage,
bot: Bot | list[Bot] | None = None,
bot_id: str | set[str] | None = None,
ignore_group: set[int] | None = None,
ignore_group: list[str] = [],
check_func: Callable[[Bot, str], Awaitable] | None = None,
log_cmd: str | None = None,
platform: Literal["qq", "dodo", "kaiheila"] | None = None,
):
platform: str | None = None,
) -> int:
"""获取所有Bot或指定Bot对象广播群聊
参数:
@ -505,81 +624,18 @@ async def broadcast_group(
check_func: 发送前对群聊检测方法判断是否发送.
log_cmd: 日志标记.
platform: 指定平台
返回:
int: 成功发送次数
"""
if platform and platform not in ["qq", "dodo", "kaiheila"]:
raise ValueError("指定平台不支持")
if not message:
raise ValueError("群聊广播消息不能为空")
bot_dict = nonebot.get_bots()
bot_list: list[Bot] = []
if bot:
if isinstance(bot, list):
bot_list = bot
else:
bot_list.append(bot)
elif bot_id:
_bot_id_list = bot_id
if isinstance(bot_id, str):
_bot_id_list = [bot_id]
for id_ in _bot_id_list:
if bot_id in bot_dict:
bot_list.append(bot_dict[bot_id])
else:
logger.warning(f"Bot:{id_} 对象未连接或不存在")
else:
bot_list = list(bot_dict.values())
_used_group = []
for _bot in bot_list:
try:
if platform and platform != PlatformUtils.get_platform(_bot):
continue
group_list, _ = await PlatformUtils.get_group_list(_bot)
if group_list:
for group in group_list:
key = f"{group.group_id}:{group.channel_id}"
try:
if (
ignore_group
and (
group.group_id in ignore_group
or group.channel_id in ignore_group
)
) or key in _used_group:
logger.debug(
"广播方法群组重复, 已跳过...",
log_cmd,
group_id=group.group_id,
)
continue
is_run = False
if check_func:
if is_coroutine_callable(check_func):
is_run = await check_func(_bot, group.group_id)
else:
is_run = check_func(_bot, group.group_id)
if not is_run:
logger.debug(
"广播方法检测运行方法为 False, 已跳过...",
log_cmd,
group_id=group.group_id,
)
continue
target = PlatformUtils.get_target(
user_id=None,
group_id=group.group_id,
channel_id=group.channel_id,
)
if target:
_used_group.append(key)
message_list = message
await MessageUtils.build_message(message_list).send(
target, _bot
)
logger.debug("发送成功", log_cmd, target=key)
await asyncio.sleep(random.randint(1, 3))
else:
logger.warning("target为空", log_cmd, target=key)
except Exception as e:
logger.error("发送失败", log_cmd, target=key, e=e)
except Exception as e:
logger.error(f"Bot: {_bot.self_id} 获取群聊列表失败", command=log_cmd, e=e)
if not message.strip():
raise ValueError("群聊广播消息不能为空...")
return await BroadcastEngine(
message=message,
bot=bot,
bot_id=bot_id,
ignore_group=ignore_group,
check_func=check_func,
log_cmd=log_cmd,
platform=platform,
).broadcast()