feat: CSGO开箱

This commit is contained in:
HibiKier 2024-05-18 20:56:23 +08:00
parent f03604d3bc
commit c2fd8661b5
14 changed files with 2743 additions and 3 deletions

View File

@ -0,0 +1,345 @@
import asyncio
import random
from datetime import datetime, timedelta
from typing import List
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import Arparma, Match
from nonebot_plugin_apscheduler import scheduler
from nonebot_plugin_saa import Image, MessageFactory, Text
from nonebot_plugin_session import EventSession
from zhenxun.configs.utils import PluginCdBlock, PluginExtraData, RegisterConfig, Task
from zhenxun.services.log import logger
from zhenxun.utils.image_utils import text2image
from .command import (
_group_open_matcher,
_knifes_matcher,
_multiple_matcher,
_my_open_matcher,
_open_matcher,
_price_matcher,
_reload_matcher,
_show_case_matcher,
_update_image_matcher,
_update_matcher,
)
from .open_cases_c import (
auto_update,
get_my_knifes,
group_statistics,
open_case,
open_multiple_case,
total_open_statistics,
)
from .utils import (
CASE2ID,
KNIFE2ID,
CaseManager,
build_case_image,
download_image,
get_skin_case,
init_skin_trends,
reset_count_daily,
update_skin_data,
)
__plugin_meta__ = PluginMetadata(
name="CSGO开箱",
description="csgo模拟开箱[戒赌]",
usage="""
指令
开箱 ?[武器箱]
[1-30]连开箱 ?[武器箱]
我的开箱
我的金色
群开箱统计
查看武器箱?[武器箱]
* 不包含[武器箱]时随机开箱 *
示例: 查看武器箱
示例: 查看武器箱英勇
""".strip(),
extra=PluginExtraData(
author="HibiKier",
version="0.1",
superuser_help="""
更新皮肤指令
重置开箱 重置今日开箱所有次数
指令
更新武器箱 ?[武器箱/ALL]
更新皮肤 ?[名称/ALL1]
更新皮肤 ?[名称/ALL1] -S: (必定更新罕见皮肤所属箱子)
更新武器箱图片
* 不指定武器箱时则全部更新 *
* 过多的爬取会导致账号API被封 *
""".strip(),
menu_type="抽卡相关",
tasks=[Task(module="open_case_reset_remind", name="每日开箱重置提醒")],
limits=[PluginCdBlock(result="着什么急啊,慢慢来!")],
configs=[
RegisterConfig(
key="INITIAL_OPEN_CASE_COUNT",
value=20,
help="初始每日开箱次数",
default_value=20,
type=int,
),
RegisterConfig(
key="EACH_IMPRESSION_ADD_COUNT",
value=3,
help="每 * 点好感度额外增加开箱次数",
default_value=3,
type=int,
),
RegisterConfig(key="COOKIE", value=None, help="BUFF的cookie"),
RegisterConfig(
key="DAILY_UPDATE",
value=None,
help="每日自动更新的武器箱,存在'ALL'时则更新全部武器箱",
type=List[str],
),
RegisterConfig(
key="DEFAULT_OPEN_CASE_RESET_REMIND",
module="_task",
value=True,
help="被动 每日开箱重置提醒 进群默认开关状态",
default_value=True,
type=bool,
),
],
).dict(),
)
# cases_matcher_group = MatcherGroup(priority=5, permission=GROUP, block=True)
# k_open_case = cases_matcher_group.on_command("开箱")
# reload_count = cases_matcher_group.on_command("重置开箱", permission=SUPERUSER)
# total_case_data = cases_matcher_group.on_command(
# "我的开箱", aliases={"开箱统计", "开箱查询", "查询开箱"}
# )
# group_open_case_statistics = cases_matcher_group.on_command("群开箱统计")
# open_multiple = cases_matcher_group.on_regex("(.*)连开箱(.*)?")
# update_case = on_command(
# "更新武器箱", aliases={"更新皮肤"}, priority=1, permission=SUPERUSER, block=True
# )
# update_case_image = on_command(
# "更新武器箱图片", priority=1, permission=SUPERUSER, block=True
# )
# show_case = on_command("查看武器箱", priority=5, block=True)
# my_knifes = on_command("我的金色", priority=1, permission=GROUP, block=True)
# show_skin = on_command("查看皮肤", priority=5, block=True)
# price_trends = on_command("价格趋势", priority=5, block=True)
@_price_matcher.handle()
async def _(
session: EventSession,
arparma: Arparma,
name: str,
skin: str,
abrasion: str,
day: Match[int],
):
name = name.replace("武器箱", "").strip()
_day = 7
if day.available:
_day = day.result
if _day > 180:
await Text("天数必须大于0且小于180").finish()
result = await init_skin_trends(name, skin, abrasion, _day)
if not result:
await Text("未查询到数据...").finish(reply=True)
await Image(result.pic2bytes()).send()
logger.info(
f"查看 [{name}:{skin}({abrasion})] 价格趋势",
arparma.header_result,
session=session,
)
@_reload_matcher.handle()
async def _(session: EventSession, arparma: Arparma):
await reset_count_daily()
logger.info("重置开箱次数", arparma.header_result, session=session)
@_open_matcher.handle()
async def _(session: EventSession, arparma: Arparma, name: Match[str]):
gid = session.id3 or session.id2
if not session.id1:
await Text("用户id为空...").finish()
if not gid:
await Text("群组id为空...").finish()
case_name = None
if name.available:
case_name = name.result.replace("武器箱", "").strip()
result = await open_case(session.id1, gid, case_name, session)
await result.finish(reply=True)
@_my_open_matcher.handle()
async def _(session: EventSession, arparma: Arparma):
gid = session.id3 or session.id2
if not session.id1:
await Text("用户id为空...").finish()
if not gid:
await Text("群组id为空...").finish()
await Text(
await total_open_statistics(session.id1, gid),
).send(reply=True)
logger.info("查询我的开箱", arparma.header_result, session=session)
@_group_open_matcher.handle()
async def _(session: EventSession, arparma: Arparma):
gid = session.id3 or session.id2
if not gid:
await Text("群组id为空...").finish()
result = await group_statistics(gid)
await Text(result).send(reply=True)
logger.info("查询群开箱统计", arparma.header_result, session=session)
@_knifes_matcher.handle()
async def _(session: EventSession, arparma: Arparma):
gid = session.id3 or session.id2
if not session.id1:
await Text("用户id为空...").finish()
if not gid:
await Text("群组id为空...").finish()
result = await get_my_knifes(session.id1, gid)
await result.send(reply=True)
logger.info("查询我的金色", arparma.header_result, session=session)
@_multiple_matcher.handle()
async def _(session: EventSession, arparma: Arparma, num: int, name: Match[str]):
gid = session.id3 or session.id2
if not session.id1:
await Text("用户id为空...").finish()
if not gid:
await Text("群组id为空...").finish()
if num > 30:
await Text("开箱次数不要超过30啊笨蛋").finish()
if num < 0:
await Text("再负开箱就扣你明天开箱数了!").finish()
case_name = None
if name.available:
case_name = name.result.replace("武器箱", "").strip()
result = await open_multiple_case(session.id1, gid, case_name, num, session)
await result.send(reply=True)
logger.info(f"{num}连开箱", arparma.header_result, session=session)
@_update_matcher.handle()
async def _(session: EventSession, arparma: Arparma, name: Match[str]):
case_name = None
if name.available:
case_name = name.result.strip()
if not case_name:
case_list = []
skin_list = []
for i, case_name in enumerate(CASE2ID):
if case_name in CaseManager.CURRENT_CASES:
case_list.append(f"{i+1}.{case_name} [已更新]")
else:
case_list.append(f"{i+1}.{case_name}")
for skin_name in KNIFE2ID:
skin_list.append(f"{skin_name}")
text = "武器箱:\n" + "\n".join(case_list) + "\n皮肤:\n" + ", ".join(skin_list)
img = await text2image(text, padding=20, color="#f9f6f2")
await MessageFactory(
[Text("未指定武器箱, 当前已包含武器箱/皮肤\n"), Image(img.pic2bytes())]
).finish()
if case_name in ["ALL", "ALL1"]:
if case_name == "ALL":
case_list = list(CASE2ID.keys())
type_ = "武器箱"
else:
case_list = list(KNIFE2ID.keys())
type_ = "罕见皮肤"
await Text(f"即将更新所有{type_}, 请稍等").send()
for i, case_name in enumerate(case_list):
try:
info = await update_skin_data(case_name, arparma.find("s"))
if "请先登录" in info:
await Text(f"未登录, 已停止更新, 请配置BUFF token...").send()
return
rand = random.randint(300, 500)
result = f"更新全部{type_}完成"
if i < len(case_list) - 1:
next_case = case_list[i + 1]
result = f"将在 {rand} 秒后更新下一{type_}: {next_case}"
await Text(f"{info}, {result}").send()
logger.info(f"info, {result}", "更新武器箱", session=session)
await asyncio.sleep(rand)
except Exception as e:
logger.error(f"更新{type_}: {case_name}", session=session, e=e)
await Text(f"更新{type_}: {case_name} 发生错误: {type(e)}: {e}").send()
await Text(f"更新全部{type_}完成").send()
else:
await Text(f"开始{arparma.header_result}: {case_name}, 请稍等").send()
try:
await Text(await update_skin_data(case_name, arparma.find("s"))).send(
at_sender=True
)
except Exception as e:
logger.error(f"{arparma.header_result}: {case_name}", session=session, e=e)
await Text(
f"成功{arparma.header_result}: {case_name} 发生错误: {type(e)}: {e}"
).send()
@_show_case_matcher.handle()
async def _(session: EventSession, arparma: Arparma, name: Match[str]):
case_name = None
if name.available:
case_name = name.result.strip()
result = await build_case_image(case_name)
if isinstance(result, str):
await Text(result).send()
else:
await Image(result.pic2bytes()).send()
logger.info("查看武器箱", arparma.header_result, session=session)
@_update_image_matcher.handle()
async def _(session: EventSession, arparma: Arparma, name: Match[str]):
case_name = None
if name.available:
case_name = name.result.strip()
await Text("开始更新图片...").send(reply=True)
await download_image(case_name)
await Text("更新图片完成...").send(at_sender=True)
logger.info("更新武器箱图片", arparma.header_result, session=session)
# 重置开箱
@scheduler.scheduled_job(
"cron",
hour=0,
minute=1,
)
async def _():
await reset_count_daily()
@scheduler.scheduled_job(
"cron",
hour=0,
minute=10,
)
async def _():
now = datetime.now()
hour = random.choice([0, 1, 2, 3])
date = now + timedelta(hours=hour)
logger.debug(f"将在 {date} 时自动更新武器箱...", "更新武器箱")
scheduler.add_job(
auto_update,
"date",
run_date=date.replace(microsecond=0),
id=f"auto_update_csgo_cases",
)

View File

