diff --git a/zhenxun/plugins/open_cases/__init__.py b/zhenxun/plugins/open_cases/__init__.py new file mode 100644 index 00000000..e4ee738a --- /dev/null +++ b/zhenxun/plugins/open_cases/__init__.py @@ -0,0 +1,345 @@ +import asyncio +import random +from datetime import datetime, timedelta +from typing import List + +from nonebot.plugin import PluginMetadata +from nonebot_plugin_alconna import Arparma, Match +from nonebot_plugin_apscheduler import scheduler +from nonebot_plugin_saa import Image, MessageFactory, Text +from nonebot_plugin_session import EventSession + +from zhenxun.configs.utils import PluginCdBlock, PluginExtraData, RegisterConfig, Task +from zhenxun.services.log import logger +from zhenxun.utils.image_utils import text2image + +from .command import ( + _group_open_matcher, + _knifes_matcher, + _multiple_matcher, + _my_open_matcher, + _open_matcher, + _price_matcher, + _reload_matcher, + _show_case_matcher, + _update_image_matcher, + _update_matcher, +) +from .open_cases_c import ( + auto_update, + get_my_knifes, + group_statistics, + open_case, + open_multiple_case, + total_open_statistics, +) +from .utils import ( + CASE2ID, + KNIFE2ID, + CaseManager, + build_case_image, + download_image, + get_skin_case, + init_skin_trends, + reset_count_daily, + update_skin_data, +) + +__plugin_meta__ = PluginMetadata( + name="CSGO开箱", + description="csgo模拟开箱[戒赌]", + usage=""" + 指令: + 开箱 ?[武器箱] + [1-30]连开箱 ?[武器箱] + 我的开箱 + 我的金色 + 群开箱统计 + 查看武器箱?[武器箱] + * 不包含[武器箱]时随机开箱 * + 示例: 查看武器箱 + 示例: 查看武器箱英勇 + """.strip(), + extra=PluginExtraData( + author="HibiKier", + version="0.1", + superuser_help=""" + 更新皮肤指令 + 重置开箱: 重置今日开箱所有次数 + 指令: + 更新武器箱 ?[武器箱/ALL] + 更新皮肤 ?[名称/ALL1] + 更新皮肤 ?[名称/ALL1] -S: (必定更新罕见皮肤所属箱子) + 更新武器箱图片 + * 不指定武器箱时则全部更新 * + * 过多的爬取会导致账号API被封 * + """.strip(), + menu_type="抽卡相关", + tasks=[Task(module="open_case_reset_remind", name="每日开箱重置提醒")], + limits=[PluginCdBlock(result="着什么急啊,慢慢来!")], + configs=[ + RegisterConfig( + key="INITIAL_OPEN_CASE_COUNT", + value=20, + help="初始每日开箱次数", + default_value=20, + type=int, + ), + RegisterConfig( + key="EACH_IMPRESSION_ADD_COUNT", + value=3, + help="每 * 点好感度额外增加开箱次数", + default_value=3, + type=int, + ), + RegisterConfig(key="COOKIE", value=None, help="BUFF的cookie"), + RegisterConfig( + key="DAILY_UPDATE", + value=None, + help="每日自动更新的武器箱,存在'ALL'时则更新全部武器箱", + type=List[str], + ), + RegisterConfig( + key="DEFAULT_OPEN_CASE_RESET_REMIND", + module="_task", + value=True, + help="被动 每日开箱重置提醒 进群默认开关状态", + default_value=True, + type=bool, + ), + ], + ).dict(), +) + + +# cases_matcher_group = MatcherGroup(priority=5, permission=GROUP, block=True) + + +# k_open_case = cases_matcher_group.on_command("开箱") +# reload_count = cases_matcher_group.on_command("重置开箱", permission=SUPERUSER) +# total_case_data = cases_matcher_group.on_command( +# "我的开箱", aliases={"开箱统计", "开箱查询", "查询开箱"} +# ) +# group_open_case_statistics = cases_matcher_group.on_command("群开箱统计") +# open_multiple = cases_matcher_group.on_regex("(.*)连开箱(.*)?") +# update_case = on_command( +# "更新武器箱", aliases={"更新皮肤"}, priority=1, permission=SUPERUSER, block=True +# ) +# update_case_image = on_command( +# "更新武器箱图片", priority=1, permission=SUPERUSER, block=True +# ) +# show_case = on_command("查看武器箱", priority=5, block=True) +# my_knifes = on_command("我的金色", priority=1, permission=GROUP, block=True) +# show_skin = on_command("查看皮肤", priority=5, block=True) +# price_trends = on_command("价格趋势", priority=5, block=True) + + +@_price_matcher.handle() +async def _( + session: EventSession, + arparma: Arparma, + name: str, + skin: str, + abrasion: str, + day: Match[int], +): + name = name.replace("武器箱", "").strip() + _day = 7 + if day.available: + _day = day.result + if _day > 180: + await Text("天数必须大于0且小于180").finish() + result = await init_skin_trends(name, skin, abrasion, _day) + if not result: + await Text("未查询到数据...").finish(reply=True) + await Image(result.pic2bytes()).send() + logger.info( + f"查看 [{name}:{skin}({abrasion})] 价格趋势", + arparma.header_result, + session=session, + ) + + +@_reload_matcher.handle() +async def _(session: EventSession, arparma: Arparma): + await reset_count_daily() + logger.info("重置开箱次数", arparma.header_result, session=session) + + +@_open_matcher.handle() +async def _(session: EventSession, arparma: Arparma, name: Match[str]): + gid = session.id3 or session.id2 + if not session.id1: + await Text("用户id为空...").finish() + if not gid: + await Text("群组id为空...").finish() + case_name = None + if name.available: + case_name = name.result.replace("武器箱", "").strip() + result = await open_case(session.id1, gid, case_name, session) + await result.finish(reply=True) + + +@_my_open_matcher.handle() +async def _(session: EventSession, arparma: Arparma): + gid = session.id3 or session.id2 + if not session.id1: + await Text("用户id为空...").finish() + if not gid: + await Text("群组id为空...").finish() + await Text( + await total_open_statistics(session.id1, gid), + ).send(reply=True) + logger.info("查询我的开箱", arparma.header_result, session=session) + + +@_group_open_matcher.handle() +async def _(session: EventSession, arparma: Arparma): + gid = session.id3 or session.id2 + if not gid: + await Text("群组id为空...").finish() + result = await group_statistics(gid) + await Text(result).send(reply=True) + logger.info("查询群开箱统计", arparma.header_result, session=session) + + +@_knifes_matcher.handle() +async def _(session: EventSession, arparma: Arparma): + gid = session.id3 or session.id2 + if not session.id1: + await Text("用户id为空...").finish() + if not gid: + await Text("群组id为空...").finish() + result = await get_my_knifes(session.id1, gid) + await result.send(reply=True) + logger.info("查询我的金色", arparma.header_result, session=session) + + +@_multiple_matcher.handle() +async def _(session: EventSession, arparma: Arparma, num: int, name: Match[str]): + gid = session.id3 or session.id2 + if not session.id1: + await Text("用户id为空...").finish() + if not gid: + await Text("群组id为空...").finish() + if num > 30: + await Text("开箱次数不要超过30啊笨蛋!").finish() + if num < 0: + await Text("再负开箱就扣你明天开箱数了!").finish() + case_name = None + if name.available: + case_name = name.result.replace("武器箱", "").strip() + result = await open_multiple_case(session.id1, gid, case_name, num, session) + await result.send(reply=True) + logger.info(f"{num}连开箱", arparma.header_result, session=session) + + +@_update_matcher.handle() +async def _(session: EventSession, arparma: Arparma, name: Match[str]): + case_name = None + if name.available: + case_name = name.result.strip() + if not case_name: + case_list = [] + skin_list = [] + for i, case_name in enumerate(CASE2ID): + if case_name in CaseManager.CURRENT_CASES: + case_list.append(f"{i+1}.{case_name} [已更新]") + else: + case_list.append(f"{i+1}.{case_name}") + for skin_name in KNIFE2ID: + skin_list.append(f"{skin_name}") + text = "武器箱:\n" + "\n".join(case_list) + "\n皮肤:\n" + ", ".join(skin_list) + img = await text2image(text, padding=20, color="#f9f6f2") + await MessageFactory( + [Text("未指定武器箱, 当前已包含武器箱/皮肤\n"), Image(img.pic2bytes())] + ).finish() + if case_name in ["ALL", "ALL1"]: + if case_name == "ALL": + case_list = list(CASE2ID.keys()) + type_ = "武器箱" + else: + case_list = list(KNIFE2ID.keys()) + type_ = "罕见皮肤" + await Text(f"即将更新所有{type_}, 请稍等").send() + for i, case_name in enumerate(case_list): + try: + info = await update_skin_data(case_name, arparma.find("s")) + if "请先登录" in info: + await Text(f"未登录, 已停止更新, 请配置BUFF token...").send() + return + rand = random.randint(300, 500) + result = f"更新全部{type_}完成" + if i < len(case_list) - 1: + next_case = case_list[i + 1] + result = f"将在 {rand} 秒后更新下一{type_}: {next_case}" + await Text(f"{info}, {result}").send() + logger.info(f"info, {result}", "更新武器箱", session=session) + await asyncio.sleep(rand) + except Exception as e: + logger.error(f"更新{type_}: {case_name}", session=session, e=e) + await Text(f"更新{type_}: {case_name} 发生错误: {type(e)}: {e}").send() + await Text(f"更新全部{type_}完成").send() + else: + await Text(f"开始{arparma.header_result}: {case_name}, 请稍等").send() + try: + await Text(await update_skin_data(case_name, arparma.find("s"))).send( + at_sender=True + ) + except Exception as e: + logger.error(f"{arparma.header_result}: {case_name}", session=session, e=e) + await Text( + f"成功{arparma.header_result}: {case_name} 发生错误: {type(e)}: {e}" + ).send() + + +@_show_case_matcher.handle() +async def _(session: EventSession, arparma: Arparma, name: Match[str]): + case_name = None + if name.available: + case_name = name.result.strip() + result = await build_case_image(case_name) + if isinstance(result, str): + await Text(result).send() + else: + await Image(result.pic2bytes()).send() + logger.info("查看武器箱", arparma.header_result, session=session) + + +@_update_image_matcher.handle() +async def _(session: EventSession, arparma: Arparma, name: Match[str]): + case_name = None + if name.available: + case_name = name.result.strip() + await Text("开始更新图片...").send(reply=True) + await download_image(case_name) + await Text("更新图片完成...").send(at_sender=True) + logger.info("更新武器箱图片", arparma.header_result, session=session) + + +# 重置开箱 +@scheduler.scheduled_job( + "cron", + hour=0, + minute=1, +) +async def _(): + await reset_count_daily() + + +@scheduler.scheduled_job( + "cron", + hour=0, + minute=10, +) +async def _(): + now = datetime.now() + hour = random.choice([0, 1, 2, 3]) + date = now + timedelta(hours=hour) + logger.debug(f"将在 {date} 时自动更新武器箱...", "更新武器箱") + scheduler.add_job( + auto_update, + "date", + run_date=date.replace(microsecond=0), + id=f"auto_update_csgo_cases", + ) diff --git a/zhenxun/plugins/open_cases/build_image.py b/zhenxun/plugins/open_cases/build_image.py new file mode 100644 index 00000000..8b8db8e2 --- /dev/null +++ b/zhenxun/plugins/open_cases/build_image.py @@ -0,0 +1,155 @@ +from datetime import timedelta, timezone + +from zhenxun.configs.path_config import IMAGE_PATH +from zhenxun.services.log import logger +from zhenxun.utils.image_utils import BuildImage +from zhenxun.utils.utils import cn2py + +from .config import COLOR2COLOR, COLOR2NAME +from .models.buff_skin import BuffSkin + +BASE_PATH = IMAGE_PATH / "csgo_cases" + +ICON_PATH = IMAGE_PATH / "_icon" + + +async def draw_card(skin: BuffSkin, rand: str) -> BuildImage: + """构造抽取图片 + + 参数: + skin (BuffSkin): BuffSkin + rand (str): 磨损 + + 返回: + BuildImage: BuildImage + """ + name = skin.name + "-" + skin.skin_name + "-" + skin.abrasion + file_path = BASE_PATH / cn2py(skin.case_name.split(",")[0]) / f"{cn2py(name)}.jpg" + if not file_path.exists(): + logger.warning(f"皮肤图片: {name} 不存在", "开箱") + skin_bk = BuildImage( + 460, 200, color=(25, 25, 25, 100), font_size=25, font="CJGaoDeGuo.otf" + ) + if file_path.exists(): + skin_image = BuildImage(205, 153, background=file_path) + await skin_bk.paste(skin_image, (10, 30)) + await skin_bk.line((220, 10, 220, 180)) + await skin_bk.text((10, 10), skin.name, (255, 255, 255)) + name_icon = BuildImage(20, 20, background=ICON_PATH / "name_white.png") + await skin_bk.paste(name_icon, (240, 13)) + await skin_bk.text((265, 15), f"名称:", (255, 255, 255), font_size=20) + await skin_bk.text( + (310, 15), + f"{skin.skin_name + ('(St)' if skin.is_stattrak else '')}", + (255, 255, 255), + ) + tone_icon = BuildImage(20, 20, background=ICON_PATH / "tone_white.png") + await skin_bk.paste(tone_icon, (240, 45)) + await skin_bk.text((265, 45), "品质:", (255, 255, 255), font_size=20) + await skin_bk.text((310, 45), COLOR2NAME[skin.color][:2], COLOR2COLOR[skin.color]) + type_icon = BuildImage(20, 20, background=ICON_PATH / "type_white.png") + await skin_bk.paste(type_icon, (240, 73)) + await skin_bk.text((265, 75), "类型:", (255, 255, 255), font_size=20) + await skin_bk.text((310, 75), skin.weapon_type, (255, 255, 255)) + price_icon = BuildImage(20, 20, background=ICON_PATH / "price_white.png") + await skin_bk.paste(price_icon, (240, 103)) + await skin_bk.text((265, 105), "价格:", (255, 255, 255), font_size=20) + await skin_bk.text((310, 105), str(skin.sell_min_price), (0, 255, 98)) + abrasion_icon = BuildImage(20, 20, background=ICON_PATH / "abrasion_white.png") + await skin_bk.paste(abrasion_icon, (240, 133)) + await skin_bk.text((265, 135), "磨损:", (255, 255, 255), font_size=20) + await skin_bk.text((310, 135), skin.abrasion, (255, 255, 255)) + await skin_bk.text((228, 165), f"({rand})", (255, 255, 255)) + return skin_bk + + +async def generate_skin(skin: BuffSkin, update_count: int) -> BuildImage | None: + """构造皮肤图片 + + 参数: + skin (BuffSkin): BuffSkin + + 返回: + BuildImage | None: 图片 + """ + name = skin.name + "-" + skin.skin_name + "-" + skin.abrasion + file_path = BASE_PATH / cn2py(skin.case_name.split(",")[0]) / f"{cn2py(name)}.jpg" + if not file_path.exists(): + logger.warning(f"皮肤图片: {name} 不存在", "查看武器箱") + if skin.color == "CASE": + case_bk = BuildImage( + 700, 200, color=(25, 25, 25, 100), font_size=25, font="CJGaoDeGuo.otf" + ) + if file_path.exists(): + skin_img = BuildImage(200, 200, background=file_path) + await case_bk.paste(skin_img, (10, 10)) + await case_bk.line((250, 10, 250, 190)) + await case_bk.line((280, 160, 660, 160)) + name_icon = BuildImage(30, 30, background=ICON_PATH / "box_white.png") + await case_bk.paste(name_icon, (260, 25)) + await case_bk.text((295, 30), "名称:", (255, 255, 255)) + await case_bk.text((345, 30), skin.case_name, (255, 0, 38), font_size=30) + + type_icon = BuildImage(30, 30, background=ICON_PATH / "type_white.png") + await case_bk.paste(type_icon, (260, 70)) + await case_bk.text((295, 75), "类型:", (255, 255, 255)) + await case_bk.text((345, 75), "武器箱", (0, 157, 255), font_size=30) + + price_icon = BuildImage(30, 30, background=ICON_PATH / "price_white.png") + await case_bk.paste(price_icon, (260, 114)) + await case_bk.text((295, 120), "单价:", (255, 255, 255)) + await case_bk.text( + (340, 120), str(skin.sell_min_price), (0, 255, 98), font_size=30 + ) + + update_count_icon = BuildImage( + 40, 40, background=ICON_PATH / "reload_white.png" + ) + await case_bk.paste(update_count_icon, (575, 10)) + await case_bk.text((625, 12), str(update_count), (255, 255, 255), font_size=45) + + num_icon = BuildImage(30, 30, background=ICON_PATH / "num_white.png") + await case_bk.paste(num_icon, (455, 70)) + await case_bk.text((490, 75), "在售:", (255, 255, 255)) + await case_bk.text((535, 75), str(skin.sell_num), (144, 0, 255), font_size=30) + + want_buy_icon = BuildImage(30, 30, background=ICON_PATH / "want_buy_white.png") + await case_bk.paste(want_buy_icon, (455, 114)) + await case_bk.text((490, 120), "求购:", (255, 255, 255)) + await case_bk.text((535, 120), str(skin.buy_num), (144, 0, 255), font_size=30) + + await case_bk.text((275, 165), "更新时间", (255, 255, 255), font_size=22) + date = str( + skin.update_time.replace(microsecond=0).astimezone( + timezone(timedelta(hours=8)) + ) + ).split("+")[0] + await case_bk.text( + (350, 165), + date, + (255, 255, 255), + font_size=30, + ) + return case_bk + else: + skin_bk = BuildImage( + 235, 250, color=(25, 25, 25, 100), font_size=25, font="CJGaoDeGuo.otf" + ) + if file_path.exists(): + skin_image = BuildImage(205, 153, background=file_path) + await skin_bk.paste(skin_image, (10, 30)) + update_count_icon = BuildImage( + 35, 35, background=ICON_PATH / "reload_white.png" + ) + await skin_bk.line((10, 180, 220, 180)) + await skin_bk.text((10, 10), skin.name, (255, 255, 255)) + await skin_bk.paste(update_count_icon, (140, 10)) + await skin_bk.text((175, 15), str(update_count), (255, 255, 255)) + await skin_bk.text((10, 185), f"{skin.skin_name}", (255, 255, 255), "width") + await skin_bk.text((10, 218), "品质:", (255, 255, 255)) + await skin_bk.text( + (55, 218), COLOR2NAME[skin.color][:2], COLOR2COLOR[skin.color] + ) + await skin_bk.text((100, 218), "类型:", (255, 255, 255)) + await skin_bk.text((145, 218), skin.weapon_type, (255, 255, 255)) + return skin_bk diff --git a/zhenxun/plugins/open_cases/command.py b/zhenxun/plugins/open_cases/command.py new file mode 100644 index 00000000..a2f85c38 --- /dev/null +++ b/zhenxun/plugins/open_cases/command.py @@ -0,0 +1,75 @@ +from nonebot.permission import SUPERUSER +from nonebot_plugin_alconna import Alconna, Args, Option, on_alconna, store_true + +from zhenxun.utils.rules import ensure_group + +_open_matcher = on_alconna( + Alconna("开箱", Args["name?", str]), priority=5, block=True, rule=ensure_group +) + +_reload_matcher = on_alconna( + Alconna("重置开箱"), priority=5, block=True, permission=SUPERUSER, rule=ensure_group +) + +_my_open_matcher = on_alconna( + Alconna("我的开箱"), + aliases={"开箱统计", "开箱查询", "查询开箱"}, + priority=5, + block=True, + rule=ensure_group, +) + +_group_open_matcher = on_alconna( + Alconna("群开箱统计"), priority=5, block=True, rule=ensure_group +) + +_multiple_matcher = on_alconna( + Alconna("multiple-open", Args["num", int]["name?", str]), + priority=5, + block=True, + rule=ensure_group, +) + +_multiple_matcher.shortcut( + r"(?P\d)连开箱(?P.*?)", + command="multiple-open", + arguments=["{num}", "{name}"], + prefix=True, +) + +_update_matcher = on_alconna( + Alconna( + "更新武器箱", + Args["name?", str], + Option("-s", action=store_true, help_text="是否必定更新所属箱子"), + ), + aliases={"更新皮肤"}, + priority=1, + permission=SUPERUSER, + block=True, +) + +_update_image_matcher = on_alconna( + Alconna("更新武器箱图片", Args["name?", str]), + priority=1, + permission=SUPERUSER, + block=True, +) + +_show_case_matcher = on_alconna( + Alconna("查看武器箱", Args["name?", str]), priority=5, block=True +) + +_knifes_matcher = on_alconna( + Alconna("我的金色"), priority=5, block=True, rule=ensure_group +) + +_show_skin_matcher = on_alconna(Alconna("查看皮肤"), priority=5, block=True) + +_price_matcher = on_alconna( + Alconna( + "价格趋势", Args["name", str]["skin", str]["abrasion", str]["day?", int, 7] + ), + priority=5, + block=True, +) diff --git a/zhenxun/plugins/open_cases/config.py b/zhenxun/plugins/open_cases/config.py new file mode 100644 index 00000000..cefa7384 --- /dev/null +++ b/zhenxun/plugins/open_cases/config.py @@ -0,0 +1,253 @@ +import random +from enum import Enum + +from zhenxun.configs.path_config import IMAGE_PATH +from zhenxun.services.log import logger + +from .models.buff_skin import BuffSkin + +BLUE = 0.7981 +BLUE_ST = 0.0699 +PURPLE = 0.1626 +PURPLE_ST = 0.0164 +PINK = 0.0315 +PINK_ST = 0.0048 +RED = 0.0057 +RED_ST = 0.00021 +KNIFE = 0.0021 +KNIFE_ST = 0.000041 + +# 崭新 +FACTORY_NEW_S = 0 +FACTORY_NEW_E = 0.0699999 +# 略磨 +MINIMAL_WEAR_S = 0.07 +MINIMAL_WEAR_E = 0.14999 +# 久经 +FIELD_TESTED_S = 0.15 +FIELD_TESTED_E = 0.37999 +# 破损 +WELL_WORN_S = 0.38 +WELL_WORN_E = 0.44999 +# 战痕 +BATTLE_SCARED_S = 0.45 +BATTLE_SCARED_E = 0.99999 + + +class UpdateType(Enum): + """ + 更新类型 + """ + + CASE = "case" + WEAPON_TYPE = "weapon_type" + + +NAME2COLOR = { + "消费级": "WHITE", + "工业级": "LIGHTBLUE", + "军规级": "BLUE", + "受限": "PURPLE", + "保密": "PINK", + "隐秘": "RED", + "非凡": "KNIFE", +} + +COLOR2NAME = { + "WHITE": "消费级", + "LIGHTBLUE": "工业级", + "BLUE": "军规级", + "PURPLE": "受限", + "PINK": "保密", + "RED": "隐秘", + "KNIFE": "非凡", +} + +COLOR2COLOR = { + "WHITE": (255, 255, 255), + "LIGHTBLUE": (0, 179, 255), + "BLUE": (0, 85, 255), + "PURPLE": (149, 0, 255), + "PINK": (255, 0, 162), + "RED": (255, 34, 0), + "KNIFE": (255, 225, 0), +} + +ABRASION_SORT = ["崭新出厂", "略有磨损", "久经沙场", "破损不堪", "战横累累"] + +CASE_BACKGROUND = IMAGE_PATH / "csgo_cases" / "_background" / "shu" + +# 刀 +KNIFE2ID = { + "鲍伊猎刀": "weapon_knife_survival_bowie", + "蝴蝶刀": "weapon_knife_butterfly", + "弯刀": "weapon_knife_falchion", + "折叠刀": "weapon_knife_flip", + "穿肠刀": "weapon_knife_gut", + "猎杀者匕首": "weapon_knife_tactical", + "M9刺刀": "weapon_knife_m9_bayonet", + "刺刀": "weapon_bayonet", + "爪子刀": "weapon_knife_karambit", + "暗影双匕": "weapon_knife_push", + "短剑": "weapon_knife_stiletto", + "熊刀": "weapon_knife_ursus", + "折刀": "weapon_knife_gypsy_jackknife", + "锯齿爪刀": "weapon_knife_widowmaker", + "海豹短刀": "weapon_knife_css", + "系绳匕首": "weapon_knife_cord", + "求生匕首": "weapon_knife_canis", + "流浪者匕首": "weapon_knife_outdoor", + "骷髅匕首": "weapon_knife_skeleton", + "血猎手套": "weapon_bloodhound_gloves", + "驾驶手套": "weapon_driver_gloves", + "手部束带": "weapon_hand_wraps", + "摩托手套": "weapon_moto_gloves", + "专业手套": "weapon_specialist_gloves", + "运动手套": "weapon_sport_gloves", + "九头蛇手套": "weapon_hydra_gloves", + "狂牙手套": "weapon_brokenfang_gloves", +} + +WEAPON2ID = {} + +# 武器箱 +CASE2ID = { + "变革": "set_community_32", + "反冲": "set_community_31", + "梦魇": "set_community_30", + "激流": "set_community_29", + "蛇噬": "set_community_28", + "狂牙大行动": "set_community_27", + "裂空": "set_community_26", + "棱彩2号": "set_community_25", + "CS20": "set_community_24", + "裂网大行动": "set_community_23", + "棱彩": "set_community_22", + "头号特训": "set_community_21", + "地平线": "set_community_20", + "命悬一线": "set_community_19", + "光谱2号": "set_community_18", + "九头蛇大行动": "set_community_17", + "光谱": "set_community_16", + "手套": "set_community_15", + "伽玛2号": "set_gamma_2", + "伽玛": "set_community_13", + "幻彩3号": "set_community_12", + "野火大行动": "set_community_11", + "左轮": "set_community_10", + "暗影": "set_community_9", + "弯曲猎手": "set_community_8", + "幻彩2号": "set_community_7", + "幻彩": "set_community_6", + "先锋": "set_community_5", + "电竞2014夏季": "set_esports_iii", + "突围大行动": "set_community_4", + "猎杀者": "set_community_3", + "凤凰": "set_community_2", + "电竞2013冬季": "set_esports_ii", + "冬季攻势": "set_community_1", + "军火交易3号": "set_weapons_iii", + "英勇": "set_bravo_i", + "电竞2013": "set_esports", + "军火交易2号": "set_weapons_ii", + "军火交易": "set_weapons_i", +} + + +def get_wear(rand: float) -> str: + """判断磨损度 + + Args: + rand (float): 随机rand + + Returns: + str: 磨损名称 + """ + if rand <= FACTORY_NEW_E: + return "崭新出厂" + if MINIMAL_WEAR_S <= rand <= MINIMAL_WEAR_E: + return "略有磨损" + if FIELD_TESTED_S <= rand <= FIELD_TESTED_E: + return "久经沙场" + if WELL_WORN_S <= rand <= WELL_WORN_E: + return "破损不堪" + return "战痕累累" + + +def random_color_and_st(rand: float) -> tuple[str, bool]: + """获取皮肤品质及是否暗金 + + 参数: + rand (float): 随机rand + + 返回: + tuple[str, bool]: 品质,是否暗金 + """ + if rand <= KNIFE: + if random.random() <= KNIFE_ST: + return ("KNIFE", True) + return ("KNIFE", False) + elif KNIFE < rand <= RED: + if random.random() <= RED_ST: + return ("RED", True) + return ("RED", False) + elif RED < rand <= PINK: + if random.random() <= PINK_ST: + return ("PINK", True) + return ("PINK", False) + elif PINK < rand <= PURPLE: + if random.random() <= PURPLE_ST: + return ("PURPLE", True) + return ("PURPLE", False) + else: + if random.random() <= BLUE_ST: + return ("BLUE", True) + return ("BLUE", False) + + +async def random_skin(num: int, case_name: str) -> list[tuple[BuffSkin, float]]: + """ + 随机抽取皮肤 + """ + case_name = case_name.replace("武器箱", "").replace(" ", "") + color_map = {} + for _ in range(num): + rand = random.random() + # 尝试降低磨损 + if rand > MINIMAL_WEAR_E: + for _ in range(2): + if random.random() < 0.5: + logger.debug(f"[START]开箱随机磨损触发降低磨损条件: {rand}") + if random.random() < 0.2: + rand /= 3 + else: + rand /= 2 + logger.debug(f"[END]开箱随机磨损触发降低磨损条件: {rand}") + break + abrasion = get_wear(rand) + logger.debug(f"开箱随机磨损: {rand} | {abrasion}") + color, is_stattrak = random_color_and_st(rand) + if not color_map.get(color): + color_map[color] = {} + if is_stattrak: + if not color_map[color].get(f"{abrasion}_st"): + color_map[color][f"{abrasion}_st"] = [] + color_map[color][f"{abrasion}_st"].append(rand) + else: + if not color_map[color].get(abrasion): + color_map[color][f"{abrasion}"] = [] + color_map[color][f"{abrasion}"].append(rand) + skin_list = [] + for color in color_map: + for abrasion in color_map[color]: + rand_list = color_map[color][abrasion] + is_stattrak = "_st" in abrasion + abrasion = abrasion.replace("_st", "") + skin_list_ = await BuffSkin.random_skin( + len(rand_list), color, abrasion, is_stattrak, case_name + ) + skin_list += [(skin, rand) for skin, rand in zip(skin_list_, rand_list)] + return skin_list + + +# M249(StatTrak™) | 等高线 diff --git a/zhenxun/plugins/open_cases/models/__init__.py b/zhenxun/plugins/open_cases/models/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/zhenxun/plugins/open_cases/models/buff_prices.py b/zhenxun/plugins/open_cases/models/buff_prices.py new file mode 100644 index 00000000..9f53de0e --- /dev/null +++ b/zhenxun/plugins/open_cases/models/buff_prices.py @@ -0,0 +1,22 @@ +from tortoise import fields + +from zhenxun.services.db_context import Model + +# 1.狂牙武器箱 + + +class BuffPrice(Model): + + id = fields.IntField(pk=True, generated=True, auto_increment=True) + """自增id""" + case_id = fields.IntField() + """箱子id""" + skin_name = fields.CharField(255, unique=True) + """皮肤名称""" + skin_price = fields.FloatField() + """皮肤价格""" + update_date = fields.DatetimeField() + + class Meta: + table = "buff_prices" + table_description = "Buff价格数据表" diff --git a/zhenxun/plugins/open_cases/models/buff_skin.py b/zhenxun/plugins/open_cases/models/buff_skin.py new file mode 100644 index 00000000..7f51221a --- /dev/null +++ b/zhenxun/plugins/open_cases/models/buff_skin.py @@ -0,0 +1,113 @@ +from tortoise import fields +from tortoise.contrib.postgres.functions import Random + +from zhenxun.services.db_context import Model + + +class BuffSkin(Model): + + id = fields.IntField(pk=True, generated=True, auto_increment=True) + """自增id""" + case_name: str = fields.CharField(255) # type: ignore + """箱子名称""" + name: str = fields.CharField(255) # type: ignore + """武器/手套/刀名称""" + skin_name: str = fields.CharField(255) # type: ignore + """皮肤名称""" + is_stattrak = fields.BooleanField(default=False) + """是否暗金(计数)""" + abrasion = fields.CharField(255) + """磨损度""" + color = fields.CharField(255) + """颜色(品质)""" + skin_id = fields.CharField(255, null=True, unique=True) + """皮肤id""" + + img_url = fields.CharField(255) + """图片url""" + steam_price = fields.FloatField(default=0) + """steam价格""" + weapon_type = fields.CharField(255) + """枪械类型""" + buy_max_price = fields.FloatField(default=0) + """最大求购价格""" + buy_num = fields.IntField(default=0) + """求购数量""" + sell_min_price = fields.FloatField(default=0) + """售卖最低价格""" + sell_num = fields.IntField(default=0) + """出售个数""" + sell_reference_price = fields.FloatField(default=0) + """参考价格""" + + create_time = fields.DatetimeField(auto_add_now=True) + """创建日期""" + update_time = fields.DatetimeField(auto_add=True) + """更新日期""" + + class Meta: + table = "buff_skin" + table_description = "Buff皮肤数据表" + # unique_together = ("case_name", "name", "skin_name", "abrasion", "is_stattrak") + + def __eq__(self, other: "BuffSkin"): + + return self.skin_id == other.skin_id + + def __hash__(self): + + return hash(self.case_name + self.name + self.skin_name + str(self.is_stattrak)) + + @classmethod + async def random_skin( + cls, + num: int, + color: str, + abrasion: str, + is_stattrak: bool = False, + case_name: str | None = None, + ) -> list["BuffSkin"]: # type: ignore + """随机皮肤 + + 参数: + num: 数量 + color: 品质 + abrasion: 磨损度 + is_stattrak: 是否暗金 + case_name: 箱子名称 + + 返回: + list["BuffSkin"]: 皮肤列表 + """ + query = cls + if case_name: + query = query.filter(case_name__contains=case_name) + query = query.filter(abrasion=abrasion, is_stattrak=is_stattrak, color=color) + skin_list = await query.annotate(rand=Random()).limit(num) # type:ignore + num_ = num + cnt = 0 + while len(skin_list) < num: + cnt += 1 + num_ = num - len(skin_list) + skin_list += await query.annotate(rand=Random()).limit(num_) + if cnt > 10: + break + return skin_list # type: ignore + + @classmethod + async def _run_script(cls): + return [ + "ALTER TABLE buff_skin ADD img_url varchar(255);", # 新增img_url + "ALTER TABLE buff_skin ADD skin_id varchar(255);", # 新增skin_id + "ALTER TABLE buff_skin ADD steam_price float DEFAULT 0;", # 新增steam_price + "ALTER TABLE buff_skin ADD weapon_type varchar(255);", # 新增type + "ALTER TABLE buff_skin ADD buy_max_price float DEFAULT 0;", # 新增buy_max_price + "ALTER TABLE buff_skin ADD buy_num Integer DEFAULT 0;", # 新增buy_max_price + "ALTER TABLE buff_skin ADD sell_min_price float DEFAULT 0;", # 新增sell_min_price + "ALTER TABLE buff_skin ADD sell_num Integer DEFAULT 0;", # 新增sell_num + "ALTER TABLE buff_skin ADD sell_reference_price float DEFAULT 0;", # 新增sell_reference_price + "ALTER TABLE buff_skin DROP COLUMN skin_price;", # 删除skin_price + "alter table buff_skin drop constraint if EXISTS uid_buff_skin_case_na_c35c93;", # 删除唯一约束 + "UPDATE buff_skin set case_name='手套' where case_name='手套武器箱'", + "UPDATE buff_skin set case_name='左轮' where case_name='左轮武器箱'", + ] diff --git a/zhenxun/plugins/open_cases/models/buff_skin_log.py b/zhenxun/plugins/open_cases/models/buff_skin_log.py new file mode 100644 index 00000000..ac9fec95 --- /dev/null +++ b/zhenxun/plugins/open_cases/models/buff_skin_log.py @@ -0,0 +1,50 @@ +from tortoise import fields + +from zhenxun.services.db_context import Model + + +class BuffSkinLog(Model): + + id = fields.IntField(pk=True, generated=True, auto_increment=True) + """自增id""" + case_name = fields.CharField(255) + """箱子名称""" + name = fields.CharField(255) + """武器/手套/刀名称""" + skin_name = fields.CharField(255) + """皮肤名称""" + is_stattrak = fields.BooleanField(default=False) + """是否暗金(计数)""" + abrasion = fields.CharField(255) + """磨损度""" + color = fields.CharField(255) + """颜色(品质)""" + + steam_price = fields.FloatField(default=0) + """steam价格""" + weapon_type = fields.CharField(255) + """枪械类型""" + buy_max_price = fields.FloatField(default=0) + """最大求购价格""" + buy_num = fields.IntField(default=0) + """求购数量""" + sell_min_price = fields.FloatField(default=0) + """售卖最低价格""" + sell_num = fields.IntField(default=0) + """出售个数""" + sell_reference_price = fields.FloatField(default=0) + """参考价格""" + + create_time = fields.DatetimeField(auto_add_now=True) + """创建日期""" + + class Meta: + table = "buff_skin_log" + table_description = "Buff皮肤更新日志表" + + @classmethod + async def _run_script(cls): + return [ + "UPDATE buff_skin_log set case_name='手套' where case_name='手套武器箱'", + "UPDATE buff_skin_log set case_name='左轮' where case_name='左轮武器箱'", + ] diff --git a/zhenxun/plugins/open_cases/models/open_cases_log.py b/zhenxun/plugins/open_cases/models/open_cases_log.py new file mode 100644 index 00000000..0c4f87bb --- /dev/null +++ b/zhenxun/plugins/open_cases/models/open_cases_log.py @@ -0,0 +1,44 @@ +from tortoise import fields +from tortoise.contrib.postgres.functions import Random + +from zhenxun.services.db_context import Model + + +class OpenCasesLog(Model): + + id = fields.IntField(pk=True, generated=True, auto_increment=True) + """自增id""" + user_id = fields.CharField(255) + """用户id""" + group_id = fields.CharField(255) + """群聊id""" + case_name = fields.CharField(255) + """箱子名称""" + name = fields.CharField(255) + """武器/手套/刀名称""" + skin_name = fields.CharField(255) + """皮肤名称""" + is_stattrak = fields.BooleanField(default=False) + """是否暗金(计数)""" + abrasion = fields.CharField(255) + """磨损度""" + abrasion_value = fields.FloatField() + """磨损数值""" + color = fields.CharField(255) + """颜色(品质)""" + price = fields.FloatField(default=0) + """价格""" + create_time = fields.DatetimeField(auto_add_now=True) + """创建日期""" + + class Meta: + table = "open_cases_log" + table_description = "开箱日志表" + + @classmethod + async def _run_script(cls): + return [ + "ALTER TABLE open_cases_log RENAME COLUMN user_qq TO user_id;", # 将user_qq改为user_id + "ALTER TABLE open_cases_log ALTER COLUMN user_id TYPE character varying(255);", + "ALTER TABLE open_cases_log ALTER COLUMN group_id TYPE character varying(255);", + ] diff --git a/zhenxun/plugins/open_cases/models/open_cases_user.py b/zhenxun/plugins/open_cases/models/open_cases_user.py new file mode 100644 index 00000000..3ed43937 --- /dev/null +++ b/zhenxun/plugins/open_cases/models/open_cases_user.py @@ -0,0 +1,60 @@ +from tortoise import fields + +from zhenxun.services.db_context import Model + + +class OpenCasesUser(Model): + + id = fields.IntField(pk=True, generated=True, auto_increment=True) + """自增id""" + user_id = fields.CharField(255) + """用户id""" + group_id = fields.CharField(255) + """群聊id""" + total_count = fields.IntField(default=0) + """总开启次数""" + blue_count = fields.IntField(default=0) + """蓝色""" + blue_st_count = fields.IntField(default=0) + """蓝色暗金""" + purple_count = fields.IntField(default=0) + """紫色""" + purple_st_count = fields.IntField(default=0) + """紫色暗金""" + pink_count = fields.IntField(default=0) + """粉色""" + pink_st_count = fields.IntField(default=0) + """粉色暗金""" + red_count = fields.IntField(default=0) + """紫色""" + red_st_count = fields.IntField(default=0) + """紫色暗金""" + knife_count = fields.IntField(default=0) + """金色""" + knife_st_count = fields.IntField(default=0) + """金色暗金""" + spend_money = fields.IntField(default=0) + """花费金币""" + make_money = fields.FloatField(default=0) + """赚取金币""" + today_open_total = fields.IntField(default=0) + """今日开箱数量""" + open_cases_time_last = fields.DatetimeField() + """最后开箱日期""" + knifes_name = fields.TextField(default="") + """已获取金色""" + + class Meta: + table = "open_cases_users" + table_description = "开箱统计数据表" + unique_together = ("user_id", "group_id") + + @classmethod + async def _run_script(cls): + return [ + "alter table open_cases_users alter COLUMN make_money type float;", # 将make_money字段改为float + "alter table open_cases_users alter COLUMN spend_money type float;", # 将spend_money字段改为float + "ALTER TABLE open_cases_users RENAME COLUMN user_qq TO user_id;", # 将user_qq改为user_id + "ALTER TABLE open_cases_users ALTER COLUMN user_id TYPE character varying(255);", + "ALTER TABLE open_cases_users ALTER COLUMN group_id TYPE character varying(255);", + ] diff --git a/zhenxun/plugins/open_cases/open_cases_c.py b/zhenxun/plugins/open_cases/open_cases_c.py new file mode 100644 index 00000000..8cdd5b32 --- /dev/null +++ b/zhenxun/plugins/open_cases/open_cases_c.py @@ -0,0 +1,501 @@ +import asyncio +import random +import re +from datetime import datetime + +from nonebot_plugin_saa import Image, MessageFactory, Text +from nonebot_plugin_session import EventSession + +from zhenxun.configs.config import Config +from zhenxun.configs.path_config import IMAGE_PATH +from zhenxun.models.sign_user import SignUser +from zhenxun.services.log import logger +from zhenxun.utils.image_utils import BuildImage +from zhenxun.utils.utils import cn2py + +from .build_image import draw_card +from .config import * +from .models.open_cases_log import OpenCasesLog +from .models.open_cases_user import OpenCasesUser +from .utils import CaseManager, update_skin_data + +RESULT_MESSAGE = { + "BLUE": ["这样看着才舒服", "是自己人,大伙把刀收好", "非常舒适~"], + "PURPLE": ["还行吧,勉强接受一下下", "居然不是蓝色,太假了", "运气-1-1-1-1-1..."], + "PINK": ["开始不适....", "你妈妈买菜必涨价!涨三倍!", "你最近不适合出门,真的"], + "RED": [ + "已经非常不适", + "好兄弟你开的什么箱子啊,一般箱子不是只有蓝色的吗", + "开始拿阳寿开箱子了?", + ], + "KNIFE": [ + "你的好运我收到了,你可以去喂鲨鱼了", + "最近该吃啥就迟点啥吧,哎,好好的一个人怎么就....哎", + "众所周知,欧皇寿命极短.", + ], +} + +COLOR2NAME = { + "BLUE": "军规", + "PURPLE": "受限", + "PINK": "保密", + "RED": "隐秘", + "KNIFE": "罕见", +} + +COLOR2CN = {"BLUE": "蓝", "PURPLE": "紫", "PINK": "粉", "RED": "红", "KNIFE": "金"} + + +def add_count(user: OpenCasesUser, skin: BuffSkin, case_price: float): + if skin.color == "BLUE": + if skin.is_stattrak: + user.blue_st_count += 1 + else: + user.blue_count += 1 + elif skin.color == "PURPLE": + if skin.is_stattrak: + user.purple_st_count += 1 + else: + user.purple_count += 1 + elif skin.color == "PINK": + if skin.is_stattrak: + user.pink_st_count += 1 + else: + user.pink_count += 1 + elif skin.color == "RED": + if skin.is_stattrak: + user.red_st_count += 1 + else: + user.red_count += 1 + elif skin.color == "KNIFE": + if skin.is_stattrak: + user.knife_st_count += 1 + else: + user.knife_count += 1 + user.make_money += skin.sell_min_price + user.spend_money += int(17 + case_price) + + +async def get_user_max_count(user_id: str) -> int: + """获取用户每日最大开箱次数 + + 参数: + user_id: 用户id + + 返回: + int: 最大开箱次数 + """ + user, _ = await SignUser.get_or_create(user_id=user_id) + impression = int(user.impression) + initial_open_case_count = Config.get_config("open_cases", "INITIAL_OPEN_CASE_COUNT") + each_impression_add_count = Config.get_config( + "open_cases", "EACH_IMPRESSION_ADD_COUNT" + ) + return int(initial_open_case_count + impression / each_impression_add_count) # type: ignore + + +async def open_case( + user_id: str, group_id: str, case_name: str | None, session: EventSession +) -> MessageFactory: + """开箱 + + 参数: + user_id: 用户id + group_id : 群号 + case_name: 武器箱名称. Defaults to "狂牙大行动". + session: EventSession + + 返回: + Union[str, Message]: 回复消息 + """ + user_id = str(user_id) + group_id = str(group_id) + if not CaseManager.CURRENT_CASES: + return MessageFactory([Text("未收录任何武器箱")]) + if not case_name: + case_name = random.choice(CaseManager.CURRENT_CASES) # type: ignore + if case_name not in CaseManager.CURRENT_CASES: + return "武器箱未收录, 当前可用武器箱:\n" + ", ".join(CaseManager.CURRENT_CASES) # type: ignore + logger.debug( + f"尝试开启武器箱: {case_name}", "开箱", session=user_id, group_id=group_id + ) + case = cn2py(case_name) # type: ignore + user = await OpenCasesUser.get_or_none(user_id=user_id, group_id=group_id) + if not user: + user = await OpenCasesUser.create( + user_id=user_id, group_id=group_id, open_cases_time_last=datetime.now() + ) + max_count = await get_user_max_count(user_id) + # 一天次数上限 + if user.today_open_total >= max_count: + return MessageFactory( + [ + Text( + f"今天已达开箱上限了喔,明天再来吧\n(提升好感度可以增加每日开箱数 #疯狂暗示)" + ) + ] + ) + skin_list = await random_skin(1, case_name) # type: ignore + if not skin_list: + return MessageFactory(Text("未抽取到任何皮肤")) + skin, rand = skin_list[0] + rand = str(rand)[:11] + case_price = 0 + if case_skin := await BuffSkin.get_or_none(case_name=case_name, color="CASE"): + case_price = case_skin.sell_min_price + user.today_open_total += 1 + user.total_count += 1 + user.open_cases_time_last = datetime.now() + await user.save( + update_fields=["today_open_total", "total_count", "open_cases_time_last"] + ) + add_count(user, skin, case_price) + ridicule_result = random.choice(RESULT_MESSAGE[skin.color]) + price_result = skin.sell_min_price + name = skin.name + "-" + skin.skin_name + "-" + skin.abrasion + img_path = IMAGE_PATH / "csgo_cases" / case / f"{cn2py(name)}.jpg" + logger.info( + f"开启{case_name}武器箱获得 {skin.name}{'(StatTrak™)' if skin.is_stattrak else ''} | {skin.skin_name} ({skin.abrasion}) 磨损: [{rand}] 价格: {skin.sell_min_price}", + "开箱", + session=session, + ) + await user.save() + await OpenCasesLog.create( + user_id=user_id, + group_id=group_id, + case_name=case_name, + name=skin.name, + skin_name=skin.skin_name, + is_stattrak=skin.is_stattrak, + abrasion=skin.abrasion, + color=skin.color, + price=skin.sell_min_price, + abrasion_value=rand, + create_time=datetime.now(), + ) + logger.debug(f"添加 1 条开箱日志", "开箱", session=session) + over_count = max_count - user.today_open_total + img = await draw_card(skin, rand) + return MessageFactory( + [ + Text(f"开启{case_name}武器箱.\n剩余开箱次数:{over_count}.\n"), + Image(img.pic2bytes()), + Text( + f"\n箱子单价:{case_price}\n花费:{17 + case_price:.2f}\n:{ridicule_result}" + ), + ] + ) + + +async def open_multiple_case( + user_id: str, + group_id: str, + case_name: str | None, + num: int = 10, + session: EventSession | None = None, +) -> MessageFactory: + """多连开箱 + + 参数: + user_id (int): 用户id + group_id (int): 群号 + case_name (str): 箱子名称 + num (int, optional): 数量. Defaults to 10. + session: EventSession + + 返回: + _type_: _description_ + """ + user_id = str(user_id) + group_id = str(group_id) + if not CaseManager.CURRENT_CASES: + return MessageFactory([Text("未收录任何武器箱")]) + if not case_name: + case_name = random.choice(CaseManager.CURRENT_CASES) # type: ignore + if case_name not in CaseManager.CURRENT_CASES: + return MessageFactory( + [ + Text( + "武器箱未收录, 当前可用武器箱:\n" + + ", ".join(CaseManager.CURRENT_CASES) + ) + ] + ) + user, _ = await OpenCasesUser.get_or_create( + user_id=user_id, + group_id=group_id, + defaults={"open_cases_time_last": datetime.now()}, + ) + max_count = await get_user_max_count(user_id) + if user.today_open_total >= max_count: + return MessageFactory( + [ + Text( + f"今天已达开箱上限了喔,明天再来吧\n(提升好感度可以增加每日开箱数 #疯狂暗示)" + ) + ] + ) + if max_count - user.today_open_total < num: + return MessageFactory( + [ + Text( + f"今天开箱次数不足{num}次噢,请单抽试试看(也许单抽运气更好?)" + f"\n剩余开箱次数:{max_count - user.today_open_total}" + ) + ] + ) + logger.debug(f"尝试开启武器箱: {case_name}", "开箱", session=session) + case = cn2py(case_name) # type: ignore + skin_count = {} + img_list = [] + skin_list = await random_skin(num, case_name) # type: ignore + if not skin_list: + return MessageFactory([Text("未抽取到任何皮肤...")]) + total_price = 0 + log_list = [] + now = datetime.now() + user.today_open_total += num + user.total_count += num + user.open_cases_time_last = datetime.now() + await user.save( + update_fields=["today_open_total", "total_count", "open_cases_time_last"] + ) + case_price = 0 + if case_skin := await BuffSkin.get_or_none(case_name=case_name, color="CASE"): + case_price = case_skin.sell_min_price + img_w, img_h = 0, 0 + for skin, rand in skin_list: + img = await draw_card(skin, str(rand)[:11]) + img_w, img_h = img.size + total_price += skin.sell_min_price + color_name = COLOR2CN[skin.color] + if not skin_count.get(color_name): + skin_count[color_name] = 0 + skin_count[color_name] += 1 + add_count(user, skin, case_price) + img_list.append(img) + logger.info( + f"开启{case_name}武器箱获得 {skin.name}{'(StatTrak™)' if skin.is_stattrak else ''} | {skin.skin_name} ({skin.abrasion}) 磨损: [{rand:.11f}] 价格: {skin.sell_min_price}", + "开箱", + session=session, + ) + log_list.append( + OpenCasesLog( + user_id=user_id, + group_id=group_id, + case_name=case_name, + name=skin.name, + skin_name=skin.skin_name, + is_stattrak=skin.is_stattrak, + abrasion=skin.abrasion, + color=skin.color, + price=skin.sell_min_price, + abrasion_value=rand, + create_time=now, + ) + ) + await user.save() + if log_list: + await OpenCasesLog.bulk_create(log_list, 10) + logger.debug(f"添加 {len(log_list)} 条开箱日志", "开箱", session=session) + img_w += 10 + img_h += 10 + w = img_w * 5 + if num < 5: + h = img_h - 10 + w = img_w * num + elif not num % 5: + h = img_h * int(num / 5) + else: + h = img_h * int(num / 5) + img_h + mark_image = BuildImage(w - 10, h - 10, color=(255, 255, 255)) + mark_image = await mark_image.auto_paste(img_list, 5, padding=20) + over_count = max_count - user.today_open_total + result = "" + for color_name in skin_count: + result += f"[{color_name}:{skin_count[color_name]}] " + return MessageFactory( + [ + Text(f"开启{case_name}武器箱\n剩余开箱次数:{over_count}\n"), + Image(mark_image.pic2bytes()), + Text( + f"\nresult[:-1]\n箱子单价:{case_price}\n总获取金额:{total_price:.2f}\n总花费:{(17 + case_price) * num:.2f}" + ), + ] + ) + + +async def total_open_statistics(user_id: str, group_id: str) -> str: + user, _ = await OpenCasesUser.get_or_create(user_id=user_id, group_id=group_id) + return ( + f"开箱总数:{user.total_count}\n" + f"今日开箱:{user.today_open_total}\n" + f"蓝色军规:{user.blue_count}\n" + f"蓝色暗金:{user.blue_st_count}\n" + f"紫色受限:{user.purple_count}\n" + f"紫色暗金:{user.purple_st_count}\n" + f"粉色保密:{user.pink_count}\n" + f"粉色暗金:{user.pink_st_count}\n" + f"红色隐秘:{user.red_count}\n" + f"红色暗金:{user.red_st_count}\n" + f"金色罕见:{user.knife_count}\n" + f"金色暗金:{user.knife_st_count}\n" + f"花费金额:{user.spend_money}\n" + f"获取金额:{user.make_money:.2f}\n" + f"最后开箱日期:{user.open_cases_time_last.date()}" + ) + + +async def group_statistics(group_id: str): + user_list = await OpenCasesUser.filter(group_id=str(group_id)).all() + # lan zi fen hong jin pricei + uplist = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0.0, 0, 0] + for user in user_list: + uplist[0] += user.blue_count + uplist[1] += user.blue_st_count + uplist[2] += user.purple_count + uplist[3] += user.purple_st_count + uplist[4] += user.pink_count + uplist[5] += user.pink_st_count + uplist[6] += user.red_count + uplist[7] += user.red_st_count + uplist[8] += user.knife_count + uplist[9] += user.knife_st_count + uplist[10] += user.make_money + uplist[11] += user.total_count + uplist[12] += user.today_open_total + return ( + f"群开箱总数:{uplist[11]}\n" + f"群今日开箱:{uplist[12]}\n" + f"蓝色军规:{uplist[0]}\n" + f"蓝色暗金:{uplist[1]}\n" + f"紫色受限:{uplist[2]}\n" + f"紫色暗金:{uplist[3]}\n" + f"粉色保密:{uplist[4]}\n" + f"粉色暗金:{uplist[5]}\n" + f"红色隐秘:{uplist[6]}\n" + f"红色暗金:{uplist[7]}\n" + f"金色罕见:{uplist[8]}\n" + f"金色暗金:{uplist[9]}\n" + f"花费金额:{uplist[11] * 17}\n" + f"获取金额:{uplist[10]:.2f}" + ) + + +async def get_my_knifes(user_id: str, group_id: str) -> MessageFactory: + """获取我的金色 + + 参数: + user_id (str): 用户id + group_id (str): 群号 + + 返回: + MessageFactory: 回复消息或图片 + """ + data_list = await get_old_knife(str(user_id), str(group_id)) + data_list += await OpenCasesLog.filter( + user_id=user_id, group_id=group_id, color="KNIFE" + ).all() + if not data_list: + return MessageFactory([Text("您木有开出金色级别的皮肤喔...")]) + length = len(data_list) + if length < 5: + h = 600 + w = length * 540 + elif length % 5 == 0: + h = 600 * int(length / 5) + w = 540 * 5 + else: + h = 600 * int(length / 5) + 600 + w = 540 * 5 + A = BuildImage(w, h) + image_list = [] + for skin in data_list: + name = skin.name + "-" + skin.skin_name + "-" + skin.abrasion + img_path = ( + IMAGE_PATH / "csgo_cases" / cn2py(skin.case_name) / f"{cn2py(name)}.jpg" + ) + knife_img = BuildImage(470, 600, font_size=20) + await knife_img.paste( + BuildImage(470, 470, background=img_path if img_path.exists() else None), + (0, 0), + ) + await knife_img.text( + (5, 500), f"\t{skin.name}|{skin.skin_name}({skin.abrasion})" + ) + await knife_img.text((5, 530), f"\t磨损:{skin.abrasion_value}") + await knife_img.text((5, 560), f"\t价格:{skin.price}") + image_list.append(knife_img) + A = await A.auto_paste(image_list, 5) + return MessageFactory([Image(A.pic2bytes())]) + + +async def get_old_knife(user_id: str, group_id: str) -> list[OpenCasesLog]: + """获取旧数据字段 + + 参数: + user_id (str): 用户id + group_id (str): 群号 + + 返回: + list[OpenCasesLog]: 旧数据兼容 + """ + user, _ = await OpenCasesUser.get_or_create(user_id=user_id, group_id=group_id) + knifes_name = user.knifes_name + data_list = [] + if knifes_name: + knifes_list = knifes_name[:-1].split(",") + for knife in knifes_list: + try: + if r := re.search( + "(.*)\|\|(.*) \| (.*)\((.*)\) 磨损:(.*), 价格:(.*)", knife + ): + case_name_py = r.group(1) + name = r.group(2) + skin_name = r.group(3) + abrasion = r.group(4) + abrasion_value = r.group(5) + price = r.group(6) + name = name.replace("(StatTrak™)", "") + data_list.append( + OpenCasesLog( + user_id=user_id, + group_id=group_id, + name=name.strip(), + case_name=case_name_py.strip(), + skin_name=skin_name.strip(), + abrasion=abrasion.strip(), + abrasion_value=abrasion_value, + price=price, + ) + ) + except Exception as e: + logger.error( + f"获取兼容旧数据错误: {knife}", + "我的金色", + session=user_id, + group_id=group_id, + e=e, + ) + return data_list + + +async def auto_update(): + """自动更新武器箱""" + if case_list := Config.get_config("open_cases", "DAILY_UPDATE"): + logger.debug("尝试自动更新武器箱", "更新武器箱") + if "ALL" in case_list: + case_list = CASE2ID.keys() + logger.debug(f"预计自动更新武器箱 {len(case_list)} 个", "更新武器箱") + for case_name in case_list: + logger.debug(f"开始自动更新武器箱: {case_name}", "更新武器箱") + try: + await update_skin_data(case_name) + rand = random.randint(300, 500) + logger.info( + f"成功自动更新武器箱: {case_name}, 将在 {rand} 秒后再次更新下一武器箱", + "更新武器箱", + ) + await asyncio.sleep(rand) + except Exception as e: + logger.error(f"自动更新武器箱: {case_name}", e=e) diff --git a/zhenxun/plugins/open_cases/utils.py b/zhenxun/plugins/open_cases/utils.py new file mode 100644 index 00000000..212ef69e --- /dev/null +++ b/zhenxun/plugins/open_cases/utils.py @@ -0,0 +1,656 @@ +import asyncio +import os +import random +import re +import time +from datetime import datetime, timedelta + +import nonebot +from tortoise.functions import Count + +from zhenxun.configs.config import Config +from zhenxun.configs.path_config import IMAGE_PATH +from zhenxun.services.log import logger +from zhenxun.utils.http_utils import AsyncHttpx +from zhenxun.utils.image_utils import BuildImage, BuildMat, MatType + +from .build_image import generate_skin +from .config import ( + CASE2ID, + CASE_BACKGROUND, + COLOR2NAME, + KNIFE2ID, + NAME2COLOR, + UpdateType, +) +from .models.buff_skin import BuffSkin +from .models.buff_skin_log import BuffSkinLog +from .models.open_cases_user import OpenCasesUser + +# from zhenxun.utils.utils import broadcast_group, cn2py + + +URL = "https://buff.163.com/api/market/goods" + +SELL_URL = "https://buff.163.com/goods" + + +driver = nonebot.get_driver() + +BASE_PATH = IMAGE_PATH / "csgo_cases" + + +class CaseManager: + + CURRENT_CASES = [] + + @classmethod + async def reload(cls): + cls.CURRENT_CASES = [] + case_list = await BuffSkin.filter(color="CASE").values_list( + "case_name", flat=True + ) + for case_name in ( + await BuffSkin.filter(case_name__not="未知武器箱") + .annotate() + .distinct() + .values_list("case_name", flat=True) + ): + for name in case_name.split(","): # type: ignore + if name not in cls.CURRENT_CASES and name in case_list: + cls.CURRENT_CASES.append(name) + + +async def update_skin_data(name: str, is_update_case_name: bool = False) -> str: + """更新箱子内皮肤数据 + + 参数: + name (str): 箱子名称 + is_update_case_name (bool): 是否必定更新所属箱子 + + 返回: + str: 回复内容 + """ + type_ = None + if name in CASE2ID: + type_ = UpdateType.CASE + if name in KNIFE2ID: + type_ = UpdateType.WEAPON_TYPE + if not type_: + return "未在指定武器箱或指定武器类型内" + session = Config.get_config("open_cases", "COOKIE") + if not session: + return "BUFF COOKIE为空捏!" + weapon2case = {} + if type_ == UpdateType.WEAPON_TYPE: + db_data = await BuffSkin.filter(name__contains=name).all() + weapon2case = { + item.name + item.skin_name: item.case_name + for item in db_data + if item.case_name != "未知武器箱" + } + data_list, total = await search_skin_page(name, 1, type_) + if isinstance(data_list, str): + return data_list + for page in range(2, total + 1): + rand_time = random.randint(20, 50) + logger.debug(f"访问随机等待时间: {rand_time}", "开箱更新") + await asyncio.sleep(rand_time) + data_list_, total = await search_skin_page(name, page, type_) + if isinstance(data_list_, list): + data_list += data_list_ + create_list: list[BuffSkin] = [] + update_list: list[BuffSkin] = [] + log_list = [] + now = datetime.now() + exists_id_list = [] + new_weapon2case = {} + for skin in data_list: + if skin.skin_id in exists_id_list: + continue + if skin.case_name: + skin.case_name = ( + skin.case_name.replace("”", "") + .replace("“", "") + .replace("武器箱", "") + .replace(" ", "") + ) + skin.name = skin.name.replace("(★ StatTrak™)", "").replace("(★)", "") + exists_id_list.append(skin.skin_id) + key = skin.name + skin.skin_name + name_ = skin.name + skin.skin_name + skin.abrasion + skin.create_time = now + skin.update_time = now + if UpdateType.WEAPON_TYPE and not skin.case_name: + if is_update_case_name: + case_name = new_weapon2case.get(key) + else: + case_name = weapon2case.get(key) + if not case_name: + if case_list := await get_skin_case(skin.skin_id): + case_name = ",".join(case_list) + rand = random.randint(10, 20) + logger.debug( + f"获取 {skin.name} | {skin.skin_name} 皮肤所属武器箱: {case_name}, 访问随机等待时间: {rand}", + "开箱更新", + ) + await asyncio.sleep(rand) + if not case_name: + case_name = "未知武器箱" + else: + weapon2case[key] = case_name + new_weapon2case[key] = case_name + if skin.case_name == "反恐精英20周年": + skin.case_name = "CS20" + skin.case_name = case_name + if await BuffSkin.exists(skin_id=skin.skin_id): + update_list.append(skin) + else: + create_list.append(skin) + log_list.append( + BuffSkinLog( + name=skin.name, + case_name=skin.case_name, + skin_name=skin.skin_name, + is_stattrak=skin.is_stattrak, + abrasion=skin.abrasion, + color=skin.color, + steam_price=skin.steam_price, + weapon_type=skin.weapon_type, + buy_max_price=skin.buy_max_price, + buy_num=skin.buy_num, + sell_min_price=skin.sell_min_price, + sell_num=skin.sell_num, + sell_reference_price=skin.sell_reference_price, + create_time=now, + ) + ) + name_ = skin.name + "-" + skin.skin_name + "-" + skin.abrasion + for c_name_ in skin.case_name.split(","): + file_path = BASE_PATH / cn2py(c_name_) / f"{cn2py(name_)}.jpg" + if not file_path.exists(): + logger.debug(f"下载皮肤 {name} 图片: {skin.img_url}...", "开箱更新") + await AsyncHttpx.download_file(skin.img_url, file_path) + rand_time = random.randint(1, 10) + await asyncio.sleep(rand_time) + logger.debug(f"图片下载随机等待时间: {rand_time}", "开箱更新") + else: + logger.debug(f"皮肤 {name_} 图片已存在...", "开箱更新") + if create_list: + logger.debug( + f"更新武器箱/皮肤: [{name}], 创建 {len(create_list)} 个皮肤!" + ) + await BuffSkin.bulk_create(set(create_list), 10) + if update_list: + abrasion_list = [] + name_list = [] + skin_name_list = [] + for skin in update_list: + if skin.abrasion not in abrasion_list: + abrasion_list.append(skin.abrasion) + if skin.name not in name_list: + name_list.append(skin.name) + if skin.skin_name not in skin_name_list: + skin_name_list.append(skin.skin_name) + db_data = await BuffSkin.filter( + case_name__contains=name, + skin_name__in=skin_name_list, + name__in=name_list, + abrasion__in=abrasion_list, + ).all() + _update_list = [] + for data in db_data: + for skin in update_list: + if ( + data.name == skin.name + and data.skin_name == skin.skin_name + and data.abrasion == skin.abrasion + ): + data.steam_price = skin.steam_price + data.buy_max_price = skin.buy_max_price + data.buy_num = skin.buy_num + data.sell_min_price = skin.sell_min_price + data.sell_num = skin.sell_num + data.sell_reference_price = skin.sell_reference_price + data.update_time = skin.update_time + _update_list.append(data) + logger.debug( + f"更新武器箱/皮肤: [{name}], 更新 {len(create_list)} 个皮肤!" + ) + await BuffSkin.bulk_update( + _update_list, + [ + "steam_price", + "buy_max_price", + "buy_num", + "sell_min_price", + "sell_num", + "sell_reference_price", + "update_time", + ], + 10, + ) + if log_list: + logger.debug( + f"更新武器箱/皮肤: [{name}], 新增 {len(log_list)} 条皮肤日志!" + ) + await BuffSkinLog.bulk_create(log_list) + if name not in CaseManager.CURRENT_CASES: + CaseManager.CURRENT_CASES.append(name) # type: ignore + return f"更新武器箱/皮肤: [{name}] 成功, 共更新 {len(update_list)} 个皮肤, 新创建 {len(create_list)} 个皮肤!" + + +async def search_skin_page( + s_name: str, page_index: int, type_: UpdateType +) -> tuple[list[BuffSkin] | str, int]: + """查询箱子皮肤 + + 参数: + s_name (str): 箱子/皮肤名称 + page_index (int): 页数 + + 返回: + tuple[list[BuffSkin] | str, int]: BuffSkin + """ + logger.debug( + f"尝试访问武器箱/皮肤: [{s_name}] 页数: [{page_index}]", + "开箱更新", + ) + cookie = {"session": Config.get_config("open_cases", "COOKIE")} + params = { + "game": "csgo", + "page_num": page_index, + "page_size": 80, + "_": time.time(), + "use_suggestio": 0, + } + if type_ == UpdateType.CASE: + params["itemset"] = CASE2ID[s_name] + elif type_ == UpdateType.WEAPON_TYPE: + params["category"] = KNIFE2ID[s_name] + proxy = None + if ip := Config.get_config("open_cases", "BUFF_PROXY"): + proxy = {"http://": ip, "https://": ip} + response = None + error = "" + for i in range(3): + try: + response = await AsyncHttpx.get( + URL, + proxy=proxy, + params=params, + cookies=cookie, # type: ignore + ) + if response.status_code == 200: + break + rand = random.randint(3, 7) + logger.debug( + f"尝试访问武器箱/皮肤第 {i+1} 次访问异常, code: {response.status_code}", + "开箱更新", + ) + await asyncio.sleep(rand) + except Exception as e: + logger.debug( + f"尝试访问武器箱/皮肤第 {i+1} 次访问发生错误 {type(e)}: {e}", "开箱更新" + ) + error = f"{type(e)}: {e}" + if not response: + return f"访问发生异常: {error}", -1 + if response.status_code == 200: + # logger.debug(f"访问BUFF API: {response.text}", "开箱更新") + json_data = response.json() + update_data = [] + if json_data["code"] == "OK": + data_list = json_data["data"]["items"] + for data in data_list: + obj = {} + if type_ == UpdateType.CASE: + obj["case_name"] = s_name + name = data["name"] + try: + logger.debug( + f"武器箱: [{s_name}] 页数: [{page_index}] 正在收录皮肤: [{name}]...", + "开箱更新", + ) + obj["skin_id"] = str(data["id"]) + obj["buy_max_price"] = data["buy_max_price"] # 求购最大金额 + obj["buy_num"] = data["buy_num"] # 当前求购 + goods_info = data["goods_info"] + info = goods_info["info"] + tags = info["tags"] + obj["weapon_type"] = tags["type"]["localized_name"] # 枪械类型 + if obj["weapon_type"] in ["音乐盒", "印花", "探员"]: + continue + elif obj["weapon_type"] in ["匕首", "手套"]: + obj["color"] = "KNIFE" + obj["name"] = data["short_name"].split("(")[0].strip() # 名称 + elif obj["weapon_type"] in ["武器箱"]: + obj["color"] = "CASE" + obj["name"] = data["short_name"] + else: + obj["color"] = NAME2COLOR[tags["rarity"]["localized_name"]] + obj["name"] = tags["weapon"]["localized_name"] # 名称 + if obj["weapon_type"] not in ["武器箱"]: + obj["abrasion"] = tags["exterior"]["localized_name"] # 磨损 + obj["is_stattrak"] = "StatTrak" in tags["quality"]["localized_name"] # type: ignore # 是否暗金 + if not obj["color"]: + obj["color"] = NAME2COLOR[ + tags["rarity"]["localized_name"] + ] # 品质颜色 + else: + obj["abrasion"] = "CASE" + obj["skin_name"] = ( + data["short_name"].split("|")[-1].strip() + ) # 皮肤名称 + obj["img_url"] = goods_info["original_icon_url"] # 图片url + obj["steam_price"] = goods_info["steam_price_cny"] # steam价格 + obj["sell_min_price"] = data["sell_min_price"] # 售卖最低价格 + obj["sell_num"] = data["sell_num"] # 售卖数量 + obj["sell_reference_price"] = data[ + "sell_reference_price" + ] # 参考价格 + update_data.append(BuffSkin(**obj)) + except Exception as e: + logger.error( + f"更新武器箱: [{s_name}] 皮肤: [{s_name}] 错误", + e=e, + ) + logger.debug( + f"访问武器箱: [{s_name}] 页数: [{page_index}] 成功并收录完成", + "开箱更新", + ) + return update_data, json_data["data"]["total_page"] + else: + logger.warning(f'访问BUFF失败: {json_data["error"]}') + return f'访问失败: {json_data["error"]}', -1 + return f"访问失败, 状态码: {response.status_code}", -1 + + +async def build_case_image(case_name: str | None) -> BuildImage | str: + """构造武器箱图片 + + 参数: + case_name (str): 名称 + + 返回: + BuildImage | str: 图片 + """ + background = random.choice(os.listdir(CASE_BACKGROUND)) + background_img = BuildImage(0, 0, background=CASE_BACKGROUND / background) + if case_name: + log_list = ( + await BuffSkinLog.filter(case_name__contains=case_name) + .annotate(count=Count("id")) + .group_by("skin_name") + .values_list("skin_name", "count") + ) + skin_list_ = await BuffSkin.filter(case_name__contains=case_name).all() + skin2count = {item[0]: item[1] for item in log_list} + case = None + skin_list: list[BuffSkin] = [] + exists_name = [] + for skin in skin_list_: + if skin.color == "CASE": + case = skin + else: + name = skin.name + skin.skin_name + if name not in exists_name: + skin_list.append(skin) + exists_name.append(name) + generate_img = {} + for skin in skin_list: + skin_img = await generate_skin(skin, skin2count.get(skin.skin_name, 0)) + if skin_img: + if not generate_img.get(skin.color): + generate_img[skin.color] = [] + generate_img[skin.color].append(skin_img) + skin_image_list = [] + for color in COLOR2NAME: + if generate_img.get(color): + skin_image_list = skin_image_list + generate_img[color] + img = skin_image_list[0] + img_w, img_h = img.size + total_size = (img_w + 25) * (img_h + 10) * len(skin_image_list) # 总面积 + new_size = get_bk_image_size(total_size, background_img.size, img.size, 250) + A = BuildImage( + new_size[0] + 50, new_size[1], background=CASE_BACKGROUND / background + ) + await A.filter("GaussianBlur", 2) + if case: + case_img = await generate_skin( + case, skin2count.get(f"{case_name}武器箱", 0) + ) + if case_img: + await A.paste(case_img, (25, 25)) + w = 25 + h = 230 + skin_image_list.reverse() + for image in skin_image_list: + await A.paste(image, (w, h)) + w += image.width + 20 + if w + image.width - 25 > A.width: + h += image.height + 10 + w = 25 + if h + img_h + 100 < A.height: + await A.crop((0, 0, A.width, h + img_h + 100)) + return A + else: + log_list = ( + await BuffSkinLog.filter(color="CASE") + .annotate(count=Count("id")) + .group_by("case_name") + .values_list("case_name", "count") + ) + name2count = {item[0]: item[1] for item in log_list} + skin_list = await BuffSkin.filter(color="CASE").all() + image_list: list[BuildImage] = [] + for skin in skin_list: + if img := await generate_skin(skin, name2count[skin.case_name]): + image_list.append(img) + if not image_list: + return "未收录武器箱" + w = 25 + h = 150 + img = image_list[0] + img_w, img_h = img.size + total_size = (img_w + 25) * (img_h + 10) * len(image_list) # 总面积 + + new_size = get_bk_image_size(total_size, background_img.size, img.size, 155) + A = BuildImage( + new_size[0] + 50, new_size[1], background=CASE_BACKGROUND / background + ) + await A.filter("GaussianBlur", 2) + bk_img = BuildImage( + img_w, 120, color=(25, 25, 25, 100), font_size=60, font="CJGaoDeGuo.otf" + ) + await bk_img.text( + (0, 0), + f"已收录 {len(image_list)} 个武器箱", + (255, 255, 255), + center_type="center", + ) + await A.paste(bk_img, (10, 10), "width") + for image in image_list: + await A.paste(image, (w, h)) + w += image.width + 20 + if w + image.width - 25 > A.width: + h += image.height + 10 + w = 25 + if h + img_h + 100 < A.height: + await A.crop((0, 0, A.width, h + img_h + 100)) + return A + + +def get_bk_image_size( + total_size: int, + base_size: tuple[int, int], + img_size: tuple[int, int], + extra_height: int = 0, +) -> tuple[int, int]: + """获取所需背景大小且不改变图片长宽比 + + 参数: + total_size (int): 总面积 + base_size (Tuple[int, int]): 初始背景大小 + img_size (Tuple[int, int]): 贴图大小 + + 返回: + tuple[int, int]: 满足所有贴图大小 + """ + bk_w, bk_h = base_size + img_w, img_h = img_size + is_add_title_size = False + left_dis = 0 + right_dis = 0 + old_size = (0, 0) + new_size = (0, 0) + ratio = 1.1 + while 1: + w_ = int(ratio * bk_w) + h_ = int(ratio * bk_h) + size = w_ * h_ + if size < total_size: + left_dis = size + else: + right_dis = size + r = w_ / (img_w + 25) + if right_dis and r - int(r) < 0.1: + if not is_add_title_size and extra_height: + total_size = int(total_size + w_ * extra_height) + is_add_title_size = True + right_dis = 0 + continue + if total_size - left_dis > right_dis - total_size: + new_size = (w_, h_) + else: + new_size = old_size + break + old_size = (w_, h_) + ratio += 0.1 + return new_size + + +async def get_skin_case(id_: str) -> list[str] | None: + """获取皮肤所在箱子 + + 参数: + id_ (str): 皮肤id + + 返回: + list[str] | None: 武器箱名称 + """ + url = f"{SELL_URL}/{id_}" + proxy = None + if ip := Config.get_config("open_cases", "BUFF_PROXY"): + proxy = {"http://": ip, "https://": ip} + response = await AsyncHttpx.get( + url, + proxy=proxy, + ) + if response.status_code == 200: + text = response.text + if r := re.search('', text): + case_list = [] + for s in r.group(1).split(","): + if "武器箱" in s: + case_list.append( + s.replace("”", "") + .replace("“", "") + .replace('"', "") + .replace("'", "") + .replace("武器箱", "") + .replace(" ", "") + ) + return case_list + else: + logger.debug(f"访问皮肤所属武器箱异常 url: {url} code: {response.status_code}") + return None + + +async def init_skin_trends( + name: str, skin: str, abrasion: str, day: int = 7 +) -> BuildImage | None: + date = datetime.now() - timedelta(days=day) + log_list = ( + await BuffSkinLog.filter( + name__contains=name.upper(), + skin_name=skin, + abrasion__contains=abrasion, + create_time__gt=date, + is_stattrak=False, + ) + .order_by("create_time") + .limit(day * 5) + .all() + ) + if not log_list: + return None + date_list = [] + price_list = [] + for log in log_list: + date = str(log.create_time.date()) + if date not in date_list: + date_list.append(date) + price_list.append(log.sell_min_price) + graph = BuildMat(MatType.LINE) + graph.data = price_list + graph.title = f"{name}({skin})价格趋势({day})" + graph.x_index = date_list + return await graph.build() + + +async def reset_count_daily(): + """ + 重置每日开箱 + """ + try: + await OpenCasesUser.all().update(today_open_total=0) + # await broadcast_group( + # "[[_task|open_case_reset_remind]]今日开箱次数重置成功", + # log_cmd="开箱重置提醒", + # ) + except Exception as e: + logger.error(f"开箱重置错误", e=e) + + +async def download_image(case_name: str | None = None): + """下载皮肤图片 + + 参数: + case_name: 箱子名称. + """ + skin_list = ( + await BuffSkin.filter(case_name=case_name).all() + if case_name + else await BuffSkin.all() + ) + for skin in skin_list: + name_ = skin.name + "-" + skin.skin_name + "-" + skin.abrasion + for c_name_ in skin.case_name.split(","): + try: + pass + # file_path = BASE_PATH / cn2py(c_name_) / f"{cn2py(name_)}.jpg" + # if not file_path.exists(): + # logger.debug( + # f"下载皮肤 {c_name_}/{skin.name} 图片: {skin.img_url}...", + # "开箱图片更新", + # ) + # await AsyncHttpx.download_file(skin.img_url, file_path) + # rand_time = random.randint(1, 5) + # await asyncio.sleep(rand_time) + # logger.debug(f"图片下载随机等待时间: {rand_time}", "开箱图片更新") + # else: + # logger.debug( + # f"皮肤 {c_name_}/{skin.name} 图片已存在...", "开箱图片更新" + # ) + except Exception as e: + logger.error( + f"下载皮肤 {c_name_}/{skin.name} 图片: {skin.img_url}", + "开箱图片更新", + e=e, + ) + + +@driver.on_startup +async def _(): + await CaseManager.reload() diff --git a/zhenxun/utils/_build_mat.py b/zhenxun/utils/_build_mat.py index 65b6d371..1506e655 100644 --- a/zhenxun/utils/_build_mat.py +++ b/zhenxun/utils/_build_mat.py @@ -1,3 +1,469 @@ +import random +from io import BytesIO +from pathlib import Path +from re import S + +from pydantic import BaseModel +from strenum import StrEnum + +from ._build_image import BuildImage + + +class MatType(StrEnum): + + LINE = "LINE" + """折线图""" + BAR = "BAR" + """柱状图""" + BARH = "BARH" + """横向柱状图""" + + +class BuildMatData(BaseModel): + + mat_type: MatType + """类型""" + data: list[int | float] = [] + """数据""" + x_name: str | None = None + """X轴坐标名称""" + y_name: str | None = None + """Y轴坐标名称""" + x_index: list[str] = [] + """显示轴坐标值""" + y_index: list[int | float] = [] + """数据轴坐标值""" + space: tuple[int, int] = (15, 15) + """坐标值间隔(X, Y)""" + rotate: tuple[int, int] = (0, 0) + """坐标值旋转(X, Y)""" + title: str | None = None + """标题""" + font: str = "msyh.ttf" + """字体""" + font_size: int = 15 + """字体大小""" + display_num: bool = True + """是否在点与柱状图顶部显示数值""" + is_grid: bool = False + """是否添加栅格""" + background_color: tuple[int, int, int] | str = (255, 255, 255) + """背景颜色""" + background: Path | bytes | None = None + """背景图片""" + bar_color: list[str] = ["*"] + """柱状图柱子颜色, 多个时随机, 使用 * 时七色随机""" + padding: tuple[int, int] = (50, 50) + """图表上下左右边距""" + + class BuildMat: - def pic2bs4(self): - return "" + """ + 针对 折线图/柱状图,基于 BuildImage 编写的 非常难用的 自定义画图工具 + 目前仅支持 正整数 + """ + + class InitGraph(BaseModel): + + mark_image: BuildImage + """BuildImage""" + x_height: int + """横坐标高度""" + x_point: list[int] + """横坐标坐标""" + graph_height: int + """坐标轴高度""" + + class Config: + arbitrary_types_allowed = True + + def __init__(self, mat_type: MatType) -> None: + self.line_length = 760 + self._x_padding = 0 + self._y_padding = 0 + self.build_data = BuildMatData(mat_type=mat_type) + + @property + def x_name(self) -> str | None: + return self.build_data.x_name + + @x_name.setter + def x_name(self, data: str) -> str | None: + self.build_data.x_name = data + + @property + def y_name(self) -> str | None: + return self.build_data.y_name + + @y_name.setter + def y_name(self, data: str) -> str | None: + self.build_data.y_name = data + + @property + def data(self) -> list[int | float]: + return self.build_data.data + + @data.setter + def data(self, data: list[int | float]): + self._check_value(data, self.build_data.y_index) + self.build_data.data = data + + @property + def x_index(self) -> list[str]: + return self.build_data.x_index + + @x_index.setter + def x_index(self, data: list[str]): + self.build_data.x_index = data + + @property + def y_index(self) -> list[int | float]: + return self.build_data.y_index + + @y_index.setter + def y_index(self, data: list[int | float]): + # self._check_value(self.build_data.data, data) + data.sort() + self.build_data.y_index = data + + @property + def space(self) -> tuple[int, int]: + return self.build_data.space + + @space.setter + def space(self, data: tuple[int, int]): + self.build_data.space = data + + @property + def rotate(self) -> tuple[int, int]: + return self.build_data.rotate + + @rotate.setter + def rotate(self, data: tuple[int, int]): + self.build_data.rotate = data + + @property + def title(self) -> str | None: + return self.build_data.title + + @title.setter + def title(self, data: str): + self.build_data.title = data + + @property + def font(self) -> str: + return self.build_data.font + + @font.setter + def font(self, data: str): + self.build_data.font = data + + # @property + # def font_size(self) -> int: + # return self.build_data.font_size + + # @font_size.setter + # def font_size(self, data: int): + # self.build_data.font_size = data + + @property + def display_num(self) -> bool: + return self.build_data.display_num + + @display_num.setter + def display_num(self, data: bool): + self.build_data.display_num = data + + @property + def is_grid(self) -> bool: + return self.build_data.is_grid + + @is_grid.setter + def is_grid(self, data: bool): + self.build_data.is_grid = data + + @property + def background_color(self) -> tuple[int, int, int] | str: + return self.build_data.background_color + + @background_color.setter + def background_color(self, data: tuple[int, int, int] | str): + self.build_data.background_color = data + + @property + def background(self) -> Path | bytes | None: + return self.build_data.background + + @background.setter + def background(self, data: Path | bytes): + self.build_data.background = data + + @property + def bar_color(self) -> list[str]: + return self.build_data.bar_color + + @bar_color.setter + def bar_color(self, data: list[str]): + self.build_data.bar_color = data + + def _check_value( + self, + y: list[int | float], + y_index: list[int | float] | None = None, + x_index: list[int | float] | None = None, + ): + """检查值合法性 + + 参数: + y: 坐标值 + y_index: y轴坐标值 + x_index: x轴坐标值 + """ + if y_index: + _value = x_index if self.build_data.mat_type == "barh" else y_index + if not isinstance(y[0], str): + __y = [float(t_y) for t_y in y] + _y_index = [float(t_y) for t_y in y_index] + if max(__y) > max(_y_index): + raise ValueError("坐标点的值必须小于y轴坐标的最大值...") + i = -9999999999 + for _y in _y_index: + if _y > i: + i = _y + else: + raise ValueError("y轴坐标值必须有序...") + + async def build(self): + """构造图片""" + A = None + bar_color = self.build_data.bar_color + if "*" in bar_color: + bar_color = [ + "#FF0000", + "#FF7F00", + "#FFFF00", + "#00FF00", + "#00FFFF", + "#0000FF", + "#8B00FF", + ] + init_graph = await self._init_graph() + mark_image = None + if self.build_data.mat_type == MatType.LINE: + mark_image = await self._build_line_graph(init_graph, bar_color) + if self.build_data.mat_type == MatType.BAR: + pass + if self.build_data.mat_type == MatType.BARH: + pass + if mark_image: + padding_width, padding_height = self.build_data.padding + width = mark_image.width + padding_width * 2 + height = mark_image.height + padding_height * 2 + if self.build_data.background: + if isinstance(self.build_data.background, bytes): + A = BuildImage( + width, height, background=BytesIO(self.build_data.background) + ) + elif isinstance(self.build_data.background, Path): + A = BuildImage(width, height, background=self.build_data.background) + else: + A = BuildImage(width, height, self.build_data.background_color) + if A: + await A.paste(mark_image, (padding_width, padding_height)) + if self.build_data.title: + font = BuildImage.load_font( + self.build_data.font, self.build_data.font_size + 7 + ) + title_width, title_height = BuildImage.get_text_size( + self.build_data.title, font + ) + pos = ( + int(A.width / 2 - title_width / 2), + int(padding_height / 2 - title_height / 2), + ) + await A.text(pos, self.build_data.title) + if self.build_data.x_name: + font = BuildImage.load_font( + self.build_data.font, self.build_data.font_size + 4 + ) + title_width, title_height = BuildImage.get_text_size( + self.build_data.x_name, font # type: ignore + ) + pos = ( + A.width - title_width - 20, + A.height - int(padding_height / 2 + title_height), + ) + await A.text(pos, self.build_data.x_name) + return A + + async def _init_graph(self) -> InitGraph: + """构造初始化图表 + + 返回: + InitGraph: InitGraph + """ + padding_width = 0 + padding_height = 0 + font = BuildImage.load_font(self.build_data.font, self.build_data.font_size) + width_list = [] + height_list = [] + for x in self.build_data.x_index: + text_size = BuildImage.get_text_size(x, font) + if text_size[1] > padding_height: + padding_height = text_size[1] + width_list.append(text_size[0]) + if not self.build_data.y_index: + """没有指定y_index时,使用data自动生成""" + max_num = max(self.build_data.data) + s = int(max_num / 5) + _y_index = [max_num] + for _n in range(4): + max_num -= s + _y_index.append(max_num) + _y_index.sort() + self.build_data.y_index = _y_index + for item in self.build_data.y_index: + text_size = BuildImage.get_text_size(str(item), font) + if text_size[0] > padding_width: + padding_width = text_size[0] + height_list.append(text_size[1]) + width = ( + sum([w + self.build_data.space[0] for w in width_list]) + + height_list[0] + + self.build_data.space[0] * 2 + + 20 + ) + height = ( + sum([h + self.build_data.space[1] for h in height_list]) + + self.build_data.space[1] * 2 + + 30 + ) + if self.build_data.mat_type == MatType.BARH: + """横向柱状图时xy轴长度调换""" + _tmp = height + height = width + width = _tmp + A = BuildImage( + width, + (height + 10), + color=(255, 255, 255, 0), + ) + padding_height += 5 + await A.line( + ( + padding_width + 5, + padding_height, + padding_width + 5, + height - padding_height, + ), + width=2, + ) + await A.line( + ( + padding_width + 5, + height - padding_height, + width - padding_width + 5, + height - padding_height, + ), + width=2, + ) + _x_index = self.build_data.x_index + _y_index = self.build_data.y_index + if self.build_data.mat_type == MatType.BARH: + _tmp = _y_index + _y_index = _x_index + _x_index = _tmp + cur_width = padding_width + self.build_data.space[0] * 2 + cur_height = height - height_list[0] - 5 + x_point = [] + for i, _x in enumerate(_x_index): + """X轴数值""" + grid_height = cur_height + if self.build_data.is_grid: + grid_height = padding_height + await A.line((cur_width, cur_height - 1, cur_width, grid_height - 5)) + x_point.append(cur_width - 1) + mid_point = cur_width - int(width_list[i] / 2) + await A.text((mid_point, cur_height), str(_x), font=font) + cur_width += width_list[i] + self.build_data.space[0] + cur_width = padding_width + cur_height = height - self.build_data.padding[1] + for i, _y in enumerate(_y_index): + """Y轴数值""" + grid_width = cur_width + if self.build_data.is_grid: + grid_width = width - padding_width + 5 + await A.line((cur_width + 5, cur_height, grid_width + 11, cur_height)) + text_width = BuildImage.get_text_size(str(_y), font)[0] + await A.text( + (cur_width - text_width, cur_height - int(height_list[i] / 2) - 3), + str(_y), + font=font, + ) + cur_height -= height_list[i] + self.build_data.space[1] + graph_height = height - self.build_data.padding[1] - cur_height + 5 + return self.InitGraph( + mark_image=A, + x_height=height - height_list[0] - 5, + graph_height=graph_height, + x_point=x_point, + ) + + async def _build_line_graph( + self, init_graph: InitGraph, bar_color: list[str] + ) -> BuildImage: + """构建折线图 + + 参数: + init_graph: InitGraph + bar_color: 颜色列表 + + 返回: + BuildImage: 折线图 + """ + font = BuildImage.load_font(self.build_data.font, self.build_data.font_size) + mark_image = init_graph.mark_image + x_height = init_graph.x_height + graph_height = init_graph.graph_height + random_color = random.choice(bar_color) + _black_point = BuildImage(11, 11, color=random_color) + await _black_point.circle() + max_num = max(self.y_index) + point_list = [] + for x_p, y in zip(init_graph.x_point, self.build_data.data): + """折线图标点""" + y_height = int(y / max_num * init_graph.graph_height) + await mark_image.paste(_black_point, (x_p, x_height - y_height)) + point_list.append((x_p + 4, x_height - y_height + 4)) + for i in range(len(point_list) - 1): + """画线""" + a_x, a_y = point_list[i] + b_x, b_y = point_list[i + 1] + await mark_image.line((a_x, a_y, b_x, b_y), random_color) + if self.build_data.display_num: + """显示数值""" + value = self.build_data.data[i] + text_size = BuildImage.get_text_size(str(value), font) + await mark_image.text( + (a_x - int(text_size[0] / 2), a_y - text_size[1] - 5), + str(value), + font=font, + ) + """最后一个数值显示""" + value = self.build_data.data[-1] + text_size = BuildImage.get_text_size(str(value), font) + await mark_image.text( + ( + point_list[-1][0] - int(text_size[0] / 2), + point_list[-1][1] - text_size[1] - 5, + ), + str(value), + font=font, + ) + return mark_image + + async def _build_bar_graph(self): + pass + + async def _build_barh_graph(self): + pass diff --git a/zhenxun/utils/image_utils.py b/zhenxun/utils/image_utils.py index b09e9d92..ddb08407 100644 --- a/zhenxun/utils/image_utils.py +++ b/zhenxun/utils/image_utils.py @@ -7,7 +7,7 @@ from typing import Awaitable, Callable from nonebot.utils import is_coroutine_callable from ._build_image import BuildImage, ColorAlias -from ._build_mat import BuildMat +from ._build_mat import BuildMat, MatType from ._image_template import ImageTemplate, RowStyle # TODO: text2image 长度错误