diff --git a/README.md b/README.md
index 28d7b61d..c1f0eaac 100644
--- a/README.md
+++ b/README.md
@@ -331,6 +331,11 @@ PS: **ARM平台** 请使用全量版 同时 **如果你的机器 RAM < 1G 可能
## 更新
+### 2023/3/25
+
+* 删除BUFF_SKIN表约束,新增`skin_id`字段
+* 开箱新增更新指定刀具皮肤命令(某些箱子金色无法通过api获取)
+
### 2023/3/20
* 修复BuildImage类text居中类型bug [@pull/1301](https://github.com/HibiKier/zhenxun_bot/pull/1317)
diff --git a/plugins/open_cases/__init__.py b/plugins/open_cases/__init__.py
index 63d83b5e..4bb2462b 100755
--- a/plugins/open_cases/__init__.py
+++ b/plugins/open_cases/__init__.py
@@ -6,7 +6,6 @@ from typing import Any, List, Tuple
from nonebot import on_command
from nonebot.adapters.onebot.v11 import GroupMessageEvent, Message, MessageEvent
from nonebot.adapters.onebot.v11.permission import GROUP
-from nonebot.matcher import Matcher
from nonebot.params import CommandArg, RegexGroup
from nonebot.permission import SUPERUSER
from nonebot.plugin import MatcherGroup
@@ -15,6 +14,8 @@ from nonebot.typing import T_State
from configs.config import Config
from configs.path_config import IMAGE_PATH
from services.log import logger
+from utils.depends import OneCommand
+from utils.image_utils import text2image
from utils.message_builder import image
from utils.utils import CN2NUM, is_number, scheduler
@@ -28,10 +29,12 @@ from .open_cases_c import (
)
from .utils import (
CASE2ID,
+ KNIFE2ID,
CaseManager,
build_case_image,
+ get_skin_case,
reset_count_daily,
- update_case_data,
+ update_skin_data,
)
__zx_plugin_name__ = "开箱"
@@ -55,8 +58,8 @@ usage:
更新皮肤指令
重置开箱: 重置今日开箱所有次数
指令:
- 更新开箱图片 ?[武器箱]
- 更新开箱价格 ?[武器箱]
+ 更新武器箱 ?[武器箱]
+ 更新皮肤 ?[刀具名称]
* 不指定武器箱时则全部更新 *
* 过多的爬取会导致账号API被封 *
""".strip()
@@ -68,8 +71,8 @@ __plugin_cmd__ = [
"我的金色",
"群开箱统计",
"查看武器箱?[武器箱]",
- "更新开箱图片 ?[武器箱] [_superuser]",
- "更新开箱价格 ?[武器箱] [_superuser]",
+ "更新武器箱 ?[武器箱] [_superuser]",
+ "更新皮肤 ?[刀具名称] [_superuser]",
]
__plugin_type__ = ("抽卡相关", 1)
__plugin_version__ = 0.1
@@ -125,11 +128,19 @@ total_case_data = cases_matcher_group.on_command(
)
group_open_case_statistics = cases_matcher_group.on_command("群开箱统计")
open_multiple = cases_matcher_group.on_regex("(.*)连开箱(.*)?")
-update_case = on_command("更新武器箱", priority=1, permission=SUPERUSER, block=True)
+update_case = on_command(
+ "更新武器箱", aliases={"更新皮肤"}, priority=1, permission=SUPERUSER, block=True
+)
show_case = on_command("查看武器箱", priority=5, block=True)
my_knifes = on_command("我的金色", priority=1, permission=GROUP, block=True)
show_skin = on_command("查看皮肤", priority=5, block=True)
-# show_case = on_command("test", priority=1, permission=GROUP, block=True)
+test = on_command("test", priority=1, permission=GROUP, block=True)
+
+
+@test.handle()
+async def _(event: GroupMessageEvent):
+ a = await get_skin_case("872472")
+ print(a)
@reload_count.handle()
@@ -189,41 +200,53 @@ async def _(
@update_case.handle()
-async def _(event: MessageEvent, arg: Message = CommandArg()):
+async def _(event: MessageEvent, arg: Message = CommandArg(), cmd: str = OneCommand()):
msg = arg.extract_plain_text().strip()
if not msg:
case_list = []
+ skin_list = []
for i, case_name in enumerate(CASE2ID):
if case_name in CaseManager.CURRENT_CASES:
case_list.append(f"{i+1}.{case_name} [已更新]")
else:
case_list.append(f"{i+1}.{case_name}")
- await update_case.finish("未指定武器箱, 当前已包含武器箱\n" + "\n".join(case_list))
- if msg == "ALL":
- await update_case.send(f"即将更新所有武器箱, 请稍等")
- case_list = list(CASE2ID.keys())
+ for skin_name in KNIFE2ID:
+ skin_list.append(f"{skin_name}")
+ text = "武器箱:\n" + "\n".join(case_list) + "\n皮肤:\n" + ", ".join(skin_list)
+ await update_case.finish(
+ "未指定武器箱, 当前已包含武器箱/皮肤\n"
+ + image(await text2image(text, padding=20, color="#f9f6f2"))
+ )
+ if msg in ["ALL", "ALL1"]:
+ if msg == "ALL":
+ case_list = list(CASE2ID.keys())
+ result = "武器箱"
+ else:
+ case_list = list(KNIFE2ID.keys())
+ result = "罕见皮肤"
+ await update_case.send(f"即将更新所有{result}, 请稍等")
for i, case_name in enumerate(case_list):
try:
- await update_case_data(case_name)
+ await update_skin_data(case_name)
rand = random.randint(300, 500)
- result = "更新全部武器箱完成"
+ result = f"更新全部{result}完成"
if i < len(case_list):
next_case = case_list[i + 1]
- result = f"将在 {rand} 秒后更新下一武器箱: {next_case}"
- await update_case.send(f"成功更新武器箱: {case_name}, {result}")
- logger.info(f"成功更新武器箱: {case_name}, {result}", "更新武器箱")
+ result = f"将在 {rand} 秒后更新下一{result}: {next_case}"
+ await update_case.send(f"成功更新{result}: {case_name}, {result}")
+ logger.info(f"成功更新{result}: {case_name}, {result}", "更新武器箱")
await asyncio.sleep(rand)
except Exception as e:
- logger.error(f"更新武器箱: {case_name}", e=e)
- await update_case.send(f"成功更新武器箱: {case_name} 发生错误: {type(e)}: {e}")
- await update_case.send(f"更新全部武器箱完成")
+ logger.error(f"更新{result}: {case_name}", e=e)
+ await update_case.send(f"更新{result}: {case_name} 发生错误: {type(e)}: {e}")
+ await update_case.send(f"更新全部{result}完成")
else:
- await update_case.send(f"开始更新武器箱: {msg}, 请稍等")
+ await update_case.send(f"开始{cmd}: {msg}, 请稍等")
try:
- await update_case.send(await update_case_data(msg), at_sender=True)
+ await update_case.send(await update_skin_data(msg), at_sender=True)
except Exception as e:
- logger.error(f"更新武器箱: {msg}", e=e)
- await update_case.send(f"成功自动更新武器箱: {msg} 发生错误: {type(e)}: {e}")
+ logger.error(f"{cmd}: {msg}", e=e)
+ await update_case.send(f"成功{cmd}: {msg} 发生错误: {type(e)}: {e}")
@show_case.handle()
diff --git a/plugins/open_cases/config.py b/plugins/open_cases/config.py
index 469278e6..c3496918 100755
--- a/plugins/open_cases/config.py
+++ b/plugins/open_cases/config.py
@@ -1,4 +1,5 @@
import random
+from enum import Enum
from typing import List, Tuple
from configs.path_config import IMAGE_PATH
@@ -34,6 +35,16 @@ BATTLE_SCARED_S = 0.45
BATTLE_SCARED_E = 0.99999
+class UpdateType(Enum):
+
+ """
+ 更新类型
+ """
+
+ CASE = "case"
+ WEAPON_TYPE = "weapon_type"
+
+
NAME2COLOR = {
"消费级": "WHITE",
"工业级": "LIGHTBLUE",
@@ -68,7 +79,30 @@ ABRASION_SORT = ["崭新出厂", "略有磨损", "久经沙场", "破损不堪",
CASE_BACKGROUND = IMAGE_PATH / "csgo_cases" / "_background" / "shu"
+# 刀
+KNIFE2ID = {
+ "鲍伊猎刀": "weapon_knife_survival_bowie",
+ "蝴蝶刀": "weapon_knife_butterfly",
+ "弯刀": "weapon_knife_falchion",
+ "折叠刀": "weapon_knife_flip",
+ "穿肠刀": "weapon_knife_gut",
+ "猎杀者匕首": "weapon_knife_tactical",
+ "M9刺刀": "weapon_knife_m9_bayonet",
+ "刺刀": "weapon_bayonet",
+ "爪子刀": "weapon_knife_karambit",
+ "暗影双匕": "weapon_knife_push",
+ "短剑": "weapon_knife_stiletto",
+ "熊刀": "weapon_knife_ursus",
+ "折刀": "weapon_knife_gypsy_jackknife",
+ "锯齿爪刀": "weapon_knife_widowmaker",
+ "海豹短刀": "weapon_knife_css",
+ "系绳匕首": "weapon_knife_cord",
+ "求生匕首": "weapon_knife_canis",
+ "流浪者匕首": "weapon_knife_outdoor",
+ "骷髅匕首": "weapon_knife_skeleton",
+}
+# 武器箱
CASE2ID = {
"变革": "set_community_32",
"反冲": "set_community_31",
diff --git a/plugins/open_cases/models/buff_skin.py b/plugins/open_cases/models/buff_skin.py
index 27ab3eba..70997eda 100644
--- a/plugins/open_cases/models/buff_skin.py
+++ b/plugins/open_cases/models/buff_skin.py
@@ -11,11 +11,11 @@ class BuffSkin(Model):
id = fields.IntField(pk=True, generated=True, auto_increment=True)
"""自增id"""
- case_name = fields.CharField(255)
+ case_name: str = fields.CharField(255) # type: ignore
"""箱子名称"""
- name = fields.CharField(255)
+ name: str = fields.CharField(255) # type: ignore
"""武器/手套/刀名称"""
- skin_name = fields.CharField(255)
+ skin_name: str = fields.CharField(255) # type: ignore
"""皮肤名称"""
is_stattrak = fields.BooleanField(default=False)
"""是否暗金(计数)"""
@@ -23,6 +23,8 @@ class BuffSkin(Model):
"""磨损度"""
color = fields.CharField(255)
"""颜色(品质)"""
+ skin_id = fields.CharField(255, null=True, unique=True)
+ """皮肤id"""
img_url = fields.CharField(255)
"""图片url"""
@@ -49,7 +51,7 @@ class BuffSkin(Model):
class Meta:
table = "buff_skin"
table_description = "Buff皮肤数据表"
- unique_together = ("case_name", "name", "skin_name", "abrasion")
+ # unique_together = ("case_name", "name", "skin_name", "abrasion", "is_stattrak")
@classmethod
async def random_skin(
@@ -79,6 +81,7 @@ class BuffSkin(Model):
async def _run_script(cls):
return [
"ALTER TABLE buff_skin ADD img_url varchar(255);", # 新增img_url
+ "ALTER TABLE buff_skin ADD skin_id varchar(255);", # 新增skin_id
"ALTER TABLE buff_skin ADD steam_price float DEFAULT 0;", # 新增steam_price
"ALTER TABLE buff_skin ADD weapon_type varchar(255);", # 新增type
"ALTER TABLE buff_skin ADD buy_max_price float DEFAULT 0;", # 新增buy_max_price
@@ -87,4 +90,5 @@ class BuffSkin(Model):
"ALTER TABLE buff_skin ADD sell_num Integer DEFAULT 0;", # 新增sell_num
"ALTER TABLE buff_skin ADD sell_reference_price float DEFAULT 0;", # 新增sell_reference_price
"ALTER TABLE buff_skin DROP COLUMN skin_price;", # 删除skin_price
+ "alter table buff_skin drop constraint if EXISTS uid_buff_skin_case_na_c35c93;", # 删除唯一约束
]
diff --git a/plugins/open_cases/open_cases_c.py b/plugins/open_cases/open_cases_c.py
index 70294c4b..d6caf162 100755
--- a/plugins/open_cases/open_cases_c.py
+++ b/plugins/open_cases/open_cases_c.py
@@ -17,7 +17,7 @@ from utils.utils import cn2py
from .config import *
from .models.open_cases_log import OpenCasesLog
from .models.open_cases_user import OpenCasesUser
-from .utils import CaseManager, update_case_data
+from .utils import CaseManager, update_skin_data
RESULT_MESSAGE = {
"BLUE": ["这样看着才舒服", "是自己人,大伙把刀收好", "非常舒适~"],
@@ -435,7 +435,7 @@ async def auto_update():
for case_name in case_list:
logger.debug(f"开始自动更新武器箱: {case_name}", "更新武器箱")
try:
- await update_case_data(case_name)
+ await update_skin_data(case_name)
rand = random.randint(300, 500)
logger.info(f"成功自动更新武器箱: {case_name}, 将在 {rand} 秒后再次更新下一武器箱", "更新武器箱")
await asyncio.sleep(rand)
diff --git a/plugins/open_cases/utils.py b/plugins/open_cases/utils.py
index aeefee5f..d204f08a 100755
--- a/plugins/open_cases/utils.py
+++ b/plugins/open_cases/utils.py
@@ -1,9 +1,10 @@
import asyncio
import os
import random
+import re
import time
from datetime import datetime
-from typing import List, Tuple, Union
+from typing import List, Optional, Tuple, Union
import nonebot
from tortoise.functions import Count
@@ -16,13 +17,21 @@ from utils.image_utils import BuildImage
from utils.utils import broadcast_group, cn2py
from .build_image import generate_skin
-from .config import CASE2ID, CASE_BACKGROUND, COLOR2NAME, NAME2COLOR
+from .config import (
+ CASE2ID,
+ CASE_BACKGROUND,
+ COLOR2NAME,
+ KNIFE2ID,
+ NAME2COLOR,
+ UpdateType,
+)
from .models.buff_skin import BuffSkin
from .models.buff_skin_log import BuffSkinLog
from .models.open_cases_user import OpenCasesUser
URL = "https://buff.163.com/api/market/goods"
-# proxies = 'http://49.75.59.242:3128'
+
+SELL_URL = "https://buff.163.com/goods"
driver = nonebot.get_driver()
@@ -41,44 +50,80 @@ class CaseManager:
)
-async def update_case_data(case_name: str) -> str:
- """更新皮肤数据
+async def update_skin_data(name: str) -> str:
+ """更新箱子内皮肤数据
Args:
- case_name (str): 箱子名称
+ name (str): 箱子名称
Returns:
_type_: _description_
"""
- if case_name not in CASE2ID:
- return "未在当前指定武器箱捏"
+ type_ = None
+ if name in CASE2ID:
+ type_ = UpdateType.CASE
+ if name in KNIFE2ID:
+ type_ = UpdateType.WEAPON_TYPE
+ if not type_:
+ return "未在指定武器箱或指定武器类型内"
session = Config.get_config("open_cases", "COOKIE")
if not session:
return "BUFF COOKIE为空捏!"
- db_skin_list = await BuffSkin.filter(case_name=case_name).all()
- db_skin_name_list = [
- skin.name + skin.skin_name + skin.abrasion for skin in db_skin_list
- ]
- data_list, total = await search_skin_page(case_name, 1)
+ weapon2case = {}
+ if type_ == UpdateType.CASE:
+ db_skin_id_list = [
+ skin.skin_id for skin in await BuffSkin.filter(case_name=name).all()
+ ]
+ else:
+ db_data = await BuffSkin.filter(name__contains=name).all()
+ db_skin_id_list = [
+ skin.skin_id for skin in await BuffSkin.filter(name__contains=name).all()
+ ]
+ weapon2case = {
+ item.name + item.skin_name: item.case_name
+ for item in db_data
+ if item.case_name != "未知武器箱"
+ }
+ data_list, total = await search_skin_page(name, 1, type_)
if isinstance(data_list, str):
return data_list
for page in range(2, total + 1):
- rand_time = random.randint(10, 50)
+ rand_time = random.randint(20, 50)
logger.debug(f"访问随机等待时间: {rand_time}", "开箱更新")
await asyncio.sleep(rand_time)
- data_list_, total = await search_skin_page(case_name, page)
+ data_list_, total = await search_skin_page(name, page, type_)
if isinstance(data_list_, list):
data_list += data_list_
create_list: List[BuffSkin] = []
update_list: List[BuffSkin] = []
log_list = []
- case_name_py = cn2py(case_name)
now = datetime.now()
+ exists_id_list = []
for skin in data_list:
- name = skin.name + skin.skin_name + skin.abrasion
+ if skin.skin_id in exists_id_list:
+ continue
+ exists_id_list.append(skin.skin_id)
+ key = skin.name + skin.skin_name
+ name_ = skin.name + skin.skin_name + skin.abrasion
skin.create_time = now
skin.update_time = now
- if name in db_skin_name_list:
+ if not skin.case_name:
+ case_name = weapon2case.get(key)
+ if not case_name:
+ case_name = await get_skin_case(skin.skin_id)
+ rand = random.randint(10, 20)
+ logger.debug(
+ f"获取 {skin.name} | {skin.skin_name} 皮肤所属武器箱: {case_name}, 访问随机等待时间: {rand}",
+ "开箱更新",
+ )
+ await asyncio.sleep(rand)
+ if not case_name:
+ case_name = "未知武器箱"
+ else:
+ weapon2case[key] = case_name
+ case_name = case_name.replace("”", "").replace("“", "")
+ skin.case_name = case_name
+ if skin.skin_id in db_skin_id_list:
update_list.append(skin)
else:
create_list.append(skin)
@@ -100,8 +145,8 @@ async def update_case_data(case_name: str) -> str:
create_time=now,
)
)
- name = skin.name + "-" + skin.skin_name + "-" + skin.abrasion
- file_path = BASE_PATH / case_name_py / f"{cn2py(name)}.jpg"
+ name_ = skin.name + "-" + skin.skin_name + "-" + skin.abrasion
+ file_path = BASE_PATH / cn2py(skin.case_name) / f"{cn2py(name_)}.jpg"
if not file_path.exists():
logger.debug(f"下载皮肤 {name} 图片: {skin.img_url}...", "开箱更新")
await AsyncHttpx.download_file(skin.img_url, file_path)
@@ -109,9 +154,9 @@ async def update_case_data(case_name: str) -> str:
await asyncio.sleep(rand_time)
logger.debug(f"图片下载随机等待时间: {rand_time}", "开箱更新")
else:
- logger.debug(f"皮肤 {name} 图片已存在...", "开箱更新")
+ logger.debug(f"皮肤 {name_} 图片已存在...", "开箱更新")
if create_list:
- logger.debug(f"更新武器箱: [{case_name}], 创建 {len(create_list)} 个皮肤!")
+ logger.debug(f"更新武器箱/皮肤: [{name}], 创建 {len(create_list)} 个皮肤!")
await BuffSkin.bulk_create(create_list, 10)
if update_list:
abrasion_list = []
@@ -125,7 +170,7 @@ async def update_case_data(case_name: str) -> str:
if skin.skin_name not in skin_name_list:
skin_name_list.append(skin.skin_name)
db_data = await BuffSkin.filter(
- case_name=case_name,
+ case_name=name,
skin_name__in=skin_name_list,
name__in=name_list,
abrasion__in=abrasion_list,
@@ -146,7 +191,7 @@ async def update_case_data(case_name: str) -> str:
data.sell_reference_price = skin.sell_reference_price
data.update_time = skin.update_time
_update_list.append(data)
- logger.debug(f"更新武器箱: [{case_name}], 更新 {len(create_list)} 个皮肤!")
+ logger.debug(f"更新武器箱/皮肤: [{name}], 更新 {len(create_list)} 个皮肤!")
await BuffSkin.bulk_update(
_update_list,
[
@@ -161,105 +206,129 @@ async def update_case_data(case_name: str) -> str:
10,
)
if log_list:
- logger.debug(f"更新武器箱: [{case_name}], 新增 {len(log_list)} 条皮肤日志!")
+ logger.debug(f"更新武器箱/皮肤: [{name}], 新增 {len(log_list)} 条皮肤日志!")
await BuffSkinLog.bulk_create(log_list)
- if case_name not in CaseManager.CURRENT_CASES:
+ if name not in CaseManager.CURRENT_CASES:
CaseManager.CURRENT_CASES.append(case_name) # type: ignore
- return f"更新武器箱: [{case_name}] 成功, 共更新 {len(update_list)} 个皮肤, 新创建 {len(create_list)} 个皮肤!"
+ return f"更新武器箱/皮肤: [{name}] 成功, 共更新 {len(update_list)} 个皮肤, 新创建 {len(create_list)} 个皮肤!"
async def search_skin_page(
- case_name: str, page_index: int
+ name: str, page_index: int, type_: UpdateType
) -> Tuple[Union[List[BuffSkin], str], int]:
"""查询箱子皮肤
Args:
- case_name (str): 箱子名称
+ name (str): 箱子名称
page_index (int): 页数
Returns:
Union[List[BuffSkin], str]: BuffSkin
"""
logger.debug(
- f"尝试访问武器箱: [{case_name}] 页数: [{page_index}]", "开箱更新"
+ f"尝试访问武器箱/皮肤: [{name}] 页数: [{page_index}]", "开箱更新"
)
cookie = {"session": Config.get_config("open_cases", "COOKIE")}
params = {
"game": "csgo",
"page_num": page_index,
"page_size": 80,
- "itemset": CASE2ID[case_name],
"_": time.time(),
"use_suggestio": 0,
}
+ if type_ == UpdateType.CASE:
+ params["itemset"] = CASE2ID[name]
+ elif type_ == UpdateType.WEAPON_TYPE:
+ params["category"] = KNIFE2ID[name]
proxy = None
if ip := Config.get_config("open_cases", "BUFF_PROXY"):
proxy = {"http://": ip, "https://": ip}
- response = await AsyncHttpx.get(
- URL,
- proxy=proxy,
- params=params,
- cookies=cookie, # type: ignore
- )
- logger.debug(f"访问BUFF API: {response.text}", "更新武器箱")
- json_data = response.json()
- update_data = []
- if json_data["code"] == "OK":
- data_list = json_data["data"]["items"]
- for data in data_list:
- obj = {"case_name": case_name}
- name = data["name"]
- try:
- logger.debug(
- f"武器箱: [{case_name}] 页数: [{page_index}] 正在收录皮肤: [{name}]...",
- "开箱更新",
- )
- obj["buy_max_price"] = data["buy_max_price"] # 求购最大金额
- obj["buy_num"] = data["buy_num"] # 当前求购
- goods_info = data["goods_info"]
- info = goods_info["info"]
- tags = info["tags"]
- obj["weapon_type"] = tags["type"]["localized_name"] # 枪械类型
- if obj["weapon_type"] in ["音乐盒", "印花", "探员"]:
- continue
- elif obj["weapon_type"] in ["匕首", "手套"]:
- obj["color"] = "KNIFE"
- obj["name"] = data["short_name"].split("(")[0].strip() # 名称
- elif obj["weapon_type"] in ["武器箱"]:
- obj["color"] = "CASE"
- obj["name"] = data["short_name"]
- else:
- obj["color"] = NAME2COLOR[tags["rarity"]["localized_name"]]
- obj["name"] = tags["weapon"]["localized_name"] # 名称
- if obj["weapon_type"] not in ["武器箱"]:
- obj["abrasion"] = tags["exterior"]["localized_name"] # 磨损
- obj["is_stattrak"] = "StatTrak" in tags["quality"]["localized_name"] # type: ignore # 是否暗金
- if not obj["color"]:
- obj["color"] = NAME2COLOR[
- tags["rarity"]["localized_name"]
- ] # 品质颜色
- else:
- obj["abrasion"] = "CASE"
- obj["skin_name"] = data["short_name"].split("|")[-1].strip() # 皮肤名称
- obj["img_url"] = goods_info["original_icon_url"] # 图片url
- obj["steam_price"] = goods_info["steam_price_cny"] # steam价格
- obj["sell_min_price"] = data["sell_min_price"] # 售卖最低价格
- obj["sell_num"] = data["sell_num"] # 售卖数量
- obj["sell_reference_price"] = data["sell_reference_price"] # 参考价格
- update_data.append(BuffSkin(**obj))
- except Exception as e:
- logger.error(
- f"更新武器箱: [{case_name}] 皮肤: [{name}] 错误",
- e=e,
- )
- logger.debug(
- f"访问武器箱: [{case_name}] 页数: [{page_index}] 成功并收录完成",
- "开箱更新",
- )
- return update_data, json_data["data"]["total_page"]
- else:
- logger.warning(f'访问BUFF失败: {json_data["error"]}')
- return f'访问失败: {json_data["error"]}', -1
+ response = None
+ error = ""
+ for i in range(3):
+ try:
+ response = await AsyncHttpx.get(
+ URL,
+ proxy=proxy,
+ params=params,
+ cookies=cookie, # type: ignore
+ )
+ if response.status_code == 200:
+ break
+ rand = random.randint(3, 7)
+ logger.debug(
+ f"尝试访问武器箱/皮肤第 {i+1} 次访问异常, code: {response.status_code}", "开箱更新"
+ )
+ await asyncio.sleep(rand)
+ except Exception as e:
+ logger.debug(f"尝试访问武器箱/皮肤第 {i+1} 次访问发生错误 {type(e)}: {e}", "开箱更新")
+ error = f"{type(e)}: {e}"
+ if not response:
+ return f"访问发生异常: {error}", -1
+ if response.status_code == 200:
+ logger.debug(f"访问BUFF API: {response.text}", "开箱更新")
+ json_data = response.json()
+ update_data = []
+ if json_data["code"] == "OK":
+ data_list = json_data["data"]["items"]
+ for data in data_list:
+ obj = {}
+ if type_ == UpdateType.CASE:
+ obj["case_name"] = name
+ name = data["name"]
+ try:
+ logger.debug(
+ f"武器箱: [{name}] 页数: [{page_index}] 正在收录皮肤: [{name}]...",
+ "开箱更新",
+ )
+ obj["skin_id"] = str(data["id"])
+ obj["buy_max_price"] = data["buy_max_price"] # 求购最大金额
+ obj["buy_num"] = data["buy_num"] # 当前求购
+ goods_info = data["goods_info"]
+ info = goods_info["info"]
+ tags = info["tags"]
+ obj["weapon_type"] = tags["type"]["localized_name"] # 枪械类型
+ if obj["weapon_type"] in ["音乐盒", "印花", "探员"]:
+ continue
+ elif obj["weapon_type"] in ["匕首", "手套"]:
+ obj["color"] = "KNIFE"
+ obj["name"] = data["short_name"].split("(")[0].strip() # 名称
+ elif obj["weapon_type"] in ["武器箱"]:
+ obj["color"] = "CASE"
+ obj["name"] = data["short_name"]
+ else:
+ obj["color"] = NAME2COLOR[tags["rarity"]["localized_name"]]
+ obj["name"] = tags["weapon"]["localized_name"] # 名称
+ if obj["weapon_type"] not in ["武器箱"]:
+ obj["abrasion"] = tags["exterior"]["localized_name"] # 磨损
+ obj["is_stattrak"] = "StatTrak" in tags["quality"]["localized_name"] # type: ignore # 是否暗金
+ if not obj["color"]:
+ obj["color"] = NAME2COLOR[
+ tags["rarity"]["localized_name"]
+ ] # 品质颜色
+ else:
+ obj["abrasion"] = "CASE"
+ obj["skin_name"] = data["short_name"].split("|")[-1].strip() # 皮肤名称
+ obj["img_url"] = goods_info["original_icon_url"] # 图片url
+ obj["steam_price"] = goods_info["steam_price_cny"] # steam价格
+ obj["sell_min_price"] = data["sell_min_price"] # 售卖最低价格
+ obj["sell_num"] = data["sell_num"] # 售卖数量
+ obj["sell_reference_price"] = data["sell_reference_price"] # 参考价格
+ update_data.append(BuffSkin(**obj))
+ except Exception as e:
+ logger.error(
+ f"更新武器箱: [{name}] 皮肤: [{name}] 错误",
+ e=e,
+ )
+ logger.debug(
+ f"访问武器箱: [{name}] 页数: [{page_index}] 成功并收录完成",
+ "开箱更新",
+ )
+ return update_data, json_data["data"]["total_page"]
+ else:
+ logger.warning(f'访问BUFF失败: {json_data["error"]}')
+ return f'访问失败: {json_data["error"]}', -1
+ return f"访问失败, 状态码: {response.status_code}", -1
async def build_case_image(case_name: str) -> Union[BuildImage, str]:
@@ -421,6 +490,32 @@ def get_bk_image_size(
return new_size
+async def get_skin_case(id_: str) -> Optional[str]:
+ """获取皮肤所在箱子
+
+ Args:
+ id_ (str): 皮肤id
+
+ Returns:
+ Optional[str]: 武器箱名称
+ """
+ url = f"{SELL_URL}/{id_}"
+ proxy = None
+ if ip := Config.get_config("open_cases", "BUFF_PROXY"):
+ proxy = {"http://": ip, "https://": ip}
+ response = await AsyncHttpx.get(
+ url,
+ proxy=proxy,
+ )
+ if response.status_code == 200:
+ text = response.text
+ if r := re.search('', text):
+ return r.group(1)
+ else:
+ logger.debug(f"访问皮肤所属武器箱异常 url: {url} code: {response.status_code}")
+ return None
+
+
async def reset_count_daily():
"""
重置每日开箱
@@ -437,57 +532,3 @@ async def reset_count_daily():
@driver.on_startup
async def _():
await CaseManager.reload()
-
-
-@driver.on_startup
-async def _():
- """
- 将旧表数据移动到新表
- """
- # if not await BuffSkin.first() and await BuffPrice.first():
- # logger.debug("开始移动旧表数据 BuffPrice -> BuffSkin")
- # id2name = {1: "狂牙大行动", 2: "突围大行动", 3: "命悬一线", 4: "裂空", 5: "光谱"}
- # data_list: List[BuffSkin] = []
- # for data in await BuffPrice.all():
- # logger.debug(f"移动旧表数据: {data.skin_name}")
- # case_name = id2name[data.case_id]
- # name = data.skin_name
- # is_stattrak = "StatTrak" in name
- # name = name.replace("(★ StatTrak™)", "").replace("(StatTrak™)", "").strip()
- # name, skin_name = name.split("|")
- # abrasion = "无涂装"
- # if "(" in skin_name:
- # skin_name, abrasion = skin_name.split("(")
- # if abrasion.endswith(")"):
- # abrasion = abrasion[:-1]
- # color = get_color(case_name, name.strip(), skin_name.strip())
- # if not color:
- # search_list = [
- # x
- # for x in data_list
- # if x.skin_name == skin_name.strip() and x.name == name.strip()
- # ]
- # if search_list:
- # color = get_color(
- # case_name, search_list[0].name, search_list[0].skin_name
- # )
- # if not color:
- # logger.debug(
- # f"箱子: [{case_name}] 皮肤: [{name}|{skin_name}] 未获取到皮肤品质,跳过..."
- # )
- # continue
- # data_list.append(
- # BuffSkin(
- # case_name=case_name,
- # name=name.strip(),
- # skin_name=skin_name.strip(),
- # is_stattrak=is_stattrak,
- # abrasion=abrasion.strip(),
- # skin_price=data.skin_price,
- # color=color,
- # create_time=datetime.now(),
- # update_time=datetime.now(),
- # )
- # )
- # await BuffSkin.bulk_create(data_list, batch_size=10)
- # logger.debug("完成移动旧表数据 BuffPrice -> BuffSkin")