2022-02-19 18:20:19 +08:00
|
|
|
|
from nonebot.adapters.onebot.v11 import GroupMessageEvent, MessageSegment
|
2022-01-05 22:32:59 +08:00
|
|
|
|
from services.log import logger
|
2022-02-19 18:20:19 +08:00
|
|
|
|
from nonebot.adapters.onebot.v11 import Bot
|
2022-05-03 02:03:06 +08:00
|
|
|
|
from pydantic import create_model
|
|
|
|
|
|
from utils.models import ShopParam
|
2022-10-15 19:49:53 +08:00
|
|
|
|
from typing import Optional, Union, Callable, List, Tuple, Dict, Any
|
2022-05-03 02:03:06 +08:00
|
|
|
|
from types import MappingProxyType
|
|
|
|
|
|
import inspect
|
2022-01-16 14:52:50 +08:00
|
|
|
|
import asyncio
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GoodsUseFuncManager:
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
|
self._data = {}
|
|
|
|
|
|
|
2022-10-15 19:49:53 +08:00
|
|
|
|
def register_use_before_handle(self, goods_name: str, fun_list: List[Callable]):
|
|
|
|
|
|
"""
|
|
|
|
|
|
说明:
|
|
|
|
|
|
注册商品使用前函数
|
|
|
|
|
|
参数:
|
|
|
|
|
|
:param goods_name: 商品名称
|
|
|
|
|
|
:param fun_list: 函数列表
|
|
|
|
|
|
"""
|
|
|
|
|
|
if fun_list:
|
|
|
|
|
|
self._data[goods_name]["before_handle"] = fun_list
|
|
|
|
|
|
logger.info(f"register_use_before_handle 成功注册商品:{goods_name} 的{len(fun_list)}个使用前函数")
|
|
|
|
|
|
|
|
|
|
|
|
def register_use_after_handle(self, goods_name: str, fun_list: List[Callable]):
|
|
|
|
|
|
"""
|
|
|
|
|
|
说明:
|
|
|
|
|
|
注册商品使用后函数
|
|
|
|
|
|
参数:
|
|
|
|
|
|
:param goods_name: 商品名称
|
|
|
|
|
|
:param fun_list: 函数列表
|
|
|
|
|
|
"""
|
|
|
|
|
|
if fun_list:
|
|
|
|
|
|
self._data[goods_name]["after_handle"] = fun_list
|
|
|
|
|
|
logger.info(f"register_use_after_handle 成功注册商品:{goods_name} 的{len(fun_list)}个使用后函数")
|
|
|
|
|
|
|
2022-01-16 14:52:50 +08:00
|
|
|
|
def register_use(self, goods_name: str, **kwargs):
|
|
|
|
|
|
"""
|
|
|
|
|
|
注册商品使用方法
|
|
|
|
|
|
:param goods_name: 商品名称
|
|
|
|
|
|
:param kwargs: kwargs
|
|
|
|
|
|
"""
|
|
|
|
|
|
self._data[goods_name] = kwargs
|
|
|
|
|
|
|
|
|
|
|
|
def exists(self, goods_name: str) -> bool:
|
|
|
|
|
|
"""
|
|
|
|
|
|
判断商品使用方法是否被注册
|
|
|
|
|
|
:param goods_name: 商品名称
|
|
|
|
|
|
"""
|
2022-10-15 19:49:53 +08:00
|
|
|
|
return bool(self._data.get(goods_name))
|
2022-01-16 14:52:50 +08:00
|
|
|
|
|
|
|
|
|
|
def get_max_num_limit(self, goods_name: str) -> int:
|
|
|
|
|
|
"""
|
|
|
|
|
|
获取单次商品使用数量
|
|
|
|
|
|
:param goods_name: 商品名称
|
|
|
|
|
|
"""
|
|
|
|
|
|
if self.exists(goods_name):
|
2022-05-03 09:43:38 +08:00
|
|
|
|
return self._data[goods_name]["kwargs"]["max_num_limit"]
|
2022-01-16 14:52:50 +08:00
|
|
|
|
return 1
|
|
|
|
|
|
|
2022-10-15 19:49:53 +08:00
|
|
|
|
def _parse_args(self, args_: MappingProxyType, param: ShopParam, **kwargs):
|
|
|
|
|
|
param_list_ = []
|
|
|
|
|
|
_bot = param.bot
|
|
|
|
|
|
param.bot = None
|
|
|
|
|
|
param_json = param.dict()
|
|
|
|
|
|
param_json["bot"] = _bot
|
|
|
|
|
|
for par in args_.keys():
|
|
|
|
|
|
if par in ["shop_param"]:
|
|
|
|
|
|
param_list_.append(param)
|
|
|
|
|
|
elif par not in ["args", "kwargs"]:
|
|
|
|
|
|
param_list_.append(param_json.get(par))
|
|
|
|
|
|
if kwargs.get(par) is not None:
|
|
|
|
|
|
del kwargs[par]
|
|
|
|
|
|
return param_list_
|
|
|
|
|
|
|
2022-05-03 02:03:06 +08:00
|
|
|
|
async def use(
|
|
|
|
|
|
self, param: ShopParam, **kwargs
|
|
|
|
|
|
) -> Optional[Union[str, MessageSegment]]:
|
2022-01-16 14:52:50 +08:00
|
|
|
|
"""
|
|
|
|
|
|
使用道具
|
2022-05-03 02:03:06 +08:00
|
|
|
|
:param param: BaseModel
|
2022-01-16 14:52:50 +08:00
|
|
|
|
:param kwargs: kwargs
|
|
|
|
|
|
"""
|
2022-05-03 02:03:06 +08:00
|
|
|
|
goods_name = param.goods_name
|
2022-01-16 14:52:50 +08:00
|
|
|
|
if self.exists(goods_name):
|
2022-10-15 19:49:53 +08:00
|
|
|
|
# 使用方法
|
2022-05-03 15:31:34 +08:00
|
|
|
|
args = inspect.signature(self._data[goods_name]["func"]).parameters
|
|
|
|
|
|
if args and list(args.keys())[0] != "kwargs":
|
2022-05-03 02:03:06 +08:00
|
|
|
|
if asyncio.iscoroutinefunction(self._data[goods_name]["func"]):
|
|
|
|
|
|
return await self._data[goods_name]["func"](
|
2022-10-15 19:49:53 +08:00
|
|
|
|
*self._parse_args(args, param, **kwargs)
|
2022-05-03 02:03:06 +08:00
|
|
|
|
)
|
|
|
|
|
|
else:
|
|
|
|
|
|
return self._data[goods_name]["func"](
|
2022-10-15 19:49:53 +08:00
|
|
|
|
*self._parse_args(args, param, **kwargs)
|
2022-05-03 02:03:06 +08:00
|
|
|
|
)
|
2022-01-16 14:52:50 +08:00
|
|
|
|
else:
|
2022-05-03 02:03:06 +08:00
|
|
|
|
if asyncio.iscoroutinefunction(self._data[goods_name]["func"]):
|
|
|
|
|
|
return await self._data[goods_name]["func"](
|
|
|
|
|
|
**kwargs,
|
|
|
|
|
|
)
|
|
|
|
|
|
else:
|
|
|
|
|
|
return self._data[goods_name]["func"](
|
|
|
|
|
|
**kwargs,
|
|
|
|
|
|
)
|
2022-01-16 14:52:50 +08:00
|
|
|
|
|
2022-10-15 19:49:53 +08:00
|
|
|
|
async def run_handle(self, goods_name: str, type_: str, param: ShopParam, **kwargs):
|
|
|
|
|
|
if self._data[goods_name].get(type_):
|
|
|
|
|
|
for func in self._data[goods_name].get(type_):
|
|
|
|
|
|
args = inspect.signature(func).parameters
|
|
|
|
|
|
if args and list(args.keys())[0] != "kwargs":
|
|
|
|
|
|
if asyncio.iscoroutinefunction(func):
|
|
|
|
|
|
await func(*self._parse_args(args, param, **kwargs))
|
|
|
|
|
|
else:
|
|
|
|
|
|
func(*self._parse_args(args, param, **kwargs))
|
|
|
|
|
|
else:
|
|
|
|
|
|
if asyncio.iscoroutinefunction(func):
|
|
|
|
|
|
await func(**kwargs)
|
|
|
|
|
|
else:
|
|
|
|
|
|
func(**kwargs)
|
|
|
|
|
|
|
2022-01-16 14:52:50 +08:00
|
|
|
|
def check_send_success_message(self, goods_name: str) -> bool:
|
|
|
|
|
|
"""
|
|
|
|
|
|
检查是否发送使用成功信息
|
|
|
|
|
|
:param goods_name: 商品名称
|
|
|
|
|
|
"""
|
|
|
|
|
|
if self.exists(goods_name):
|
|
|
|
|
|
return bool(self._data[goods_name]["kwargs"]["send_success_msg"])
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def get_kwargs(self, goods_name: str) -> dict:
|
|
|
|
|
|
"""
|
|
|
|
|
|
获取商品使用方法的kwargs
|
|
|
|
|
|
:param goods_name: 商品名称
|
|
|
|
|
|
"""
|
|
|
|
|
|
if self.exists(goods_name):
|
|
|
|
|
|
return self._data[goods_name]["kwargs"]
|
|
|
|
|
|
return {}
|
|
|
|
|
|
|
2022-05-03 02:03:06 +08:00
|
|
|
|
def init_model(self, goods_name: str, bot: Bot, event: GroupMessageEvent, num: int):
|
|
|
|
|
|
return self._data[goods_name]["model"](
|
|
|
|
|
|
**{
|
|
|
|
|
|
"goods_name": goods_name,
|
|
|
|
|
|
"bot": bot,
|
|
|
|
|
|
"event": event,
|
|
|
|
|
|
"user_id": event.user_id,
|
|
|
|
|
|
"group_id": event.group_id,
|
|
|
|
|
|
"num": num,
|
|
|
|
|
|
}
|
|
|
|
|
|
)
|
|
|
|
|
|
|
2022-01-05 22:32:59 +08:00
|
|
|
|
|
2022-01-16 14:52:50 +08:00
|
|
|
|
func_manager = GoodsUseFuncManager()
|
2022-01-05 22:32:59 +08:00
|
|
|
|
|
|
|
|
|
|
|
2022-10-15 19:49:53 +08:00
|
|
|
|
def build_params(
|
|
|
|
|
|
bot: Bot, event: GroupMessageEvent, goods_name: str, num: int
|
|
|
|
|
|
) -> Tuple[ShopParam, Dict[str, Any]]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
说明:
|
|
|
|
|
|
构造参数
|
|
|
|
|
|
参数:
|
|
|
|
|
|
:param bot: bot
|
|
|
|
|
|
:param event: event
|
|
|
|
|
|
:param goods_name: 商品名称
|
|
|
|
|
|
:param num: 数量
|
|
|
|
|
|
:return:
|
|
|
|
|
|
"""
|
|
|
|
|
|
_kwargs = func_manager.get_kwargs(goods_name)
|
|
|
|
|
|
return (
|
|
|
|
|
|
func_manager.init_model(goods_name, bot, event, num),
|
|
|
|
|
|
{
|
|
|
|
|
|
**_kwargs,
|
|
|
|
|
|
"_bot": bot,
|
|
|
|
|
|
"event": event,
|
|
|
|
|
|
"group_id": event.group_id,
|
|
|
|
|
|
"user_id": event.user_id,
|
|
|
|
|
|
"num": num,
|
|
|
|
|
|
"goods_name": goods_name,
|
|
|
|
|
|
},
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
2022-02-09 20:05:49 +08:00
|
|
|
|
async def effect(
|
|
|
|
|
|
bot: Bot, event: GroupMessageEvent, goods_name: str, num: int
|
|
|
|
|
|
) -> Optional[Union[str, MessageSegment]]:
|
2022-01-05 22:32:59 +08:00
|
|
|
|
"""
|
|
|
|
|
|
商品生效
|
2022-01-16 14:52:50 +08:00
|
|
|
|
:param bot: Bot
|
|
|
|
|
|
:param event: GroupMessageEvent
|
2022-01-05 22:32:59 +08:00
|
|
|
|
:param goods_name: 商品名称
|
2022-01-16 14:52:50 +08:00
|
|
|
|
:param num: 使用数量
|
2022-01-05 22:32:59 +08:00
|
|
|
|
:return: 使用是否成功
|
|
|
|
|
|
"""
|
|
|
|
|
|
# 优先使用注册的商品插件
|
2022-05-03 02:03:06 +08:00
|
|
|
|
# try:
|
|
|
|
|
|
if func_manager.exists(goods_name):
|
|
|
|
|
|
_kwargs = func_manager.get_kwargs(goods_name)
|
2022-10-15 19:49:53 +08:00
|
|
|
|
model, kwargs = build_params(bot, event, goods_name, num)
|
|
|
|
|
|
return await func_manager.use(model, **kwargs)
|
2022-05-03 02:03:06 +08:00
|
|
|
|
# except Exception as e:
|
|
|
|
|
|
# logger.error(f"use 商品生效函数effect 发生错误 {type(e)}:{e}")
|
2022-02-09 20:05:49 +08:00
|
|
|
|
return None
|
2022-01-05 22:32:59 +08:00
|
|
|
|
|
|
|
|
|
|
|
2022-10-15 19:49:53 +08:00
|
|
|
|
def register_use(goods_name: str, func: Callable, **kwargs):
|
2022-01-05 22:32:59 +08:00
|
|
|
|
"""
|
|
|
|
|
|
注册商品使用方法
|
|
|
|
|
|
:param goods_name: 商品名称
|
|
|
|
|
|
:param func: 使用函数
|
|
|
|
|
|
:param kwargs: kwargs
|
|
|
|
|
|
"""
|
2022-01-16 14:52:50 +08:00
|
|
|
|
if func_manager.exists(goods_name):
|
2022-01-05 22:32:59 +08:00
|
|
|
|
raise ValueError("该商品使用函数已被注册!")
|
2022-01-16 14:52:50 +08:00
|
|
|
|
# 发送使用成功信息
|
2022-05-03 02:03:06 +08:00
|
|
|
|
kwargs["send_success_msg"] = kwargs.get("send_success_msg", True)
|
2022-05-03 09:43:38 +08:00
|
|
|
|
kwargs["max_num_limit"] = kwargs.get("max_num_limit", 1)
|
2022-05-03 02:03:06 +08:00
|
|
|
|
func_manager.register_use(
|
|
|
|
|
|
goods_name,
|
|
|
|
|
|
**{
|
|
|
|
|
|
"func": func,
|
|
|
|
|
|
"model": create_model(f"{goods_name}_model", __base__=ShopParam, **kwargs),
|
|
|
|
|
|
"kwargs": kwargs,
|
|
|
|
|
|
},
|
2022-01-16 14:52:50 +08:00
|
|
|
|
)
|
|
|
|
|
|
logger.info(f"register_use 成功注册商品:{goods_name} 的使用函数")
|