开箱新增更新指定刀具皮肤命令

This commit is contained in:
HibiKier 2023-03-25 15:38:12 +08:00
parent a48661745e
commit f72988a71a
6 changed files with 288 additions and 181 deletions

View File

@ -331,6 +331,11 @@ PS: **ARM平台** 请使用全量版 同时 **如果你的机器 RAM < 1G 可能
## 更新
### 2023/3/25
* 删除BUFF_SKIN表约束新增`skin_id`字段
* 开箱新增更新指定刀具皮肤命令(某些箱子金色无法通过api获取)
### 2023/3/20
* 修复BuildImage类text居中类型bug [@pull/1301](https://github.com/HibiKier/zhenxun_bot/pull/1317)

View File

@ -6,7 +6,6 @@ from typing import Any, List, Tuple
from nonebot import on_command
from nonebot.adapters.onebot.v11 import GroupMessageEvent, Message, MessageEvent
from nonebot.adapters.onebot.v11.permission import GROUP
from nonebot.matcher import Matcher
from nonebot.params import CommandArg, RegexGroup
from nonebot.permission import SUPERUSER
from nonebot.plugin import MatcherGroup
@ -15,6 +14,8 @@ from nonebot.typing import T_State
from configs.config import Config
from configs.path_config import IMAGE_PATH
from services.log import logger
from utils.depends import OneCommand
from utils.image_utils import text2image
from utils.message_builder import image
from utils.utils import CN2NUM, is_number, scheduler
@ -28,10 +29,12 @@ from .open_cases_c import (
)
from .utils import (
CASE2ID,
KNIFE2ID,
CaseManager,
build_case_image,
get_skin_case,
reset_count_daily,
update_case_data,
update_skin_data,
)
__zx_plugin_name__ = "开箱"
@ -55,8 +58,8 @@ usage
更新皮肤指令
重置开箱 重置今日开箱所有次数
指令
更新开箱图片 ?[武器箱]
更新开箱价格 ?[武器箱]
更新武器箱 ?[武器箱]
更新皮肤 ?[刀具名称]
* 不指定武器箱时则全部更新 *
* 过多的爬取会导致账号API被封 *
""".strip()
@ -68,8 +71,8 @@ __plugin_cmd__ = [
"我的金色",
"群开箱统计",
"查看武器箱?[武器箱]",
"更新开箱图片 ?[武器箱] [_superuser]",
"更新开箱价格 ?[武器箱] [_superuser]",
"更新武器箱 ?[武器箱] [_superuser]",
"更新皮肤 ?[刀具名称] [_superuser]",
]
__plugin_type__ = ("抽卡相关", 1)
__plugin_version__ = 0.1
@ -125,11 +128,19 @@ total_case_data = cases_matcher_group.on_command(
)
group_open_case_statistics = cases_matcher_group.on_command("群开箱统计")
open_multiple = cases_matcher_group.on_regex("(.*)连开箱(.*)?")
update_case = on_command("更新武器箱", priority=1, permission=SUPERUSER, block=True)
update_case = on_command(
"更新武器箱", aliases={"更新皮肤"}, priority=1, permission=SUPERUSER, block=True
)
show_case = on_command("查看武器箱", priority=5, block=True)
my_knifes = on_command("我的金色", priority=1, permission=GROUP, block=True)
show_skin = on_command("查看皮肤", priority=5, block=True)
# show_case = on_command("test", priority=1, permission=GROUP, block=True)
test = on_command("test", priority=1, permission=GROUP, block=True)
@test.handle()
async def _(event: GroupMessageEvent):
a = await get_skin_case("872472")
print(a)
@reload_count.handle()
@ -189,41 +200,53 @@ async def _(
@update_case.handle()
async def _(event: MessageEvent, arg: Message = CommandArg()):
async def _(event: MessageEvent, arg: Message = CommandArg(), cmd: str = OneCommand()):
msg = arg.extract_plain_text().strip()
if not msg:
case_list = []
skin_list = []
for i, case_name in enumerate(CASE2ID):
if case_name in CaseManager.CURRENT_CASES:
case_list.append(f"{i+1}.{case_name} [已更新]")
else:
case_list.append(f"{i+1}.{case_name}")
await update_case.finish("未指定武器箱, 当前已包含武器箱\n" + "\n".join(case_list))
if msg == "ALL":
await update_case.send(f"即将更新所有武器箱, 请稍等")
case_list = list(CASE2ID.keys())
for skin_name in KNIFE2ID:
skin_list.append(f"{skin_name}")
text = "武器箱:\n" + "\n".join(case_list) + "\n皮肤:\n" + ", ".join(skin_list)
await update_case.finish(
"未指定武器箱, 当前已包含武器箱/皮肤\n"
+ image(await text2image(text, padding=20, color="#f9f6f2"))
)
if msg in ["ALL", "ALL1"]:
if msg == "ALL":
case_list = list(CASE2ID.keys())
result = "武器箱"
else:
case_list = list(KNIFE2ID.keys())
result = "罕见皮肤"
await update_case.send(f"即将更新所有{result}, 请稍等")
for i, case_name in enumerate(case_list):
try:
await update_case_data(case_name)
await update_skin_data(case_name)
rand = random.randint(300, 500)
result = "更新全部武器箱完成"
result = f"更新全部{result}完成"
if i < len(case_list):
next_case = case_list[i + 1]
result = f"将在 {rand} 秒后更新下一武器箱: {next_case}"
await update_case.send(f"成功更新武器箱: {case_name}, {result}")
logger.info(f"成功更新武器箱: {case_name}, {result}", "更新武器箱")
result = f"将在 {rand} 秒后更新下一{result}: {next_case}"
await update_case.send(f"成功更新{result}: {case_name}, {result}")
logger.info(f"成功更新{result}: {case_name}, {result}", "更新武器箱")
await asyncio.sleep(rand)
except Exception as e:
logger.error(f"更新武器箱: {case_name}", e=e)
await update_case.send(f"成功更新武器箱: {case_name} 发生错误: {type(e)}: {e}")
await update_case.send(f"更新全部武器箱完成")
logger.error(f"更新{result}: {case_name}", e=e)
await update_case.send(f"更新{result}: {case_name} 发生错误: {type(e)}: {e}")
await update_case.send(f"更新全部{result}完成")
else:
await update_case.send(f"开始更新武器箱: {msg}, 请稍等")
await update_case.send(f"开始{cmd}: {msg}, 请稍等")
try:
await update_case.send(await update_case_data(msg), at_sender=True)
await update_case.send(await update_skin_data(msg), at_sender=True)
except Exception as e:
logger.error(f"更新武器箱: {msg}", e=e)
await update_case.send(f"成功自动更新武器箱: {msg} 发生错误: {type(e)}: {e}")
logger.error(f"{cmd}: {msg}", e=e)
await update_case.send(f"成功{cmd}: {msg} 发生错误: {type(e)}: {e}")
@show_case.handle()

View File

@ -1,4 +1,5 @@
import random
from enum import Enum
from typing import List, Tuple
from configs.path_config import IMAGE_PATH
@ -34,6 +35,16 @@ BATTLE_SCARED_S = 0.45
BATTLE_SCARED_E = 0.99999
class UpdateType(Enum):
"""
更新类型
"""
CASE = "case"
WEAPON_TYPE = "weapon_type"
NAME2COLOR = {
"消费级": "WHITE",
"工业级": "LIGHTBLUE",
@ -68,7 +79,30 @@ ABRASION_SORT = ["崭新出厂", "略有磨损", "久经沙场", "破损不堪",
CASE_BACKGROUND = IMAGE_PATH / "csgo_cases" / "_background" / "shu"
# 刀
KNIFE2ID = {
"鲍伊猎刀": "weapon_knife_survival_bowie",
"蝴蝶刀": "weapon_knife_butterfly",
"弯刀": "weapon_knife_falchion",
"折叠刀": "weapon_knife_flip",
"穿肠刀": "weapon_knife_gut",
"猎杀者匕首": "weapon_knife_tactical",
"M9刺刀": "weapon_knife_m9_bayonet",
"刺刀": "weapon_bayonet",
"爪子刀": "weapon_knife_karambit",
"暗影双匕": "weapon_knife_push",
"短剑": "weapon_knife_stiletto",
"熊刀": "weapon_knife_ursus",
"折刀": "weapon_knife_gypsy_jackknife",
"锯齿爪刀": "weapon_knife_widowmaker",
"海豹短刀": "weapon_knife_css",
"系绳匕首": "weapon_knife_cord",
"求生匕首": "weapon_knife_canis",
"流浪者匕首": "weapon_knife_outdoor",
"骷髅匕首": "weapon_knife_skeleton",
}
# 武器箱
CASE2ID = {
"变革": "set_community_32",
"反冲": "set_community_31",

View File

@ -11,11 +11,11 @@ class BuffSkin(Model):
id = fields.IntField(pk=True, generated=True, auto_increment=True)
"""自增id"""
case_name = fields.CharField(255)
case_name: str = fields.CharField(255) # type: ignore
"""箱子名称"""
name = fields.CharField(255)
name: str = fields.CharField(255) # type: ignore
"""武器/手套/刀名称"""
skin_name = fields.CharField(255)
skin_name: str = fields.CharField(255) # type: ignore
"""皮肤名称"""
is_stattrak = fields.BooleanField(default=False)
"""是否暗金(计数)"""
@ -23,6 +23,8 @@ class BuffSkin(Model):
"""磨损度"""
color = fields.CharField(255)
"""颜色(品质)"""
skin_id = fields.CharField(255, null=True, unique=True)
"""皮肤id"""
img_url = fields.CharField(255)
"""图片url"""
@ -49,7 +51,7 @@ class BuffSkin(Model):
class Meta:
table = "buff_skin"
table_description = "Buff皮肤数据表"
unique_together = ("case_name", "name", "skin_name", "abrasion")
# unique_together = ("case_name", "name", "skin_name", "abrasion", "is_stattrak")
@classmethod
async def random_skin(
@ -79,6 +81,7 @@ class BuffSkin(Model):
async def _run_script(cls):
return [
"ALTER TABLE buff_skin ADD img_url varchar(255);", # 新增img_url
"ALTER TABLE buff_skin ADD skin_id varchar(255);", # 新增skin_id
"ALTER TABLE buff_skin ADD steam_price float DEFAULT 0;", # 新增steam_price
"ALTER TABLE buff_skin ADD weapon_type varchar(255);", # 新增type
"ALTER TABLE buff_skin ADD buy_max_price float DEFAULT 0;", # 新增buy_max_price
@ -87,4 +90,5 @@ class BuffSkin(Model):
"ALTER TABLE buff_skin ADD sell_num Integer DEFAULT 0;", # 新增sell_num
"ALTER TABLE buff_skin ADD sell_reference_price float DEFAULT 0;", # 新增sell_reference_price
"ALTER TABLE buff_skin DROP COLUMN skin_price;", # 删除skin_price
"alter table buff_skin drop constraint if EXISTS uid_buff_skin_case_na_c35c93;", # 删除唯一约束
]

View File

@ -17,7 +17,7 @@ from utils.utils import cn2py
from .config import *
from .models.open_cases_log import OpenCasesLog
from .models.open_cases_user import OpenCasesUser
from .utils import CaseManager, update_case_data
from .utils import CaseManager, update_skin_data
RESULT_MESSAGE = {
"BLUE": ["这样看着才舒服", "是自己人,大伙把刀收好", "非常舒适~"],
@ -435,7 +435,7 @@ async def auto_update():
for case_name in case_list:
logger.debug(f"开始自动更新武器箱: {case_name}", "更新武器箱")
try:
await update_case_data(case_name)
await update_skin_data(case_name)
rand = random.randint(300, 500)
logger.info(f"成功自动更新武器箱: {case_name}, 将在 {rand} 秒后再次更新下一武器箱", "更新武器箱")
await asyncio.sleep(rand)

View File

@ -1,9 +1,10 @@
import asyncio
import os
import random
import re
import time
from datetime import datetime
from typing import List, Tuple, Union
from typing import List, Optional, Tuple, Union
import nonebot
from tortoise.functions import Count
@ -16,13 +17,21 @@ from utils.image_utils import BuildImage
from utils.utils import broadcast_group, cn2py
from .build_image import generate_skin
from .config import CASE2ID, CASE_BACKGROUND, COLOR2NAME, NAME2COLOR
from .config import (
CASE2ID,
CASE_BACKGROUND,
COLOR2NAME,
KNIFE2ID,
NAME2COLOR,
UpdateType,
)
from .models.buff_skin import BuffSkin
from .models.buff_skin_log import BuffSkinLog
from .models.open_cases_user import OpenCasesUser
URL = "https://buff.163.com/api/market/goods"
# proxies = 'http://49.75.59.242:3128'
SELL_URL = "https://buff.163.com/goods"
driver = nonebot.get_driver()
@ -41,44 +50,80 @@ class CaseManager:
)
async def update_case_data(case_name: str) -> str:
"""更新皮肤数据
async def update_skin_data(name: str) -> str:
"""更新箱子内皮肤数据
Args:
case_name (str): 箱子名称
name (str): 箱子名称
Returns:
_type_: _description_
"""
if case_name not in CASE2ID:
return "未在当前指定武器箱捏"
type_ = None
if name in CASE2ID:
type_ = UpdateType.CASE
if name in KNIFE2ID:
type_ = UpdateType.WEAPON_TYPE
if not type_:
return "未在指定武器箱或指定武器类型内"
session = Config.get_config("open_cases", "COOKIE")
if not session:
return "BUFF COOKIE为空捏!"
db_skin_list = await BuffSkin.filter(case_name=case_name).all()
db_skin_name_list = [
skin.name + skin.skin_name + skin.abrasion for skin in db_skin_list
]
data_list, total = await search_skin_page(case_name, 1)
weapon2case = {}
if type_ == UpdateType.CASE:
db_skin_id_list = [
skin.skin_id for skin in await BuffSkin.filter(case_name=name).all()
]
else:
db_data = await BuffSkin.filter(name__contains=name).all()
db_skin_id_list = [
skin.skin_id for skin in await BuffSkin.filter(name__contains=name).all()
]
weapon2case = {
item.name + item.skin_name: item.case_name
for item in db_data
if item.case_name != "未知武器箱"
}
data_list, total = await search_skin_page(name, 1, type_)
if isinstance(data_list, str):
return data_list
for page in range(2, total + 1):
rand_time = random.randint(10, 50)
rand_time = random.randint(20, 50)
logger.debug(f"访问随机等待时间: {rand_time}", "开箱更新")
await asyncio.sleep(rand_time)
data_list_, total = await search_skin_page(case_name, page)
data_list_, total = await search_skin_page(name, page, type_)
if isinstance(data_list_, list):
data_list += data_list_
create_list: List[BuffSkin] = []
update_list: List[BuffSkin] = []
log_list = []
case_name_py = cn2py(case_name)
now = datetime.now()
exists_id_list = []
for skin in data_list:
name = skin.name + skin.skin_name + skin.abrasion
if skin.skin_id in exists_id_list:
continue
exists_id_list.append(skin.skin_id)
key = skin.name + skin.skin_name
name_ = skin.name + skin.skin_name + skin.abrasion
skin.create_time = now
skin.update_time = now
if name in db_skin_name_list:
if not skin.case_name:
case_name = weapon2case.get(key)
if not case_name:
case_name = await get_skin_case(skin.skin_id)
rand = random.randint(10, 20)
logger.debug(
f"获取 {skin.name} | {skin.skin_name} 皮肤所属武器箱: {case_name}, 访问随机等待时间: {rand}",
"开箱更新",
)
await asyncio.sleep(rand)
if not case_name:
case_name = "未知武器箱"
else:
weapon2case[key] = case_name
case_name = case_name.replace("", "").replace("", "")
skin.case_name = case_name
if skin.skin_id in db_skin_id_list:
update_list.append(skin)
else:
create_list.append(skin)
@ -100,8 +145,8 @@ async def update_case_data(case_name: str) -> str:
create_time=now,
)
)
name = skin.name + "-" + skin.skin_name + "-" + skin.abrasion
file_path = BASE_PATH / case_name_py / f"{cn2py(name)}.jpg"
name_ = skin.name + "-" + skin.skin_name + "-" + skin.abrasion
file_path = BASE_PATH / cn2py(skin.case_name) / f"{cn2py(name_)}.jpg"
if not file_path.exists():
logger.debug(f"下载皮肤 {name} 图片: {skin.img_url}...", "开箱更新")
await AsyncHttpx.download_file(skin.img_url, file_path)
@ -109,9 +154,9 @@ async def update_case_data(case_name: str) -> str:
await asyncio.sleep(rand_time)
logger.debug(f"图片下载随机等待时间: {rand_time}", "开箱更新")
else:
logger.debug(f"皮肤 {name} 图片已存在...", "开箱更新")
logger.debug(f"皮肤 {name_} 图片已存在...", "开箱更新")
if create_list:
logger.debug(f"更新武器箱: [<u><e>{case_name}</e></u>], 创建 {len(create_list)} 个皮肤!")
logger.debug(f"更新武器箱/皮肤: [<u><e>{name}</e></u>], 创建 {len(create_list)} 个皮肤!")
await BuffSkin.bulk_create(create_list, 10)
if update_list:
abrasion_list = []
@ -125,7 +170,7 @@ async def update_case_data(case_name: str) -> str:
if skin.skin_name not in skin_name_list:
skin_name_list.append(skin.skin_name)
db_data = await BuffSkin.filter(
case_name=case_name,
case_name=name,
skin_name__in=skin_name_list,
name__in=name_list,
abrasion__in=abrasion_list,
@ -146,7 +191,7 @@ async def update_case_data(case_name: str) -> str:
data.sell_reference_price = skin.sell_reference_price
data.update_time = skin.update_time
_update_list.append(data)
logger.debug(f"更新武器箱: [<u><c>{case_name}</c></u>], 更新 {len(create_list)} 个皮肤!")
logger.debug(f"更新武器箱/皮肤: [<u><c>{name}</c></u>], 更新 {len(create_list)} 个皮肤!")
await BuffSkin.bulk_update(
_update_list,
[
@ -161,105 +206,129 @@ async def update_case_data(case_name: str) -> str:
10,
)
if log_list:
logger.debug(f"更新武器箱: [<u><e>{case_name}</e></u>], 新增 {len(log_list)} 条皮肤日志!")
logger.debug(f"更新武器箱/皮肤: [<u><e>{name}</e></u>], 新增 {len(log_list)} 条皮肤日志!")
await BuffSkinLog.bulk_create(log_list)
if case_name not in CaseManager.CURRENT_CASES:
if name not in CaseManager.CURRENT_CASES:
CaseManager.CURRENT_CASES.append(case_name) # type: ignore
return f"更新武器箱: [{case_name}] 成功, 共更新 {len(update_list)} 个皮肤, 新创建 {len(create_list)} 个皮肤!"
return f"更新武器箱/皮肤: [{name}] 成功, 共更新 {len(update_list)} 个皮肤, 新创建 {len(create_list)} 个皮肤!"
async def search_skin_page(
case_name: str, page_index: int
name: str, page_index: int, type_: UpdateType
) -> Tuple[Union[List[BuffSkin], str], int]:
"""查询箱子皮肤
Args:
case_name (str): 箱子名称
name (str): 箱子名称
page_index (int): 页数
Returns:
Union[List[BuffSkin], str]: BuffSkin
"""
logger.debug(
f"尝试访问武器箱: [<u><e>{case_name}</e></u>] 页数: [<u><y>{page_index}</y></u>]", "开箱更新"
f"尝试访问武器箱/皮肤: [<u><e>{name}</e></u>] 页数: [<u><y>{page_index}</y></u>]", "开箱更新"
)
cookie = {"session": Config.get_config("open_cases", "COOKIE")}
params = {
"game": "csgo",
"page_num": page_index,
"page_size": 80,
"itemset": CASE2ID[case_name],
"_": time.time(),
"use_suggestio": 0,
}
if type_ == UpdateType.CASE:
params["itemset"] = CASE2ID[name]
elif type_ == UpdateType.WEAPON_TYPE:
params["category"] = KNIFE2ID[name]
proxy = None
if ip := Config.get_config("open_cases", "BUFF_PROXY"):
proxy = {"http://": ip, "https://": ip}
response = await AsyncHttpx.get(
URL,
proxy=proxy,
params=params,
cookies=cookie, # type: ignore
)
logger.debug(f"访问BUFF API: {response.text}", "更新武器箱")
json_data = response.json()
update_data = []
if json_data["code"] == "OK":
data_list = json_data["data"]["items"]
for data in data_list:
obj = {"case_name": case_name}
name = data["name"]
try:
logger.debug(
f"武器箱: [<u><e>{case_name}</e></u>] 页数: [<u><y>{page_index}</y></u>] 正在收录皮肤: [<u><c>{name}</c></u>]...",
"开箱更新",
)
obj["buy_max_price"] = data["buy_max_price"] # 求购最大金额
obj["buy_num"] = data["buy_num"] # 当前求购
goods_info = data["goods_info"]
info = goods_info["info"]
tags = info["tags"]
obj["weapon_type"] = tags["type"]["localized_name"] # 枪械类型
if obj["weapon_type"] in ["音乐盒", "印花", "探员"]:
continue
elif obj["weapon_type"] in ["匕首", "手套"]:
obj["color"] = "KNIFE"
obj["name"] = data["short_name"].split("")[0].strip() # 名称
elif obj["weapon_type"] in ["武器箱"]:
obj["color"] = "CASE"
obj["name"] = data["short_name"]
else:
obj["color"] = NAME2COLOR[tags["rarity"]["localized_name"]]
obj["name"] = tags["weapon"]["localized_name"] # 名称
if obj["weapon_type"] not in ["武器箱"]:
obj["abrasion"] = tags["exterior"]["localized_name"] # 磨损
obj["is_stattrak"] = "StatTrak" in tags["quality"]["localized_name"] # type: ignore # 是否暗金
if not obj["color"]:
obj["color"] = NAME2COLOR[
tags["rarity"]["localized_name"]
] # 品质颜色
else:
obj["abrasion"] = "CASE"
obj["skin_name"] = data["short_name"].split("|")[-1].strip() # 皮肤名称
obj["img_url"] = goods_info["original_icon_url"] # 图片url
obj["steam_price"] = goods_info["steam_price_cny"] # steam价格
obj["sell_min_price"] = data["sell_min_price"] # 售卖最低价格
obj["sell_num"] = data["sell_num"] # 售卖数量
obj["sell_reference_price"] = data["sell_reference_price"] # 参考价格
update_data.append(BuffSkin(**obj))
except Exception as e:
logger.error(
f"更新武器箱: [<u><e>{case_name}</e></u>] 皮肤: [<u><c>{name}</c></u>] 错误",
e=e,
)
logger.debug(
f"访问武器箱: [<u><e>{case_name}</e></u>] 页数: [<u><y>{page_index}</y></u>] 成功并收录完成",
"开箱更新",
)
return update_data, json_data["data"]["total_page"]
else:
logger.warning(f'访问BUFF失败: {json_data["error"]}')
return f'访问失败: {json_data["error"]}', -1
response = None
error = ""
for i in range(3):
try:
response = await AsyncHttpx.get(
URL,
proxy=proxy,
params=params,
cookies=cookie, # type: ignore
)
if response.status_code == 200:
break
rand = random.randint(3, 7)
logger.debug(
f"尝试访问武器箱/皮肤第 {i+1} 次访问异常, code: {response.status_code}", "开箱更新"
)
await asyncio.sleep(rand)
except Exception as e:
logger.debug(f"尝试访问武器箱/皮肤第 {i+1} 次访问发生错误 {type(e)}: {e}", "开箱更新")
error = f"{type(e)}: {e}"
if not response:
return f"访问发生异常: {error}", -1
if response.status_code == 200:
logger.debug(f"访问BUFF API: {response.text}", "开箱更新")
json_data = response.json()
update_data = []
if json_data["code"] == "OK":
data_list = json_data["data"]["items"]
for data in data_list:
obj = {}
if type_ == UpdateType.CASE:
obj["case_name"] = name
name = data["name"]
try:
logger.debug(
f"武器箱: [<u><e>{name}</e></u>] 页数: [<u><y>{page_index}</y></u>] 正在收录皮肤: [<u><c>{name}</c></u>]...",
"开箱更新",
)
obj["skin_id"] = str(data["id"])
obj["buy_max_price"] = data["buy_max_price"] # 求购最大金额
obj["buy_num"] = data["buy_num"] # 当前求购
goods_info = data["goods_info"]
info = goods_info["info"]
tags = info["tags"]
obj["weapon_type"] = tags["type"]["localized_name"] # 枪械类型
if obj["weapon_type"] in ["音乐盒", "印花", "探员"]:
continue
elif obj["weapon_type"] in ["匕首", "手套"]:
obj["color"] = "KNIFE"
obj["name"] = data["short_name"].split("")[0].strip() # 名称
elif obj["weapon_type"] in ["武器箱"]:
obj["color"] = "CASE"
obj["name"] = data["short_name"]
else:
obj["color"] = NAME2COLOR[tags["rarity"]["localized_name"]]
obj["name"] = tags["weapon"]["localized_name"] # 名称
if obj["weapon_type"] not in ["武器箱"]:
obj["abrasion"] = tags["exterior"]["localized_name"] # 磨损
obj["is_stattrak"] = "StatTrak" in tags["quality"]["localized_name"] # type: ignore # 是否暗金
if not obj["color"]:
obj["color"] = NAME2COLOR[
tags["rarity"]["localized_name"]
] # 品质颜色
else:
obj["abrasion"] = "CASE"
obj["skin_name"] = data["short_name"].split("|")[-1].strip() # 皮肤名称
obj["img_url"] = goods_info["original_icon_url"] # 图片url
obj["steam_price"] = goods_info["steam_price_cny"] # steam价格
obj["sell_min_price"] = data["sell_min_price"] # 售卖最低价格
obj["sell_num"] = data["sell_num"] # 售卖数量
obj["sell_reference_price"] = data["sell_reference_price"] # 参考价格
update_data.append(BuffSkin(**obj))
except Exception as e:
logger.error(
f"更新武器箱: [<u><e>{name}</e></u>] 皮肤: [<u><c>{name}</c></u>] 错误",
e=e,
)
logger.debug(
f"访问武器箱: [<u><e>{name}</e></u>] 页数: [<u><y>{page_index}</y></u>] 成功并收录完成",
"开箱更新",
)
return update_data, json_data["data"]["total_page"]
else:
logger.warning(f'访问BUFF失败: {json_data["error"]}')
return f'访问失败: {json_data["error"]}', -1
return f"访问失败, 状态码: {response.status_code}", -1
async def build_case_image(case_name: str) -> Union[BuildImage, str]:
@ -421,6 +490,32 @@ def get_bk_image_size(
return new_size
async def get_skin_case(id_: str) -> Optional[str]:
"""获取皮肤所在箱子
Args:
id_ (str): 皮肤id
Returns:
Optional[str]: 武器箱名称
"""
url = f"{SELL_URL}/{id_}"
proxy = None
if ip := Config.get_config("open_cases", "BUFF_PROXY"):
proxy = {"http://": ip, "https://": ip}
response = await AsyncHttpx.get(
url,
proxy=proxy,
)
if response.status_code == 200:
text = response.text
if r := re.search('<meta name="description".*,(.*)武器箱.*?>', text):
return r.group(1)
else:
logger.debug(f"访问皮肤所属武器箱异常 url: {url} code: {response.status_code}")
return None
async def reset_count_daily():
"""
重置每日开箱
@ -437,57 +532,3 @@ async def reset_count_daily():
@driver.on_startup
async def _():
await CaseManager.reload()
@driver.on_startup
async def _():
"""
将旧表数据移动到新表
"""
# if not await BuffSkin.first() and await BuffPrice.first():
# logger.debug("开始移动旧表数据 BuffPrice -> BuffSkin")
# id2name = {1: "狂牙大行动", 2: "突围大行动", 3: "命悬一线", 4: "裂空", 5: "光谱"}
# data_list: List[BuffSkin] = []
# for data in await BuffPrice.all():
# logger.debug(f"移动旧表数据: {data.skin_name}")
# case_name = id2name[data.case_id]
# name = data.skin_name
# is_stattrak = "StatTrak" in name
# name = name.replace("(★ StatTrak™", "").replace("StatTrak™", "").strip()
# name, skin_name = name.split("|")
# abrasion = "无涂装"
# if "(" in skin_name:
# skin_name, abrasion = skin_name.split("(")
# if abrasion.endswith(")"):
# abrasion = abrasion[:-1]
# color = get_color(case_name, name.strip(), skin_name.strip())
# if not color:
# search_list = [
# x
# for x in data_list
# if x.skin_name == skin_name.strip() and x.name == name.strip()
# ]
# if search_list:
# color = get_color(
# case_name, search_list[0].name, search_list[0].skin_name
# )
# if not color:
# logger.debug(
# f"箱子: [{case_name}] 皮肤: [{name}|{skin_name}] 未获取到皮肤品质,跳过..."
# )
# continue
# data_list.append(
# BuffSkin(
# case_name=case_name,
# name=name.strip(),
# skin_name=skin_name.strip(),
# is_stattrak=is_stattrak,
# abrasion=abrasion.strip(),
# skin_price=data.skin_price,
# color=color,
# create_time=datetime.now(),
# update_time=datetime.now(),
# )
# )
# await BuffSkin.bulk_create(data_list, batch_size=10)
# logger.debug("完成移动旧表数据 BuffPrice -> BuffSkin")