diff --git a/zhenxun/builtin_plugins/__init__.py b/zhenxun/builtin_plugins/__init__.py index 89e9e62b..b0ae260a 100644 --- a/zhenxun/builtin_plugins/__init__.py +++ b/zhenxun/builtin_plugins/__init__.py @@ -3,6 +3,7 @@ import os from nonebot import require from nonebot.drivers import Driver from tortoise import Tortoise +from tortoise.exceptions import OperationalError from zhenxun.models.goods_info import GoodsInfo from zhenxun.models.sign_user import SignUser @@ -62,64 +63,69 @@ async def _(): and not await UserConsole.annotate().count() and not await SignUser.annotate().count() ): - flag = False - db = Tortoise.get_connection("default") - old_sign_list = await db.execute_query_dict(SIGN_SQL) - old_bag_list = await db.execute_query_dict(BAG_SQL) - goods = { - g["goods_name"]: g["uuid"] - for g in await GoodsInfo.annotate().values("goods_name", "uuid") - } - create_list = [] - sign_id_list = [] - uid = await UserConsole.get_new_uid() - for old_sign in old_sign_list: - sign_id_list.append(old_sign["user_id"]) - old_bag = [b for b in old_bag_list if b["user_id"] == old_sign["user_id"]] - if old_bag: - old_bag = old_bag[0] - property = json.loads(old_bag["property"]) - props = {} - if property: - for name, num in property.items(): - if name in goods: - props[goods[name]] = num + try: + flag = False + db = Tortoise.get_connection("default") + old_sign_list = await db.execute_query_dict(SIGN_SQL) + old_bag_list = await db.execute_query_dict(BAG_SQL) + goods = { + g["goods_name"]: g["uuid"] + for g in await GoodsInfo.annotate().values("goods_name", "uuid") + } + create_list = [] + sign_id_list = [] + uid = await UserConsole.get_new_uid() + for old_sign in old_sign_list: + sign_id_list.append(old_sign["user_id"]) + old_bag = [ + b for b in old_bag_list if b["user_id"] == old_sign["user_id"] + ] + if old_bag: + old_bag = old_bag[0] + property = json.loads(old_bag["property"]) + props = {} + if property: + for name, num in property.items(): + if name in goods: + props[goods[name]] = num + create_list.append( + UserConsole( + user_id=old_sign["user_id"], + platform="qq", + uid=uid, + props=props, + gold=old_bag["gold"], + ) + ) + else: + create_list.append( + UserConsole(user_id=old_sign["user_id"], platform="qq", uid=uid) + ) + uid += 1 + if create_list: + logger.info("开始迁移用户数据...") + await UserConsole.bulk_create(create_list, 10) + logger.info("迁移用户数据完成!") + create_list.clear() + uc_dict = {u.user_id: u for u in await UserConsole.all()} + for old_sign in old_sign_list: + user_console = uc_dict.get(old_sign["user_id"]) + if not user_console: + user_console = await UserConsole.get_user(old_sign["user_id"], "qq") create_list.append( - UserConsole( + SignUser( user_id=old_sign["user_id"], + user_console=user_console, platform="qq", - uid=uid, - props=props, - gold=old_bag["gold"], + sign_count=old_sign["checkin_count"], + impression=old_sign["impression"], + add_probability=old_sign["add_probability"], + specify_probability=old_sign["specify_probability"], ) ) - else: - create_list.append( - UserConsole(user_id=old_sign["user_id"], platform="qq", uid=uid) - ) - uid += 1 - if create_list: - logger.info("开始迁移用户数据...") - await UserConsole.bulk_create(create_list, 10) - logger.info("迁移用户数据完成!") - create_list.clear() - uc_dict = {u.user_id: u for u in await UserConsole.all()} - for old_sign in old_sign_list: - user_console = uc_dict.get(old_sign["user_id"]) - if not user_console: - user_console = await UserConsole.get_user(old_sign["user_id"], "qq") - create_list.append( - SignUser( - user_id=old_sign["user_id"], - user_console=user_console, - platform="qq", - sign_count=old_sign["checkin_count"], - impression=old_sign["impression"], - add_probability=old_sign["add_probability"], - specify_probability=old_sign["specify_probability"], - ) - ) - if create_list: - logger.info("开始迁移签到数据...") - await SignUser.bulk_create(create_list, 10) - logger.info("迁移签到数据完成!") + if create_list: + logger.info("开始迁移签到数据...") + await SignUser.bulk_create(create_list, 10) + logger.info("迁移签到数据完成!") + except OperationalError as e: + logger.warning("数据迁移", e=e) diff --git a/zhenxun/configs/utils/__init__.py b/zhenxun/configs/utils/__init__.py index 7ef9e0f4..abe5d03c 100644 --- a/zhenxun/configs/utils/__init__.py +++ b/zhenxun/configs/utils/__init__.py @@ -161,6 +161,8 @@ class PluginExtraData(BaseModel): """插件限制""" tasks: list[Task] | None = None """技能被动""" + superuser_help: str | None = None + """超级用户帮助""" class NoSuchConfig(Exception): diff --git a/zhenxun/models/user_console.py b/zhenxun/models/user_console.py index a492c317..c743dd54 100644 --- a/zhenxun/models/user_console.py +++ b/zhenxun/models/user_console.py @@ -77,7 +77,8 @@ class UserConsole(Model): platform: 平台. """ user, _ = await cls.get_or_create( - user_id=user_id, defaults={"platform": platform, "uid": cls.get_new_uid()} + user_id=user_id, + defaults={"platform": platform, "uid": await cls.get_new_uid()}, ) user.gold += gold await user.save(update_fields=["gold"]) diff --git a/zhenxun/plugins/epic/__init__.py b/zhenxun/plugins/epic/__init__.py index ea32b277..bc756244 100644 --- a/zhenxun/plugins/epic/__init__.py +++ b/zhenxun/plugins/epic/__init__.py @@ -7,9 +7,8 @@ from nonebot_plugin_apscheduler import scheduler from nonebot_plugin_saa import MessageFactory, Text from nonebot_plugin_session import EventSession -from zhenxun.configs.utils import PluginExtraData, RegisterConfig +from zhenxun.configs.utils import PluginExtraData from zhenxun.services.log import logger -from zhenxun.utils.platform import broadcast_group from .data_source import get_epic_free @@ -22,16 +21,6 @@ __plugin_meta__ = PluginMetadata( extra=PluginExtraData( author="AkashiCoin", version="0.1", - configs=[ - RegisterConfig( - module="_task", - key="DEFAULT_EPIC_FREE_GAME", - value=True, - help="被动 epic免费游戏 进群默认开关状态", - default_value=True, - type=bool, - ), - ], ).dict(), ) @@ -39,7 +28,7 @@ _matcher = on_alconna(Alconna("epic"), priority=5, block=True) @_matcher.handle() -async def handle(bot: Bot, session: EventSession, arparma: Arparma): +async def _(bot: Bot, session: EventSession, arparma: Arparma): gid = session.id3 or session.id2 type_ = "Group" if gid else "Private" msg_list, code = await get_epic_free(bot, type_) diff --git a/zhenxun/plugins/gold_redbag/__init__.py b/zhenxun/plugins/gold_redbag/__init__.py new file mode 100644 index 00000000..6cc29f22 --- /dev/null +++ b/zhenxun/plugins/gold_redbag/__init__.py @@ -0,0 +1,355 @@ +import time +import uuid +from datetime import datetime, timedelta +from typing import List + +from apscheduler.jobstores.base import JobLookupError +from nonebot.adapters import Bot +from nonebot.exception import ActionFailed +from nonebot.permission import SUPERUSER +from nonebot.plugin import PluginMetadata +from nonebot.rule import to_me +from nonebot_plugin_alconna import Alconna, Args, Arparma +from nonebot_plugin_alconna import At as alcAt +from nonebot_plugin_alconna import Match, Option, on_alconna +from nonebot_plugin_apscheduler import scheduler +from nonebot_plugin_saa import Image, Mention, MessageFactory, Text +from nonebot_plugin_session import EventSession + +from zhenxun.configs.config import NICKNAME +from zhenxun.configs.utils import PluginCdBlock, PluginExtraData, RegisterConfig +from zhenxun.services.log import logger +from zhenxun.utils.depends import GetConfig, UserName +from zhenxun.utils.platform import PlatformUtils +from zhenxun.utils.rules import ensure_group + +from .config import FESTIVE_KEY, FestiveRedBagManage +from .data_source import RedBagManager + +__plugin_meta__ = PluginMetadata( + name="金币红包", + description="运气项目又来了", + usage=""" + 塞红包 [金币数] ?[红包数=5] ?[at指定人]: 塞入红包 + 开/抢: 打开红包 + 退回红包: 退回未开完的红包,必须在一分钟后使用 + + * 不同群组同一个节日红包用户只能开一次 + + 示例: + 塞红包 1000 + 塞红包 1000 10 + """.strip(), + extra=PluginExtraData( + author="HibiKier", + version="0.1", + superuser_help=""" + 节日红包 [金额] [红包数] ?[指定主题文字] ? -g [群id] + + * 不同群组同一个节日红包用户只能开一次 + + 示例: + 节日红包 10000 20 今日出道贺金 + 节日红包 10000 20 明日出道贺金 -g 123123123 + + """, + configs=[ + RegisterConfig( + key="DEFAULT_TIMEOUT", + value=600, + help="普通红包默认超时时间", + default_value=600, + type=int, + ), + RegisterConfig( + key="DEFAULT_INTERVAL", + value=60, + help="用户发送普通红包最小间隔时间", + default_value=60, + type=int, + ), + RegisterConfig( + key="RANK_NUM", + value=10, + help="结算排行显示前N位", + default_value=10, + type=int, + ), + ], + limits=[PluginCdBlock(result="急什么急什么,待会再发!")], + ).dict(), +) + + +# def rule(session: EventSession) -> bool: +# if gid := session.id3 or session.id2: +# if group_red_bag := RedBagManager.get_group_data(gid): +# return group_red_bag.check_open(gid) +# return False + + +# async def rule_group(session: EventSession): +# return rule(session) and ensure_group(session) + + +_red_bag_matcher = on_alconna( + Alconna("塞红包", Args["amount", int]["num", int, 5]["user?", alcAt]), + aliases={"金币红包"}, + priority=5, + block=True, + rule=ensure_group, +) + +_open_matcher = on_alconna( + Alconna("开"), + aliases={"抢", "开红包", "抢红包"}, + priority=5, + block=True, + rule=ensure_group, +) + +_return_matcher = on_alconna( + Alconna("退回红包"), aliases={"退还红包"}, priority=5, block=True, rule=ensure_group +) + +_festive_matcher = on_alconna( + Alconna( + "节日红包", + Args["amount", int]["num", int]["text?", str], + Option("-g|--group", Args["group_list", str], help_text="指定群"), + ), + priority=1, + block=True, + permission=SUPERUSER, + rule=to_me(), +) + + +@_red_bag_matcher.handle() +async def _( + session: EventSession, + arparma: Arparma, + amount: int, + num: int, + user: Match[alcAt], + default_interval: int = GetConfig(config="DEFAULT_INTERVAL"), + user_name: str = UserName(), +): + at_user = None + if user.available: + at_user = user.result.target + # group_id = session.id3 or session.id2 + group_id = session.id2 + """以频道id为键""" + user_id = session.id1 + if not user_id: + await Text("用户id为空").finish() + if not group_id: + await Text("群组id为空").finish() + group_red_bag = RedBagManager.get_group_data(group_id) + # 剩余过期时间 + time_remaining = group_red_bag.check_timeout(user_id) + if time_remaining != -1: + # 判断用户红包是否存在且是否过时覆盖 + if user_red_bag := group_red_bag.get_user_red_bag(user_id): + now = time.time() + if now < user_red_bag.start_time + default_interval: + await Text( + f"你的红包还没消化完捏...还剩下 {user_red_bag.num - len(user_red_bag.open_user)} 个! 请等待红包领取完毕..." + f"(或等待{time_remaining}秒红包cd)" + ).finish() + result = await RedBagManager.check_gold(user_id, amount, session.platform) + if result: + await Text(result).finish(at_sender=True) + await group_red_bag.add_red_bag( + f"{user_name}的红包", + int(amount), + 1 if at_user else num, + user_name, + user_id, + assigner=at_user, + platform=session.platform, + ) + image = await RedBagManager.random_red_bag_background( + user_id, platform=session.platform + ) + message_list: list = [ + Text(f"{user_name}发起了金币红包\n金额: {amount}\n数量: {num}\n") + ] + if at_user: + message_list.append(Text("指定人: ")) + message_list.append(Mention(at_user)) + message_list.append(Text("\n")) + message_list.append(Image(image.pic2bytes())) + await MessageFactory(message_list).send() + + logger.info( + f"塞入 {num} 个红包,共 {amount} 金币", arparma.header_result, session=session + ) + + +@_open_matcher.handle() +async def _( + session: EventSession, + rank_num: int = GetConfig(config="RANK_NUM"), +): + # group_id = session.id3 or session.id2 + group_id = session.id2 + """以频道id为键""" + user_id = session.id1 + if not user_id: + await Text("用户id为空").finish() + if not group_id: + await Text("群组id为空").finish() + if group_red_bag := RedBagManager.get_group_data(group_id): + open_data, settlement_list = await group_red_bag.open(user_id, session.platform) + # send_msg = Text("没有红包给你开!") + send_msg = [] + for _, item in open_data.items(): + amount, red_bag = item + result_image = await RedBagManager.build_open_result_image( + red_bag, user_id, amount, session.platform + ) + send_msg.append( + Text(f"开启了 {red_bag.promoter} 的红包, 获取 {amount} 个金币\n") + ) + send_msg.append(Image(result_image.pic2bytes())) + send_msg.append(Text("\n")) + logger.info( + f"抢到了 {red_bag.promoter}({red_bag.promoter_id}) 的红包,获取了{amount}个金币", + "开红包", + session=session, + ) + send_msg = ( + MessageFactory(send_msg[:-1]) if send_msg else Text("没有红包给你开!") + ) + await send_msg.send(reply=True) + if settlement_list: + for red_bag in settlement_list: + result_image = await red_bag.build_amount_rank( + rank_num, session.platform + ) + await MessageFactory( + [Text(f"{red_bag.name}已结算\n"), Image(result_image.pic2bytes())] + ).send() + + +@_return_matcher.handle() +async def _( + session: EventSession, + default_interval: int = GetConfig(config="DEFAULT_INTERVAL"), + rank_num: int = GetConfig(config="RANK_NUM"), +): + group_id = session.id3 or session.id2 + user_id = session.id1 + if not user_id: + await Text("用户id为空").finish() + if not group_id: + await Text("群组id为空").finish() + if group_red_bag := RedBagManager.get_group_data(group_id): + if user_red_bag := group_red_bag.get_user_red_bag(user_id): + now = time.time() + if now - user_red_bag.start_time < default_interval: + await Text( + f"你的红包还没有过时, 在 {int(default_interval - now + user_red_bag.start_time)} " + f"秒后可以退回..." + ).finish(reply=True) + user_red_bag = group_red_bag.get_user_red_bag(user_id) + if user_red_bag and ( + data := await group_red_bag.settlement(user_id, session.platform) + ): + image_result = await user_red_bag.build_amount_rank( + rank_num, session.platform + ) + logger.info(f"退回了红包 {data[0]} 金币", "红包退回", session=session) + await MessageFactory( + [ + Text(f"已成功退还了 " f"{data[0]} 金币\n"), + Image(image_result.pic2bytes()), + ] + ).finish(reply=True) + await Text("目前没有红包可以退回...").finish(reply=True) + + +@_festive_matcher.handle() +async def _( + bot: Bot, + session: EventSession, + arparma: Arparma, + amount: int, + num: int, + text: Match[str], + group_list: Match[str], + user_name: str = UserName(), +): + # TODO: 指定多个群 + greetings = "恭喜发财 大吉大利" + if text.available: + greetings = text.result + gl = [] + if group_list.available: + gl = [group_list.result] + else: + g_l, platform = await PlatformUtils.get_group_list(bot) + gl = [g.channel_id or g.group_id for g in g_l] + _uuid = str(uuid.uuid1()) + FestiveRedBagManage.add(_uuid) + for g in gl: + if target := PlatformUtils.get_target(bot, group_id=g): + group_red_bag = RedBagManager.get_group_data(g) + if festive_red_bag := group_red_bag.get_festive_red_bag(): + group_red_bag.remove_festive_red_bag() + if festive_red_bag.uuid: + FestiveRedBagManage.remove(festive_red_bag.uuid) + rank_image = await festive_red_bag.build_amount_rank(10, platform) + try: + await MessageFactory( + [ + Text( + f"{NICKNAME}的节日红包过时了,一共开启了 " + f"{len(festive_red_bag.open_user)}" + f" 个红包,共 {sum(festive_red_bag.open_user.values())} 金币\n" + ), + Image(rank_image.pic2bytes()), + ] + ).send_to(target=target, bot=bot) + except ActionFailed: + pass + try: + scheduler.remove_job(f"{FESTIVE_KEY}_{g}") + await RedBagManager.end_red_bag( + g, is_festive=True, platform=session.platform + ) + except JobLookupError: + pass + await group_red_bag.add_red_bag( + f"{NICKNAME}的红包", + amount, + num, + NICKNAME, + FESTIVE_KEY, + _uuid, + platform=session.platform, + ) + scheduler.add_job( + RedBagManager._auto_end_festive_red_bag, + "date", + run_date=(datetime.now() + timedelta(hours=24)).replace(microsecond=0), + id=f"{FESTIVE_KEY}_{g}", + args=[bot, g, session.platform], + ) + try: + image_result = await RedBagManager.random_red_bag_background( + bot.self_id, greetings, session.platform + ) + await MessageFactory( + [ + Text( + f"{NICKNAME}发起了节日金币红包\n金额: {amount}\n数量: {num}\n" + ), + Image(image_result.pic2bytes()), + ] + ).send_to(target=target, bot=bot) + logger.debug("节日红包图片信息发送成功...", "节日红包", group_id=g) + except ActionFailed: + logger.warning(f"节日红包图片信息发送失败...", "节日红包", group_id=g) diff --git a/zhenxun/plugins/gold_redbag/config.py b/zhenxun/plugins/gold_redbag/config.py new file mode 100644 index 00000000..da8c0d39 --- /dev/null +++ b/zhenxun/plugins/gold_redbag/config.py @@ -0,0 +1,372 @@ +import random +import time +from io import BytesIO +from typing import Dict + +from pydantic import BaseModel + +from zhenxun.models.group_member_info import GroupInfoUser +from zhenxun.models.user_console import UserConsole +from zhenxun.utils.image_utils import BuildImage +from zhenxun.utils.platform import PlatformUtils +from zhenxun.utils.utils import get_user_avatar + +from .model import RedbagUser + +FESTIVE_KEY = "FESTIVE" +"""节日红包KEY""" + + +class FestiveRedBagManage: + + _data: Dict[str, list[str]] = {} + + @classmethod + def add(cls, uuid: str): + cls._data[uuid] = [] + + @classmethod + def open(cls, uuid: str, uid: str): + if uuid in cls._data and uid not in cls._data[uuid]: + cls._data[uuid].append(uid) + + @classmethod + def remove(cls, uuid: str): + if uuid in cls._data: + del cls._data[uuid] + + @classmethod + def check(cls, uuid: str, uid: str): + if uuid in cls._data: + return uid not in cls._data[uuid] + return False + + +class RedBag(BaseModel): + """ + 红包 + """ + + group_id: str + """所属群聊""" + name: str + """红包名称""" + amount: int + """总金币""" + num: int + """红包数量""" + promoter: str + """发起人昵称""" + promoter_id: str + """发起人id""" + is_festival: bool + """是否为节日红包""" + timeout: int + """过期时间""" + assigner: str | None = None + """指定人id""" + start_time: float + """红包发起时间""" + open_user: Dict[str, int] = {} + """开启用户""" + red_bag_list: list[int] + """红包金额列表""" + uuid: str | None + """uuid""" + + async def build_amount_rank(self, num: int, platform: str) -> BuildImage: + """生成结算红包图片 + + 参数: + num: 查看的排名数量. + platform: 平台. + + 返回: + BuildImage: 结算红包图片 + """ + user_image_list = [] + if self.open_user: + sort_data = sorted( + self.open_user.items(), key=lambda item: item[1], reverse=True + ) + num = num if num < len(self.open_user) else len(self.open_user) + user_id_list = [sort_data[i][0] for i in range(num)] + group_user_list = await GroupInfoUser.filter( + group_id=self.group_id, user_id__in=user_id_list + ).all() + for i in range(num): + user_background = BuildImage(600, 100, font_size=30) + user_id, amount = sort_data[i] + user_ava_bytes = await PlatformUtils.get_user_avatar(user_id, platform) + user_ava = None + if user_ava_bytes: + user_ava = BuildImage(80, 80, background=BytesIO(user_ava_bytes)) + else: + user_ava = BuildImage(80, 80) + await user_ava.circle_corner(10) + await user_background.paste(user_ava, (130, 10)) + no_image = BuildImage(100, 100, font_size=65, font="CJGaoDeGuo.otf") + await no_image.text((0, 0), f"{i+1}", center_type="center") + await no_image.line((99, 10, 99, 90), "#b9b9b9") + await user_background.paste(no_image) + name = [ + user.user_name + for user in group_user_list + if user_id == user.user_id + ] + await user_background.text((225, 15), name[0] if name else "") + amount_image = await BuildImage.build_text_image( + f"{amount} 元", size=30, font_color="#cdac72" + ) + await user_background.paste( + amount_image, (user_background.width - amount_image.width - 20, 50) + ) + await user_background.line((225, 99, 590, 99), "#b9b9b9") + user_image_list.append(user_background) + background = BuildImage(600, 150 + len(user_image_list) * 100) + top = BuildImage(600, 100, color="#f55545", font_size=30) + promoter_ava_bytes = await PlatformUtils.get_user_avatar( + self.promoter_id, platform + ) + promoter_ava = None + if promoter_ava_bytes: + promoter_ava = BuildImage(60, 60, background=BytesIO(promoter_ava_bytes)) + else: + promoter_ava = BuildImage(60, 60) + await promoter_ava.circle() + await top.paste(promoter_ava, (10, 0), "height") + await top.text((80, 33), self.name, (255, 255, 255)) + right_text = BuildImage(150, 100, color="#f55545", font_size=30) + await right_text.text((10, 33), "结算排行", (255, 255, 255)) + await right_text.line((4, 10, 4, 90), (255, 255, 255), 2) + await top.paste(right_text, (460, 0)) + await background.paste(top) + cur_h = 110 + for user_image in user_image_list: + await background.paste(user_image, (0, cur_h)) + cur_h += user_image.height + return background + + +class GroupRedBag: + """ + 群组红包管理 + """ + + def __init__(self, group_id: str): + self.group_id = group_id + self._data: Dict[str, RedBag] = {} + """红包列表""" + + def remove_festive_red_bag(self): + """删除节日红包""" + _key = None + for k, red_bag in self._data.items(): + if red_bag.is_festival: + _key = k + break + if _key: + del self._data[_key] + + def get_festive_red_bag(self) -> RedBag | None: + """获取节日红包 + + 返回: + RedBag | None: 节日红包 + """ + for _, red_bag in self._data.items(): + if red_bag.is_festival: + return red_bag + return None + + def get_user_red_bag(self, user_id: str) -> RedBag | None: + """获取用户塞红包数据 + + 参数: + user_id: 用户id + + 返回: + RedBag | None: RedBag + """ + return self._data.get(str(user_id)) + + def check_open(self, user_id: str) -> bool: + """检查是否有可开启的红包 + + 参数: + user_id: 用户id + + 返回: + bool: 是否有可开启的红包 + """ + user_id = str(user_id) + for _, red_bag in self._data.items(): + if red_bag.assigner: + if red_bag.assigner == user_id: + return True + else: + if user_id not in red_bag.open_user: + return True + return False + + def check_timeout(self, user_id: str) -> int: + """判断用户红包是否过期 + + 参数: + user_id: 用户id + + 返回: + int: 距离过期时间 + """ + if user_id in self._data: + reg_bag = self._data[user_id] + now = time.time() + if now < reg_bag.timeout + reg_bag.start_time: + return int(reg_bag.timeout + reg_bag.start_time - now) + return -1 + + async def open( + self, user_id: str, platform: str | None = None + ) -> tuple[Dict[str, tuple[int, RedBag]], list[RedBag]]: + """开启红包 + + 参数: + user_id: 用户id + platform: 所属平台 + + 返回: + Dict[str, tuple[int, RedBag]]: 键为发起者id, 值为开启金额以及对应RedBag + list[RedBag]: 开完的红包 + """ + open_data = {} + settlement_list: list[RedBag] = [] + for _, red_bag in self._data.items(): + if red_bag.num > len(red_bag.open_user): + if red_bag.is_festival and red_bag.uuid: + if not FestiveRedBagManage.check(red_bag.uuid, user_id): + continue + FestiveRedBagManage.open(red_bag.uuid, user_id) + is_open = False + if red_bag.assigner: + is_open = red_bag.assigner == user_id + else: + is_open = user_id not in red_bag.open_user + if is_open: + random_amount = red_bag.red_bag_list.pop() + await RedbagUser.add_redbag_data( + user_id, self.group_id, "get", random_amount + ) + await UserConsole.add_gold( + user_id, random_amount, "gold_redbag", platform + ) + red_bag.open_user[user_id] = random_amount + open_data[red_bag.promoter_id] = (random_amount, red_bag) + if red_bag.num == len(red_bag.open_user): + # 红包开完,结算 + settlement_list.append(red_bag) + if settlement_list: + for uid in [red_bag.promoter_id for red_bag in settlement_list]: + if uid in self._data: + del self._data[uid] + return open_data, settlement_list + + def festive_red_bag_expire(self) -> RedBag | None: + """节日红包过期 + + 返回: + RedBag | None: 过期的节日红包 + """ + if FESTIVE_KEY in self._data: + red_bag = self._data[FESTIVE_KEY] + del self._data[FESTIVE_KEY] + return red_bag + return None + + async def settlement( + self, user_id: str, platform: str | None = None + ) -> tuple[int | None, RedBag | None]: + """红包退回 + + 参数: + user_id: 用户id, 指定id时结算指定用户红包. + platform: 用户平台 + + 返回: + tuple[int | None, RedBag | None]: 退回金币, 红包 + """ + if red_bag := self._data.get(user_id): + del self._data[user_id] + if red_bag.is_festival and red_bag.uuid: + FestiveRedBagManage.remove(red_bag.uuid) + if red_bag.red_bag_list: + """退还剩余金币""" + if amount := sum(red_bag.red_bag_list): + await UserConsole.add_gold(user_id, amount, "gold_redbag", platform) + return amount, red_bag + return None, None + + async def add_red_bag( + self, + name: str, + amount: int, + num: int, + promoter: str, + promoter_id: str, + festival_uuid: str | None = None, + timeout: int = 60, + assigner: str | None = None, + platform: str | None = None, + ): + """添加红包 + + 参数: + name: 红包名称 + amount: 金币数量 + num: 红包数量 + promoter: 发起人昵称 + promoter_id: 发起人id + festival_uuid: 节日红包uuid. + timeout: 超时时间. + assigner: 指定人. + platform: 用户平台. + """ + user = await UserConsole.get_user(promoter_id, platform) + if not festival_uuid and (amount < 1 or user.gold < amount): + raise ValueError("红包金币不足或用户金币不足") + red_bag_list = self._random_red_bag(amount, num) + if not festival_uuid: + user.gold -= amount + await RedbagUser.add_redbag_data(promoter_id, self.group_id, "send", amount) + await user.save(update_fields=["gold"]) + self._data[promoter_id] = RedBag( + group_id=self.group_id, + name=name, + amount=amount, + num=num, + promoter=promoter, + promoter_id=promoter_id, + is_festival=bool(festival_uuid), + timeout=timeout, + start_time=time.time(), + assigner=assigner, + red_bag_list=red_bag_list, + uuid=festival_uuid, + ) + + def _random_red_bag(self, amount: int, num: int) -> list[int]: + """初始化红包金币 + + 参数: + amount: 金币数量 + num: 红包数量 + + 返回: + list[int]: 红包列表 + """ + red_bag_list = [] + for _ in range(num - 1): + tmp = int(amount / random.choice(range(3, num + 3))) + red_bag_list.append(tmp) + amount -= tmp + red_bag_list.append(amount) + return red_bag_list diff --git a/zhenxun/plugins/gold_redbag/data_source.py b/zhenxun/plugins/gold_redbag/data_source.py new file mode 100644 index 00000000..b35dee24 --- /dev/null +++ b/zhenxun/plugins/gold_redbag/data_source.py @@ -0,0 +1,238 @@ +import asyncio +import os +import random +from io import BytesIO +from typing import Dict + +from nonebot.adapters import Bot +from nonebot.exception import ActionFailed +from nonebot_plugin_saa import Image, MessageFactory, Text + +from zhenxun.configs.config import NICKNAME, Config +from zhenxun.configs.path_config import IMAGE_PATH +from zhenxun.models.user_console import UserConsole +from zhenxun.utils.image_utils import BuildImage +from zhenxun.utils.platform import PlatformUtils +from zhenxun.utils.utils import get_user_avatar + +from .config import FESTIVE_KEY, FestiveRedBagManage, GroupRedBag, RedBag + + +class RedBagManager: + + _data: Dict[str, GroupRedBag] = {} + + @classmethod + def get_group_data(cls, group_id: str) -> GroupRedBag: + """获取群组红包数据 + + 参数: + group_id: 群组id + + 返回: + GroupRedBag | None: GroupRedBag + """ + if group_id not in cls._data: + cls._data[group_id] = GroupRedBag(group_id) + return cls._data[group_id] + + @classmethod + async def _auto_end_festive_red_bag(cls, bot: Bot, group_id: str, platform: str): + """自动结算节日红包 + + 参数: + bot: Bot + group_id: 群组id + platform: 平台 + """ + if target := PlatformUtils.get_target(bot, group_id=group_id): + rank_num = Config.get_config("gold_redbag", "RANK_NUM") or 10 + group_red_bag = cls.get_group_data(group_id) + red_bag = group_red_bag.get_festive_red_bag() + if not red_bag: + return + rank_image = await red_bag.build_amount_rank(rank_num, platform) + if red_bag.is_festival and red_bag.uuid: + FestiveRedBagManage.remove(red_bag.uuid) + await asyncio.sleep(random.randint(1, 5)) + try: + await MessageFactory( + [ + Text( + f"{NICKNAME}的节日红包过时了,一共开启了 " + f"{len(red_bag.open_user)}" + f" 个红包,共 {sum(red_bag.open_user.values())} 金币\n" + ), + Image(rank_image.pic2bytes()), + ] + ).send_to(target=target, bot=bot) + except ActionFailed: + pass + + @classmethod + async def end_red_bag( + cls, + group_id: str, + user_id: str | None = None, + is_festive: bool = False, + platform: str = "", + ) -> MessageFactory | None: + """结算红包 + + 参数: + group_id: 群组id或频道id + user_id: 用户id + is_festive: 是否节日红包 + platform: 用户平台 + """ + rank_num = Config.get_config("gold_redbag", "RANK_NUM") or 10 + group_red_bag = cls.get_group_data(group_id) + if not group_red_bag: + return None + if is_festive: + if festive_red_bag := group_red_bag.festive_red_bag_expire(): + rank_image = await festive_red_bag.build_amount_rank(rank_num, platform) + return MessageFactory( + [ + Text( + f"{NICKNAME}的节日红包过时了,一共开启了 " + f"{len(festive_red_bag.open_user)}" + f" 个红包,共 {sum(festive_red_bag.open_user.values())} 金币\n" + ), + Image(rank_image.pic2bytes()), + ] + ) + else: + if not user_id: + return None + return_gold, red_bag = await group_red_bag.settlement(user_id, platform) + if red_bag: + rank_image = await red_bag.build_amount_rank(rank_num, platform) + return MessageFactory( + [ + Text(f"已成功退还了 " f"{return_gold} 金币\n"), + Image(rank_image.pic2bytes()), + ] + ) + + @classmethod + async def check_gold(cls, user_id: str, amount: int, platform: str) -> str | None: + """检查金币数量是否合法 + + 参数: + user_id: 用户id + amount: 金币数量 + platform: 所属平台 + + 返回: + tuple[bool, str]: 是否合法以及提示语 + """ + user = await UserConsole.get_user(user_id, platform) + if amount < 1: + return "小气鬼,要别人倒贴金币给你嘛!" + if user.gold < amount: + return "没有金币的话请不要发红包..." + return None + + @classmethod + async def random_red_bag_background( + cls, user_id: str, msg: str = "恭喜发财 大吉大利", platform: str = "" + ) -> BuildImage: + """构造发送红包图片 + + 参数: + user_id: 用户id + msg: 红包消息. + platform: 平台. + + 异常: + ValueError: 图片背景列表为空 + + 返回: + BuildImage: 构造后的图片 + """ + background_list = os.listdir(f"{IMAGE_PATH}/prts/redbag_2") + if not background_list: + raise ValueError("prts/redbag_1 背景图列表为空...") + random_redbag = random.choice(background_list) + redbag = BuildImage( + 0, + 0, + font_size=38, + background=IMAGE_PATH / "prts" / "redbag_2" / random_redbag, + ) + ava_byte = await PlatformUtils.get_user_avatar(user_id, platform) + ava = None + if ava_byte: + ava = BuildImage(65, 65, background=BytesIO(ava_byte)) + else: + ava = BuildImage(65, 65, color=(0, 0, 0)) + await ava.circle() + await redbag.text( + (int((redbag.size[0] - redbag.getsize(msg)[0]) / 2), 210), + msg, + (240, 218, 164), + ) + await redbag.paste(ava, (int((redbag.size[0] - ava.size[0]) / 2), 130)) + return redbag + + @classmethod + async def build_open_result_image( + cls, red_bag: RedBag, user_id: str, amount: int, platform: str + ) -> BuildImage: + """构造红包开启图片 + + 参数: + red_bag: RedBag + user_id: 开启红包用户id + amount: 开启红包获取的金额 + platform: 平台 + + 异常: + ValueError: 图片背景列表为空 + + 返回: + BuildImage: 构造后的图片 + """ + background_list = os.listdir(f"{IMAGE_PATH}/prts/redbag_1") + if not background_list: + raise ValueError("prts/redbag_1 背景图列表为空...") + random_redbag = random.choice(background_list) + head = BuildImage( + 1000, + 980, + font_size=30, + background=IMAGE_PATH / "prts" / "redbag_1" / random_redbag, + ) + size = BuildImage.get_text_size(red_bag.name, font_size=50) + ava_bk = BuildImage(100 + size[0], 66, (255, 255, 255, 0), font_size=50) + + ava_byte = await PlatformUtils.get_user_avatar(user_id, platform) + ava = None + if ava_byte: + ava = BuildImage(66, 66, background=BytesIO(ava_byte)) + else: + ava = BuildImage(66, 66, color=(0, 0, 0)) + await ava_bk.paste(ava) + await ava_bk.text((100, 7), red_bag.name) + ava_bk_w, ava_bk_h = ava_bk.size + await head.paste(ava_bk, (int((1000 - ava_bk_w) / 2), 300)) + size = BuildImage.get_text_size(str(amount), font_size=150) + amount_image = BuildImage(size[0], size[1], (255, 255, 255, 0), font_size=150) + await amount_image.text((0, 0), str(amount), fill=(209, 171, 108)) + # 金币中文 + await head.paste(amount_image, (int((1000 - size[0]) / 2) - 50, 460)) + await head.text( + (int((1000 - size[0]) / 2 + size[0]) - 50, 500 + size[1] - 70), + "金币", + fill=(209, 171, 108), + ) + # 剩余数量和金额 + text = ( + f"已领取" + f"{red_bag.num - len(red_bag.open_user)}" + f"/{red_bag.num}个," + f"共{sum(red_bag.open_user.values())}/{red_bag.amount}金币" + ) + await head.text((350, 900), text, (198, 198, 198)) + return head diff --git a/zhenxun/plugins/gold_redbag/model.py b/zhenxun/plugins/gold_redbag/model.py new file mode 100644 index 00000000..a8e9359a --- /dev/null +++ b/zhenxun/plugins/gold_redbag/model.py @@ -0,0 +1,63 @@ +from tortoise import fields + +from zhenxun.services.db_context import Model + + +class RedbagUser(Model): + + id = fields.IntField(pk=True, generated=True, auto_increment=True) + """自增id""" + user_id = fields.CharField(255) + """用户id""" + group_id = fields.CharField(255) + """群聊id""" + send_redbag_count = fields.IntField(default=0) + """发送红包次数""" + get_redbag_count = fields.IntField(default=0) + """开启红包次数""" + spend_gold = fields.IntField(default=0) + """发送红包花费金额""" + get_gold = fields.IntField(default=0) + """开启红包获取金额""" + + class Meta: + table = "redbag_users" + table_description = "红包统计数据表" + unique_together = ("user_id", "group_id") + + @classmethod + async def add_redbag_data( + cls, user_id: str, group_id: str, i_type: str, money: int + ): + """添加收发红包数据 + + 参数: + user_id: 用户id + group_id: 群号 + i_type: 收或发 + money: 金钱数量 + """ + + user, _ = await cls.get_or_create(user_id=user_id, group_id=group_id) + if i_type == "get": + user.get_redbag_count = user.get_redbag_count + 1 + user.get_gold = user.get_gold + money + else: + user.send_redbag_count = user.send_redbag_count + 1 + user.spend_gold = user.spend_gold + money + await user.save( + update_fields=[ + "get_redbag_count", + "get_gold", + "send_redbag_count", + "spend_gold", + ] + ) + + @classmethod + async def _run_script(cls): + return [ + "ALTER TABLE redbag_users RENAME COLUMN user_qq TO user_id;", # 将user_qq改为user_id + "ALTER TABLE redbag_users ALTER COLUMN user_id TYPE character varying(255);", + "ALTER TABLE redbag_users ALTER COLUMN group_id TYPE character varying(255);", + ] diff --git a/zhenxun/utils/depends/__init__.py b/zhenxun/utils/depends/__init__.py index a993ee45..addcabe6 100644 --- a/zhenxun/utils/depends/__init__.py +++ b/zhenxun/utils/depends/__init__.py @@ -1,7 +1,35 @@ +from typing import Any + from nonebot.internal.params import Depends +from nonebot.matcher import Matcher from nonebot.params import Command +from nonebot_plugin_saa import Text +from nonebot_plugin_session import EventSession from nonebot_plugin_userinfo import EventUserInfo, UserInfo +from zhenxun.configs.config import Config + + +def CheckUg(check_user: bool = True, check_group: bool = True): + """检测群组id和用户id是否存在 + + 参数: + check_user: 检查用户id. + check_group: 检查群组id. + """ + + async def dependency(session: EventSession): + if check_user: + user_id = session.id1 + if not user_id: + await Text("用户id为空").finish() + if check_group: + group_id = session.id3 or session.id2 + if not group_id: + await Text("群组id为空").finish() + + return Depends(dependency) + def OneCommand(): """ @@ -24,6 +52,57 @@ def UserName(): async def dependency(user_info: UserInfo = EventUserInfo()): return ( user_info.user_displayname or user_info.user_remark or user_info.user_name - ) + ) or "" + + return Depends(dependency) + + +def GetConfig( + module: str | None = None, + config: str = "", + default_value: Any = None, + prompt: str | None = None, +): + """获取配置项 + + 参数: + module: 模块名,为空时默认使用当前插件模块名 + config: 配置项名称 + default_value: 默认值 + prompt: 为空时提示 + """ + + async def dependency(matcher: Matcher): + module_ = module or matcher.plugin_name + if module_: + value = Config.get_config(module_, config, default_value) + if value is None and prompt: + # await matcher.finish(prompt or f"配置项 {config} 未填写!") + await matcher.finish(prompt) + return value + + return Depends(dependency) + + +def CheckConfig( + module: str | None = None, + config: str | list[str] = "", + prompt: str | None = None, +): + """检测配置项在配置文件中是否填写 + + 参数: + module: 模块名,为空时默认使用当前插件模块名 + config: 需要检查的配置项名称 + prompt: 为空时提示 + """ + + async def dependency(matcher: Matcher): + module_ = module or matcher.plugin_name + if module_: + config_list = [config] if isinstance(config, str) else config + for c in config_list: + if Config.get_config(module_, c) is None: + await matcher.finish(prompt or f"配置项 {c} 未填写!") return Depends(dependency) diff --git a/zhenxun/utils/enum.py b/zhenxun/utils/enum.py index 437c4f0d..681f9768 100644 --- a/zhenxun/utils/enum.py +++ b/zhenxun/utils/enum.py @@ -88,7 +88,7 @@ class RequestType(StrEnum): class RequestHandleType(StrEnum): """ - 请求类型 + 请求处理类型 """ APPROVE = "APPROVE" diff --git a/zhenxun/utils/platform.py b/zhenxun/utils/platform.py index 014e026b..19163cc5 100644 --- a/zhenxun/utils/platform.py +++ b/zhenxun/utils/platform.py @@ -1,5 +1,6 @@ from typing import Awaitable, Callable, Literal, Set +import httpx import nonebot from nonebot.adapters import Bot from nonebot.adapters.discord import Bot as DiscordBot @@ -27,6 +28,53 @@ from zhenxun.services.log import logger class PlatformUtils: + @classmethod + async def get_user_avatar(cls, user_id: str, platform: str) -> bytes | None: + """快捷获取用户头像 + + 参数: + user_id: 用户id + platform: 平台 + """ + if platform == "qq": + url = f"http://q1.qlogo.cn/g?b=qq&nk={user_id}&s=160" + async with httpx.AsyncClient() as client: + for _ in range(3): + try: + return (await client.get(url)).content + except Exception as e: + logger.error( + "获取用户头像错误", + "Util", + target=user_id, + platform=platform, + ) + else: + pass + return None + + @classmethod + async def get_group_avatar(cls, gid: str, platform: str) -> bytes | None: + """快捷获取用群头像 + + 参数: + gid: 群组id + platform: 平台 + """ + if platform == "qq": + url = f"http://p.qlogo.cn/gh/{gid}/{gid}/640/" + async with httpx.AsyncClient() as client: + for _ in range(3): + try: + return (await client.get(url)).content + except Exception as e: + logger.error( + "获取群头像错误", "Util", target=gid, platform=platform + ) + else: + pass + return None + @classmethod async def send_message( cls, @@ -109,7 +157,7 @@ class PlatformUtils: bot: Bot 返回: - list[GroupConsole]: 群组列表 + tuple[list[GroupConsole], str]: 群组列表, 平台 """ if isinstance(bot, v11Bot): group_list = await bot.get_group_list() @@ -239,8 +287,8 @@ class PlatformUtils: 参数: bot: Bot - group_id: 群组id - channel_id: 频道id或群组id + user_id: 用户id + group_id: 频道id或群组id 返回: target: 对应平台Target diff --git a/zhenxun/utils/utils.py b/zhenxun/utils/utils.py index 76395fd1..9ef46d7d 100644 --- a/zhenxun/utils/utils.py +++ b/zhenxun/utils/utils.py @@ -99,8 +99,7 @@ class UserBlockLimiter: def check(self, key: Any) -> bool: if time.time() - self.time > 30: self.set_false(key) - return False - return self.flag_data[key] + return not self.flag_data[key] class FreqLimiter: @@ -156,7 +155,7 @@ async def get_group_avatar(gid: int | str) -> bytes | None: """快捷获取用群头像 参数: - :param gid: 群号 + gid: 群号 """ url = f"http://p.qlogo.cn/gh/{gid}/{gid}/640/" async with httpx.AsyncClient() as client: