perf👌: 道具使用与签到

This commit is contained in:
HibiKier 2024-03-05 08:29:46 +08:00
parent db96f46dcb
commit 5a50a2bff4
9 changed files with 342 additions and 178 deletions

View File

@ -1,6 +1,7 @@
import os import os
from nonebot import require from nonebot import require
from nonebot.drivers import Driver
from tortoise import Tortoise from tortoise import Tortoise
from zhenxun.models.goods_info import GoodsInfo from zhenxun.models.goods_info import GoodsInfo
@ -27,7 +28,7 @@ for d in os.listdir(path):
nonebot.load_plugins(str((path / d).resolve())) nonebot.load_plugins(str((path / d).resolve()))
driver = nonebot.get_driver() driver: Driver = nonebot.get_driver()
flag = True flag = True
@ -52,7 +53,7 @@ from public.bag_users t1
""" """
@driver.on_bot_connect @driver.on_startup
async def _(): async def _():
global flag global flag
await shop_register.load_register() await shop_register.load_register()

View File

@ -1,6 +1,14 @@
from nonebot.adapters import Bot, Event
from nonebot.plugin import PluginMetadata from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import Alconna, Args, Arparma, Subcommand, on_alconna from nonebot_plugin_alconna import (
from nonebot_plugin_saa import Image, Text Alconna,
Args,
Arparma,
Subcommand,
UniMsg,
on_alconna,
)
from nonebot_plugin_saa import Image, MessageFactory, Text
from nonebot_plugin_session import EventSession from nonebot_plugin_session import EventSession
from nonebot_plugin_userinfo import EventUserInfo, UserInfo from nonebot_plugin_userinfo import EventUserInfo, UserInfo
@ -121,5 +129,18 @@ async def _(session: EventSession, arparma: Arparma, name: str, num: int):
@_matcher.assign("use") @_matcher.assign("use")
async def _(session: EventSession, arparma: Arparma, name: str, num: int): async def _(
pass bot: Bot,
event: Event,
message: UniMsg,
session: EventSession,
arparma: Arparma,
name: str,
num: int,
):
result = await ShopManage.use(bot, event, session, message, name, num, "")
logger.info(f"使用道具 {name}, 数量: {num}", arparma.header_result, session=session)
if isinstance(result, str):
await Text(result).send(reply=True)
elif isinstance(result, MessageFactory):
await result.finish(reply=True)

View File