@ -0,0 +1,155 @@
from datetime import timedelta, timezone
from zhenxun.configs.path_config import IMAGE_PATH
from zhenxun.services.log import logger
from zhenxun.utils.image_utils import BuildImage
from zhenxun.utils.utils import cn2py
from .config import COLOR2COLOR, COLOR2NAME
from .models.buff_skin import BuffSkin
BASE_PATH = IMAGE_PATH / "csgo_cases"
ICON_PATH = IMAGE_PATH / "_icon"
async def draw_card(skin: BuffSkin, rand: str) -> BuildImage:
"""构造抽取图片
参数:
skin (BuffSkin): BuffSkin
rand (str): 磨损
返回:
BuildImage: BuildImage
"""
name = skin.name + "-" + skin.skin_name + "-" + skin.abrasion
file_path = BASE_PATH / cn2py(skin.case_name.split(",")[0]) / f"{cn2py(name)}.jpg"
if not file_path.exists():
logger.warning(f"皮肤图片: {name} 不存在", "开箱")
skin_bk = BuildImage(
460, 200, color=(25, 25, 25, 100), font_size=25, font="CJGaoDeGuo.otf"
)
if file_path.exists():
skin_image = BuildImage(205, 153, background=file_path)
await skin_bk.paste(skin_image, (10, 30))
await skin_bk.line((220, 10, 220, 180))
await skin_bk.text((10, 10), skin.name, (255, 255, 255))
name_icon = BuildImage(20, 20, background=ICON_PATH / "name_white.png")
await skin_bk.paste(name_icon, (240, 13))
await skin_bk.text((265, 15), f"名称:", (255, 255, 255), font_size=20)
await skin_bk.text(
(310, 15),
f"{skin.skin_name + ('(St)' if skin.is_stattrak else '')}",
(255, 255, 255),
)
tone_icon = BuildImage(20, 20, background=ICON_PATH / "tone_white.png")
await skin_bk.paste(tone_icon, (240, 45))
await skin_bk.text((265, 45), "品质:", (255, 255, 255), font_size=20)
await skin_bk.text((310, 45), COLOR2NAME[skin.color][:2], COLOR2COLOR[skin.color])
type_icon = BuildImage(20, 20, background=ICON_PATH / "type_white.png")
await skin_bk.paste(type_icon, (240, 73))
await skin_bk.text((265, 75), "类型:", (255, 255, 255), font_size=20)
await skin_bk.text((310, 75), skin.weapon_type, (255, 255, 255))
price_icon = BuildImage(20, 20, background=ICON_PATH / "price_white.png")
await skin_bk.paste(price_icon, (240, 103))
await skin_bk.text((265, 105), "价格:", (255, 255, 255), font_size=20)
await skin_bk.text((310, 105), str(skin.sell_min_price), (0, 255, 98))
abrasion_icon = BuildImage(20, 20, background=ICON_PATH / "abrasion_white.png")
await skin_bk.paste(abrasion_icon, (240, 133))
await skin_bk.text((265, 135), "磨损:", (255, 255, 255), font_size=20)
await skin_bk.text((310, 135), skin.abrasion, (255, 255, 255))
await skin_bk.text((228, 165), f"({rand})", (255, 255, 255))
return skin_bk
async def generate_skin(skin: BuffSkin, update_count: int) -> BuildImage | None:
"""构造皮肤图片
参数:
skin (BuffSkin): BuffSkin
返回:
BuildImage | None: 图片
"""
name = skin.name + "-" + skin.skin_name + "-" + skin.abrasion
file_path = BASE_PATH / cn2py(skin.case_name.split(",")[0]) / f"{cn2py(name)}.jpg"
if not file_path.exists():
logger.warning(f"皮肤图片: {name} 不存在", "查看武器箱")
if skin.color == "CASE":
case_bk = BuildImage(
700, 200, color=(25, 25, 25, 100), font_size=25, font="CJGaoDeGuo.otf"
)
if file_path.exists():
skin_img = BuildImage(200, 200, background=file_path)
await case_bk.paste(skin_img, (10, 10))
await case_bk.line((250, 10, 250, 190))
await case_bk.line((280, 160, 660, 160))
name_icon = BuildImage(30, 30, background=ICON_PATH / "box_white.png")
await case_bk.paste(name_icon, (260, 25))
await case_bk.text((295, 30), "名称:", (255, 255, 255))
await case_bk.text((345, 30), skin.case_name, (255, 0, 38), font_size=30)
type_icon = BuildImage(30, 30, background=ICON_PATH / "type_white.png")
await case_bk.paste(type_icon, (260, 70))
await case_bk.text((295, 75), "类型:", (255, 255, 255))
await case_bk.text((345, 75), "武器箱", (0, 157, 255), font_size=30)
price_icon = BuildImage(30, 30, background=ICON_PATH / "price_white.png")
await case_bk.paste(price_icon, (260, 114))
await case_bk.text((295, 120), "单价:", (255, 255, 255))
await case_bk.text(
(340, 120), str(skin.sell_min_price), (0, 255, 98), font_size=30
)
update_count_icon = BuildImage(
40, 40, background=ICON_PATH / "reload_white.png"
)
await case_bk.paste(update_count_icon, (575, 10))
await case_bk.text((625, 12), str(update_count), (255, 255, 255), font_size=45)
num_icon = BuildImage(30, 30, background=ICON_PATH / "num_white.png")
await case_bk.paste(num_icon, (455, 70))
await case_bk.text((490, 75), "在售:", (255, 255, 255))
await case_bk.text((535, 75), str(skin.sell_num), (144, 0, 255), font_size=30)
want_buy_icon = BuildImage(30, 30, background=ICON_PATH / "want_buy_white.png")
await case_bk.paste(want_buy_icon, (455, 114))
await case_bk.text((490, 120), "求购:", (255, 255, 255))
await case_bk.text((535, 120), str(skin.buy_num), (144, 0, 255), font_size=30)
await case_bk.text((275, 165), "更新时间", (255, 255, 255), font_size=22)
date = str(
skin.update_time.replace(microsecond=0).astimezone(
timezone(timedelta(hours=8))
)
).split("+")[0]
await case_bk.text(
(350, 165),
date,
(255, 255, 255),
font_size=30,
)
return case_bk
else:
skin_bk = BuildImage(
235, 250, color=(25, 25, 25, 100), font_size=25, font="CJGaoDeGuo.otf"
)
if file_path.exists():
skin_image = BuildImage(205, 153, background=file_path)
await skin_bk.paste(skin_image, (10, 30))
update_count_icon = BuildImage(
35, 35, background=ICON_PATH / "reload_white.png"
)
await skin_bk.line((10, 180, 220, 180))
await skin_bk.text((10, 10), skin.name, (255, 255, 255))
await skin_bk.paste(update_count_icon, (140, 10))
await skin_bk.text((175, 15), str(update_count), (255, 255, 255))
await skin_bk.text((10, 185), f"{skin.skin_name}", (255, 255, 255), "width")
await skin_bk.text((10, 218), "品质:", (255, 255, 255))
await skin_bk.text(
(55, 218), COLOR2NAME[skin.color][:2], COLOR2COLOR[skin.color]
)
await skin_bk.text((100, 218), "类型:", (255, 255, 255))
await skin_bk.text((145, 218), skin.weapon_type, (255, 255, 255))
return skin_bk

View File

@ -0,0 +1,75 @@
from nonebot.permission import SUPERUSER
from nonebot_plugin_alconna import Alconna, Args, Option, on_alconna, store_true
from zhenxun.utils.rules import ensure_group
_open_matcher = on_alconna(
Alconna("开箱", Args["name?", str]), priority=5, block=True, rule=ensure_group
)
_reload_matcher = on_alconna(
Alconna("重置开箱"), priority=5, block=True, permission=SUPERUSER, rule=ensure_group
)
_my_open_matcher = on_alconna(
Alconna("我的开箱"),
aliases={"开箱统计", "开箱查询", "查询开箱"},
priority=5,
block=True,
rule=ensure_group,
)
_group_open_matcher = on_alconna(
Alconna("群开箱统计"), priority=5, block=True, rule=ensure_group
)
_multiple_matcher = on_alconna(
Alconna("multiple-open", Args["num", int]["name?", str]),
priority=5,
block=True,
rule=ensure_group,
)
_multiple_matcher.shortcut(
r"(?P<num>\d)连开箱(?P<name>.*?)",
command="multiple-open",
arguments=["{num}", "{name}"],
prefix=True,
)
_update_matcher = on_alconna(
Alconna(
"更新武器箱",
Args["name?", str],
Option("-s", action=store_true, help_text="是否必定更新所属箱子"),
),
aliases={"更新皮肤"},
priority=1,
permission=SUPERUSER,
block=True,
)
_update_image_matcher = on_alconna(
Alconna("更新武器箱图片", Args["name?", str]),
priority=1,
permission=SUPERUSER,
block=True,
)
_show_case_matcher = on_alconna(
Alconna("查看武器箱", Args["name?", str]), priority=5, block=True
)
_knifes_matcher = on_alconna(
Alconna("我的金色"), priority=5, block=True, rule=ensure_group
)
_show_skin_matcher = on_alconna(Alconna("查看皮肤"), priority=5, block=True)
_price_matcher = on_alconna(
Alconna(
"价格趋势", Args["name", str]["skin", str]["abrasion", str]["day?", int, 7]
),
priority=5,
block=True,
)

View File

