重写开箱更新箱子,允许更新目前所有箱子的皮肤

This commit is contained in:
HibiKier 2023-03-01 22:54:03 +08:00
parent 6d7e79e9de
commit 83c3f44ae7
9 changed files with 420 additions and 628 deletions

View File

@ -14,6 +14,7 @@ from configs.config import Config
from configs.path_config import IMAGE_PATH
from utils.utils import is_number, scheduler
from .config import CASE2ID
from .open_cases_c import (
group_statistics,
my_knifes_name,
@ -21,7 +22,7 @@ from .open_cases_c import (
open_multiple_case,
total_open_statistics,
)
from .utils import reset_count_daily, util_get_buff_img, util_get_buff_price
from .utils import CaseManager, reset_count_daily, update_case_data
__zx_plugin_name__ = "开箱"
__plugin_usage__ = """
@ -104,7 +105,6 @@ Config.add_plugin_config(
type=int,
)
cases_name = ["狂牙大行动", "突围大行动", "命悬一线", "裂空", "光谱"]
cases_matcher_group = MatcherGroup(priority=5, permission=GROUP, block=True)
@ -123,12 +123,7 @@ async def _(event: GroupMessageEvent):
async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
case_name = arg.extract_plain_text().strip()
case_name = case_name.replace("武器箱", "").strip()
if case_name:
result = await open_case(event.user_id, event.group_id, case_name)
else:
result = await open_case(
event.user_id, event.group_id, random.choice(cases_name)
)
result = await open_case(event.user_id, event.group_id, case_name)
await k_open_case.finish(result, at_sender=True)
@ -183,10 +178,6 @@ async def _(
else:
await open_multiple.finish("必须要是数字切不要超过30啊笨蛋中文也可", at_sender=True)
case_name = case_name.replace("武器箱", "").strip()
if not case_name:
case_name = random.choice(cases_name)
elif case_name not in cases_name:
await open_multiple.finish("武器箱未收录!", at_sender=True)
await open_multiple.finish(
await open_multiple_case(event.user_id, event.group_id, case_name, num),
at_sender=True,
@ -227,24 +218,21 @@ num_dict = {
}
update_price = on_command("更新开箱价格", priority=1, permission=SUPERUSER, block=True)
update_data = on_command("更新武器箱", priority=1, permission=SUPERUSER, block=True)
@update_price.handle()
@update_data.handle()
async def _(event: MessageEvent, arg: Message = CommandArg()):
await update_price.send(
await util_get_buff_price(arg.extract_plain_text().strip() or "狂牙大行动")
)
update_img = on_command("更新开箱图片", priority=1, permission=SUPERUSER, block=True)
@update_img.handle()
async def _(event: MessageEvent, arg: Message = CommandArg()):
await update_img.send(
await util_get_buff_img(arg.extract_plain_text().strip() or "狂牙大行动")
)
msg = arg.extract_plain_text().strip()
if not msg:
case_list = []
for i, case_name in enumerate(CASE2ID):
if case_name in CaseManager.CURRENT_CASES:
case_list.append(f"{i+1}.{case_name} [已更新]")
else:
case_list.append(f"{i+1}.{case_name}")
await update_data.finish("未指定武器箱, 当前已包含武器箱\n" + "\n".join(case_list))
await update_data.send(await update_case_data(msg))
# 重置开箱

View File

@ -32,266 +32,48 @@ WELL_WORN_E = 0.44999
BATTLE_SCARED_S = 0.45
BATTLE_SCARED_E = 0.99999
# 狂牙大行动
KUANGYADAXINGDONG_CASE_KNIFE = [
"摩托手套 | 第三特种兵连",
"狂牙手套 | 翡翠",
"驾驶手套 | 美洲豹女王",
"运动手套 | 弹弓",
"专业手套 | 老虎精英",
"专业手套 | 渐变大理石",
"运动手套 | 夜行衣",
"驾驶手套 | 西装革履",
"摩托手套 | 终点线",
"摩托手套 | 血压",
"运动手套 | 猩红头巾",
"驾驶手套 | 雪豹",
"裹手 | 长颈鹿",
"驾驶手套 | 绯红列赞",
"裹手 | 沙漠头巾",
"专业手套 | 一线特工",
"狂牙手套 | 黄色斑纹",
"摩托手套 | 小心烟幕弹",
"裹手 | 蟒蛇",
"裹手 | 警告!",
"狂牙手套 | 精神错乱",
"运动手套 | 大型猎物",
"狂牙手套 | 针尖",
"专业手套 | 陆军少尉长官",
]
KUANGYADAXINGDONG_CASE_RED = ["M4A1 | 印花集", "格洛克 | 黑色魅影"]
KUANGYADAXINGDONG_CASE_PINK = ["FN57 | 童话城堡", "M4A4 | 赛博", "USP | 小绿怪"]
KUANGYADAXINGDONG_CASE_PURPLE = [
"AWP | 亡灵之主",
"双持贝瑞塔 | 灾难",
"新星 | 一见青心",
"SSG 08 | 抖枪",
"UMP-45 | 金铋辉煌",
]
KUANGYADAXINGDONG_CASE_BLUE = [
"CZ75 | 世仇",
"P90 | 大怪兽RUSH",
"G3SG1 | 血腥迷彩",
"加利尔 AR | 破坏者",
"P250 | 污染物",
"M249 | 等高线",
"MP5-SD | 零点行动",
]
# 突围大行动
TUWEIDAXINGDONG_CASE_KNIFE = [
"蝴蝶刀 | 无涂装",
"蝴蝶刀 | 蓝钢",
"蝴蝶刀 | 屠夫",
"蝴蝶刀 | 森林 DDPAT",
"蝴蝶刀 | 北方森林",
"蝴蝶刀 | 狩猎网格",
"蝴蝶刀 | 枯焦之色",
"蝴蝶刀 | 人工染色",
"蝴蝶刀 | 都市伪装",
"蝴蝶刀 | 表面淬火",
"蝴蝶刀 | 深红之网",
"蝴蝶刀 | 渐变之色",
"蝴蝶刀 | 噩梦之夜",
]
TUWEIDAXINGDONG_CASE_RED = ["P90 | 二西莫夫", "M4A1 | 次时代"]
TUWEIDAXINGDONG_CASE_PINK = ["沙漠之鹰 | 阴谋者", "FN57 | 狩猎利器", "格洛克 | 水灵"]
TUWEIDAXINGDONG_CASE_PURPLE = ["PP-野牛 | 死亡主宰者", "CZ75 | 猛虎", "新星 | 锦鲤", "P250 | 超新星"]
TUWEIDAXINGDONG_CASE_BLUE = [
"MP7 | 都市危机",
"内格夫 | 沙漠精英",
"P2000 | 乳白象牙",
"SSG 08 | 无尽深海",
"UMP-45 | 迷之宫",
]
# 命悬一线
MINGXUANYIXIAN_CASE_KNIFE = [
"专业手套 | 大腕",
"专业手套 | 深红之网",
"专业手套 | 渐变之色",
"专业手套 | 狩鹿",
"九头蛇手套 | 响尾蛇",
"九头蛇手套 | 红树林",
"九头蛇手套 | 翡翠色调",
"九头蛇手套 | 表面淬火",
"摩托手套 | 交运",
"摩托手套 | 嘭!",
"摩托手套 | 多边形",
"摩托手套 | 玳瑁",
"裹手 | 套印",
"裹手 | 森林色调",
"裹手 | 钴蓝骷髅",
"裹手 | 防水布胶带",
"运动手套 | 双栖",
"运动手套 | 欧米伽",
"运动手套 | 迈阿密风云",
"运动手套 | 青铜形态",
"驾驶手套 | 墨绿色调",
"驾驶手套 | 王蛇",
"驾驶手套 | 蓝紫格子",
"驾驶手套 | 超越",
]
MINGXUANYIXIAN_CASE_RED = ["M4A4 | 黑色魅影", "MP7 | 血腥运动"]
MINGXUANYIXIAN_CASE_PINK = ["AUG | 湖怪鸟", "AWP | 死神", "USP | 脑洞大开"]
MINGXUANYIXIAN_CASE_PURPLE = [
"MAG-7 | SWAG-7",
"UMP-45 | 白狼",
"内格夫 | 狮子鱼",
"新星 | 狂野六号",
"格洛克 | 城里的月光",
]
MINGXUANYIXIAN_CASE_BLUE = [
"FN57 | 焰色反应",
"MP9 | 黑砂",
"P2000 | 都市危机",
"PP-野牛 | 黑夜暴乱",
"R8 左轮手枪 | 稳",
"SG 553 | 阿罗哈",
"XM1014 | 锈蚀烈焰",
]
LIEKONG_CASE_KNIFE = [
"求生匕首 | 无涂装",
"求生匕首 | 人工染色",
"求生匕首 | 北方森林",
"求生匕首 | 夜色",
"求生匕首 | 屠夫",
"求生匕首 | 枯焦之色",
"求生匕首 | 森林 DDPAT",
"求生匕首 | 深红之网",
"求生匕首 | 渐变之色",
"求生匕首 | 狩猎网格",
"求生匕首 | 蓝钢",
"求生匕首 | 表面淬火",
"求生匕首 | 都市伪装",
"流浪者匕首 | 无涂装",
"流浪者匕首 | 人工染色",
"流浪者匕首 | 北方森林",
"流浪者匕首 | 夜色",
"流浪者匕首 | 屠夫",
"流浪者匕首 | 枯焦之色",
"流浪者匕首 | 森林 DDPAT",
"流浪者匕首 | 深红之网",
"流浪者匕首 | 渐变之色",
"流浪者匕首 | 狩猎网格",
"流浪者匕首 | 蓝钢",
"流浪者匕首 | 表面淬火",
"流浪者匕首 | 都市伪装",
"系绳匕首 | 无涂装",
"系绳匕首 | 人工染色",
"系绳匕首 | 北方森林",
"系绳匕首 | 夜色",
"系绳匕首 | 屠夫",
"系绳匕首 | 枯焦之色",
"系绳匕首 | 森林 DDPAT",
"系绳匕首 | 深红之网",
"系绳匕首 | 渐变之色",
"系绳匕首 | 狩猎网格",
"系绳匕首 | 蓝钢",
"系绳匕首 | 表面淬火",
"系绳匕首 | 都市伪装",
"骷髅匕首 | 无涂装",
"骷髅匕首 | 人工染色",
"骷髅匕首 | 北方森林",
"骷髅匕首 | 夜色",
"骷髅匕首 | 屠夫",
"骷髅匕首 | 枯焦之色",
"骷髅匕首 | 森林 DDPAT",
"骷髅匕首 | 深红之网",
"骷髅匕首 | 渐变之色",
"骷髅匕首 | 狩猎网格",
"骷髅匕首 | 蓝钢",
"骷髅匕首 | 表面淬火",
"骷髅匕首 | 都市伪装",
]
LIEKONG_CASE_RED = ["AK-47 | 阿努比斯军团", "沙漠之鹰 | 印花集"]
LIEKONG_CASE_PINK = ["M4A4 | 齿仙", "XM1014 | 埋葬之影", "格洛克 | 摩登时代"]
LIEKONG_CASE_PURPLE = [
"加利尔 AR | 凤凰商号",
"Tec-9 | 兄弟连",
"MP5-SD | 猛烈冲锋",
"MAG-7 | 北冥有鱼",
"MAC-10 | 魅惑",
]
LIEKONG_CASE_BLUE = [
"内格夫 | 飞羽",
"SSG 08 | 主机001",
"SG 553 | 锈蚀之刃",
"PP-野牛 | 神秘碑文",
"P90 | 集装箱",
"P250 | 卡带",
"P2000 | 盘根错节",
]
GUANGPU_CASE_KNIFE = [
"弯刀 | 外表生锈",
"弯刀 | 多普勒",
"弯刀 | 大马士革钢",
"弯刀 | 渐变大理石",
"弯刀 | 致命紫罗兰",
"弯刀 | 虎牙",
"暗影双匕 | 外表生锈",
"暗影双匕 | 多普勒",
"暗影双匕 | 大马士革钢",
"暗影双匕 | 渐变大理石",
"暗影双匕 | 致命紫罗兰",
"暗影双匕 | 虎牙",
"猎杀者匕首 | 外表生锈",
"猎杀者匕首 | 多普勒",
"猎杀者匕首 | 大马士革钢",
"猎杀者匕首 | 渐变大理石",
"猎杀者匕首 | 致命紫罗兰",
"猎杀者匕首 | 虎牙",
"蝴蝶刀 | 外表生锈",
"蝴蝶刀 | 多普勒",
"蝴蝶刀 | 大马士革钢",
"蝴蝶刀 | 渐变大理石",
"蝴蝶刀 | 致命紫罗兰",
"蝴蝶刀 | 虎牙",
"鲍伊猎刀 | 外表生锈",
"鲍伊猎刀 | 多普勒",
"鲍伊猎刀 | 大马士革钢",
"鲍伊猎刀 | 渐变大理石",
"鲍伊猎刀 | 致命紫罗兰",
"鲍伊猎刀 | 虎牙",
]
GUANGPU_CASE_RED = ["USP | 黑色魅影", "AK-47 | 血腥运动"]
GUANGPU_CASE_PINK = ["M4A1 | 毁灭者 2000", "CZ75 | 相柳", "AWP | 浮生如梦"]
GUANGPU_CASE_PURPLE = [
"加利尔 AR | 深红海啸",
"XM1014 | 四季",
"UMP-45 | 支架",
"MAC-10 | 绝界之行",
"M249 | 翠绿箭毒蛙",
]
GUANGPU_CASE_BLUE = [
"沙漠之鹰 | 锈蚀烈焰",
"截短霰弹枪 | 梭鲈",
"SCAR-20 | 蓝图",
"PP-野牛 | 丛林滑流",
"P250 | 涟漪",
"MP7 | 非洲部落",
"FN57 | 毛细血管",
]
NO_STA_KNIFE = [
"求生匕首 | 北方森林",
"求生匕首 | 夜色",
"求生匕首 | 枯焦之色",
"流浪者匕首 | 夜色",
"流浪者匕首 | 枯焦之色",
"流浪者匕首 | 森林 DDPAT",
"系绳匕首 | 夜色",
"系绳匕首 | 狩猎网格",
"骷髅匕首 | 夜色",
"骷髅匕首 | 森林 DDPAT",
"骷髅匕首 | 狩猎网格",
]
CASE2ID = {
"变革": "set_community_32",
"反冲": "set_community_31",
"梦魇": "set_community_30",
"激流": "set_community_29",
"蛇噬": "set_community_28",
"狂牙大行动": "set_community_27",
"裂空": "set_community_26",
"棱彩2号": "set_community_25",
"CS20": "set_community_24",
"裂网大行动": "set_community_23",
"棱彩": "set_community_22",
"头号特训": "set_community_21",
"地平线": "set_community_20",
"命悬一线": "set_community_19",
"光谱2号": "set_community_18",
"九头蛇大行动": "set_community_17",
"光谱": "set_community_16",
"手套武器箱": "set_community_15",
"伽玛2号": "set_gamma_2",
"伽玛": "set_gamma_1",
"幻彩3号": "set_community_12",
"野火大行动": "set_community_11",
"左轮武器箱": "set_community_10",
"暗影": "set_community_9",
"弯曲猎手": "set_community_8",
"幻彩2号": "set_community_7",
"幻彩": "set_community_6",
"先锋": "set_community_5",
"电竞2014夏季": "set_esports_iii",
"突围大行动": "set_community_4",
"猎杀者": "set_community_3",
"凤凰": "set_community_2",
"电竞2013冬季": "set_esports_ii",
"冬季攻势": "set_community_1",
"军火交易3号": "set_weapons_iii",
"英勇": "set_bravo_i",
"电竞2013": "set_esports",
"军火交易2号": "set_weapons_ii",
"军火交易": "set_weapons_i",
}
def get_wear(rand: float) -> str:

View File

@ -1,4 +1,4 @@
import random
from datetime import datetime
from typing import List, Optional
from tortoise import fields
@ -23,11 +23,27 @@ class BuffSkin(Model):
"""磨损度"""
color = fields.CharField(255)
"""颜色(品质)"""
skin_price = fields.FloatField(default=0)
"""皮肤价格"""
create_time = fields.DatetimeField(auto_add_now=True)
img_url = fields.CharField(255)
"""图片url"""
steam_price = fields.FloatField(default=0)
"""steam价格"""
weapon_type = fields.CharField(255)
"""枪械类型"""
buy_max_price = fields.FloatField(default=0)
"""最大求购价格"""
buy_num = fields.IntField(default=0)
"""求购数量"""
sell_min_price = fields.FloatField(default=0)
"""售卖最低价格"""
sell_num = fields.IntField(default=0)
"""出售个数"""
sell_reference_price = fields.FloatField(default=0)
"""参考价格"""
create_time: datetime = fields.DatetimeField(auto_add_now=True)
"""创建日期"""
update_time = fields.DatetimeField(auto_add=True)
update_time: datetime = fields.DatetimeField(auto_add=True)
"""更新日期"""
class Meta:
@ -48,3 +64,17 @@ class BuffSkin(Model):
query = query.filter(case_name=case_name)
query = query.filter(abrasion=abrasion, is_stattrak=is_stattrak, color=color)
return await query.annotate(rand=Random()).limit(num) # type:ignore
@classmethod
async def _run_script(cls):
return [
"ALTER TABLE buff_skin ADD img_url varchar(255);", # 新增img_url
"ALTER TABLE buff_skin ADD steam_price float;", # 新增steam_price
"ALTER TABLE buff_skin ADD weapon_type varchar(255);", # 新增type
"ALTER TABLE buff_skin ADD buy_max_price float;" # 新增buy_max_price
"ALTER TABLE buff_skin ADD buy_num Integer;", # 新增buy_max_price
"ALTER TABLE buff_skin ADD sell_min_price float;", # 新增sell_min_price
"ALTER TABLE buff_skin ADD sell_num Integer;", # 新增sell_num
"ALTER TABLE buff_skin ADD sell_reference_price float;", # 新增sell_reference_price
"ALTER TABLE buff_skin DROP COLUMN skin_price;", # 删除skin_price
]

View File

@ -0,0 +1,46 @@
from typing import List, Optional
from tortoise import fields
from tortoise.contrib.postgres.functions import Random
from services.db_context import Model
class BuffSkinLog(Model):
id = fields.IntField(pk=True, generated=True, auto_increment=True)
"""自增id"""
case_name = fields.CharField(255)
"""箱子名称"""
name = fields.CharField(255)
"""武器/手套/刀名称"""
skin_name = fields.CharField(255)
"""皮肤名称"""
is_stattrak = fields.BooleanField(default=False)
"""是否暗金(计数)"""
abrasion = fields.CharField(255)
"""磨损度"""
color = fields.CharField(255)
"""颜色(品质)"""
steam_price = fields.FloatField(default=0)
"""steam价格"""
weapon_type = fields.CharField(255)
"""枪械类型"""
buy_max_price = fields.FloatField(default=0)
"""最大求购价格"""
buy_num = fields.IntField(default=0)
"""求购数量"""
sell_min_price = fields.FloatField(default=0)
"""售卖最低价格"""
sell_num = fields.IntField(default=0)
"""出售个数"""
sell_reference_price = fields.FloatField(default=0)
"""参考价格"""
create_time = fields.DatetimeField(auto_add_now=True)
"""创建日期"""
class Meta:
table = "buff_skin_log"
table_description = "Buff皮肤更新日志表"

View File

@ -51,7 +51,6 @@ class OpenCasesUser(Model):
@classmethod
async def _run_script(cls):
await cls.raw(
"alter table open_cases_users alter COLUMN make_money type float;"
)
"""将make_money字段改为float"""
return [
"alter table open_cases_users alter COLUMN make_money type float;" # 将make_money字段改为float
]

View File

@ -16,8 +16,8 @@ from utils.message_builder import image
from utils.utils import cn2py
from .config import *
from .models.buff_prices import BuffPrice
from .models.open_cases_user import OpenCasesUser
from .utils import CaseManager
RESULT_MESSAGE = {
"BLUE": ["这样看着才舒服", "是自己人,大伙把刀收好", "非常舒适~"],
@ -60,7 +60,7 @@ def add_count(user: OpenCasesUser, skin: BuffSkin):
user.knife_count += 1
user.today_open_total += 1
user.total_count += 1
user.make_money += skin.skin_price
user.make_money += skin.sell_min_price
user.spend_money += 17
@ -83,9 +83,7 @@ async def get_user_max_count(user_qq: int, group_id: int) -> int:
return int(initial_open_case_count + impression / each_impression_add_count) # type: ignore
async def open_case(
user_qq: int, group_id: int, case_name: str = "狂牙大行动"
) -> Union[str, Message]:
async def open_case(user_qq: int, group_id: int, case_name: str) -> Union[str, Message]:
"""开箱
Args:
@ -96,8 +94,12 @@ async def open_case(
Returns:
Union[str, Message]: 回复消息
"""
if case_name not in ["狂牙大行动", "突围大行动", "命悬一线", "裂空", "光谱"]:
return "武器箱未收录"
if not CaseManager.CURRENT_CASES:
return "未收录任何武器箱"
if not case_name:
case_name = random.choice(CaseManager.CURRENT_CASES) # type: ignore
if case_name not in CaseManager.CURRENT_CASES:
return "武器箱未收录, 当前可用武器箱:\n" + "\n".join(CaseManager.CURRENT_CASES) # type: ignore
logger.debug(f"尝试开启武器箱: {case_name}", "开箱", user_qq, group_id)
case = cn2py(case_name)
user = await OpenCasesUser.get_or_none(user_qq=user_qq, group_id=group_id)
@ -116,20 +118,16 @@ async def open_case(
rand = str(rand)[:11]
add_count(user, skin)
ridicule_result = random.choice(RESULT_MESSAGE[skin.color])
price_result = skin.skin_price
price_result = skin.sell_min_price
if skin.color == "KNIFE":
user.knifes_name = (
user.knifes_name
+ f"{case}||{skin.name}{'StatTrak™' if skin.is_stattrak else ''} | {skin.skin_name} ({skin.abrasion}) 磨损:{rand} 价格:{skin.skin_price},"
+ f"{case}||{skin.name}{'StatTrak™' if skin.is_stattrak else ''} | {skin.skin_name} ({skin.abrasion}) 磨损:{rand} 价格:{skin.sell_min_price},"
)
img_path = (
IMAGE_PATH
/ "cases"
/ case
/ f"{cn2py(skin.name)} - {cn2py(skin.skin_name)}.png"
)
name = skin.name + "-" + skin.skin_name + "-" + skin.abrasion
img_path = IMAGE_PATH / "csgo_cases" / case / f"{cn2py(name)}.jpg"
logger.info(
f"开启{case_name}武器箱获得 {skin.name}{'StatTrak™' if skin.is_stattrak else ''} | {skin.skin_name} ({skin.abrasion}) 磨损: [{rand}] 价格: {skin.skin_price}",
f"开启{case_name}武器箱获得 {skin.name}{'StatTrak™' if skin.is_stattrak else ''} | {skin.skin_name} ({skin.abrasion}) 磨损: [{rand}] 价格: {skin.sell_min_price}",
"开箱",
user_qq,
group_id,
@ -150,6 +148,12 @@ async def open_case(
async def open_multiple_case(
user_qq: int, group_id: int, case_name: str, num: int = 10
):
if not CaseManager.CURRENT_CASES:
return "未收录任何武器箱"
if not case_name:
case_name = random.choice(CaseManager.CURRENT_CASES) # type: ignore
if case_name not in CaseManager.CURRENT_CASES:
return "武器箱未收录, 当前可用武器箱:\n" + "\n".join(CaseManager.CURRENT_CASES) # type: ignore
user, _ = await OpenCasesUser.get_or_create(user_qq=user_qq, group_id=group_id)
max_count = await get_user_max_count(user_qq, group_id)
if user.today_open_total >= max_count:
@ -173,7 +177,7 @@ async def open_multiple_case(
return "未抽取到任何皮肤..."
total_price = 0
for skin, rand in skin_list:
total_price += skin.skin_price
total_price += skin.sell_min_price
rand = str(rand)[:11]
add_count(user, skin)
color_name = COLOR2CN[skin.color]
@ -185,29 +189,22 @@ async def open_multiple_case(
if skin.color == "KNIFE":
user.knifes_name = (
user.knifes_name
+ f"{case}||{skin.name}{'StatTrak™' if skin.is_stattrak else ''} | {skin.skin_name} ({skin.abrasion}) 磨损:{rand} 价格:{skin.skin_price},"
+ f"{case}||{skin.name}{'StatTrak™' if skin.is_stattrak else ''} | {skin.skin_name} ({skin.abrasion}) 磨损:{rand} 价格:{skin.sell_min_price},"
)
name = skin.name + "-" + skin.skin_name + "-" + skin.abrasion
img_path = IMAGE_PATH / "csgo_cases" / case / f"{cn2py(name)}.jpg"
wImg = BuildImage(200, 270, 200, 200)
await wImg.apaste(
alpha2white_pil(
Image.open(
IMAGE_PATH
/ "cases"
/ case
/ f"{cn2py(skin.name)} - {cn2py(skin.skin_name)}.png"
).resize((200, 200), Image.ANTIALIAS)
),
(0, 0),
)
img = BuildImage(200, 200, background=img_path)
await wImg.apaste(img, (0, 0), True)
await wImg.atext(
(5, 200),
f"{skin.name}{'StatTrak™' if skin.is_stattrak else ''} | {skin.skin_name} ({skin.abrasion})",
)
await wImg.atext((5, 220), f"磨损:{rand}")
await wImg.atext((5, 240), f"价格:{skin.skin_price}")
await wImg.atext((5, 240), f"价格:{skin.sell_min_price}")
img_list.append(wImg)
logger.info(
f"开启{case_name}武器箱获得 {skin.name}{'StatTrak™' if skin.is_stattrak else ''} | {skin.skin_name} ({skin.abrasion}) 磨损: [{rand}] 价格: {skin.skin_price}",
f"开启{case_name}武器箱获得 {skin.name}{'StatTrak™' if skin.is_stattrak else ''} | {skin.skin_name} ({skin.abrasion}) 磨损: [{rand}] 价格: {skin.sell_min_price}",
"开箱",
user_qq,
group_id,

View File

@ -1,6 +1,8 @@
from asyncio.exceptions import TimeoutError
import asyncio
import random
import time
from datetime import datetime
from typing import Optional
from typing import List, Tuple, Union
import nonebot
@ -10,251 +12,203 @@ from services.log import logger
from utils.http_utils import AsyncHttpx
from utils.utils import broadcast_group, cn2py
from .config import *
from .models.buff_prices import BuffPrice
from .config import CASE2ID
from .models.buff_skin import BuffSkin
from .models.buff_skin_log import BuffSkinLog
from .models.open_cases_user import OpenCasesUser
url = "https://buff.163.com/api/market/goods"
URL = "https://buff.163.com/api/market/goods"
# proxies = 'http://49.75.59.242:3128'
NAME2COLOR = {"军规级": "BLUE", "受限": "PURPLE", "保密": "PINK", "隐秘": "RED", "非凡": "KNIFE"}
CURRENT_CASES = []
driver = nonebot.get_driver()
async def util_get_buff_price(case_name: str = "狂牙大行动") -> str:
cookie = {"session": Config.get_config("open_cases", "COOKIE")}
failed_list = []
case = cn2py(case_name)
if case_name == "狂牙大行动":
case_id = 1
elif case_name == "突围大行动":
case_id = 2
elif case_name == "命悬一线":
case_id = 3
elif case_name == "裂空":
case_id = 4
elif case_name == "光谱":
case_id = 5
else:
return "未查询到武器箱"
case = case.upper()
CASE_KNIFE = eval(case + "_CASE_KNIFE")
CASE_RED = eval(case + "_CASE_RED")
CASE_PINK = eval(case + "_CASE_PINK")
CASE_PURPLE = eval(case + "_CASE_PURPLE")
CASE_BLUE = eval(case + "_CASE_BLUE")
for total_list in [CASE_KNIFE, CASE_RED, CASE_PINK, CASE_PURPLE, CASE_BLUE]:
for skin in total_list:
if skin in [
"蝴蝶刀 | 无涂装",
"求生匕首 | 无涂装",
"流浪者匕首 | 无涂装",
"系绳匕首 | 无涂装",
"骷髅匕首 | 无涂装",
]:
skin = skin.split("|")[0].strip()
name_list = []
price_list = []
parameter = {"game": "csgo", "page_num": "1", "search": skin}
try:
response = await AsyncHttpx.get(
url,
proxy=Config.get_config("open_cases", "BUFF_PROXY"),
params=parameter,
cookies=cookie,
)
if response.status_code == 200:
data = response.json()["data"]
total_page = data["total_page"]
data = data["items"]
flag = False
if (
skin.find("|") == -1
): # in ['蝴蝶刀', '求生匕首', '流浪者匕首', '系绳匕首', '骷髅匕首']:
for i in range(1, total_page + 1):
name_list = []
price_list = []
parameter = {
"game": "csgo",
"page_num": f"{i}",
"search": skin,
}
res = await AsyncHttpx.get(
url, params=parameter, cookies=cookie
)
data = res.json()["data"]["items"]
for j in range(len(data)):
if data[j]["name"] in [f"{skin}(★)"]:
name = data[j]["name"]
price = data[j]["sell_reference_price"]
name_list.append(
name.split("")[0].strip() + " | 无涂装"
)
price_list.append(price)
flag = True
break
if flag:
break
else:
try:
for _ in range(total_page):
for i in range(len(data)):
name = data[i]["name"]
price = data[i]["sell_reference_price"]
name_list.append(name)
price_list.append(price)
except Exception as e:
failed_list.append(skin)
logger.warning(f"{skin}更新失败")
else:
failed_list.append(skin)
logger.warning(f"{skin}更新失败")
except Exception:
failed_list.append(skin)
logger.warning(f"{skin}更新失败")
continue
for i in range(len(name_list)):
name = name_list[i].strip()
price = float(price_list[i])
if name.find("(★)") != -1:
name = name[: name.find("")] + name[name.find("") + 1 :]
if name.find("消音") != -1 and name.find("S") != -1:
name = name.split("")[0][:-4] + "" + name.split("")[1]
name = (
name.split("|")[0].strip() + " | " + name.split("|")[1].strip()
)
elif name.find("消音") != -1:
name = (
name.split("|")[0][:-5].strip()
+ " | "
+ name.split("|")[1].strip()
)
if name.find(" 18 ") != -1 and name.find("S") != -1:
name = name.split("")[0][:-5] + "" + name.split("")[1]
name = (
name.split("|")[0].strip() + " | " + name.split("|")[1].strip()
)
elif name.find(" 18 ") != -1:
name = (
name.split("|")[0][:-6].strip()
+ " | "
+ name.split("|")[1].strip()
)
if dbskin := await BuffPrice.get_or_none(skin_name=name):
if dbskin.update_date.date() == datetime.now().date():
continue
dbskin.case_id = case_id
dbskin.skin_price = price
dbskin.update_date = datetime.now()
await dbskin.save(
update_fields=["case_id", "skin_price", "update_date"]
)
logger.info(f"{name_list[i]}---------->成功更新")
result = None
if failed_list:
result = ""
for fail_skin in failed_list:
result += fail_skin + "\n"
return result[:-1] if result else "更新价格成功"
BASE_PATH = IMAGE_PATH / "csgo_cases"
async def util_get_buff_img(case_name: str = "狂牙大行动") -> str:
cookie = {"session": Config.get_config("open_cases", "COOKIE")}
error_list = []
case = cn2py(case_name)
path = IMAGE_PATH / "cases/" / case
path.mkdir(exist_ok=True, parents=True)
case = case.upper()
CASE_KNIFE = eval(case + "_CASE_KNIFE")
CASE_RED = eval(case + "_CASE_RED")
CASE_PINK = eval(case + "_CASE_PINK")
CASE_PURPLE = eval(case + "_CASE_PURPLE")
CASE_BLUE = eval(case + "_CASE_BLUE")
for total_list in [CASE_KNIFE, CASE_RED, CASE_PINK, CASE_PURPLE, CASE_BLUE]:
for skin in total_list:
parameter = {"game": "csgo", "page_num": "1", "search": skin}
if skin in [
"蝴蝶刀 | 无涂装",
"求生匕首 | 无涂装",
"流浪者匕首 | 无涂装",
"系绳匕首 | 无涂装",
"骷髅匕首 | 无涂装",
]:
skin = skin.split("|")[0].strip()
logger.info(f"开始更新----->{skin}")
skin_name = ""
# try:
response = await AsyncHttpx.get(
url,
proxy=Config.get_config("open_cases", "BUFF_PROXY"),
params=parameter,
)
if response.status_code == 200:
data = response.json()["data"]
total_page = data["total_page"]
flag = False
if skin.find("|") == -1: # in ['蝴蝶刀', '求生匕首', '流浪者匕首', '系绳匕首', '骷髅匕首']:
for i in range(1, total_page + 1):
res = await AsyncHttpx.get(url, params=parameter)
data = res.json()["data"]["items"]
for j in range(len(data)):
if data[j]["name"] in [f"{skin}(★)"]:
img_url = data[j]["goods_info"]["icon_url"]
skin_name = cn2py(skin + "无涂装")
await AsyncHttpx.download_file(
img_url, path / f"{skin_name}.png"
)
flag = True
break
if flag:
break
else:
img_url = (await response.json())["data"]["items"][0]["goods_info"][
"icon_url"
]
skin_name += cn2py(skin.replace("|", "-").strip())
if await AsyncHttpx.download_file(
img_url, path / f"{skin_name}.png"
):
logger.info(f"------->写入 {skin} 成功")
else:
logger.info(f"------->写入 {skin} 失败")
result = None
if error_list:
result = ""
for err_skin in error_list:
result += err_skin + "\n"
return result[:-1] if result else "更新图片成功"
class CaseManager:
CURRENT_CASES = []
@classmethod
async def reload(cls):
cls.CURRENT_CASES = (
await BuffSkin.annotate().distinct().values_list("case_name", flat=True)
)
async def get_price(d_name):
cookie = {"session": Config.get_config("open_cases", "COOKIE")}
name_list = []
price_list = []
parameter = {"game": "csgo", "page_num": "1", "search": d_name}
try:
response = await AsyncHttpx.get(url, cookies=cookie, params=parameter)
if response.status_code == 200:
try:
data = response.json()["data"]
total_page = data["total_page"]
data = data["items"]
for _ in range(total_page):
for i in range(len(data)):
name = data[i]["name"]
price = data[i]["sell_reference_price"]
name_list.append(name)
price_list.append(price)
except Exception as e:
return "没有查询到...", 998
async def update_case_data(case_name: str) -> str:
"""更新皮肤数据
Args:
case_name (str): 箱子名称
Returns:
_type_: _description_
"""
if case_name not in CASE2ID:
return "未在当前指定武器箱捏"
session = Config.get_config("open_cases", "COOKIE")
if not session:
return "BUFF COOKIE为空捏!"
db_skin_list = await BuffSkin.filter(case_name=case_name).all()
db_skin_name_list = [
skin.name + skin.skin_name + skin.abrasion for skin in db_skin_list
]
data_list, total = await search_skin_page(case_name, 1)
if isinstance(data_list, str):
return data_list
for page in range(2, total + 1):
rand_time = random.randint(10, 50)
logger.debug(f"访问随机等待时间: {rand_time}", "开箱更新")
await asyncio.sleep(rand_time)
data_list_, total = await search_skin_page(case_name, page)
if isinstance(data_list_, list):
data_list += data_list_
create_list = []
update_list = []
log_list = []
case_name_py = cn2py(case_name)
now = datetime.now()
for skin in data_list:
name = skin.name + skin.skin_name + skin.abrasion
skin.create_time = now
skin.update_time = now
if name in db_skin_name_list:
update_list.append(skin)
else:
return "访问失败!", response.status_code
except TimeoutError as e:
return "访问超时! 请重试或稍后再试!", 997
result = f"皮肤: {d_name}({len(name_list)})\n"
for i in range(len(name_list)):
result += name_list[i] + ": " + price_list[i] + "\n"
return result[:-1], 999
create_list.append(skin)
log_list.append(
BuffSkinLog(
name=skin.name,
case_name=skin.case_name,
skin_name=skin.skin_name,
is_stattrak=skin.is_stattrak,
abrasion=skin.abrasion,
color=skin.color,
steam_price=skin.steam_price,
weapon_type=skin.weapon_type,
buy_max_price=skin.buy_max_price,
buy_num=skin.buy_num,
sell_min_price=skin.sell_min_price,
sell_num=skin.sell_num,
sell_reference_price=skin.sell_reference_price,
create_time=now,
)
)
name = skin.name + "-" + skin.skin_name + "-" + skin.abrasion
file_path = BASE_PATH / case_name_py / f"{cn2py(name)}.jpg"
if not file_path.exists():
logger.debug(f"下载皮肤 {name} 图片: {skin.img_url}...", "开箱更新")
await AsyncHttpx.download_file(skin.img_url, file_path)
rand_time = random.randint(1, 10)
await asyncio.sleep(rand_time)
logger.debug(f"图片下载随机等待时间: {rand_time}", "开箱更新")
else:
logger.debug(f"皮肤 {name} 图片已存在...", "开箱更新")
if create_list:
logger.debug(f"更新武器箱: [<u><e>{case_name}</e></u>], 创建 {len(create_list)} 个皮肤!")
await BuffSkin.bulk_create(create_list, 10)
if update_list:
logger.debug(f"更新武器箱: [<u><c>{case_name}</c></u>], 更新 {len(create_list)} 个皮肤!")
await BuffSkin.bulk_update(
update_list,
[
"skin_price",
"steam_price",
"buy_max_price",
"buy_num",
"sell_min_price",
"sell_num",
"sell_reference_price",
"update_time",
],
10,
)
if log_list:
logger.debug(f"更新武器箱: [<u><e>{case_name}</e></u>], 新增 {len(log_list)} 条皮肤日志!")
await BuffSkinLog.bulk_create(log_list)
if case_name not in CaseManager.CURRENT_CASES:
CaseManager.CURRENT_CASES.append(case_name) # type: ignore
return f"更新武器箱: [{case_name}] 成功, 共更新 {len(update_list)} 个皮肤, 新创建 {len(create_list)} 个皮肤!"
async def search_skin_page(
case_name: str, page_index: int
) -> Tuple[Union[List[BuffSkin], str], int]:
"""查询箱子皮肤
Args:
case_name (str): 箱子名称
page_index (int): 页数
Returns:
Union[List[BuffSkin], str]: BuffSkin
"""
logger.debug(
f"尝试访问武器箱: [<u><e>{case_name}</e></u>] 页数: [<u><y>{page_index}</y></u>]", "开箱更新"
)
cookie = {"session": Config.get_config("open_cases", "COOKIE")}
params = {
"game": "csgo",
"page_num": page_index,
"page_size": 80,
"itemset": CASE2ID[case_name],
"_": time.time(),
"use_suggestio": 0,
}
response = await AsyncHttpx.get(
URL,
proxy=Config.get_config("open_cases", "BUFF_PROXY"),
params=params,
cookies=cookie, # type: ignore
)
json_data = response.json()
update_data = []
if json_data["code"] == "OK":
data_list = json_data["data"]["items"]
for data in data_list:
obj = {"case_name": case_name}
name = data["name"]
logger.debug(
f"武器箱: [<u><e>{case_name}</e></u>] 页数: [<u><y>{page_index}</y></u>] 正在收录皮肤: [<u><c>{name}</c></u>]...",
"开箱更新",
)
obj["buy_max_price"] = data["buy_max_price"] # 求购最大金额
obj["buy_num"] = data["buy_num"] # 当前求购
goods_info = data["goods_info"]
info = goods_info["info"]
tags = info["tags"]
obj["weapon_type"] = tags["type"]["localized_name"] # 枪械类型
if obj["weapon_type"] in ["音乐盒", "印花", "武器箱"]:
continue
if obj["weapon_type"] in ["匕首", "手套"]:
obj["color"] = "KNIFE"
obj["name"] = data["short_name"].split("")[0].strip() # 名称
else:
obj["color"] = NAME2COLOR[tags["rarity"]["localized_name"]]
obj["name"] = tags["weapon"]["localized_name"] # 名称
obj["skin_name"] = data["short_name"].split("|")[-1].strip() # 皮肤名称
obj["img_url"] = goods_info["original_icon_url"] # 图片url
obj["steam_price"] = goods_info["steam_price_cny"] # steam价格
obj["abrasion"] = tags["exterior"]["localized_name"] # 磨损
obj["color"] = NAME2COLOR[tags["rarity"]["localized_name"]] # 品质颜色
obj["is_stattrak"] = "StatTrak" in tags["quality"]["localized_name"] # type: ignore # 是否暗金
obj["sell_min_price"] = data["sell_min_price"] # 售卖最低价格
obj["sell_num"] = data["sell_num"] # 售卖数量
obj["sell_reference_price"] = data["sell_reference_price"] # 参考价格
update_data.append(BuffSkin(**obj))
logger.debug(
f"访问武器箱: [<u><e>{case_name}</e></u>] 页数: [<u><y>{page_index}</y></u>] 成功并收录完成",
"开箱更新",
)
return update_data, json_data["data"]["total_page"]
else:
logger.warning(f'访问BUFF失败: {json_data["msg"]}')
return f'访问失败: {json_data["msg"]}', -1
async def reset_count_daily():
@ -270,19 +224,9 @@ async def reset_count_daily():
logger.error(f"开箱重置错误", e=e)
def get_color(case_name: str, name: str, skin_name: str) -> Optional[str]:
case_py = cn2py(case_name).upper()
color_map = {}
color_map["KNIFE"] = eval(case_py + "_CASE_KNIFE")
color_map["RED"] = eval(case_py + "_CASE_RED")
color_map["PINK"] = eval(case_py + "_CASE_PINK")
color_map["PURPLE"] = eval(case_py + "_CASE_PURPLE")
color_map["BLUE"] = eval(case_py + "_CASE_BLUE")
for key in color_map:
for skin in color_map[key]:
if name in skin and skin_name in skin:
return key
return None
@driver.on_startup
async def _():
await CaseManager.reload()
@driver.on_startup
@ -290,55 +234,50 @@ async def _():
"""
将旧表数据移动到新表
"""
if not await BuffSkin.first() and await BuffPrice.first():
logger.debug("开始移动旧表数据 BuffPrice -> BuffSkin")
id2name = {1: "狂牙大行动", 2: "突围大行动", 3: "命悬一线", 4: "裂空", 5: "光谱"}
data_list: List[BuffSkin] = []
for data in await BuffPrice.all():
logger.debug(f"移动旧表数据: {data.skin_name}")
case_name = id2name[data.case_id]
name = data.skin_name
is_stattrak = "StatTrak" in name
name = name.replace("(★ StatTrak™", "").replace("StatTrak™", "").strip()
name, skin_name = name.split("|")
abrasion = "无涂装"
if "(" in skin_name:
skin_name, abrasion = skin_name.split("(")
if abrasion.endswith(")"):
abrasion = abrasion[:-1]
color = get_color(case_name, name.strip(), skin_name.strip())
if not color:
search_list = [
x
for x in data_list
if x.skin_name == skin_name.strip() and x.name == name.strip()
]
if search_list:
color = get_color(
case_name, search_list[0].name, search_list[0].skin_name
)
if not color:
logger.debug(
f"箱子: [{case_name}] 皮肤: [{name}|{skin_name}] 未获取到皮肤品质,跳过..."
)
continue
data_list.append(
BuffSkin(
case_name=case_name,
name=name.strip(),
skin_name=skin_name.strip(),
is_stattrak=is_stattrak,
abrasion=abrasion.strip(),
skin_price=data.skin_price,
color=color,
create_time=datetime.now(),
update_time=datetime.now(),
)
)
await BuffSkin.bulk_create(data_list, batch_size=10)
logger.debug("完成移动旧表数据 BuffPrice -> BuffSkin")
# 蝴蝶刀(★) | 噩梦之夜 (久经沙场)
if __name__ == "__main__":
print(util_get_buff_img("xxxx/"))
# if not await BuffSkin.first() and await BuffPrice.first():
# logger.debug("开始移动旧表数据 BuffPrice -> BuffSkin")
# id2name = {1: "狂牙大行动", 2: "突围大行动", 3: "命悬一线", 4: "裂空", 5: "光谱"}
# data_list: List[BuffSkin] = []
# for data in await BuffPrice.all():
# logger.debug(f"移动旧表数据: {data.skin_name}")
# case_name = id2name[data.case_id]
# name = data.skin_name
# is_stattrak = "StatTrak" in name
# name = name.replace("(★ StatTrak™", "").replace("StatTrak™", "").strip()
# name, skin_name = name.split("|")
# abrasion = "无涂装"
# if "(" in skin_name:
# skin_name, abrasion = skin_name.split("(")
# if abrasion.endswith(")"):
# abrasion = abrasion[:-1]
# color = get_color(case_name, name.strip(), skin_name.strip())
# if not color:
# search_list = [
# x
# for x in data_list
# if x.skin_name == skin_name.strip() and x.name == name.strip()
# ]
# if search_list:
# color = get_color(
# case_name, search_list[0].name, search_list[0].skin_name
# )
# if not color:
# logger.debug(
# f"箱子: [{case_name}] 皮肤: [{name}|{skin_name}] 未获取到皮肤品质,跳过..."
# )
# continue
# data_list.append(
# BuffSkin(
# case_name=case_name,
# name=name.strip(),
# skin_name=skin_name.strip(),
# is_stattrak=is_stattrak,
# abrasion=abrasion.strip(),
# skin_price=data.skin_price,
# color=color,
# create_time=datetime.now(),
# update_time=datetime.now(),
# )
# )
# await BuffSkin.bulk_create(data_list, batch_size=10)
# logger.debug("完成移动旧表数据 BuffPrice -> BuffSkin")

View File

@ -53,11 +53,20 @@ async def init():
except Exception as e:
raise Exception(f"数据库连接错误.... {type(e)}: {e}")
if SCRIPT_METHOD:
logger.debug(f"即将运行SCRIPT_METHOD方法, 合计 <u><y>{len(SCRIPT_METHOD)}</y></u> 个...")
sql_list = []
for module, func in SCRIPT_METHOD:
try:
await func()
if sql := await func():
sql_list += sql
except Exception as e:
logger.debug(f"{module} 执行SQL", e=e)
logger.debug(f"{module} 执行SCRIPT_METHOD方法出错...", e=e)
for sql in sql_list:
logger.debug(f"执行SQL: {sql}")
try:
await TestSQL.raw(sql)
except Exception as e:
logger.debug(f"执行SQL: {sql} 错误...", e=e)
async def disconnect():

View File

@ -112,6 +112,8 @@ class logger:
e: Optional[Exception] = None,
):
template = cls.__parser_template(info, command, user_id, group_id, target)
if e:
template += f" || 错误 <r>{type(e)}: {e}</r>"
logger_.opt(colors=True).debug(template)
@classmethod