@ -1,7 +1,13 @@
import asyncio
import inspect
import time import time
from typing import Any, Callable, Dict from types import MappingProxyType
from typing import Any, Callable, Dict, Literal
from nonebot.adapters import Event from nonebot.adapters import Bot, Event
from nonebot_plugin_alconna import UniMsg
from nonebot_plugin_saa import MessageFactory
from nonebot_plugin_session import EventSession
from pydantic import BaseModel, create_model from pydantic import BaseModel, create_model
from zhenxun.configs.path_config import IMAGE_PATH from zhenxun.configs.path_config import IMAGE_PATH
@ -18,13 +24,24 @@ ICON_PATH = IMAGE_PATH / "shop_icon"
class Goods(BaseModel): class Goods(BaseModel):
name: str
"""商品名称"""
before_handle: list[Callable] = [] before_handle: list[Callable] = []
"""使用前函数"""
after_handle: list[Callable] = [] after_handle: list[Callable] = []
"""使用后函数"""
func: Callable | None = None func: Callable | None = None
params: Any | None = None """使用函数"""
params: Any = None
"""参数"""
send_success_msg: bool = True send_success_msg: bool = True
"""使用成功是否发送消息"""
max_num_limit: int = 1 max_num_limit: int = 1
model: Any | None = None """单次使用最大次数"""
model: Any = None
"""model"""
session: EventSession | None = None
"""EventSession"""
class ShopParam(BaseModel): class ShopParam(BaseModel):
@ -41,23 +58,221 @@ class ShopParam(BaseModel):
"""event""" """event"""
num: int num: int
"""道具单次使用数量""" """道具单次使用数量"""
message: str
"""message"""
text: str text: str
"""text""" """text"""
send_success_msg: bool = True send_success_msg: bool = True
"""是否发送使用成功信息""" """是否发送使用成功信息"""
max_num_limit: int = 1 max_num_limit: int = 1
"""单次使用最大次数""" """单次使用最大次数"""
session: EventSession | None = None
"""EventSession"""
class ShopManage: class ShopManage:
uuid2goods: Dict[str, Goods] = {} uuid2goods: Dict[str, Goods] = {}
@classmethod
def __build_params(
cls,
bot: Bot,
event: Event,
session: EventSession,
message: UniMsg,
goods: Goods,
num: int,
text: str,
) -> tuple[ShopParam, Dict[str, Any]]:
"""构造参数
参数:
bot: bot
event: event
goods_name: 商品名称
num: 数量
text: 其他信息
"""
_kwargs = goods.params
model = goods.model(
**{
"goods_name": goods.name,
"bot": bot,
"event": event,
"user_id": session.id1,
"group_id": session.id3 or session.id2,
"num": num,
"text": text,
"session": session,
}
)
return model, {
**_kwargs,
"_bot": bot,
"event": event,
"user_id": session.id1,
"group_id": session.id3 or session.id2,
"num": num,
"text": text,
"goods_name": goods.name,
}
@classmethod
def __parse_args(
cls,
args: MappingProxyType,
param: ShopParam,
session: EventSession,
message: UniMsg,
**kwargs,
) -> list[Any]:
"""解析参数
参数:
args: MappingProxyType
param: ShopParam
返回:
list[Any]: 参数
"""
param_list = []
_bot = param.bot
param.bot = None
param_json = param.dict()
param_json["bot"] = _bot
for par in args.keys():
if par in ["shop_param"]:
param_list.append(param)
elif par in ["session"]:
param_list.append(session)
elif par in ["message"]:
param_list.append(message)
elif par not in ["args", "kwargs"]:
param_list.append(param_json.get(par))
if kwargs.get(par) is not None:
del kwargs[par]
return param_list
@classmethod
async def run_before_after(
cls,
goods: Goods,
param: ShopParam,
run_type: Literal["after", "before"],
**kwargs,
):
"""运行使用前使用后函数
参数:
goods: Goods
param: 参数
run_type: 运行类型
"""
fun_list = goods.before_handle if run_type == "before" else goods.after_handle
if fun_list:
for func in fun_list:
args = inspect.signature(func).parameters
if args and list(args.keys())[0] != "kwargs":
if asyncio.iscoroutinefunction(func):
await func(*cls.__parse_args(args, param, **kwargs))
else:
func(*cls.__parse_args(args, param, **kwargs))
else:
if asyncio.iscoroutinefunction(func):
await func(**kwargs)
else:
func(**kwargs)
@classmethod
async def __run(
cls,
goods: Goods,
param: ShopParam,
session: EventSession,
message: UniMsg,
**kwargs,
) -> str | MessageFactory | None:
"""运行道具函数
参数:
goods: Goods
param: ShopParam
返回:
str | MessageFactory | None: 使用完成后返回信息
"""
args = inspect.signature(goods.func).parameters # type: ignore
if goods.func:
if args and list(args.keys())[0] != "kwargs":
if asyncio.iscoroutinefunction(goods.func):
return await goods.func(
*cls.__parse_args(args, param, session, message, **kwargs)
)
else:
return goods.func(
*cls.__parse_args(args, param, session, message, **kwargs)
)
else:
if asyncio.iscoroutinefunction(goods.func):
return await goods.func(
**kwargs,
)
else:
return goods.func(**kwargs)
@classmethod
async def use(
cls,
bot: Bot,
event: Event,
session: EventSession,
message: UniMsg,
goods_name: str,
num: int,
text: str,
) -> str | MessageFactory | None:
"""使用道具
参数:
bot: Bot
event: Event
session: Session
message: 消息
goods_name: 商品名称
num: 使用数量
text: 其他信息
返回:
str | MessageFactory | None: 使用完成后返回信息
"""
if goods_name.isdigit():
user = await UserConsole.get_user(user_id=session.id1) # type: ignore
uuid = list(user.props.keys())[int(goods_name)]
goods_info = await GoodsInfo.get_or_none(uuid=uuid)
else:
goods_info = await GoodsInfo.get_or_none(goods_name=goods_name)
if not goods_info:
return f"{goods_name} 不存在..."
if goods_info.is_passive:
return f"{goods_name} 是被动道具, 无法使用..."
goods = cls.uuid2goods.get(goods_info.uuid)
if not goods or not goods.func:
return f"{goods_name} 未注册使用函数, 无法使用..."
param, kwargs = cls.__build_params(
bot, event, session, message, goods, num, text
)
if num > param.max_num_limit:
return f"{goods_name} 单次使用最大数量为{param.max_num_limit}..."
await cls.run_before_after(goods, param, "before", **kwargs)
result = await cls.__run(goods, param, session, message, **kwargs)
await cls.run_before_after(goods, param, "after", **kwargs)
if not result and param.send_success_msg:
result = f"使用道具 {goods.name} {num} 次成功!"
return result
@classmethod @classmethod
async def register_use( async def register_use(
cls, cls,
name: str,
uuid: str, uuid: str,
func: Callable, func: Callable,
send_success_msg: bool = True, send_success_msg: bool = True,
@ -83,22 +298,35 @@ class ShopManage:
raise ValueError("该商品使用函数已被注册!") raise ValueError("该商品使用函数已被注册!")
kwargs["send_success_msg"] = send_success_msg kwargs["send_success_msg"] = send_success_msg
kwargs["max_num_limit"] = max_num_limit kwargs["max_num_limit"] = max_num_limit
cls.uuid2func = Goods( cls.uuid2goods[uuid] = Goods(
model=create_model(f"{uuid}_model", __base__=ShopParam, **kwargs), model=create_model(f"{uuid}_model", __base__=ShopParam, **kwargs),
params=kwargs, params=kwargs,
before_handle=before_handle, before_handle=before_handle,
after_handle=after_handle, after_handle=after_handle,
name=name,
func=func,
) )
@classmethod @classmethod
async def buy_prop( async def buy_prop(
cls, user_id: str, name: str, num: int = 1, platform: str | None = None cls, user_id: str, name: str, num: int = 1, platform: str | None = None
) -> str: ) -> str:
"""购买道具
参数:
user_id: 用户id
name: 道具名称
num: 购买数量.
platform: 平台.
返回:
str: 返回小
"""
if name == "神秘药水": if name == "神秘药水":
return "你们看看就好啦,这是不可能卖给你们的~" return "你们看看就好啦,这是不可能卖给你们的~"
if num < 0: if num < 0:
return "购买的数量要大于0!" return "购买的数量要大于0!"
goods_list = await GoodsInfo.annotate().order_by("-id").all() goods_list = await GoodsInfo.annotate().order_by("id").all()
goods_list = [ goods_list = [
goods goods
for goods in goods_list for goods in goods_list

View File

@ -19,6 +19,7 @@ from zhenxun.utils.image_utils import BuildImage, ImageTemplate
from zhenxun.utils.utils import get_user_avatar from zhenxun.utils.utils import get_user_avatar
from ._random_event import random_event from ._random_event import random_event
from .goods_register import driver
from .utils import SIGN_TODAY_CARD_PATH, get_card from .utils import SIGN_TODAY_CARD_PATH, get_card
ICON_PATH = IMAGE_PATH / "_icon" ICON_PATH = IMAGE_PATH / "_icon"

View File

@ -12,13 +12,14 @@ from zhenxun.utils.decorator.shop import NotMeetUseConditionsException, shop_reg
driver: Driver = nonebot.get_driver() driver: Driver = nonebot.get_driver()
@driver.on_startup # @driver.on_startup
async def _(): # async def _():
""" # """
导入内置的三个商品 # 导入内置的三个商品
""" # """
@shop_register(
@shop_register(
name=("好感度双倍加持卡Ⅰ", "好感度双倍加持卡Ⅱ", "好感度双倍加持卡Ⅲ"), name=("好感度双倍加持卡Ⅰ", "好感度双倍加持卡Ⅱ", "好感度双倍加持卡Ⅲ"),
price=(30, 150, 250), price=(30, 150, 250),
des=( des=(
@ -26,15 +27,15 @@ async def _():
"下次签到双倍好感度概率 + 20%(平平庸庸)(同类商品将覆盖)", "下次签到双倍好感度概率 + 20%(平平庸庸)(同类商品将覆盖)",
"下次签到双倍好感度概率 + 30%(金币才是真命天子!)(同类商品将覆盖)", "下次签到双倍好感度概率 + 30%(金币才是真命天子!)(同类商品将覆盖)",
), ),
load_status=bool(Config.get_config("shop", "IMPORT_DEFAULT_SHOP_GOODS")), load_status=True,
icon=( icon=(
"favorability_card_1.png", "favorability_card_1.png",
"favorability_card_2.png", "favorability_card_2.png",
"favorability_card_3.png", "favorability_card_3.png",
), ),
**{"好感度双倍加持卡_prob": 0.1, "好感度双倍加持卡Ⅱ_prob": 0.2, "好感度双倍加持卡Ⅲ_prob": 0.3}, # type: ignore **{"好感度双倍加持卡_prob": 0.1, "好感度双倍加持卡Ⅱ_prob": 0.2, "好感度双倍加持卡Ⅲ_prob": 0.3}, # type: ignore
) )
async def _(session: EventSession, user_id: int, group_id: int, prob: float): async def _(session: EventSession, user_id: int, group_id: int, prob: float):
if session.id1: if session.id1:
user_console = await UserConsole.get_user(session.id1, session.platform) user_console = await UserConsole.get_user(session.id1, session.platform)
user, _ = await SignUser.get_or_create( user, _ = await SignUser.get_or_create(
@ -44,27 +45,29 @@ async def _():
user.add_probability = Decimal(prob) user.add_probability = Decimal(prob)
await user.save(update_fields=["add_probability"]) await user.save(update_fields=["add_probability"])
@shop_register(
@shop_register(
name="测试道具A", name="测试道具A",
price=99, price=99,
des="随便侧而出", des="随便侧而出",
load_status=False, load_status=False,
icon="sword.png", icon="sword.png",
) )
async def _(user_id: int, group_id: int): async def _(user_id: int, group_id: int):
print(user_id, group_id, "使用测试道具") print(user_id, group_id, "使用测试道具")
@shop_register.before_handle(name="测试道具A", load_status=False)
async def _(user_id: int, group_id: int): @shop_register.before_handle(name="测试道具A", load_status=False)
async def _(user_id: int, group_id: int):
print(user_id, group_id, "第一个使用前函数before handle") print(user_id, group_id, "第一个使用前函数before handle")
@shop_register.before_handle(name="测试道具A", load_status=False)
async def _(user_id: int, group_id: int):
print(user_id, group_id, "第二个使用前函数before handle222")
raise NotMeetUseConditionsException(
"太笨了!"
) # 抛出异常,阻断使用,并返回信息
@shop_register.after_handle(name="测试道具A", load_status=False) @shop_register.before_handle(name="测试道具A", load_status=False)
async def _(user_id: int, group_id: int): async def _(user_id: int, group_id: int):
print(user_id, group_id, "第二个使用前函数before handle222")
raise NotMeetUseConditionsException("太笨了!") # 抛出异常,阻断使用,并返回信息
@shop_register.after_handle(name="测试道具A", load_status=False)
async def _(user_id: int, group_id: int):
print(user_id, group_id, "第一个使用后函数after handle") print(user_id, group_id, "第一个使用后函数after handle")

View File

@ -46,7 +46,7 @@ class GoodsInfo(Model):
daily_limit: int = 0, daily_limit: int = 0,
is_passive: bool = False, is_passive: bool = False,
icon: str | None = None, icon: str | None = None,
) -> str | None: ) -> str:
"""添加商品 """添加商品
参数: 参数:
@ -73,7 +73,8 @@ class GoodsInfo(Model):
icon=icon, icon=icon,
) )
return str(uuid_) return str(uuid_)
return None else:
return (await cls.get(goods_name=goods_name)).uuid
@classmethod @classmethod
async def delete_goods(cls, goods_name: str) -> bool: async def delete_goods(cls, goods_name: str) -> bool:

View File

@ -43,11 +43,15 @@ class UserConsole(Model):
返回: 返回:
UserConsole: UserConsole UserConsole: UserConsole
""" """
user, _ = await UserConsole.get_or_create( if not await cls.exists(user_id=user_id):
user_id=user_id, await cls.create(
defaults={"platform": platform, "uid": await cls.get_new_uid()}, user_id=user_id, platform=platform, uid=await cls.get_new_uid()
) )
return user # user, _ = await UserConsole.get_or_create(
# user_id=user_id,
# defaults={"platform": platform, "uid": await cls.get_new_uid()},
# )
return await cls.get(user_id=user_id)
@classmethod @classmethod
async def get_new_uid(cls) -> int: async def get_new_uid(cls) -> int:
@ -56,7 +60,7 @@ class UserConsole(Model):
返回: 返回:
int: 最新uid int: 最新uid
""" """
if user := await cls.annotate().order_by("uid").first(): if user := await cls.annotate().order_by("-uid").first():
return user.uid + 1 return user.uid + 1
return 1 return 1

View File

@ -1,105 +0,0 @@
import os
import nonebot
import ujson as json
from tortoise import Tortoise
from zhenxun.models.goods_info import GoodsInfo
from zhenxun.models.sign_user import SignUser
from zhenxun.models.user_console import UserConsole
from zhenxun.services.log import logger
driver = nonebot.get_driver()
flag = True
SIGN_SQL = """
select distinct on("user_id") t1.user_id, t1.checkin_count, t1.add_probability, t1.specify_probability, t1.impression
from public.sign_group_users t1
join (
select user_id, max(t2.impression) as max_impression
from public.sign_group_users t2
group by user_id
) t on t.user_id = t1.user_id and t.max_impression = t1.impression
"""
BAG_SQL = """
select t1.user_id, t1.gold, t1.property
from public.bag_users t1
join (
select user_id, max(t2.gold) as max_gold
from public.bag_users t2
group by user_id
) t on t.user_id = t1.user_id and t.max_gold = t1.gold
"""
@driver.on_startup
async def _test():
global flag
if (
flag
and not await UserConsole.annotate().count()
and not await SignUser.annotate().count()
):
flag = False
db = Tortoise.get_connection("default")
old_sign_list = await db.execute_query_dict(SIGN_SQL)
old_bag_list = await db.execute_query_dict(BAG_SQL)
goods = {
g["goods_name"]: g["uuid"]
for g in await GoodsInfo.annotate().values("goods_name", "uuid")
}
create_list = []
sign_id_list = []
uid = await UserConsole.get_new_uid()
for old_sign in old_sign_list:
sign_id_list.append(old_sign["user_id"])
old_bag = [b for b in old_bag_list if b["user_id"] == old_sign["user_id"]]
if old_bag:
old_bag = old_bag[0]
property = json.loads(old_bag["property"])
props = {}
if property:
for name, num in property.items():
if name in goods:
props[goods[name]] = num
create_list.append(
UserConsole(
user_id=old_sign["user_id"],
platform="qq",
uid=uid,
props=props,
gold=old_bag["gold"],
)
)
else:
create_list.append(
UserConsole(user_id=old_sign["user_id"], platform="qq", uid=uid)
)
uid += 1
if create_list:
logger.info("开始迁移用户数据...")
await UserConsole.bulk_create(create_list, 10)
logger.info("迁移用户数据完成!")
create_list.clear()
uc_dict = {u.user_id: u for u in await UserConsole.all()}
for old_sign in old_sign_list:
user_console = uc_dict.get(old_sign["user_id"])
if not user_console:
user_console = await UserConsole.get_user(old_sign["user_id"], "qq")
create_list.append(
SignUser(
user_id=old_sign["user_id"],
user_console=user_console,
platform="qq",
sign_count=old_sign["checkin_count"],
impression=old_sign["impression"],
add_probability=old_sign["add_probability"],
specify_probability=old_sign["specify_probability"],
)
)
if create_list:
logger.info("开始迁移签到数据...")
await SignUser.bulk_create(create_list, 10)
logger.info("迁移签到数据完成!")

View File

@ -141,6 +141,7 @@ class ShopRegister(dict):
goods.kwargs = _temp_kwargs goods.kwargs = _temp_kwargs
goods.send_success_msg = ssm goods.send_success_msg = ssm
goods.max_num_limit = mnl goods.max_num_limit = mnl
self._data[n] = goods
return func return func
return lambda func: add_register_item(func) return lambda func: add_register_item(func)
@ -167,6 +168,7 @@ class ShopRegister(dict):
) )
if uuid: if uuid:
await ShopManage.register_use( await ShopManage.register_use(
name,
uuid, uuid,
goods.func, goods.func,
goods.send_success_msg, goods.send_success_msg,
@ -187,6 +189,8 @@ class ShopRegister(dict):
daily_limit: int | tuple[int, ...] = 0, daily_limit: int | tuple[int, ...] = 0,
is_passive: bool | tuple[bool, ...] = False, is_passive: bool | tuple[bool, ...] = False,
icon: str | tuple[str, ...] = "", icon: str | tuple[str, ...] = "",
send_success_msg: bool | tuple[bool, ...] = True,
max_num_limit: int | tuple[int, ...] = 1,
**kwargs, **kwargs,
): ):
"""注册商品 """注册商品
@ -201,6 +205,8 @@ class ShopRegister(dict):
daily_limit: 每日限购 daily_limit: 每日限购
is_passive: 是否被动道具 is_passive: 是否被动道具
icon: 图标 icon: 图标
send_success_msg: 成功时发送消息
max_num_limit: 单次最大使用次数
""" """
_tuple_list = [] _tuple_list = []
_current_len = -1 _current_len = -1
@ -222,6 +228,8 @@ class ShopRegister(dict):
_daily_limit = self.__get(daily_limit, _current_len) _daily_limit = self.__get(daily_limit, _current_len)
_is_passive = self.__get(is_passive, _current_len) _is_passive = self.__get(is_passive, _current_len)
_icon = self.__get(icon, _current_len) _icon = self.__get(icon, _current_len)
_send_success_msg = self.__get(send_success_msg, _current_len)
_max_num_limit = self.__get(max_num_limit, _current_len)
return self.register( return self.register(
_name, _name,
_price, _price,
@ -232,6 +240,8 @@ class ShopRegister(dict):
_daily_limit, _daily_limit,
_is_passive, _is_passive,
_icon, _icon,
_send_success_msg,
_max_num_limit,
**kwargs, **kwargs,
) )