@ -0,0 +1,253 @@
import random
from enum import Enum
from zhenxun.configs.path_config import IMAGE_PATH
from zhenxun.services.log import logger
from .models.buff_skin import BuffSkin
BLUE = 0.7981
BLUE_ST = 0.0699
PURPLE = 0.1626
PURPLE_ST = 0.0164
PINK = 0.0315
PINK_ST = 0.0048
RED = 0.0057
RED_ST = 0.00021
KNIFE = 0.0021
KNIFE_ST = 0.000041
# 崭新
FACTORY_NEW_S = 0
FACTORY_NEW_E = 0.0699999
# 略磨
MINIMAL_WEAR_S = 0.07
MINIMAL_WEAR_E = 0.14999
# 久经
FIELD_TESTED_S = 0.15
FIELD_TESTED_E = 0.37999
# 破损
WELL_WORN_S = 0.38
WELL_WORN_E = 0.44999
# 战痕
BATTLE_SCARED_S = 0.45
BATTLE_SCARED_E = 0.99999
class UpdateType(Enum):
"""
更新类型
"""
CASE = "case"
WEAPON_TYPE = "weapon_type"
NAME2COLOR = {
"消费级": "WHITE",
"工业级": "LIGHTBLUE",
"军规级": "BLUE",
"受限": "PURPLE",
"保密": "PINK",
"隐秘": "RED",
"非凡": "KNIFE",
}
COLOR2NAME = {
"WHITE": "消费级",
"LIGHTBLUE": "工业级",
"BLUE": "军规级",
"PURPLE": "受限",
"PINK": "保密",
"RED": "隐秘",
"KNIFE": "非凡",
}
COLOR2COLOR = {
"WHITE": (255, 255, 255),
"LIGHTBLUE": (0, 179, 255),
"BLUE": (0, 85, 255),
"PURPLE": (149, 0, 255),
"PINK": (255, 0, 162),
"RED": (255, 34, 0),
"KNIFE": (255, 225, 0),
}
ABRASION_SORT = ["崭新出厂", "略有磨损", "久经沙场", "破损不堪", "战横累累"]
CASE_BACKGROUND = IMAGE_PATH / "csgo_cases" / "_background" / "shu"
# 刀
KNIFE2ID = {
"鲍伊猎刀": "weapon_knife_survival_bowie",
"蝴蝶刀": "weapon_knife_butterfly",
"弯刀": "weapon_knife_falchion",
"折叠刀": "weapon_knife_flip",
"穿肠刀": "weapon_knife_gut",
"猎杀者匕首": "weapon_knife_tactical",
"M9刺刀": "weapon_knife_m9_bayonet",
"刺刀": "weapon_bayonet",
"爪子刀": "weapon_knife_karambit",
"暗影双匕": "weapon_knife_push",
"短剑": "weapon_knife_stiletto",
"熊刀": "weapon_knife_ursus",
"折刀": "weapon_knife_gypsy_jackknife",
"锯齿爪刀": "weapon_knife_widowmaker",
"海豹短刀": "weapon_knife_css",
"系绳匕首": "weapon_knife_cord",
"求生匕首": "weapon_knife_canis",
"流浪者匕首": "weapon_knife_outdoor",
"骷髅匕首": "weapon_knife_skeleton",
"血猎手套": "weapon_bloodhound_gloves",
"驾驶手套": "weapon_driver_gloves",
"手部束带": "weapon_hand_wraps",
"摩托手套": "weapon_moto_gloves",
"专业手套": "weapon_specialist_gloves",
"运动手套": "weapon_sport_gloves",
"九头蛇手套": "weapon_hydra_gloves",
"狂牙手套": "weapon_brokenfang_gloves",
}
WEAPON2ID = {}
# 武器箱
CASE2ID = {
"变革": "set_community_32",
"反冲": "set_community_31",
"梦魇": "set_community_30",
"激流": "set_community_29",
"蛇噬": "set_community_28",
"狂牙大行动": "set_community_27",
"裂空": "set_community_26",
"棱彩2号": "set_community_25",
"CS20": "set_community_24",
"裂网大行动": "set_community_23",
"棱彩": "set_community_22",
"头号特训": "set_community_21",
"地平线": "set_community_20",
"命悬一线": "set_community_19",
"光谱2号": "set_community_18",
"九头蛇大行动": "set_community_17",
"光谱": "set_community_16",
"手套": "set_community_15",
"伽玛2号": "set_gamma_2",
"伽玛": "set_community_13",
"幻彩3号": "set_community_12",
"野火大行动": "set_community_11",
"左轮": "set_community_10",
"暗影": "set_community_9",
"弯曲猎手": "set_community_8",
"幻彩2号": "set_community_7",
"幻彩": "set_community_6",
"先锋": "set_community_5",
"电竞2014夏季": "set_esports_iii",
"突围大行动": "set_community_4",
"猎杀者": "set_community_3",
"凤凰": "set_community_2",
"电竞2013冬季": "set_esports_ii",
"冬季攻势": "set_community_1",
"军火交易3号": "set_weapons_iii",
"英勇": "set_bravo_i",
"电竞2013": "set_esports",
"军火交易2号": "set_weapons_ii",
"军火交易": "set_weapons_i",
}
def get_wear(rand: float) -> str:
"""判断磨损度
Args:
rand (float): 随机rand
Returns:
str: 磨损名称
"""
if rand <= FACTORY_NEW_E:
return "崭新出厂"
if MINIMAL_WEAR_S <= rand <= MINIMAL_WEAR_E:
return "略有磨损"
if FIELD_TESTED_S <= rand <= FIELD_TESTED_E:
return "久经沙场"
if WELL_WORN_S <= rand <= WELL_WORN_E:
return "破损不堪"
return "战痕累累"
def random_color_and_st(rand: float) -> tuple[str, bool]:
"""获取皮肤品质及是否暗金
参数:
rand (float): 随机rand
返回:
tuple[str, bool]: 品质是否暗金
"""
if rand <= KNIFE:
if random.random() <= KNIFE_ST:
return ("KNIFE", True)
return ("KNIFE", False)
elif KNIFE < rand <= RED:
if random.random() <= RED_ST:
return ("RED", True)
return ("RED", False)
elif RED < rand <= PINK:
if random.random() <= PINK_ST:
return ("PINK", True)
return ("PINK", False)
elif PINK < rand <= PURPLE:
if random.random() <= PURPLE_ST:
return ("PURPLE", True)
return ("PURPLE", False)
else:
if random.random() <= BLUE_ST:
return ("BLUE", True)
return ("BLUE", False)
async def random_skin(num: int, case_name: str) -> list[tuple[BuffSkin, float]]:
"""
随机抽取皮肤
"""
case_name = case_name.replace("武器箱", "").replace(" ", "")
color_map = {}
for _ in range(num):
rand = random.random()
# 尝试降低磨损
if rand > MINIMAL_WEAR_E:
for _ in range(2):
if random.random() < 0.5:
logger.debug(f"[START]开箱随机磨损触发降低磨损条件: {rand}")
if random.random() < 0.2:
rand /= 3
else:
rand /= 2
logger.debug(f"[END]开箱随机磨损触发降低磨损条件: {rand}")
break
abrasion = get_wear(rand)
logger.debug(f"开箱随机磨损: {rand} | {abrasion}")
color, is_stattrak = random_color_and_st(rand)
if not color_map.get(color):
color_map[color] = {}
if is_stattrak:
if not color_map[color].get(f"{abrasion}_st"):
color_map[color][f"{abrasion}_st"] = []
color_map[color][f"{abrasion}_st"].append(rand)
else:
if not color_map[color].get(abrasion):
color_map[color][f"{abrasion}"] = []
color_map[color][f"{abrasion}"].append(rand)
skin_list = []
for color in color_map:
for abrasion in color_map[color]:
rand_list = color_map[color][abrasion]
is_stattrak = "_st" in abrasion
abrasion = abrasion.replace("_st", "")
skin_list_ = await BuffSkin.random_skin(
len(rand_list), color, abrasion, is_stattrak, case_name
)
skin_list += [(skin, rand) for skin, rand in zip(skin_list_, rand_list)]
return skin_list
# M249StatTrak™ | 等高线

View File

@ -0,0 +1,22 @@
from tortoise import fields
from zhenxun.services.db_context import Model
# 1.狂牙武器箱
class BuffPrice(Model):
id = fields.IntField(pk=True, generated=True, auto_increment=True)
"""自增id"""
case_id = fields.IntField()
"""箱子id"""
skin_name = fields.CharField(255, unique=True)
"""皮肤名称"""
skin_price = fields.FloatField()
"""皮肤价格"""
update_date = fields.DatetimeField()
class Meta:
table = "buff_prices"
table_description = "Buff价格数据表"

View File

@ -0,0 +1,113 @@
from tortoise import fields
from tortoise.contrib.postgres.functions import Random
from zhenxun.services.db_context import Model
class BuffSkin(Model):
id = fields.IntField(pk=True, generated=True, auto_increment=True)
"""自增id"""
case_name: str = fields.CharField(255) # type: ignore
"""箱子名称"""
name: str = fields.CharField(255) # type: ignore
"""武器/手套/刀名称"""
skin_name: str = fields.CharField(255) # type: ignore
"""皮肤名称"""
is_stattrak = fields.BooleanField(default=False)
"""是否暗金(计数)"""
abrasion = fields.CharField(255)
"""磨损度"""
color = fields.CharField(255)
"""颜色(品质)"""
skin_id = fields.CharField(255, null=True, unique=True)
"""皮肤id"""
img_url = fields.CharField(255)
"""图片url"""
steam_price = fields.FloatField(default=0)
"""steam价格"""
weapon_type = fields.CharField(255)
"""枪械类型"""
buy_max_price = fields.FloatField(default=0)
"""最大求购价格"""
buy_num = fields.IntField(default=0)
"""求购数量"""
sell_min_price = fields.FloatField(default=0)
"""售卖最低价格"""
sell_num = fields.IntField(default=0)
"""出售个数"""
sell_reference_price = fields.FloatField(default=0)
"""参考价格"""
create_time = fields.DatetimeField(auto_add_now=True)
"""创建日期"""
update_time = fields.DatetimeField(auto_add=True)
"""更新日期"""
class Meta:
table = "buff_skin"
table_description = "Buff皮肤数据表"
# unique_together = ("case_name", "name", "skin_name", "abrasion", "is_stattrak")
def __eq__(self, other: "BuffSkin"):
return self.skin_id == other.skin_id
def __hash__(self):
return hash(self.case_name + self.name + self.skin_name + str(self.is_stattrak))
@classmethod
async def random_skin(
cls,
num: int,
color: str,
abrasion: str,
is_stattrak: bool = False,
case_name: str | None = None,
) -> list["BuffSkin"]: # type: ignore
"""随机皮肤
参数:
num: 数量
color: 品质
abrasion: 磨损度
is_stattrak: 是否暗金
case_name: 箱子名称
返回:
list["BuffSkin"]: 皮肤列表
"""
query = cls
if case_name:
query = query.filter(case_name__contains=case_name)
query = query.filter(abrasion=abrasion, is_stattrak=is_stattrak, color=color)
skin_list = await query.annotate(rand=Random()).limit(num) # type:ignore
num_ = num
cnt = 0
while len(skin_list) < num:
cnt += 1
num_ = num - len(skin_list)
skin_list += await query.annotate(rand=Random()).limit(num_)
if cnt > 10:
break
return skin_list # type: ignore
@classmethod
async def _run_script(cls):
return [
"ALTER TABLE buff_skin ADD img_url varchar(255);", # 新增img_url
"ALTER TABLE buff_skin ADD skin_id varchar(255);", # 新增skin_id
"ALTER TABLE buff_skin ADD steam_price float DEFAULT 0;", # 新增steam_price
"ALTER TABLE buff_skin ADD weapon_type varchar(255);", # 新增type
"ALTER TABLE buff_skin ADD buy_max_price float DEFAULT 0;", # 新增buy_max_price
"ALTER TABLE buff_skin ADD buy_num Integer DEFAULT 0;", # 新增buy_max_price
"ALTER TABLE buff_skin ADD sell_min_price float DEFAULT 0;", # 新增sell_min_price
"ALTER TABLE buff_skin ADD sell_num Integer DEFAULT 0;", # 新增sell_num
"ALTER TABLE buff_skin ADD sell_reference_price float DEFAULT 0;", # 新增sell_reference_price
"ALTER TABLE buff_skin DROP COLUMN skin_price;", # 删除skin_price
"alter table buff_skin drop constraint if EXISTS uid_buff_skin_case_na_c35c93;", # 删除唯一约束
"UPDATE buff_skin set case_name='手套' where case_name='手套武器箱'",
"UPDATE buff_skin set case_name='左轮' where case_name='左轮武器箱'",
]

