feat: 数据迁移

This commit is contained in:
HibiKier 2024-03-04 23:27:05 +08:00
parent a2d6c7f951
commit db96f46dcb
12 changed files with 682 additions and 230 deletions

View File

@ -1,6 +1,13 @@
import os
from nonebot import require
from tortoise import Tortoise
from zhenxun.models.goods_info import GoodsInfo
from zhenxun.models.sign_user import SignUser
from zhenxun.models.user_console import UserConsole
from zhenxun.services.log import logger
from zhenxun.utils.decorator.shop import shop_register
require("nonebot_plugin_apscheduler")
require("nonebot_plugin_alconna")
@ -13,7 +20,105 @@ enable_auto_select_bot()
from pathlib import Path
import nonebot
import ujson as json
path = Path(__file__).parent / "platform"
for d in os.listdir(path):
nonebot.load_plugins(str((path / d).resolve()))
driver = nonebot.get_driver()
flag = True
SIGN_SQL = """
select distinct on("user_id") t1.user_id, t1.checkin_count, t1.add_probability, t1.specify_probability, t1.impression
from public.sign_group_users t1
join (
select user_id, max(t2.impression) as max_impression
from public.sign_group_users t2
group by user_id
) t on t.user_id = t1.user_id and t.max_impression = t1.impression
"""
BAG_SQL = """
select t1.user_id, t1.gold, t1.property
from public.bag_users t1
join (
select user_id, max(t2.gold) as max_gold
from public.bag_users t2
group by user_id
) t on t.user_id = t1.user_id and t.max_gold = t1.gold
"""
@driver.on_bot_connect
async def _():
global flag
await shop_register.load_register()
if (
flag
and not await UserConsole.annotate().count()
and not await SignUser.annotate().count()
):
flag = False
db = Tortoise.get_connection("default")
old_sign_list = await db.execute_query_dict(SIGN_SQL)
old_bag_list = await db.execute_query_dict(BAG_SQL)
goods = {
g["goods_name"]: g["uuid"]
for g in await GoodsInfo.annotate().values("goods_name", "uuid")
}
create_list = []
sign_id_list = []
uid = await UserConsole.get_new_uid()
for old_sign in old_sign_list:
sign_id_list.append(old_sign["user_id"])
old_bag = [b for b in old_bag_list if b["user_id"] == old_sign["user_id"]]
if old_bag:
old_bag = old_bag[0]
property = json.loads(old_bag["property"])
props = {}
if property:
for name, num in property.items():
if name in goods:
props[goods[name]] = num
create_list.append(
UserConsole(
user_id=old_sign["user_id"],
platform="qq",
uid=uid,
props=props,
gold=old_bag["gold"],
)
)
else:
create_list.append(
UserConsole(user_id=old_sign["user_id"], platform="qq", uid=uid)
)
uid += 1
if create_list:
logger.info("开始迁移用户数据...")
await UserConsole.bulk_create(create_list, 10)
logger.info("迁移用户数据完成!")
create_list.clear()
uc_dict = {u.user_id: u for u in await UserConsole.all()}
for old_sign in old_sign_list:
user_console = uc_dict.get(old_sign["user_id"])
if not user_console:
user_console = await UserConsole.get_user(old_sign["user_id"], "qq")
create_list.append(
SignUser(
user_id=old_sign["user_id"],
user_console=user_console,
platform="qq",
sign_count=old_sign["checkin_count"],
impression=old_sign["impression"],
add_probability=old_sign["add_probability"],
specify_probability=old_sign["specify_probability"],
)
)
if create_list:
logger.info("开始迁移签到数据...")
await SignUser.bulk_create(create_list, 10)
logger.info("迁移签到数据完成!")

View File

@ -40,4 +40,4 @@ Config.add_plugin_config(
type=int,
)
nonebot.load_plugins(str(Path(__file__).parent.resolve()))
# nonebot.load_plugins(str(Path(__file__).parent.resolve()))

View File

