mirror of
https://github.com/zhenxun-org/zhenxun_bot.git
synced 2025-12-14 21:52:56 +08:00
update v0.1.5.1
This commit is contained in:
parent
56573d1d34
commit
901a90ff13
@ -242,6 +242,10 @@ __Docker 最新版本由 [Sakuracio](https://github.com/Sakuracio) 提供__
|
||||
|
||||
## 更新
|
||||
|
||||
### 2022/5/
|
||||
|
||||
* 商品使用函数可以添加参数ShopParam,从中获取user_id等以及自己提供的参数
|
||||
|
||||
### 2022/5/1
|
||||
|
||||
* 删除了`group_last_chat`插件(该功能可由`chat_history`替代
|
||||
|
||||
@ -1 +1 @@
|
||||
__version__: v0.1.5.0
|
||||
__version__: v0.1.5.1
|
||||
@ -7,6 +7,7 @@ from typing import Optional, Union
|
||||
from configs.config import Config
|
||||
from nonebot import Driver
|
||||
from nonebot.plugin import require
|
||||
# from utils.decorator.shop import shop_register
|
||||
import nonebot
|
||||
import time
|
||||
|
||||
@ -20,10 +21,14 @@ async def init_default_shop_goods():
|
||||
"""
|
||||
导入内置的三个商品
|
||||
"""
|
||||
async def sign_card(**kwargs):
|
||||
user_id = kwargs['user_id']
|
||||
group_id = kwargs['group_id']
|
||||
prob = kwargs["prob"]
|
||||
|
||||
# @shop_register(
|
||||
# name="好感度双倍加持卡Ⅰ",
|
||||
# price=30,
|
||||
# des="下次签到双倍好感度概率 + 10%(谁才是真命天子?)(同类商品将覆盖)",
|
||||
# ** {"prob": 0.1}
|
||||
# )
|
||||
async def sign_card(user_id: int, group_id: int, prob: float):
|
||||
user = await SignGroupUser.ensure(user_id, group_id)
|
||||
await user.update(add_probability=prob).apply()
|
||||
|
||||
@ -151,12 +156,13 @@ async def register_goods(
|
||||
des = kwargs.get("des")
|
||||
discount = kwargs.get("discount")
|
||||
limit_time = kwargs.get("time_limit")
|
||||
limit_time = float(limit_time) if limit_time else limit_time
|
||||
discount = discount if discount is None else 1
|
||||
limit_time = int(time.time() + limit_time * 60 * 60) if limit_time is not None and limit_time != 0 else 0
|
||||
return await GoodsInfo.add_goods(
|
||||
name, int(price), des, float(discount), limit_time
|
||||
)
|
||||
if await GoodsInfo.get_goods_info(name):
|
||||
limit_time = float(limit_time) if limit_time else limit_time
|
||||
discount = discount if discount is None else 1
|
||||
limit_time = int(time.time() + limit_time * 60 * 60) if limit_time is not None and limit_time != 0 else 0
|
||||
return await GoodsInfo.add_goods(
|
||||
name, int(price), des, float(discount), limit_time
|
||||
)
|
||||
|
||||
|
||||
# 删除商品
|
||||
|
||||
@ -1,7 +1,11 @@
|
||||
from nonebot.adapters.onebot.v11 import GroupMessageEvent, MessageSegment
|
||||
from services.log import logger
|
||||
from nonebot.adapters.onebot.v11 import Bot
|
||||
from pydantic import create_model
|
||||
from utils.models import ShopParam
|
||||
from typing import Optional, Union
|
||||
from types import MappingProxyType
|
||||
import inspect
|
||||
import asyncio
|
||||
|
||||
|
||||
@ -33,21 +37,50 @@ class GoodsUseFuncManager:
|
||||
return self._data[goods_name]["kwargs"]["_max_num_limit"]
|
||||
return 1
|
||||
|
||||
async def use(self, **kwargs) -> Optional[Union[str, MessageSegment]]:
|
||||
async def use(
|
||||
self, param: ShopParam, **kwargs
|
||||
) -> Optional[Union[str, MessageSegment]]:
|
||||
"""
|
||||
使用道具
|
||||
:param param: BaseModel
|
||||
:param kwargs: kwargs
|
||||
"""
|
||||
goods_name = kwargs.get("goods_name")
|
||||
def parse_args(args_: MappingProxyType):
|
||||
param_list_ = []
|
||||
_bot = param.bot
|
||||
param.bot = None
|
||||
param_json = param.dict()
|
||||
param_json["bot"] = _bot
|
||||
for par in args_.keys():
|
||||
if par in ["shop_param"]:
|
||||
param_list_.append(param)
|
||||
elif par not in ["args", "kwargs"]:
|
||||
param_list_.append(param_json.get(par))
|
||||
if kwargs.get(par) is not None:
|
||||
del kwargs[par]
|
||||
return param_list_
|
||||
goods_name = param.goods_name
|
||||
if self.exists(goods_name):
|
||||
if asyncio.iscoroutinefunction(self._data[goods_name]["func"]):
|
||||
return await self._data[goods_name]["func"](
|
||||
**kwargs,
|
||||
)
|
||||
if args := inspect.signature(self._data[goods_name]["func"]).parameters:
|
||||
print(parse_args(args))
|
||||
print(kwargs)
|
||||
if asyncio.iscoroutinefunction(self._data[goods_name]["func"]):
|
||||
return await self._data[goods_name]["func"](
|
||||
*parse_args(args)
|
||||
)
|
||||
else:
|
||||
return self._data[goods_name]["func"](
|
||||
**kwargs,
|
||||
)
|
||||
else:
|
||||
return self._data[goods_name]["func"](
|
||||
**kwargs,
|
||||
)
|
||||
if asyncio.iscoroutinefunction(self._data[goods_name]["func"]):
|
||||
return await self._data[goods_name]["func"](
|
||||
**kwargs,
|
||||
)
|
||||
else:
|
||||
return self._data[goods_name]["func"](
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def check_send_success_message(self, goods_name: str) -> bool:
|
||||
"""
|
||||
@ -67,6 +100,18 @@ class GoodsUseFuncManager:
|
||||
return self._data[goods_name]["kwargs"]
|
||||
return {}
|
||||
|
||||
def init_model(self, goods_name: str, bot: Bot, event: GroupMessageEvent, num: int):
|
||||
return self._data[goods_name]["model"](
|
||||
**{
|
||||
"goods_name": goods_name,
|
||||
"bot": bot,
|
||||
"event": event,
|
||||
"user_id": event.user_id,
|
||||
"group_id": event.group_id,
|
||||
"num": num,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
func_manager = GoodsUseFuncManager()
|
||||
|
||||
@ -83,22 +128,23 @@ async def effect(
|
||||
:return: 使用是否成功
|
||||
"""
|
||||
# 优先使用注册的商品插件
|
||||
try:
|
||||
if func_manager.exists(goods_name):
|
||||
_kwargs = func_manager.get_kwargs(goods_name)
|
||||
return await func_manager.use(
|
||||
**{
|
||||
**_kwargs,
|
||||
"_bot": bot,
|
||||
"event": event,
|
||||
"group_id": event.group_id,
|
||||
"user_id": event.user_id,
|
||||
"num": num,
|
||||
"goods_name": goods_name,
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"use 商品生效函数effect 发生错误 {type(e)}:{e}")
|
||||
# try:
|
||||
if func_manager.exists(goods_name):
|
||||
_kwargs = func_manager.get_kwargs(goods_name)
|
||||
return await func_manager.use(
|
||||
func_manager.init_model(goods_name, bot, event, num),
|
||||
**{
|
||||
**_kwargs,
|
||||
"_bot": bot,
|
||||
"event": event,
|
||||
"group_id": event.group_id,
|
||||
"user_id": event.user_id,
|
||||
"num": num,
|
||||
"goods_name": goods_name,
|
||||
},
|
||||
)
|
||||
# except Exception as e:
|
||||
# logger.error(f"use 商品生效函数effect 发生错误 {type(e)}:{e}")
|
||||
return None
|
||||
|
||||
|
||||
@ -112,10 +158,14 @@ def register_use(goods_name: str, func, **kwargs):
|
||||
if func_manager.exists(goods_name):
|
||||
raise ValueError("该商品使用函数已被注册!")
|
||||
# 发送使用成功信息
|
||||
if kwargs.get("send_success_msg") is None:
|
||||
kwargs["send_success_msg"] = True
|
||||
kwargs["_max_num_limit"] = (
|
||||
kwargs.get("_max_num_limit") if kwargs.get("_max_num_limit") else 1
|
||||
kwargs["send_success_msg"] = kwargs.get("send_success_msg", True)
|
||||
kwargs["_max_num_limit"] = kwargs.get("_max_num_limit", 1)
|
||||
func_manager.register_use(
|
||||
goods_name,
|
||||
**{
|
||||
"func": func,
|
||||
"model": create_model(f"{goods_name}_model", __base__=ShopParam, **kwargs),
|
||||
"kwargs": kwargs,
|
||||
},
|
||||
)
|
||||
func_manager.register_use(goods_name, **{"func": func, "kwargs": kwargs})
|
||||
logger.info(f"register_use 成功注册商品:{goods_name} 的使用函数")
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
from typing import Tuple, Any
|
||||
|
||||
from .group_user_checkin import (
|
||||
group_user_check_in,
|
||||
group_user_check,
|
||||
@ -8,9 +10,9 @@ from .group_user_checkin import (
|
||||
from nonebot.adapters.onebot.v11 import GroupMessageEvent, Message
|
||||
from nonebot.adapters.onebot.v11.permission import GROUP
|
||||
from utils.message_builder import image
|
||||
from nonebot import on_command
|
||||
from nonebot import on_command, on_regex
|
||||
from utils.utils import scheduler
|
||||
from nonebot.params import CommandArg
|
||||
from nonebot.params import CommandArg, RegexGroup
|
||||
from pathlib import Path
|
||||
from configs.path_config import DATA_PATH
|
||||
from services.log import logger
|
||||
@ -68,7 +70,7 @@ except (FileNotFoundError, ValueError, TypeError):
|
||||
data = {"0": []}
|
||||
|
||||
|
||||
sign = on_command("签到", priority=5, permission=GROUP, block=True)
|
||||
sign = on_regex("^签到(all)?$", priority=5, permission=GROUP, block=True)
|
||||
my_sign = on_command(
|
||||
cmd="我的签到", aliases={"好感度"}, priority=5, permission=GROUP, block=True
|
||||
)
|
||||
@ -85,13 +87,13 @@ total_sign_rank = on_command(
|
||||
|
||||
|
||||
@sign.handle()
|
||||
async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
|
||||
async def _(event: GroupMessageEvent, reg_group: Tuple[Any, ...] = RegexGroup()):
|
||||
nickname = event.sender.card or event.sender.nickname
|
||||
await sign.send(
|
||||
await group_user_check_in(nickname, event.user_id, event.group_id),
|
||||
at_sender=True,
|
||||
)
|
||||
if arg.extract_plain_text().strip() == "all":
|
||||
if reg_group[0]:
|
||||
await check_in_all(nickname, event.user_id)
|
||||
|
||||
|
||||
|
||||
0
utils/decorator/__init__.py
Normal file
0
utils/decorator/__init__.py
Normal file
60
utils/decorator/shop.py
Normal file
60
utils/decorator/shop.py
Normal file
@ -0,0 +1,60 @@
|
||||
#
|
||||
# from functools import wraps
|
||||
# from typing import Union, List, Callable
|
||||
# from nonebot.plugin import require
|
||||
# from nonebot.adapters.onebot.v11 import Bot
|
||||
# import asyncio
|
||||
# import nonebot
|
||||
#
|
||||
# driver = nonebot.get_driver()
|
||||
#
|
||||
# use = require("use")
|
||||
# shop = require("shop_handle")
|
||||
#
|
||||
# flag = False
|
||||
#
|
||||
# name_list = []
|
||||
#
|
||||
# func_list = []
|
||||
#
|
||||
#
|
||||
# def shop_register(
|
||||
# name: Union[str, List[str]],
|
||||
# price: Union[int, List[int]],
|
||||
# des: Union[str, List[str]],
|
||||
# discount: Union[float, List[float]] = 1,
|
||||
# limit_time: Union[int, List[int]] = 0,
|
||||
# status: bool = True,
|
||||
# **kwargs_
|
||||
# ):
|
||||
# print("---------")
|
||||
# print("name:", name)
|
||||
# print("price:", price)
|
||||
# print("des:", des)
|
||||
# print("discount:", discount)
|
||||
# print("limit_time:", limit_time)
|
||||
# print("status:", status)
|
||||
# print("kwargs:", kwargs_)
|
||||
# asyncio.run(shop.register_goods(
|
||||
# name, 30, price, discount, limit_time
|
||||
# ))
|
||||
#
|
||||
# def _register_use(goods_func: Callable):
|
||||
# def _wrapper(**kwargs):
|
||||
# # print(*args)
|
||||
# print(**kwargs)
|
||||
# print(1111111111111111)
|
||||
# use.register_use(name, goods_func, **kwargs)
|
||||
# # func_list.append({"name": name, "func": goods_func, "args": args, "kwargs": kwargs})
|
||||
# return _wrapper
|
||||
#
|
||||
# return _register_use
|
||||
#
|
||||
#
|
||||
# @driver.on_bot_connect
|
||||
# async def do_something(bot: Bot):
|
||||
# for func in func_list:
|
||||
# if asyncio.iscoroutinefunction(func):
|
||||
# await func()
|
||||
# else:
|
||||
# func()
|
||||
14
utils/models/__init__.py
Normal file
14
utils/models/__init__.py
Normal file
@ -0,0 +1,14 @@
|
||||
from nonebot.adapters.onebot.v11 import Bot, MessageEvent
|
||||
from pydantic import BaseModel, create_model
|
||||
from typing import Any
|
||||
|
||||
|
||||
class ShopParam(BaseModel):
|
||||
goods_name: str
|
||||
user_id: int
|
||||
group_id: int
|
||||
bot: Any
|
||||
event: MessageEvent
|
||||
num: int # 道具单次使用数量
|
||||
send_success_msg: bool = True # 是否发送使用成功信息
|
||||
_max_num_limit: int = 1 # 单次使用最大次数
|
||||
Loading…
Reference in New Issue
Block a user