View File

@ -0,0 +1,50 @@
from tortoise import fields
from zhenxun.services.db_context import Model
class BuffSkinLog(Model):
id = fields.IntField(pk=True, generated=True, auto_increment=True)
"""自增id"""
case_name = fields.CharField(255)
"""箱子名称"""
name = fields.CharField(255)
"""武器/手套/刀名称"""
skin_name = fields.CharField(255)
"""皮肤名称"""
is_stattrak = fields.BooleanField(default=False)
"""是否暗金(计数)"""
abrasion = fields.CharField(255)
"""磨损度"""
color = fields.CharField(255)
"""颜色(品质)"""
steam_price = fields.FloatField(default=0)
"""steam价格"""
weapon_type = fields.CharField(255)
"""枪械类型"""
buy_max_price = fields.FloatField(default=0)
"""最大求购价格"""
buy_num = fields.IntField(default=0)
"""求购数量"""
sell_min_price = fields.FloatField(default=0)
"""售卖最低价格"""
sell_num = fields.IntField(default=0)
"""出售个数"""
sell_reference_price = fields.FloatField(default=0)
"""参考价格"""
create_time = fields.DatetimeField(auto_add_now=True)
"""创建日期"""
class Meta:
table = "buff_skin_log"
table_description = "Buff皮肤更新日志表"
@classmethod
async def _run_script(cls):
return [
"UPDATE buff_skin_log set case_name='手套' where case_name='手套武器箱'",
"UPDATE buff_skin_log set case_name='左轮' where case_name='左轮武器箱'",
]

View File

@ -0,0 +1,44 @@
from tortoise import fields
from tortoise.contrib.postgres.functions import Random
from zhenxun.services.db_context import Model
class OpenCasesLog(Model):
id = fields.IntField(pk=True, generated=True, auto_increment=True)
"""自增id"""
user_id = fields.CharField(255)
"""用户id"""
group_id = fields.CharField(255)
"""群聊id"""
case_name = fields.CharField(255)
"""箱子名称"""
name = fields.CharField(255)
"""武器/手套/刀名称"""
skin_name = fields.CharField(255)
"""皮肤名称"""
is_stattrak = fields.BooleanField(default=False)
"""是否暗金(计数)"""
abrasion = fields.CharField(255)
"""磨损度"""
abrasion_value = fields.FloatField()
"""磨损数值"""
color = fields.CharField(255)
"""颜色(品质)"""
price = fields.FloatField(default=0)
"""价格"""
create_time = fields.DatetimeField(auto_add_now=True)
"""创建日期"""
class Meta:
table = "open_cases_log"
table_description = "开箱日志表"
@classmethod
async def _run_script(cls):
return [
"ALTER TABLE open_cases_log RENAME COLUMN user_qq TO user_id;", # 将user_qq改为user_id
"ALTER TABLE open_cases_log ALTER COLUMN user_id TYPE character varying(255);",
"ALTER TABLE open_cases_log ALTER COLUMN group_id TYPE character varying(255);",
]

View File

@ -0,0 +1,60 @@
from tortoise import fields
from zhenxun.services.db_context import Model
class OpenCasesUser(Model):
id = fields.IntField(pk=True, generated=True, auto_increment=True)
"""自增id"""
user_id = fields.CharField(255)
"""用户id"""
group_id = fields.CharField(255)
"""群聊id"""
total_count = fields.IntField(default=0)
"""总开启次数"""
blue_count = fields.IntField(default=0)
"""蓝色"""
blue_st_count = fields.IntField(default=0)
"""蓝色暗金"""
purple_count = fields.IntField(default=0)
"""紫色"""
purple_st_count = fields.IntField(default=0)
"""紫色暗金"""
pink_count = fields.IntField(default=0)
"""粉色"""
pink_st_count = fields.IntField(default=0)
"""粉色暗金"""
red_count = fields.IntField(default=0)
"""紫色"""
red_st_count = fields.IntField(default=0)
"""紫色暗金"""
knife_count = fields.IntField(default=0)
"""金色"""
knife_st_count = fields.IntField(default=0)
"""金色暗金"""
spend_money = fields.IntField(default=0)
"""花费金币"""
make_money = fields.FloatField(default=0)
"""赚取金币"""
today_open_total = fields.IntField(default=0)
"""今日开箱数量"""
open_cases_time_last = fields.DatetimeField()
"""最后开箱日期"""
knifes_name = fields.TextField(default="")
"""已获取金色"""
class Meta:
table = "open_cases_users"
table_description = "开箱统计数据表"
unique_together = ("user_id", "group_id")
@classmethod
async def _run_script(cls):
return [
"alter table open_cases_users alter COLUMN make_money type float;", # 将make_money字段改为float
"alter table open_cases_users alter COLUMN spend_money type float;", # 将spend_money字段改为float
"ALTER TABLE open_cases_users RENAME COLUMN user_qq TO user_id;", # 将user_qq改为user_id
"ALTER TABLE open_cases_users ALTER COLUMN user_id TYPE character varying(255);",
"ALTER TABLE open_cases_users ALTER COLUMN group_id TYPE character varying(255);",
]

View File