@ -39,6 +39,7 @@ async def _handle_setting(
setting = extra_data.setting or PluginSetting()
if metadata.type == "library":
extra_data.plugin_type = PluginType.HIDDEN
extra_data.menu_type = ""
plugin_list.append(
PluginInfo(
module=plugin.name,

View File

@ -1,7 +1,6 @@
from asyncio.exceptions import TimeoutError
import nonebot
import ujson as json
from nonebot.drivers import Driver
from nonebot_plugin_apscheduler import scheduler
@ -9,6 +8,12 @@ from zhenxun.configs.path_config import TEXT_PATH
from zhenxun.services.log import logger
from zhenxun.utils.http_utils import AsyncHttpx
try:
import ujson as json
except ModuleNotFoundError:
import json
driver: Driver = nonebot.get_driver()
@ -40,13 +45,13 @@ async def update_city():
data[provinces_data[province]].append(city_data[city])
with open(china_city, "w", encoding="utf8") as f:
json.dump(data, f, indent=4, ensure_ascii=False)
logger.info("自动更新城市列表完成...")
logger.info("自动更新城市列表完成.....")
except TimeoutError as e:
logger.warning("自动更新城市列表超时...", e=e)
except ValueError as e:
logger.warning("自动城市列表失败...", e=e)
logger.warning("自动城市列表失败.....", e=e)
except Exception as e:
logger.error(f"自动城市列表未知错误...", e=e)
logger.error(f"自动城市列表未知错误", e=e)
# 自动更新城市列表

View File

@ -1,5 +1,8 @@
import time
from typing import Dict
from typing import Any, Callable, Dict
from nonebot.adapters import Event
from pydantic import BaseModel, create_model
from zhenxun.configs.path_config import IMAGE_PATH
from zhenxun.models.goods_info import GoodsInfo
@ -13,8 +16,80 @@ from zhenxun.utils.image_utils import BuildImage, ImageTemplate, text2image
ICON_PATH = IMAGE_PATH / "shop_icon"
class Goods(BaseModel):
before_handle: list[Callable] = []
after_handle: list[Callable] = []
func: Callable | None = None
params: Any | None = None
send_success_msg: bool = True
max_num_limit: int = 1
model: Any | None = None
class ShopParam(BaseModel):
goods_name: str
"""商品名称"""
user_id: int
"""用户id"""
group_id: int
"""群聊id"""
bot: Any
"""bot"""
event: Event
"""event"""
num: int
"""道具单次使用数量"""
message: str
"""message"""
text: str
"""text"""
send_success_msg: bool = True
"""是否发送使用成功信息"""
max_num_limit: int = 1
"""单次使用最大次数"""
class ShopManage:
uuid2goods: Dict[str, Goods] = {}
@classmethod
async def register_use(
cls,
uuid: str,
func: Callable,
send_success_msg: bool = True,
max_num_limit: int = 1,
before_handle: list[Callable] = [],
after_handle: list[Callable] = [],
**kwargs,
):
"""注册使用方法
参数:
uuid: uuid
func: 使用函数
send_success_msg: 使用成功时发送消息.
max_num_limit: 单次最大使用限制.
before_handle: 使用前函数.
after_handle: 使用后函数.
异常:
ValueError: 该商品使用函数已被注册
"""
if uuid in cls.uuid2goods:
raise ValueError("该商品使用函数已被注册!")
kwargs["send_success_msg"] = send_success_msg
kwargs["max_num_limit"] = max_num_limit
cls.uuid2func = Goods(
model=create_model(f"{uuid}_model", __base__=ShopParam, **kwargs),
params=kwargs,
before_handle=before_handle,
after_handle=after_handle,
)
@classmethod
async def buy_prop(
cls, user_id: str, name: str, num: int = 1, platform: str | None = None

View File

@ -1,160 +1,161 @@
# from typing import Dict
from typing import Dict
# from services.db_context import Model
# from tortoise import fields
from tortoise import fields
# from .goods_info import GoodsInfo
from zhenxun.services.db_context import Model
from .goods_info import GoodsInfo
# class BagUser(Model):
class BagUser(Model):
# id = fields.IntField(pk=True, generated=True, auto_increment=True)
# """自增id"""
# user_id = fields.CharField(255)
# """用户id"""
# group_id = fields.CharField(255)
# """群聊id"""
# gold = fields.IntField(default=100)
# """金币数量"""
# spend_total_gold = fields.IntField(default=0)
# """花费金币总数"""
# get_total_gold = fields.IntField(default=0)
# """获取金币总数"""
# get_today_gold = fields.IntField(default=0)
# """今日获取金币"""
# spend_today_gold = fields.IntField(default=0)
# """今日获取金币"""
# property: Dict[str, int] = fields.JSONField(default={}) # type: ignore
# """道具"""
id = fields.IntField(pk=True, generated=True, auto_increment=True)
"""自增id"""
user_id = fields.CharField(255)
"""用户id"""
group_id = fields.CharField(255)
"""群聊id"""
gold = fields.IntField(default=100)
"""金币数量"""
spend_total_gold = fields.IntField(default=0)
"""花费金币总数"""
get_total_gold = fields.IntField(default=0)
"""获取金币总数"""
get_today_gold = fields.IntField(default=0)
"""今日获取金币"""
spend_today_gold = fields.IntField(default=0)
"""今日获取金币"""
property: Dict[str, int] = fields.JSONField(default={}) # type: ignore
"""道具"""
# class Meta:
# table = "bag_users"
# table_description = "用户道具数据表"
# unique_together = ("user_id", "group_id")
class Meta:
table = "bag_users"
table_description = "用户道具数据表"
unique_together = ("user_id", "group_id")
# @classmethod
# async def get_gold(cls, user_id: str, group_id: str) -> int:
# """获取当前金币
@classmethod
async def get_gold(cls, user_id: str, group_id: str) -> int:
"""获取当前金币
# 参数:
# user_id: 用户id
# group_id: 所在群组id
参数:
user_id: 用户id
group_id: 所在群组id
# 返回:
# int: 金币数量
# """
# user, _ = await cls.get_or_create(user_id=user_id, group_id=group_id)
# return user.gold
返回:
int: 金币数量
"""
user, _ = await cls.get_or_create(user_id=user_id, group_id=group_id)
return user.gold
# @classmethod
# async def get_property(
# cls, user_id: str, group_id: str, only_active: bool = False
# ) -> Dict[str, int]:
# """获取当前道具
@classmethod
async def get_property(
cls, user_id: str, group_id: str, only_active: bool = False
) -> Dict[str, int]:
"""获取当前道具
# 参数:
# user_id: 用户id
# group_id: 所在群组id
# only_active: 仅仅获取主动使用的道具
参数:
user_id: 用户id
group_id: 所在群组id
only_active: 仅仅获取主动使用的道具
# 返回:
# Dict[str, int]: 道具名称与数量
# """
# user, _ = await cls.get_or_create(user_id=user_id, group_id=group_id)
# if only_active and user.property:
# data = {}
# name_list = [
# x.goods_name
# for x in await GoodsInfo.get_all_goods()
# if not x.is_passive
# ]
# for key in [x for x in user.property if x in name_list]:
# data[key] = user.property[key]
# return data
# return user.property
返回:
Dict[str, int]: 道具名称与数量
"""
user, _ = await cls.get_or_create(user_id=user_id, group_id=group_id)
if only_active and user.property:
data = {}
name_list = [
x.goods_name
for x in await GoodsInfo.get_all_goods()
if not x.is_passive
]
for key in [x for x in user.property if x in name_list]:
data[key] = user.property[key]
return data
return user.property
# @classmethod
# async def add_gold(cls, user_id: str, group_id: str, num: int):
# """增加金币
@classmethod
async def add_gold(cls, user_id: str, group_id: str, num: int):
"""增加金币
# 参数:
# user_id: 用户id
# group_id: 所在群组id
# num: 金币数量
# """
# user, _ = await cls.get_or_create(user_id=user_id, group_id=group_id)
# user.gold = user.gold + num
# user.get_total_gold = user.get_total_gold + num
# user.get_today_gold = user.get_today_gold + num
# await user.save(update_fields=["gold", "get_today_gold", "get_total_gold"])
参数:
user_id: 用户id
group_id: 所在群组id
num: 金币数量
"""
user, _ = await cls.get_or_create(user_id=user_id, group_id=group_id)
user.gold = user.gold + num
user.get_total_gold = user.get_total_gold + num
user.get_today_gold = user.get_today_gold + num
await user.save(update_fields=["gold", "get_today_gold", "get_total_gold"])
# @classmethod
# async def spend_gold(cls, user_id: str, group_id: str, num: int):
# """花费金币
@classmethod
async def spend_gold(cls, user_id: str, group_id: str, num: int):
"""花费金币
# 参数:
# user_id: 用户id
# group_id: 所在群组id
# num: 金币数量
# """
# user, _ = await cls.get_or_create(user_id=str(user_id), group_id=str(group_id))
# user.gold = user.gold - num
# user.spend_total_gold = user.spend_total_gold + num
# user.spend_today_gold = user.spend_today_gold + num
# await user.save(update_fields=["gold", "spend_total_gold", "spend_today_gold"])
参数:
user_id: 用户id
group_id: 所在群组id
num: 金币数量
"""
user, _ = await cls.get_or_create(user_id=str(user_id), group_id=str(group_id))
user.gold = user.gold - num
user.spend_total_gold = user.spend_total_gold + num
user.spend_today_gold = user.spend_today_gold + num
await user.save(update_fields=["gold", "spend_total_gold", "spend_today_gold"])
# @classmethod
# async def add_property(cls, user_id: str, group_id: str, name: str, num: int = 1):
# """增加道具
@classmethod
async def add_property(cls, user_id: str, group_id: str, name: str, num: int = 1):
"""增加道具
# 参数:
# user_id: 用户id
# group_id: 所在群组id
# name: 道具名称
# num: 道具数量
# """
# user, _ = await cls.get_or_create(user_id=str(user_id), group_id=str(group_id))
# property_ = user.property
# if property_.get(name) is None:
# property_[name] = 0
# property_[name] += num
# user.property = property_
# await user.save(update_fields=["property"])
参数:
user_id: 用户id
group_id: 所在群组id
name: 道具名称
num: 道具数量
"""
user, _ = await cls.get_or_create(user_id=str(user_id), group_id=str(group_id))
property_ = user.property
if property_.get(name) is None:
property_[name] = 0
property_[name] += num
user.property = property_
await user.save(update_fields=["property"])
# @classmethod
# async def delete_property(
# cls, user_id: str, group_id: str, name: str, num: int = 1
# ) -> bool:
# """使用/删除 道具
@classmethod
async def delete_property(
cls, user_id: str, group_id: str, name: str, num: int = 1
) -> bool:
"""使用/删除 道具
# 参数:
# user_id: 用户id
# group_id: 所在群组id
# name: 道具名称
# num: 使用个数
参数:
user_id: 用户id
group_id: 所在群组id
name: 道具名称
num: 使用个数
# 返回:
# bool: 是否使用/删除成功
# """
# user, _ = await cls.get_or_create(user_id=str(user_id), group_id=str(group_id))
# property_ = user.property
# if name in property_:
# if (n := property_.get(name, 0)) < num:
# return False
# if n == num:
# del property_[name]
# else:
# property_[name] -= num
# await user.save(update_fields=["property"])
# return True
# return False
返回:
bool: 是否使用/删除成功
"""
user, _ = await cls.get_or_create(user_id=str(user_id), group_id=str(group_id))
property_ = user.property
if name in property_:
if (n := property_.get(name, 0)) < num:
return False
if n == num:
del property_[name]
else:
property_[name] -= num
await user.save(update_fields=["property"])
return True
return False
# @classmethod
# async def _run_script(cls):
# return [
# "ALTER TABLE bag_users DROP props;", # 删除 props 字段
# "ALTER TABLE bag_users RENAME COLUMN user_qq TO user_id;", # 将user_qq改为user_id
# "ALTER TABLE bag_users ALTER COLUMN user_id TYPE character varying(255);",
# # 将user_id字段类型改为character varying(255)
# "ALTER TABLE bag_users ALTER COLUMN group_id TYPE character varying(255);",
# ]
@classmethod
async def _run_script(cls):
return [
"ALTER TABLE bag_users DROP props;", # 删除 props 字段
"ALTER TABLE bag_users RENAME COLUMN user_qq TO user_id;", # 将user_qq改为user_id
"ALTER TABLE bag_users ALTER COLUMN user_id TYPE character varying(255);",
# 将user_id字段类型改为character varying(255)
"ALTER TABLE bag_users ALTER COLUMN group_id TYPE character varying(255);",
]

View File

@ -46,7 +46,7 @@ class GoodsInfo(Model):
daily_limit: int = 0,
is_passive: bool = False,
icon: str | None = None,
):
) -> str | None:
"""添加商品
参数:
@ -60,8 +60,9 @@ class GoodsInfo(Model):
icon: 图标
"""
if not await cls.exists(goods_name=goods_name):
uuid_ = uuid.uuid1()
await cls.create(
uuid=uuid.uuid1(),
uuid=uuid_,
goods_name=goods_name,
goods_price=goods_price,
goods_description=goods_description,
@ -71,6 +72,8 @@ class GoodsInfo(Model):
is_passive=is_passive,
icon=icon,
)
return str(uuid_)
return None
@classmethod
async def delete_goods(cls, goods_name: str) -> bool:

View File

@ -0,0 +1,81 @@
from datetime import datetime
from typing import List, Literal, Optional, Tuple, Union
from tortoise import fields
from zhenxun.services.db_context import Model
class SignGroupUser(Model):
id = fields.IntField(pk=True, generated=True, auto_increment=True)
"""自增id"""
user_id = fields.CharField(255)
"""用户id"""
group_id = fields.CharField(255)
"""群聊id"""
checkin_count = fields.IntField(default=0)
"""签到次数"""
checkin_time_last = fields.DatetimeField(default=datetime.min)
"""最后签到时间"""
impression = fields.DecimalField(10, 3, default=0)
"""好感度"""
add_probability = fields.DecimalField(10, 3, default=0)
"""双倍签到增加概率"""
specify_probability = fields.DecimalField(10, 3, default=0)
"""使用指定双倍概率"""
# specify_probability = fields.DecimalField(10, 3, default=0)
class Meta:
table = "sign_group_users"
table_description = "群员签到数据表"
unique_together = ("user_id", "group_id")
@classmethod
async def sign(cls, user: "SignGroupUser", impression: float):
"""
说明:
签到
说明:
:param user: 用户
:param impression: 增加的好感度
"""
user.checkin_time_last = datetime.now()
user.checkin_count = user.checkin_count + 1
user.add_probability = 0
user.specify_probability = 0
user.impression = float(user.impression) + impression
await user.save()
@classmethod
async def get_all_impression(
cls, group_id: Union[int, str]
) -> Tuple[List[str], List[float], List[str]]:
"""
说明:
获取该群所有用户 id 及对应 好感度
参数:
:param group_id: 群号
"""
if group_id:
query = cls.filter(group_id=str(group_id))
else:
query = cls
value_list = await query.all().values_list("user_id", "group_id", "impression") # type: ignore
user_list = []
group_list = []
impression_list = []
for value in value_list:
user_list.append(value[0])
group_list.append(value[1])
impression_list.append(float(value[2]))
return user_list, impression_list, group_list
@classmethod
async def _run_script(cls):
return [
"ALTER TABLE sign_group_users RENAME COLUMN user_qq TO user_id;", # 将user_id改为user_id
"ALTER TABLE sign_group_users ALTER COLUMN user_id TYPE character varying(255);",
# 将user_id字段类型改为character varying(255)
"ALTER TABLE sign_group_users ALTER COLUMN group_id TYPE character varying(255);",
]

View File

@ -15,7 +15,7 @@ class UserConsole(Model):
"""自增id"""
user_id = fields.CharField(255, unique=True, description="用户id")
"""用户id"""
uid = fields.IntField(description="UID")
uid = fields.IntField(description="UID", unique=True)
"""UID"""
gold = fields.IntField(default=100, description="金币数量")
"""金币数量"""

105
zhenxun/plugins/__init__.py Normal file
View File

@ -0,0 +1,105 @@
import os
import nonebot
import ujson as json
from tortoise import Tortoise
from zhenxun.models.goods_info import GoodsInfo
from zhenxun.models.sign_user import SignUser
from zhenxun.models.user_console import UserConsole
from zhenxun.services.log import logger
driver = nonebot.get_driver()
flag = True
SIGN_SQL = """
select distinct on("user_id") t1.user_id, t1.checkin_count, t1.add_probability, t1.specify_probability, t1.impression
from public.sign_group_users t1
join (
select user_id, max(t2.impression) as max_impression
from public.sign_group_users t2
group by user_id
) t on t.user_id = t1.user_id and t.max_impression = t1.impression
"""
BAG_SQL = """
select t1.user_id, t1.gold, t1.property
from public.bag_users t1
join (
select user_id, max(t2.gold) as max_gold
from public.bag_users t2
group by user_id
) t on t.user_id = t1.user_id and t.max_gold = t1.gold
"""
@driver.on_startup
async def _test():
global flag
if (
flag
and not await UserConsole.annotate().count()
and not await SignUser.annotate().count()
):
flag = False
db = Tortoise.get_connection("default")
old_sign_list = await db.execute_query_dict(SIGN_SQL)
old_bag_list = await db.execute_query_dict(BAG_SQL)
goods = {
g["goods_name"]: g["uuid"]
for g in await GoodsInfo.annotate().values("goods_name", "uuid")
}
create_list = []
sign_id_list = []
uid = await UserConsole.get_new_uid()
for old_sign in old_sign_list:
sign_id_list.append(old_sign["user_id"])
old_bag = [b for b in old_bag_list if b["user_id"] == old_sign["user_id"]]
if old_bag:
old_bag = old_bag[0]
property = json.loads(old_bag["property"])
props = {}
if property:
for name, num in property.items():
if name in goods:
props[goods[name]] = num
create_list.append(
UserConsole(
user_id=old_sign["user_id"],
platform="qq",
uid=uid,
props=props,
gold=old_bag["gold"],
)
)
else:
create_list.append(
UserConsole(user_id=old_sign["user_id"], platform="qq", uid=uid)
)
uid += 1
if create_list:
logger.info("开始迁移用户数据...")
await UserConsole.bulk_create(create_list, 10)
logger.info("迁移用户数据完成!")
create_list.clear()
uc_dict = {u.user_id: u for u in await UserConsole.all()}
for old_sign in old_sign_list:
user_console = uc_dict.get(old_sign["user_id"])
if not user_console:
user_console = await UserConsole.get_user(old_sign["user_id"], "qq")
create_list.append(
SignUser(
user_id=old_sign["user_id"],
user_console=user_console,
platform="qq",
sign_count=old_sign["checkin_count"],
impression=old_sign["impression"],
add_probability=old_sign["add_probability"],
specify_probability=old_sign["specify_probability"],
)
)
if create_list:
logger.info("开始迁移签到数据...")
await SignUser.bulk_create(create_list, 10)
logger.info("迁移签到数据完成!")

View File

@ -47,7 +47,7 @@ class TestSQL(Model):
async def init():
if not bind and not any([user, password, address, port, database]):
raise ValueError("\n数据库配置未填写.......")
raise ValueError("\n数据库配置未填写...")
i_bind = bind
if not i_bind:
i_bind = f"{sql_name}://{user}:{password}@{address}:{port}/{database}"

View File

@ -1,70 +1,116 @@
from typing import Callable, Union, Tuple, Optional
from nonebot.adapters.onebot.v11 import MessageSegment, Message
from typing import Any, Callable, Dict
from nonebot.adapters.onebot.v11 import Message, MessageSegment
from nonebot.plugin import require
from pydantic import BaseModel
from zhenxun.models.goods_info import GoodsInfo
class Goods(BaseModel):
before_handle: list[Callable] = []
after_handle: list[Callable] = []
price: int
des: str = ""
discount: float
limit_time: int
daily_limit: int
icon: str | None = None
is_passive: bool
func: Callable
kwargs: Dict[str, str] = {}
send_success_msg: bool
max_num_limit: int
class ShopRegister(dict):
def __init__(self, *args, **kwargs):
super(ShopRegister, self).__init__(*args, **kwargs)
self._data = {}
self._data: Dict[str, Goods] = {}
self._flag = True
def before_handle(self, name: Union[str, Tuple[str, ...]], load_status: bool = True):
"""
说明:
使用前检查方法
def before_handle(self, name: str | tuple[str, ...], load_status: bool = True):
"""使用前检查方法
参数:
:param name: 道具名称
:param load_status: 加载状态
name: 道具名称
load_status: 加载状态
"""
def register_before_handle(name_list: Tuple[str, ...], func: Callable):
def register_before_handle(name_list: tuple[str, ...], func: Callable):
if load_status:
for name_ in name_list:
if not self._data[name_]:
self._data[name_] = {}
if not self._data[name_].get('before_handle'):
self._data[name_]['before_handle'] = []
self._data[name]['before_handle'].append(func)
if goods := self._data.get(name_):
self._data[name_].before_handle.append(func)
_name = (name,) if isinstance(name, str) else name
return lambda func: register_before_handle(_name, func)
def after_handle(self, name: Union[str, Tuple[str, ...]], load_status: bool = True):
"""
说明:
使用后执行方法
def after_handle(self, name: str | tuple[str, ...], load_status: bool = True):
"""使用后执行方法
参数:
:param name: 道具名称
:param load_status: 加载状态
name: 道具名称
load_status: 加载状态
"""
def register_after_handle(name_list: Tuple[str, ...], func: Callable):
def register_after_handle(name_list: tuple[str, ...], func: Callable):
if load_status:
for name_ in name_list:
if not self._data[name_]:
self._data[name_] = {}
if not self._data[name_].get('after_handle'):
self._data[name_]['after_handle'] = []
self._data[name_]['after_handle'].append(func)
if goods := self._data.get(name_):
self._data[name_].after_handle.append(func)
_name = (name,) if isinstance(name, str) else name
return lambda func: register_after_handle(_name, func)
def register(
self,
name: Tuple[str, ...],
price: Tuple[float, ...],
des: Tuple[str, ...],
discount: Tuple[float, ...],
limit_time: Tuple[int, ...],
load_status: Tuple[bool, ...],
daily_limit: Tuple[int, ...],
is_passive: Tuple[bool, ...],
icon: Tuple[str, ...],
name: tuple[str, ...],
price: tuple[float, ...],
des: tuple[str, ...],
discount: tuple[float, ...],
limit_time: tuple[int, ...],
load_status: tuple[bool, ...],
daily_limit: tuple[int, ...],
is_passive: tuple[bool, ...],
icon: tuple[str, ...],
send_success_msg: tuple[bool, ...],
max_num_limit: tuple[int, ...],
**kwargs,
):
"""注册商品
参数:
name: 商品名称
price: 价格
des: 简介
discount: 折扣
limit_time: 售卖限时时间
load_status: 是否加载
daily_limit: 每日限购
is_passive: 是否被动道具
icon: 图标
send_success_msg: 成功时发送消息
max_num_limit: 单次最大使用次数
"""
def add_register_item(func: Callable):
if name in self._data.keys():
raise ValueError("该商品已注册,请替换其他名称!")
for n, p, d, dd, l, s, dl, pa, i in zip(
name, price, des, discount, limit_time, load_status, daily_limit, is_passive, icon
for n, p, d, dd, l, s, dl, pa, i, ssm, mnl in zip(
name,
price,
des,
discount,
limit_time,
load_status,
daily_limit,
is_passive,
icon,
send_success_msg,
max_num_limit,
):
if s:
_temp_kwargs = {}
@ -73,62 +119,89 @@ class ShopRegister(dict):
_temp_kwargs[key.split("_", maxsplit=1)[-1]] = value
else:
_temp_kwargs[key] = value
temp = self._data.get(n, {})
temp.update({
"price": p,
"des": d,
"discount": dd,
"limit_time": l,
"daily_limit": dl,
"icon": i,
"is_passive": pa,
"func": func,
"kwargs": _temp_kwargs,
})
self._data[n] = temp
goods = self._data.get(n) or Goods(
price=p,
des=d,
discount=dd,
limit_time=l,
daily_limit=dl,
is_passive=pa,
func=func,
send_success_msg=ssm,
max_num_limit=mnl,
)
goods.price = p
goods.des = d
goods.discount = dd
goods.limit_time = l
goods.daily_limit = dl
goods.icon = i
goods.is_passive = pa
goods.func = func
goods.kwargs = _temp_kwargs
goods.send_success_msg = ssm
goods.max_num_limit = mnl
return func
return lambda func: add_register_item(func)
async def load_register(self):
require("use")
require("shop_handle")
from basic_plugins.shop.use.data_source import register_use, func_manager
from basic_plugins.shop.shop_handle.data_source import register_goods
require("shop")
from zhenxun.builtin_plugins.shop._data_source import ShopManage
# 统一进行注册
if self._flag:
# 只进行一次注册
self._flag = False
for name in self._data.keys():
await register_goods(
name,
self._data[name]["price"],
self._data[name]["des"],
self._data[name]["discount"],
self._data[name]["limit_time"],
self._data[name]["daily_limit"],
self._data[name]["is_passive"],
self._data[name]["icon"],
)
register_use(
name, self._data[name]["func"], **self._data[name]["kwargs"]
)
func_manager.register_use_before_handle(name, self._data[name].get('before_handle', []))
func_manager.register_use_after_handle(name, self._data[name].get('after_handle', []))
if goods := self._data.get(name):
uuid = await GoodsInfo.add_goods(
name,
goods.price,
goods.des,
goods.discount,
goods.limit_time,
goods.daily_limit,
goods.is_passive,
goods.icon,
)
if uuid:
await ShopManage.register_use(
uuid,
goods.func,
goods.send_success_msg,
goods.max_num_limit,
goods.before_handle,
goods.after_handle,
**self._data[name].kwargs,
)
def __call__(
self,
name: Union[str, Tuple[str, ...]], # 名称
price: Union[float, Tuple[float, ...]], # 价格
des: Union[str, Tuple[str, ...]], # 简介
discount: Union[float, Tuple[float, ...]] = 1, # 折扣
limit_time: Union[int, Tuple[int, ...]] = 0, # 限时
load_status: Union[bool, Tuple[bool, ...]] = True, # 加载状态
daily_limit: Union[int, Tuple[int, ...]] = 0, # 每日限购
is_passive: Union[bool, Tuple[bool, ...]] = False, # 被动道具(无法被'使用道具'命令消耗)
icon: Union[str, Tuple[str, ...]] = False, # 图标
name: str | tuple[str, ...],
price: float | tuple[float, ...],
des: str | tuple[str, ...],
discount: float | tuple[float, ...] = 1,
limit_time: int | tuple[int, ...] = 0,
load_status: bool | tuple[bool, ...] = True,
daily_limit: int | tuple[int, ...] = 0,
is_passive: bool | tuple[bool, ...] = False,
icon: str | tuple[str, ...] = "",
**kwargs,
):
"""注册商品
参数:
name: 商品名称
price: 价格
des: 简介
discount: 折扣
limit_time: 售卖限时时间
load_status: 是否加载
daily_limit: 每日限购
is_passive: 是否被动道具
icon: 图标
"""
_tuple_list = []
_current_len = -1
for x in [name, price, des, discount, limit_time, load_status]:
@ -163,7 +236,11 @@ class ShopRegister(dict):
)
def __get(self, value, _current_len):
return value if isinstance(value, tuple) else tuple([value for _ in range(_current_len)])
return (
value
if isinstance(value, tuple)
else tuple([value for _ in range(_current_len)])
)
def __setitem__(self, key, value):
self._data[key] = value
@ -188,12 +265,11 @@ class ShopRegister(dict):
class NotMeetUseConditionsException(Exception):
"""
不满足条件异常类
"""
def __init__(self, info: Optional[Union[str, MessageSegment, Message]]):
def __init__(self, info: str | MessageSegment | Message | None):
super().__init__(self)
self._info = info