@ -0,0 +1,501 @@
import asyncio
import random
import re
from datetime import datetime
from nonebot_plugin_saa import Image, MessageFactory, Text
from nonebot_plugin_session import EventSession
from zhenxun.configs.config import Config
from zhenxun.configs.path_config import IMAGE_PATH
from zhenxun.models.sign_user import SignUser
from zhenxun.services.log import logger
from zhenxun.utils.image_utils import BuildImage
from zhenxun.utils.utils import cn2py
from .build_image import draw_card
from .config import *
from .models.open_cases_log import OpenCasesLog
from .models.open_cases_user import OpenCasesUser
from .utils import CaseManager, update_skin_data
RESULT_MESSAGE = {
"BLUE": ["这样看着才舒服", "是自己人,大伙把刀收好", "非常舒适~"],
"PURPLE": ["还行吧,勉强接受一下下", "居然不是蓝色,太假了", "运气-1-1-1-1-1..."],
"PINK": ["开始不适....", "你妈妈买菜必涨价!涨三倍!", "你最近不适合出门,真的"],
"RED": [
"已经非常不适",
"好兄弟你开的什么箱子啊,一般箱子不是只有蓝色的吗",
"开始拿阳寿开箱子了?",
],
"KNIFE": [
"你的好运我收到了,你可以去喂鲨鱼了",
"最近该吃啥就迟点啥吧,哎,好好的一个人怎么就....哎",
"众所周知,欧皇寿命极短.",
],
}
COLOR2NAME = {
"BLUE": "军规",
"PURPLE": "受限",
"PINK": "保密",
"RED": "隐秘",
"KNIFE": "罕见",
}
COLOR2CN = {"BLUE": "", "PURPLE": "", "PINK": "", "RED": "", "KNIFE": ""}
def add_count(user: OpenCasesUser, skin: BuffSkin, case_price: float):
if skin.color == "BLUE":
if skin.is_stattrak:
user.blue_st_count += 1
else:
user.blue_count += 1
elif skin.color == "PURPLE":
if skin.is_stattrak:
user.purple_st_count += 1
else:
user.purple_count += 1
elif skin.color == "PINK":
if skin.is_stattrak:
user.pink_st_count += 1
else:
user.pink_count += 1
elif skin.color == "RED":
if skin.is_stattrak:
user.red_st_count += 1
else:
user.red_count += 1
elif skin.color == "KNIFE":
if skin.is_stattrak:
user.knife_st_count += 1
else:
user.knife_count += 1
user.make_money += skin.sell_min_price
user.spend_money += int(17 + case_price)
async def get_user_max_count(user_id: str) -> int:
"""获取用户每日最大开箱次数
参数:
user_id: 用户id
返回:
int: 最大开箱次数
"""
user, _ = await SignUser.get_or_create(user_id=user_id)
impression = int(user.impression)
initial_open_case_count = Config.get_config("open_cases", "INITIAL_OPEN_CASE_COUNT")
each_impression_add_count = Config.get_config(
"open_cases", "EACH_IMPRESSION_ADD_COUNT"
)
return int(initial_open_case_count + impression / each_impression_add_count) # type: ignore
async def open_case(
user_id: str, group_id: str, case_name: str | None, session: EventSession
) -> MessageFactory:
"""开箱
参数:
user_id: 用户id
group_id : 群号
case_name: 武器箱名称. Defaults to "狂牙大行动".
session: EventSession
返回:
Union[str, Message]: 回复消息
"""
user_id = str(user_id)
group_id = str(group_id)
if not CaseManager.CURRENT_CASES:
return MessageFactory([Text("未收录任何武器箱")])
if not case_name:
case_name = random.choice(CaseManager.CURRENT_CASES) # type: ignore
if case_name not in CaseManager.CURRENT_CASES:
return "武器箱未收录, 当前可用武器箱:\n" + ", ".join(CaseManager.CURRENT_CASES) # type: ignore
logger.debug(
f"尝试开启武器箱: {case_name}", "开箱", session=user_id, group_id=group_id
)
case = cn2py(case_name) # type: ignore
user = await OpenCasesUser.get_or_none(user_id=user_id, group_id=group_id)
if not user:
user = await OpenCasesUser.create(
user_id=user_id, group_id=group_id, open_cases_time_last=datetime.now()
)
max_count = await get_user_max_count(user_id)
# 一天次数上限
if user.today_open_total >= max_count:
return MessageFactory(
[
Text(
f"今天已达开箱上限了喔,明天再来吧\n(提升好感度可以增加每日开箱数 #疯狂暗示)"
)
]
)
skin_list = await random_skin(1, case_name) # type: ignore
if not skin_list:
return MessageFactory(Text("未抽取到任何皮肤"))
skin, rand = skin_list[0]
rand = str(rand)[:11]
case_price = 0
if case_skin := await BuffSkin.get_or_none(case_name=case_name, color="CASE"):
case_price = case_skin.sell_min_price
user.today_open_total += 1
user.total_count += 1
user.open_cases_time_last = datetime.now()
await user.save(
update_fields=["today_open_total", "total_count", "open_cases_time_last"]
)
add_count(user, skin, case_price)
ridicule_result = random.choice(RESULT_MESSAGE[skin.color])
price_result = skin.sell_min_price
name = skin.name + "-" + skin.skin_name + "-" + skin.abrasion
img_path = IMAGE_PATH / "csgo_cases" / case / f"{cn2py(name)}.jpg"
logger.info(
f"开启{case_name}武器箱获得 {skin.name}{'StatTrak™' if skin.is_stattrak else ''} | {skin.skin_name} ({skin.abrasion}) 磨损: [{rand}] 价格: {skin.sell_min_price}",
"开箱",
session=session,
)
await user.save()
await OpenCasesLog.create(
user_id=user_id,
group_id=group_id,
case_name=case_name,
name=skin.name,
skin_name=skin.skin_name,
is_stattrak=skin.is_stattrak,
abrasion=skin.abrasion,
color=skin.color,
price=skin.sell_min_price,
abrasion_value=rand,
create_time=datetime.now(),
)
logger.debug(f"添加 1 条开箱日志", "开箱", session=session)
over_count = max_count - user.today_open_total
img = await draw_card(skin, rand)
return MessageFactory(
[
Text(f"开启{case_name}武器箱.\n剩余开箱次数:{over_count}.\n"),
Image(img.pic2bytes()),
Text(
f"\n箱子单价:{case_price}\n花费:{17 + case_price:.2f}\n:{ridicule_result}"
),
]
)
async def open_multiple_case(
user_id: str,
group_id: str,
case_name: str | None,
num: int = 10,
session: EventSession | None = None,
) -> MessageFactory:
"""多连开箱
参数:
user_id (int): 用户id
group_id (int): 群号
case_name (str): 箱子名称
num (int, optional): 数量. Defaults to 10.
session: EventSession
返回:
_type_: _description_
"""
user_id = str(user_id)
group_id = str(group_id)
if not CaseManager.CURRENT_CASES:
return MessageFactory([Text("未收录任何武器箱")])
if not case_name:
case_name = random.choice(CaseManager.CURRENT_CASES) # type: ignore
if case_name not in CaseManager.CURRENT_CASES:
return MessageFactory(
[
Text(
"武器箱未收录, 当前可用武器箱:\n"
+ ", ".join(CaseManager.CURRENT_CASES)
)
]
)
user, _ = await OpenCasesUser.get_or_create(
user_id=user_id,
group_id=group_id,
defaults={"open_cases_time_last": datetime.now()},
)
max_count = await get_user_max_count(user_id)
if user.today_open_total >= max_count:
return MessageFactory(
[
Text(
f"今天已达开箱上限了喔,明天再来吧\n(提升好感度可以增加每日开箱数 #疯狂暗示)"
)
]
)
if max_count - user.today_open_total < num:
return MessageFactory(
[
Text(
f"今天开箱次数不足{num}次噢,请单抽试试看(也许单抽运气更好?)"
f"\n剩余开箱次数:{max_count - user.today_open_total}"
)
]
)
logger.debug(f"尝试开启武器箱: {case_name}", "开箱", session=session)
case = cn2py(case_name) # type: ignore
skin_count = {}
img_list = []
skin_list = await random_skin(num, case_name) # type: ignore
if not skin_list:
return MessageFactory([Text("未抽取到任何皮肤...")])
total_price = 0
log_list = []
now = datetime.now()
user.today_open_total += num
user.total_count += num
user.open_cases_time_last = datetime.now()
await user.save(
update_fields=["today_open_total", "total_count", "open_cases_time_last"]
)
case_price = 0
if case_skin := await BuffSkin.get_or_none(case_name=case_name, color="CASE"):
case_price = case_skin.sell_min_price
img_w, img_h = 0, 0
for skin, rand in skin_list:
img = await draw_card(skin, str(rand)[:11])
img_w, img_h = img.size
total_price += skin.sell_min_price
color_name = COLOR2CN[skin.color]
if not skin_count.get(color_name):
skin_count[color_name] = 0
skin_count[color_name] += 1
add_count(user, skin, case_price)
img_list.append(img)
logger.info(
f"开启{case_name}武器箱获得 {skin.name}{'StatTrak™' if skin.is_stattrak else ''} | {skin.skin_name} ({skin.abrasion}) 磨损: [{rand:.11f}] 价格: {skin.sell_min_price}",
"开箱",
session=session,
)
log_list.append(
OpenCasesLog(
user_id=user_id,
group_id=group_id,
case_name=case_name,
name=skin.name,
skin_name=skin.skin_name,
is_stattrak=skin.is_stattrak,
abrasion=skin.abrasion,
color=skin.color,
price=skin.sell_min_price,
abrasion_value=rand,
create_time=now,
)
)
await user.save()
if log_list:
await OpenCasesLog.bulk_create(log_list, 10)
logger.debug(f"添加 {len(log_list)} 条开箱日志", "开箱", session=session)
img_w += 10
img_h += 10
w = img_w * 5
if num < 5:
h = img_h - 10
w = img_w * num
elif not num % 5:
h = img_h * int(num / 5)
else:
h = img_h * int(num / 5) + img_h
mark_image = BuildImage(w - 10, h - 10, color=(255, 255, 255))
mark_image = await mark_image.auto_paste(img_list, 5, padding=20)
over_count = max_count - user.today_open_total
result = ""
for color_name in skin_count:
result += f"[{color_name}:{skin_count[color_name]}] "
return MessageFactory(
[
Text(f"开启{case_name}武器箱\n剩余开箱次数:{over_count}\n"),
Image(mark_image.pic2bytes()),
Text(
f"\nresult[:-1]\n箱子单价:{case_price}\n总获取金额:{total_price:.2f}\n总花费:{(17 + case_price) * num:.2f}"
),
]
)
async def total_open_statistics(user_id: str, group_id: str) -> str:
user, _ = await OpenCasesUser.get_or_create(user_id=user_id, group_id=group_id)
return (
f"开箱总数:{user.total_count}\n"
f"今日开箱:{user.today_open_total}\n"
f"蓝色军规:{user.blue_count}\n"
f"蓝色暗金:{user.blue_st_count}\n"
f"紫色受限:{user.purple_count}\n"
f"紫色暗金:{user.purple_st_count}\n"
f"粉色保密:{user.pink_count}\n"
f"粉色暗金:{user.pink_st_count}\n"
f"红色隐秘:{user.red_count}\n"
f"红色暗金:{user.red_st_count}\n"
f"金色罕见:{user.knife_count}\n"
f"金色暗金:{user.knife_st_count}\n"
f"花费金额:{user.spend_money}\n"
f"获取金额:{user.make_money:.2f}\n"
f"最后开箱日期:{user.open_cases_time_last.date()}"
)
async def group_statistics(group_id: str):
user_list = await OpenCasesUser.filter(group_id=str(group_id)).all()
# lan zi fen hong jin pricei
uplist = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0.0, 0, 0]
for user in user_list:
uplist[0] += user.blue_count
uplist[1] += user.blue_st_count
uplist[2] += user.purple_count
uplist[3] += user.purple_st_count
uplist[4] += user.pink_count
uplist[5] += user.pink_st_count
uplist[6] += user.red_count
uplist[7] += user.red_st_count
uplist[8] += user.knife_count
uplist[9] += user.knife_st_count
uplist[10] += user.make_money
uplist[11] += user.total_count
uplist[12] += user.today_open_total
return (
f"群开箱总数:{uplist[11]}\n"
f"群今日开箱:{uplist[12]}\n"
f"蓝色军规:{uplist[0]}\n"
f"蓝色暗金:{uplist[1]}\n"
f"紫色受限:{uplist[2]}\n"
f"紫色暗金:{uplist[3]}\n"
f"粉色保密:{uplist[4]}\n"
f"粉色暗金:{uplist[5]}\n"
f"红色隐秘:{uplist[6]}\n"
f"红色暗金:{uplist[7]}\n"
f"金色罕见:{uplist[8]}\n"
f"金色暗金:{uplist[9]}\n"
f"花费金额:{uplist[11] * 17}\n"
f"获取金额:{uplist[10]:.2f}"
)
async def get_my_knifes(user_id: str, group_id: str) -> MessageFactory:
"""获取我的金色
参数:
user_id (str): 用户id
group_id (str): 群号
返回:
MessageFactory: 回复消息或图片
"""
data_list = await get_old_knife(str(user_id), str(group_id))
data_list += await OpenCasesLog.filter(
user_id=user_id, group_id=group_id, color="KNIFE"
).all()
if not data_list:
return MessageFactory([Text("您木有开出金色级别的皮肤喔...")])
length = len(data_list)
if length < 5:
h = 600
w = length * 540
elif length % 5 == 0:
h = 600 * int(length / 5)
w = 540 * 5
else:
h = 600 * int(length / 5) + 600
w = 540 * 5
A = BuildImage(w, h)
image_list = []
for skin in data_list:
name = skin.name + "-" + skin.skin_name + "-" + skin.abrasion
img_path = (
IMAGE_PATH / "csgo_cases" / cn2py(skin.case_name) / f"{cn2py(name)}.jpg"
)
knife_img = BuildImage(470, 600, font_size=20)
await knife_img.paste(
BuildImage(470, 470, background=img_path if img_path.exists() else None),
(0, 0),
)
await knife_img.text(
(5, 500), f"\t{skin.name}|{skin.skin_name}({skin.abrasion})"
)
await knife_img.text((5, 530), f"\t磨损:{skin.abrasion_value}")
await knife_img.text((5, 560), f"\t价格:{skin.price}")
image_list.append(knife_img)
A = await A.auto_paste(image_list, 5)
return MessageFactory([Image(A.pic2bytes())])
async def get_old_knife(user_id: str, group_id: str) -> list[OpenCasesLog]:
"""获取旧数据字段
参数:
user_id (str): 用户id
group_id (str): 群号
返回:
list[OpenCasesLog]: 旧数据兼容
"""
user, _ = await OpenCasesUser.get_or_create(user_id=user_id, group_id=group_id)
knifes_name = user.knifes_name
data_list = []
if knifes_name:
knifes_list = knifes_name[:-1].split(",")
for knife in knifes_list:
try:
if r := re.search(
"(.*)\|\|(.*) \| (.*)\((.*)\) 磨损:(.*) 价格:(.*)", knife
):
case_name_py = r.group(1)
name = r.group(2)
skin_name = r.group(3)
abrasion = r.group(4)
abrasion_value = r.group(5)
price = r.group(6)
name = name.replace("StatTrak™", "")
data_list.append(
OpenCasesLog(
user_id=user_id,
group_id=group_id,
name=name.strip(),
case_name=case_name_py.strip(),
skin_name=skin_name.strip(),
abrasion=abrasion.strip(),
abrasion_value=abrasion_value,
price=price,
)
)
except Exception as e:
logger.error(
f"获取兼容旧数据错误: {knife}",
"我的金色",
session=user_id,
group_id=group_id,
e=e,
)
return data_list
async def auto_update():
"""自动更新武器箱"""
if case_list := Config.get_config("open_cases", "DAILY_UPDATE"):
logger.debug("尝试自动更新武器箱", "更新武器箱")
if "ALL" in case_list:
case_list = CASE2ID.keys()
logger.debug(f"预计自动更新武器箱 {len(case_list)}", "更新武器箱")
for case_name in case_list:
logger.debug(f"开始自动更新武器箱: {case_name}", "更新武器箱")
try:
await update_skin_data(case_name)
rand = random.randint(300, 500)
logger.info(
f"成功自动更新武器箱: {case_name}, 将在 {rand} 秒后再次更新下一武器箱",
"更新武器箱",
)
await asyncio.sleep(rand)
except Exception as e:
logger.error(f"自动更新武器箱: {case_name}", e=e)

View File

@ -0,0 +1,656 @@
import asyncio
import os
import random
import re
import time
from datetime import datetime, timedelta
import nonebot
from tortoise.functions import Count
from zhenxun.configs.config import Config
from zhenxun.configs.path_config import IMAGE_PATH
from zhenxun.services.log import logger
from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.utils.image_utils import BuildImage, BuildMat, MatType
from .build_image import generate_skin
from .config import (
CASE2ID,
CASE_BACKGROUND,
COLOR2NAME,
KNIFE2ID,
NAME2COLOR,
UpdateType,
)
from .models.buff_skin import BuffSkin
from .models.buff_skin_log import BuffSkinLog
from .models.open_cases_user import OpenCasesUser
# from zhenxun.utils.utils import broadcast_group, cn2py
URL = "https://buff.163.com/api/market/goods"
SELL_URL = "https://buff.163.com/goods"
driver = nonebot.get_driver()
BASE_PATH = IMAGE_PATH / "csgo_cases"
class CaseManager:
CURRENT_CASES = []
@classmethod
async def reload(cls):
cls.CURRENT_CASES = []
case_list = await BuffSkin.filter(color="CASE").values_list(
"case_name", flat=True
)
for case_name in (
await BuffSkin.filter(case_name__not="未知武器箱")
.annotate()
.distinct()
.values_list("case_name", flat=True)
):
for name in case_name.split(","): # type: ignore
if name not in cls.CURRENT_CASES and name in case_list:
cls.CURRENT_CASES.append(name)
async def update_skin_data(name: str, is_update_case_name: bool = False) -> str:
"""更新箱子内皮肤数据
参数:
name (str): 箱子名称
is_update_case_name (bool): 是否必定更新所属箱子
返回:
str: 回复内容
"""
type_ = None
if name in CASE2ID:
type_ = UpdateType.CASE
if name in KNIFE2ID:
type_ = UpdateType.WEAPON_TYPE
if not type_:
return "未在指定武器箱或指定武器类型内"
session = Config.get_config("open_cases", "COOKIE")
if not session:
return "BUFF COOKIE为空捏!"
weapon2case = {}
if type_ == UpdateType.WEAPON_TYPE:
db_data = await BuffSkin.filter(name__contains=name).all()
weapon2case = {
item.name + item.skin_name: item.case_name
for item in db_data
if item.case_name != "未知武器箱"
}
data_list, total = await search_skin_page(name, 1, type_)
if isinstance(data_list, str):
return data_list
for page in range(2, total + 1):
rand_time = random.randint(20, 50)
logger.debug(f"访问随机等待时间: {rand_time}", "开箱更新")
await asyncio.sleep(rand_time)
data_list_, total = await search_skin_page(name, page, type_)
if isinstance(data_list_, list):
data_list += data_list_
create_list: list[BuffSkin] = []
update_list: list[BuffSkin] = []
log_list = []
now = datetime.now()
exists_id_list = []
new_weapon2case = {}
for skin in data_list:
if skin.skin_id in exists_id_list:
continue
if skin.case_name:
skin.case_name = (
skin.case_name.replace("", "")
.replace("", "")
.replace("武器箱", "")
.replace(" ", "")
)
skin.name = skin.name.replace("(★ StatTrak™", "").replace("(★)", "")
exists_id_list.append(skin.skin_id)
key = skin.name + skin.skin_name
name_ = skin.name + skin.skin_name + skin.abrasion
skin.create_time = now
skin.update_time = now
if UpdateType.WEAPON_TYPE and not skin.case_name:
if is_update_case_name:
case_name = new_weapon2case.get(key)
else:
case_name = weapon2case.get(key)
if not case_name:
if case_list := await get_skin_case(skin.skin_id):
case_name = ",".join(case_list)
rand = random.randint(10, 20)
logger.debug(
f"获取 {skin.name} | {skin.skin_name} 皮肤所属武器箱: {case_name}, 访问随机等待时间: {rand}",
"开箱更新",
)
await asyncio.sleep(rand)
if not case_name:
case_name = "未知武器箱"
else:
weapon2case[key] = case_name
new_weapon2case[key] = case_name
if skin.case_name == "反恐精英20周年":
skin.case_name = "CS20"
skin.case_name = case_name
if await BuffSkin.exists(skin_id=skin.skin_id):
update_list.append(skin)
else:
create_list.append(skin)
log_list.append(
BuffSkinLog(
name=skin.name,
case_name=skin.case_name,
skin_name=skin.skin_name,
is_stattrak=skin.is_stattrak,
abrasion=skin.abrasion,
color=skin.color,
steam_price=skin.steam_price,
weapon_type=skin.weapon_type,
buy_max_price=skin.buy_max_price,
buy_num=skin.buy_num,
sell_min_price=skin.sell_min_price,
sell_num=skin.sell_num,
sell_reference_price=skin.sell_reference_price,
create_time=now,
)
)
name_ = skin.name + "-" + skin.skin_name + "-" + skin.abrasion
for c_name_ in skin.case_name.split(","):
file_path = BASE_PATH / cn2py(c_name_) / f"{cn2py(name_)}.jpg"
if not file_path.exists():
logger.debug(f"下载皮肤 {name} 图片: {skin.img_url}...", "开箱更新")
await AsyncHttpx.download_file(skin.img_url, file_path)
rand_time = random.randint(1, 10)
await asyncio.sleep(rand_time)
logger.debug(f"图片下载随机等待时间: {rand_time}", "开箱更新")
else:
logger.debug(f"皮肤 {name_} 图片已存在...", "开箱更新")
if create_list:
logger.debug(
f"更新武器箱/皮肤: [<u><e>{name}</e></u>], 创建 {len(create_list)} 个皮肤!"
)
await BuffSkin.bulk_create(set(create_list), 10)
if update_list:
abrasion_list = []
name_list = []
skin_name_list = []
for skin in update_list:
if skin.abrasion not in abrasion_list:
abrasion_list.append(skin.abrasion)
if skin.name not in name_list:
name_list.append(skin.name)
if skin.skin_name not in skin_name_list:
skin_name_list.append(skin.skin_name)
db_data = await BuffSkin.filter(
case_name__contains=name,
skin_name__in=skin_name_list,
name__in=name_list,
abrasion__in=abrasion_list,
).all()
_update_list = []
for data in db_data:
for skin in update_list:
if (
data.name == skin.name
and data.skin_name == skin.skin_name
and data.abrasion == skin.abrasion
):
data.steam_price = skin.steam_price
data.buy_max_price = skin.buy_max_price
data.buy_num = skin.buy_num
data.sell_min_price = skin.sell_min_price
data.sell_num = skin.sell_num
data.sell_reference_price = skin.sell_reference_price
data.update_time = skin.update_time
_update_list.append(data)
logger.debug(
f"更新武器箱/皮肤: [<u><c>{name}</c></u>], 更新 {len(create_list)} 个皮肤!"
)
await BuffSkin.bulk_update(
_update_list,
[
"steam_price",
"buy_max_price",
"buy_num",
"sell_min_price",
"sell_num",
"sell_reference_price",
"update_time",
],
10,
)
if log_list:
logger.debug(
f"更新武器箱/皮肤: [<u><e>{name}</e></u>], 新增 {len(log_list)} 条皮肤日志!"
)
await BuffSkinLog.bulk_create(log_list)
if name not in CaseManager.CURRENT_CASES:
CaseManager.CURRENT_CASES.append(name) # type: ignore
return f"更新武器箱/皮肤: [{name}] 成功, 共更新 {len(update_list)} 个皮肤, 新创建 {len(create_list)} 个皮肤!"
async def search_skin_page(
s_name: str, page_index: int, type_: UpdateType
) -> tuple[list[BuffSkin] | str, int]:
"""查询箱子皮肤
参数:
s_name (str): 箱子/皮肤名称
page_index (int): 页数
返回:
tuple[list[BuffSkin] | str, int]: BuffSkin
"""
logger.debug(
f"尝试访问武器箱/皮肤: [<u><e>{s_name}</e></u>] 页数: [<u><y>{page_index}</y></u>]",
"开箱更新",
)
cookie = {"session": Config.get_config("open_cases", "COOKIE")}
params = {
"game": "csgo",
"page_num": page_index,
"page_size": 80,
"_": time.time(),
"use_suggestio": 0,
}
if type_ == UpdateType.CASE:
params["itemset"] = CASE2ID[s_name]
elif type_ == UpdateType.WEAPON_TYPE:
params["category"] = KNIFE2ID[s_name]
proxy = None
if ip := Config.get_config("open_cases", "BUFF_PROXY"):
proxy = {"http://": ip, "https://": ip}
response = None
error = ""
for i in range(3):
try:
response = await AsyncHttpx.get(
URL,
proxy=proxy,
params=params,
cookies=cookie, # type: ignore
)
if response.status_code == 200:
break
rand = random.randint(3, 7)
logger.debug(
f"尝试访问武器箱/皮肤第 {i+1} 次访问异常, code: {response.status_code}",
"开箱更新",
)
await asyncio.sleep(rand)
except Exception as e:
logger.debug(
f"尝试访问武器箱/皮肤第 {i+1} 次访问发生错误 {type(e)}: {e}", "开箱更新"
)
error = f"{type(e)}: {e}"
if not response:
return f"访问发生异常: {error}", -1
if response.status_code == 200:
# logger.debug(f"访问BUFF API: {response.text}", "开箱更新")
json_data = response.json()
update_data = []
if json_data["code"] == "OK":
data_list = json_data["data"]["items"]
for data in data_list:
obj = {}
if type_ == UpdateType.CASE:
obj["case_name"] = s_name
name = data["name"]
try:
logger.debug(
f"武器箱: [<u><e>{s_name}</e></u>] 页数: [<u><y>{page_index}</y></u>] 正在收录皮肤: [<u><c>{name}</c></u>]...",
"开箱更新",
)
obj["skin_id"] = str(data["id"])
obj["buy_max_price"] = data["buy_max_price"] # 求购最大金额
obj["buy_num"] = data["buy_num"] # 当前求购
goods_info = data["goods_info"]
info = goods_info["info"]
tags = info["tags"]
obj["weapon_type"] = tags["type"]["localized_name"] # 枪械类型
if obj["weapon_type"] in ["音乐盒", "印花", "探员"]:
continue
elif obj["weapon_type"] in ["匕首", "手套"]:
obj["color"] = "KNIFE"
obj["name"] = data["short_name"].split("")[0].strip() # 名称
elif obj["weapon_type"] in ["武器箱"]:
obj["color"] = "CASE"
obj["name"] = data["short_name"]
else:
obj["color"] = NAME2COLOR[tags["rarity"]["localized_name"]]
obj["name"] = tags["weapon"]["localized_name"] # 名称
if obj["weapon_type"] not in ["武器箱"]:
obj["abrasion"] = tags["exterior"]["localized_name"] # 磨损
obj["is_stattrak"] = "StatTrak" in tags["quality"]["localized_name"] # type: ignore # 是否暗金
if not obj["color"]:
obj["color"] = NAME2COLOR[
tags["rarity"]["localized_name"]
] # 品质颜色
else:
obj["abrasion"] = "CASE"
obj["skin_name"] = (
data["short_name"].split("|")[-1].strip()
) # 皮肤名称
obj["img_url"] = goods_info["original_icon_url"] # 图片url
obj["steam_price"] = goods_info["steam_price_cny"] # steam价格
obj["sell_min_price"] = data["sell_min_price"] # 售卖最低价格
obj["sell_num"] = data["sell_num"] # 售卖数量
obj["sell_reference_price"] = data[
"sell_reference_price"
] # 参考价格
update_data.append(BuffSkin(**obj))
except Exception as e:
logger.error(
f"更新武器箱: [<u><e>{s_name}</e></u>] 皮肤: [<u><c>{s_name}</c></u>] 错误",
e=e,
)
logger.debug(
f"访问武器箱: [<u><e>{s_name}</e></u>] 页数: [<u><y>{page_index}</y></u>] 成功并收录完成",
"开箱更新",
)
return update_data, json_data["data"]["total_page"]
else:
logger.warning(f'访问BUFF失败: {json_data["error"]}')
return f'访问失败: {json_data["error"]}', -1
return f"访问失败, 状态码: {response.status_code}", -1
async def build_case_image(case_name: str | None) -> BuildImage | str:
"""构造武器箱图片
参数:
case_name (str): 名称
返回:
BuildImage | str: 图片
"""
background = random.choice(os.listdir(CASE_BACKGROUND))
background_img = BuildImage(0, 0, background=CASE_BACKGROUND / background)
if case_name:
log_list = (
await BuffSkinLog.filter(case_name__contains=case_name)
.annotate(count=Count("id"))
.group_by("skin_name")
.values_list("skin_name", "count")
)
skin_list_ = await BuffSkin.filter(case_name__contains=case_name).all()
skin2count = {item[0]: item[1] for item in log_list}
case = None
skin_list: list[BuffSkin] = []
exists_name = []
for skin in skin_list_:
if skin.color == "CASE":
case = skin
else:
name = skin.name + skin.skin_name
if name not in exists_name:
skin_list.append(skin)
exists_name.append(name)
generate_img = {}
for skin in skin_list:
skin_img = await generate_skin(skin, skin2count.get(skin.skin_name, 0))
if skin_img:
if not generate_img.get(skin.color):
generate_img[skin.color] = []
generate_img[skin.color].append(skin_img)
skin_image_list = []
for color in COLOR2NAME:
if generate_img.get(color):
skin_image_list = skin_image_list + generate_img[color]
img = skin_image_list[0]
img_w, img_h = img.size
total_size = (img_w + 25) * (img_h + 10) * len(skin_image_list) # 总面积
new_size = get_bk_image_size(total_size, background_img.size, img.size, 250)
A = BuildImage(
new_size[0] + 50, new_size[1], background=CASE_BACKGROUND / background
)
await A.filter("GaussianBlur", 2)
if case:
case_img = await generate_skin(
case, skin2count.get(f"{case_name}武器箱", 0)
)
if case_img:
await A.paste(case_img, (25, 25))
w = 25
h = 230
skin_image_list.reverse()
for image in skin_image_list:
await A.paste(image, (w, h))
w += image.width + 20
if w + image.width - 25 > A.width:
h += image.height + 10
w = 25
if h + img_h + 100 < A.height:
await A.crop((0, 0, A.width, h + img_h + 100))
return A
else:
log_list = (
await BuffSkinLog.filter(color="CASE")
.annotate(count=Count("id"))
.group_by("case_name")
.values_list("case_name", "count")
)
name2count = {item[0]: item[1] for item in log_list}
skin_list = await BuffSkin.filter(color="CASE").all()
image_list: list[BuildImage] = []
for skin in skin_list:
if img := await generate_skin(skin, name2count[skin.case_name]):
image_list.append(img)
if not image_list:
return "未收录武器箱"
w = 25
h = 150
img = image_list[0]
img_w, img_h = img.size
total_size = (img_w + 25) * (img_h + 10) * len(image_list) # 总面积
new_size = get_bk_image_size(total_size, background_img.size, img.size, 155)
A = BuildImage(
new_size[0] + 50, new_size[1], background=CASE_BACKGROUND / background
)
await A.filter("GaussianBlur", 2)
bk_img = BuildImage(
img_w, 120, color=(25, 25, 25, 100), font_size=60, font="CJGaoDeGuo.otf"
)
await bk_img.text(
(0, 0),
f"已收录 {len(image_list)} 个武器箱",
(255, 255, 255),
center_type="center",
)
await A.paste(bk_img, (10, 10), "width")
for image in image_list:
await A.paste(image, (w, h))
w += image.width + 20
if w + image.width - 25 > A.width:
h += image.height + 10
w = 25
if h + img_h + 100 < A.height:
await A.crop((0, 0, A.width, h + img_h + 100))
return A
def get_bk_image_size(
total_size: int,
base_size: tuple[int, int],
img_size: tuple[int, int],
extra_height: int = 0,
) -> tuple[int, int]:
"""获取所需背景大小且不改变图片长宽比
参数:
total_size (int): 总面积
base_size (Tuple[int, int]): 初始背景大小
img_size (Tuple[int, int]): 贴图大小
返回:
tuple[int, int]: 满足所有贴图大小
"""
bk_w, bk_h = base_size
img_w, img_h = img_size
is_add_title_size = False
left_dis = 0
right_dis = 0
old_size = (0, 0)
new_size = (0, 0)
ratio = 1.1
while 1:
w_ = int(ratio * bk_w)
h_ = int(ratio * bk_h)
size = w_ * h_
if size < total_size:
left_dis = size
else:
right_dis = size
r = w_ / (img_w + 25)
if right_dis and r - int(r) < 0.1:
if not is_add_title_size and extra_height:
total_size = int(total_size + w_ * extra_height)
is_add_title_size = True
right_dis = 0
continue
if total_size - left_dis > right_dis - total_size:
new_size = (w_, h_)
else:
new_size = old_size
break
old_size = (w_, h_)
ratio += 0.1
return new_size
async def get_skin_case(id_: str) -> list[str] | None:
"""获取皮肤所在箱子
参数:
id_ (str): 皮肤id
返回:
list[str] | None: 武器箱名称
"""
url = f"{SELL_URL}/{id_}"
proxy = None
if ip := Config.get_config("open_cases", "BUFF_PROXY"):
proxy = {"http://": ip, "https://": ip}
response = await AsyncHttpx.get(
url,
proxy=proxy,
)
if response.status_code == 200:
text = response.text
if r := re.search('<meta name="description"(.*?)>', text):
case_list = []
for s in r.group(1).split(","):
if "武器箱" in s:
case_list.append(
s.replace("", "")
.replace("", "")
.replace('"', "")
.replace("'", "")
.replace("武器箱", "")
.replace(" ", "")
)
return case_list
else:
logger.debug(f"访问皮肤所属武器箱异常 url: {url} code: {response.status_code}")
return None
async def init_skin_trends(
name: str, skin: str, abrasion: str, day: int = 7
) -> BuildImage | None:
date = datetime.now() - timedelta(days=day)
log_list = (
await BuffSkinLog.filter(
name__contains=name.upper(),
skin_name=skin,
abrasion__contains=abrasion,
create_time__gt=date,
is_stattrak=False,
)
.order_by("create_time")
.limit(day * 5)
.all()
)
if not log_list:
return None
date_list = []
price_list = []
for log in log_list:
date = str(log.create_time.date())
if date not in date_list:
date_list.append(date)
price_list.append(log.sell_min_price)
graph = BuildMat(MatType.LINE)
graph.data = price_list
graph.title = f"{name}({skin})价格趋势({day})"
graph.x_index = date_list
return await graph.build()
async def reset_count_daily():
"""
重置每日开箱
"""
try:
await OpenCasesUser.all().update(today_open_total=0)
# await broadcast_group(
# "[[_task|open_case_reset_remind]]今日开箱次数重置成功",
# log_cmd="开箱重置提醒",
# )
except Exception as e:
logger.error(f"开箱重置错误", e=e)
async def download_image(case_name: str | None = None):
"""下载皮肤图片
参数:
case_name: 箱子名称.
"""
skin_list = (
await BuffSkin.filter(case_name=case_name).all()
if case_name
else await BuffSkin.all()
)
for skin in skin_list:
name_ = skin.name + "-" + skin.skin_name + "-" + skin.abrasion
for c_name_ in skin.case_name.split(","):
try:
pass
# file_path = BASE_PATH / cn2py(c_name_) / f"{cn2py(name_)}.jpg"
# if not file_path.exists():
# logger.debug(
# f"下载皮肤 {c_name_}/{skin.name} 图片: {skin.img_url}...",
# "开箱图片更新",
# )
# await AsyncHttpx.download_file(skin.img_url, file_path)
# rand_time = random.randint(1, 5)
# await asyncio.sleep(rand_time)
# logger.debug(f"图片下载随机等待时间: {rand_time}", "开箱图片更新")
# else:
# logger.debug(
# f"皮肤 {c_name_}/{skin.name} 图片已存在...", "开箱图片更新"
# )
except Exception as e:
logger.error(
f"下载皮肤 {c_name_}/{skin.name} 图片: {skin.img_url}",
"开箱图片更新",
e=e,
)
@driver.on_startup
async def _():
await CaseManager.reload()

View File

@ -1,3 +1,469 @@
import random
from io import BytesIO
from pathlib import Path
from re import S
from pydantic import BaseModel
from strenum import StrEnum
from ._build_image import BuildImage
class MatType(StrEnum):
LINE = "LINE"
"""折线图"""
BAR = "BAR"
"""柱状图"""
BARH = "BARH"
"""横向柱状图"""
class BuildMatData(BaseModel):
mat_type: MatType
"""类型"""
data: list[int | float] = []
"""数据"""
x_name: str | None = None
"""X轴坐标名称"""
y_name: str | None = None
"""Y轴坐标名称"""
x_index: list[str] = []
"""显示轴坐标值"""
y_index: list[int | float] = []
"""数据轴坐标值"""
space: tuple[int, int] = (15, 15)
"""坐标值间隔(X, Y)"""
rotate: tuple[int, int] = (0, 0)
"""坐标值旋转(X, Y)"""
title: str | None = None
"""标题"""
font: str = "msyh.ttf"
"""字体"""
font_size: int = 15
"""字体大小"""
display_num: bool = True
"""是否在点与柱状图顶部显示数值"""
is_grid: bool = False
"""是否添加栅格"""
background_color: tuple[int, int, int] | str = (255, 255, 255)
"""背景颜色"""
background: Path | bytes | None = None
"""背景图片"""
bar_color: list[str] = ["*"]
"""柱状图柱子颜色, 多个时随机, 使用 * 时七色随机"""
padding: tuple[int, int] = (50, 50)
"""图表上下左右边距"""
class BuildMat:
def pic2bs4(self):
return ""
"""
针对 折线图/柱状图基于 BuildImage 编写的 非常难用的 自定义画图工具
目前仅支持 正整数
"""
class InitGraph(BaseModel):
mark_image: BuildImage
"""BuildImage"""
x_height: int
"""横坐标高度"""
x_point: list[int]
"""横坐标坐标"""
graph_height: int
"""坐标轴高度"""
class Config:
arbitrary_types_allowed = True
def __init__(self, mat_type: MatType) -> None:
self.line_length = 760
self._x_padding = 0
self._y_padding = 0
self.build_data = BuildMatData(mat_type=mat_type)
@property
def x_name(self) -> str | None:
return self.build_data.x_name
@x_name.setter
def x_name(self, data: str) -> str | None:
self.build_data.x_name = data
@property
def y_name(self) -> str | None:
return self.build_data.y_name
@y_name.setter
def y_name(self, data: str) -> str | None:
self.build_data.y_name = data
@property
def data(self) -> list[int | float]:
return self.build_data.data
@data.setter
def data(self, data: list[int | float]):
self._check_value(data, self.build_data.y_index)
self.build_data.data = data
@property
def x_index(self) -> list[str]:
return self.build_data.x_index
@x_index.setter
def x_index(self, data: list[str]):
self.build_data.x_index = data
@property
def y_index(self) -> list[int | float]:
return self.build_data.y_index
@y_index.setter
def y_index(self, data: list[int | float]):
# self._check_value(self.build_data.data, data)
data.sort()
self.build_data.y_index = data
@property
def space(self) -> tuple[int, int]:
return self.build_data.space
@space.setter
def space(self, data: tuple[int, int]):
self.build_data.space = data
@property
def rotate(self) -> tuple[int, int]:
return self.build_data.rotate
@rotate.setter
def rotate(self, data: tuple[int, int]):
self.build_data.rotate = data
@property
def title(self) -> str | None:
return self.build_data.title
@title.setter
def title(self, data: str):
self.build_data.title = data
@property
def font(self) -> str:
return self.build_data.font
@font.setter
def font(self, data: str):
self.build_data.font = data
# @property
# def font_size(self) -> int:
# return self.build_data.font_size
# @font_size.setter
# def font_size(self, data: int):
# self.build_data.font_size = data
@property
def display_num(self) -> bool:
return self.build_data.display_num
@display_num.setter
def display_num(self, data: bool):
self.build_data.display_num = data
@property
def is_grid(self) -> bool:
return self.build_data.is_grid
@is_grid.setter
def is_grid(self, data: bool):
self.build_data.is_grid = data
@property
def background_color(self) -> tuple[int, int, int] | str:
return self.build_data.background_color
@background_color.setter
def background_color(self, data: tuple[int, int, int] | str):
self.build_data.background_color = data
@property
def background(self) -> Path | bytes | None:
return self.build_data.background
@background.setter
def background(self, data: Path | bytes):
self.build_data.background = data
@property
def bar_color(self) -> list[str]:
return self.build_data.bar_color
@bar_color.setter
def bar_color(self, data: list[str]):
self.build_data.bar_color = data
def _check_value(
self,
y: list[int | float],
y_index: list[int | float] | None = None,
x_index: list[int | float] | None = None,
):
"""检查值合法性
参数:
y: 坐标值
y_index: y轴坐标值
x_index: x轴坐标值
"""
if y_index:
_value = x_index if self.build_data.mat_type == "barh" else y_index
if not isinstance(y[0], str):
__y = [float(t_y) for t_y in y]
_y_index = [float(t_y) for t_y in y_index]
if max(__y) > max(_y_index):
raise ValueError("坐标点的值必须小于y轴坐标的最大值...")
i = -9999999999
for _y in _y_index:
if _y > i:
i = _y
else:
raise ValueError("y轴坐标值必须有序...")
async def build(self):
"""构造图片"""
A = None
bar_color = self.build_data.bar_color
if "*" in bar_color:
bar_color = [
"#FF0000",
"#FF7F00",
"#FFFF00",
"#00FF00",
"#00FFFF",
"#0000FF",
"#8B00FF",
]
init_graph = await self._init_graph()
mark_image = None
if self.build_data.mat_type == MatType.LINE:
mark_image = await self._build_line_graph(init_graph, bar_color)
if self.build_data.mat_type == MatType.BAR:
pass
if self.build_data.mat_type == MatType.BARH:
pass
if mark_image:
padding_width, padding_height = self.build_data.padding
width = mark_image.width + padding_width * 2
height = mark_image.height + padding_height * 2
if self.build_data.background:
if isinstance(self.build_data.background, bytes):
A = BuildImage(
width, height, background=BytesIO(self.build_data.background)
)
elif isinstance(self.build_data.background, Path):
A = BuildImage(width, height, background=self.build_data.background)
else:
A = BuildImage(width, height, self.build_data.background_color)
if A:
await A.paste(mark_image, (padding_width, padding_height))
if self.build_data.title:
font = BuildImage.load_font(
self.build_data.font, self.build_data.font_size + 7
)
title_width, title_height = BuildImage.get_text_size(
self.build_data.title, font
)
pos = (
int(A.width / 2 - title_width / 2),
int(padding_height / 2 - title_height / 2),
)
await A.text(pos, self.build_data.title)
if self.build_data.x_name:
font = BuildImage.load_font(
self.build_data.font, self.build_data.font_size + 4
)
title_width, title_height = BuildImage.get_text_size(
self.build_data.x_name, font # type: ignore
)
pos = (
A.width - title_width - 20,
A.height - int(padding_height / 2 + title_height),
)
await A.text(pos, self.build_data.x_name)
return A
async def _init_graph(self) -> InitGraph:
"""构造初始化图表
返回:
InitGraph: InitGraph
"""
padding_width = 0
padding_height = 0
font = BuildImage.load_font(self.build_data.font, self.build_data.font_size)
width_list = []
height_list = []
for x in self.build_data.x_index:
text_size = BuildImage.get_text_size(x, font)
if text_size[1] > padding_height:
padding_height = text_size[1]
width_list.append(text_size[0])
if not self.build_data.y_index:
"""没有指定y_index时使用data自动生成"""
max_num = max(self.build_data.data)
s = int(max_num / 5)
_y_index = [max_num]
for _n in range(4):
max_num -= s
_y_index.append(max_num)
_y_index.sort()
self.build_data.y_index = _y_index
for item in self.build_data.y_index:
text_size = BuildImage.get_text_size(str(item), font)
if text_size[0] > padding_width:
padding_width = text_size[0]
height_list.append(text_size[1])
width = (
sum([w + self.build_data.space[0] for w in width_list])
+ height_list[0]
+ self.build_data.space[0] * 2
+ 20
)
height = (
sum([h + self.build_data.space[1] for h in height_list])
+ self.build_data.space[1] * 2
+ 30
)
if self.build_data.mat_type == MatType.BARH:
"""横向柱状图时xy轴长度调换"""
_tmp = height
height = width
width = _tmp
A = BuildImage(
width,
(height + 10),
color=(255, 255, 255, 0),
)
padding_height += 5
await A.line(
(
padding_width + 5,
padding_height,
padding_width + 5,
height - padding_height,
),
width=2,
)
await A.line(
(
padding_width + 5,
height - padding_height,
width - padding_width + 5,
height - padding_height,
),
width=2,
)
_x_index = self.build_data.x_index
_y_index = self.build_data.y_index
if self.build_data.mat_type == MatType.BARH:
_tmp = _y_index
_y_index = _x_index
_x_index = _tmp
cur_width = padding_width + self.build_data.space[0] * 2
cur_height = height - height_list[0] - 5
x_point = []
for i, _x in enumerate(_x_index):
"""X轴数值"""
grid_height = cur_height
if self.build_data.is_grid:
grid_height = padding_height
await A.line((cur_width, cur_height - 1, cur_width, grid_height - 5))
x_point.append(cur_width - 1)
mid_point = cur_width - int(width_list[i] / 2)
await A.text((mid_point, cur_height), str(_x), font=font)
cur_width += width_list[i] + self.build_data.space[0]
cur_width = padding_width
cur_height = height - self.build_data.padding[1]
for i, _y in enumerate(_y_index):
"""Y轴数值"""
grid_width = cur_width
if self.build_data.is_grid:
grid_width = width - padding_width + 5
await A.line((cur_width + 5, cur_height, grid_width + 11, cur_height))
text_width = BuildImage.get_text_size(str(_y), font)[0]
await A.text(
(cur_width - text_width, cur_height - int(height_list[i] / 2) - 3),
str(_y),
font=font,
)
cur_height -= height_list[i] + self.build_data.space[1]
graph_height = height - self.build_data.padding[1] - cur_height + 5
return self.InitGraph(
mark_image=A,
x_height=height - height_list[0] - 5,
graph_height=graph_height,
x_point=x_point,
)
async def _build_line_graph(
self, init_graph: InitGraph, bar_color: list[str]
) -> BuildImage:
"""构建折线图
参数:
init_graph: InitGraph
bar_color: 颜色列表
返回:
BuildImage: 折线图
"""
font = BuildImage.load_font(self.build_data.font, self.build_data.font_size)
mark_image = init_graph.mark_image
x_height = init_graph.x_height
graph_height = init_graph.graph_height
random_color = random.choice(bar_color)
_black_point = BuildImage(11, 11, color=random_color)
await _black_point.circle()
max_num = max(self.y_index)
point_list = []
for x_p, y in zip(init_graph.x_point, self.build_data.data):
"""折线图标点"""
y_height = int(y / max_num * init_graph.graph_height)
await mark_image.paste(_black_point, (x_p, x_height - y_height))
point_list.append((x_p + 4, x_height - y_height + 4))
for i in range(len(point_list) - 1):
"""画线"""
a_x, a_y = point_list[i]
b_x, b_y = point_list[i + 1]
await mark_image.line((a_x, a_y, b_x, b_y), random_color)
if self.build_data.display_num:
"""显示数值"""
value = self.build_data.data[i]
text_size = BuildImage.get_text_size(str(value), font)
await mark_image.text(
(a_x - int(text_size[0] / 2), a_y - text_size[1] - 5),
str(value),
font=font,
)
"""最后一个数值显示"""
value = self.build_data.data[-1]
text_size = BuildImage.get_text_size(str(value), font)
await mark_image.text(
(
point_list[-1][0] - int(text_size[0] / 2),
point_list[-1][1] - text_size[1] - 5,
),
str(value),
font=font,
)
return mark_image
async def _build_bar_graph(self):
pass
async def _build_barh_graph(self):
pass

View File

@ -7,7 +7,7 @@ from typing import Awaitable, Callable
from nonebot.utils import is_coroutine_callable
from ._build_image import BuildImage, ColorAlias
from ._build_mat import BuildMat
from ._build_mat import BuildMat, MatType
from ._image_template import ImageTemplate, RowStyle
# TODO: text2image 长度错误