zhenxun_bot/plugins/open_cases/utils.py
2023-03-26 21:29:44 +08:00

535 lines
20 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import os
import random
import re
import time
from datetime import datetime
from typing import List, Optional, Tuple, Union
import nonebot
from tortoise.functions import Count
from configs.config import Config
from configs.path_config import IMAGE_PATH
from services.log import logger
from utils.http_utils import AsyncHttpx
from utils.image_utils import BuildImage
from utils.utils import broadcast_group, cn2py
from .build_image import generate_skin
from .config import (
CASE2ID,
CASE_BACKGROUND,
COLOR2NAME,
KNIFE2ID,
NAME2COLOR,
UpdateType,
)
from .models.buff_skin import BuffSkin
from .models.buff_skin_log import BuffSkinLog
from .models.open_cases_user import OpenCasesUser
URL = "https://buff.163.com/api/market/goods"
SELL_URL = "https://buff.163.com/goods"
driver = nonebot.get_driver()
BASE_PATH = IMAGE_PATH / "csgo_cases"
class CaseManager:
CURRENT_CASES = []
@classmethod
async def reload(cls):
cls.CURRENT_CASES = (
await BuffSkin.annotate().distinct().values_list("case_name", flat=True) # type: ignore
)
async def update_skin_data(name: str) -> str:
"""更新箱子内皮肤数据
Args:
name (str): 箱子名称
Returns:
_type_: _description_
"""
type_ = None
if name in CASE2ID:
type_ = UpdateType.CASE
if name in KNIFE2ID:
type_ = UpdateType.WEAPON_TYPE
if not type_:
return "未在指定武器箱或指定武器类型内"
session = Config.get_config("open_cases", "COOKIE")
if not session:
return "BUFF COOKIE为空捏!"
weapon2case = {}
if type_ == UpdateType.CASE:
db_skin_id_list = [
skin.skin_id for skin in await BuffSkin.filter(case_name=name).all()
]
else:
db_data = await BuffSkin.filter(name__contains=name).all()
db_skin_id_list = [
skin.skin_id for skin in await BuffSkin.filter(name__contains=name).all()
]
weapon2case = {
item.name + item.skin_name: item.case_name
for item in db_data
if item.case_name != "未知武器箱"
}
data_list, total = await search_skin_page(name, 1, type_)
if isinstance(data_list, str):
return data_list
for page in range(2, total + 1):
rand_time = random.randint(20, 50)
logger.debug(f"访问随机等待时间: {rand_time}", "开箱更新")
await asyncio.sleep(rand_time)
data_list_, total = await search_skin_page(name, page, type_)
if isinstance(data_list_, list):
data_list += data_list_
create_list: List[BuffSkin] = []
update_list: List[BuffSkin] = []
log_list = []
now = datetime.now()
exists_id_list = []
for skin in data_list:
if skin.skin_id in exists_id_list:
continue
exists_id_list.append(skin.skin_id)
key = skin.name + skin.skin_name
name_ = skin.name + skin.skin_name + skin.abrasion
skin.create_time = now
skin.update_time = now
if not skin.case_name:
case_name = weapon2case.get(key)
if not case_name:
case_name = await get_skin_case(skin.skin_id)
rand = random.randint(10, 20)
logger.debug(
f"获取 {skin.name} | {skin.skin_name} 皮肤所属武器箱: {case_name}, 访问随机等待时间: {rand}",
"开箱更新",
)
await asyncio.sleep(rand)
if not case_name:
case_name = "未知武器箱"
else:
weapon2case[key] = case_name
case_name = case_name.replace("", "").replace("", "")
skin.case_name = case_name
if skin.skin_id in db_skin_id_list:
update_list.append(skin)
else:
create_list.append(skin)
log_list.append(
BuffSkinLog(
name=skin.name,
case_name=skin.case_name,
skin_name=skin.skin_name,
is_stattrak=skin.is_stattrak,
abrasion=skin.abrasion,
color=skin.color,
steam_price=skin.steam_price,
weapon_type=skin.weapon_type,
buy_max_price=skin.buy_max_price,
buy_num=skin.buy_num,
sell_min_price=skin.sell_min_price,
sell_num=skin.sell_num,
sell_reference_price=skin.sell_reference_price,
create_time=now,
)
)
name_ = skin.name + "-" + skin.skin_name + "-" + skin.abrasion
file_path = BASE_PATH / cn2py(skin.case_name) / f"{cn2py(name_)}.jpg"
if not file_path.exists():
logger.debug(f"下载皮肤 {name} 图片: {skin.img_url}...", "开箱更新")
await AsyncHttpx.download_file(skin.img_url, file_path)
rand_time = random.randint(1, 10)
await asyncio.sleep(rand_time)
logger.debug(f"图片下载随机等待时间: {rand_time}", "开箱更新")
else:
logger.debug(f"皮肤 {name_} 图片已存在...", "开箱更新")
if create_list:
logger.debug(f"更新武器箱/皮肤: [<u><e>{name}</e></u>], 创建 {len(create_list)} 个皮肤!")
await BuffSkin.bulk_create(create_list, 10)
if update_list:
abrasion_list = []
name_list = []
skin_name_list = []
for skin in update_list:
if skin.abrasion not in abrasion_list:
abrasion_list.append(skin.abrasion)
if skin.name not in name_list:
name_list.append(skin.name)
if skin.skin_name not in skin_name_list:
skin_name_list.append(skin.skin_name)
db_data = await BuffSkin.filter(
case_name=name,
skin_name__in=skin_name_list,
name__in=name_list,
abrasion__in=abrasion_list,
).all()
_update_list = []
for data in db_data:
for skin in update_list:
if (
data.name == skin.name
and data.skin_name == skin.skin_name
and data.abrasion == skin.abrasion
):
data.steam_price = skin.steam_price
data.buy_max_price = skin.buy_max_price
data.buy_num = skin.buy_num
data.sell_min_price = skin.sell_min_price
data.sell_num = skin.sell_num
data.sell_reference_price = skin.sell_reference_price
data.update_time = skin.update_time
_update_list.append(data)
logger.debug(f"更新武器箱/皮肤: [<u><c>{name}</c></u>], 更新 {len(create_list)} 个皮肤!")
await BuffSkin.bulk_update(
_update_list,
[
"steam_price",
"buy_max_price",
"buy_num",
"sell_min_price",
"sell_num",
"sell_reference_price",
"update_time",
],
10,
)
if log_list:
logger.debug(f"更新武器箱/皮肤: [<u><e>{name}</e></u>], 新增 {len(log_list)} 条皮肤日志!")
await BuffSkinLog.bulk_create(log_list)
if name not in CaseManager.CURRENT_CASES:
CaseManager.CURRENT_CASES.append(case_name) # type: ignore
return f"更新武器箱/皮肤: [{name}] 成功, 共更新 {len(update_list)} 个皮肤, 新创建 {len(create_list)} 个皮肤!"
async def search_skin_page(
s_name: str, page_index: int, type_: UpdateType
) -> Tuple[Union[List[BuffSkin], str], int]:
"""查询箱子皮肤
Args:
s_name (str): 箱子/皮肤名称
page_index (int): 页数
Returns:
Union[List[BuffSkin], str]: BuffSkin
"""
logger.debug(
f"尝试访问武器箱/皮肤: [<u><e>{s_name}</e></u>] 页数: [<u><y>{page_index}</y></u>]", "开箱更新"
)
cookie = {"session": Config.get_config("open_cases", "COOKIE")}
params = {
"game": "csgo",
"page_num": page_index,
"page_size": 80,
"_": time.time(),
"use_suggestio": 0,
}
if type_ == UpdateType.CASE:
params["itemset"] = CASE2ID[s_name]
elif type_ == UpdateType.WEAPON_TYPE:
params["category"] = KNIFE2ID[s_name]
proxy = None
if ip := Config.get_config("open_cases", "BUFF_PROXY"):
proxy = {"http://": ip, "https://": ip}
response = None
error = ""
for i in range(3):
try:
response = await AsyncHttpx.get(
URL,
proxy=proxy,
params=params,
cookies=cookie, # type: ignore
)
if response.status_code == 200:
break
rand = random.randint(3, 7)
logger.debug(
f"尝试访问武器箱/皮肤第 {i+1} 次访问异常, code: {response.status_code}", "开箱更新"
)
await asyncio.sleep(rand)
except Exception as e:
logger.debug(f"尝试访问武器箱/皮肤第 {i+1} 次访问发生错误 {type(e)}: {e}", "开箱更新")
error = f"{type(e)}: {e}"
if not response:
return f"访问发生异常: {error}", -1
if response.status_code == 200:
logger.debug(f"访问BUFF API: {response.text}", "开箱更新")
json_data = response.json()
update_data = []
if json_data["code"] == "OK":
data_list = json_data["data"]["items"]
for data in data_list:
obj = {}
if type_ == UpdateType.CASE:
obj["case_name"] = s_name
name = data["name"]
try:
logger.debug(
f"武器箱: [<u><e>{s_name}</e></u>] 页数: [<u><y>{page_index}</y></u>] 正在收录皮肤: [<u><c>{name}</c></u>]...",
"开箱更新",
)
obj["skin_id"] = str(data["id"])
obj["buy_max_price"] = data["buy_max_price"] # 求购最大金额
obj["buy_num"] = data["buy_num"] # 当前求购
goods_info = data["goods_info"]
info = goods_info["info"]
tags = info["tags"]
obj["weapon_type"] = tags["type"]["localized_name"] # 枪械类型
if obj["weapon_type"] in ["音乐盒", "印花", "探员"]:
continue
elif obj["weapon_type"] in ["匕首", "手套"]:
obj["color"] = "KNIFE"
obj["name"] = data["short_name"].split("")[0].strip() # 名称
elif obj["weapon_type"] in ["武器箱"]:
obj["color"] = "CASE"
obj["name"] = data["short_name"]
else:
obj["color"] = NAME2COLOR[tags["rarity"]["localized_name"]]
obj["name"] = tags["weapon"]["localized_name"] # 名称
if obj["weapon_type"] not in ["武器箱"]:
obj["abrasion"] = tags["exterior"]["localized_name"] # 磨损
obj["is_stattrak"] = "StatTrak" in tags["quality"]["localized_name"] # type: ignore # 是否暗金
if not obj["color"]:
obj["color"] = NAME2COLOR[
tags["rarity"]["localized_name"]
] # 品质颜色
else:
obj["abrasion"] = "CASE"
obj["skin_name"] = data["short_name"].split("|")[-1].strip() # 皮肤名称
obj["img_url"] = goods_info["original_icon_url"] # 图片url
obj["steam_price"] = goods_info["steam_price_cny"] # steam价格
obj["sell_min_price"] = data["sell_min_price"] # 售卖最低价格
obj["sell_num"] = data["sell_num"] # 售卖数量
obj["sell_reference_price"] = data["sell_reference_price"] # 参考价格
update_data.append(BuffSkin(**obj))
except Exception as e:
logger.error(
f"更新武器箱: [<u><e>{s_name}</e></u>] 皮肤: [<u><c>{s_name}</c></u>] 错误",
e=e,
)
logger.debug(
f"访问武器箱: [<u><e>{s_name}</e></u>] 页数: [<u><y>{page_index}</y></u>] 成功并收录完成",
"开箱更新",
)
return update_data, json_data["data"]["total_page"]
else:
logger.warning(f'访问BUFF失败: {json_data["error"]}')
return f'访问失败: {json_data["error"]}', -1
return f"访问失败, 状态码: {response.status_code}", -1
async def build_case_image(case_name: str) -> Union[BuildImage, str]:
"""构造武器箱图片
Args:
case_name (str): 名称
Returns:
Union[BuildImage, str]: 图片
"""
background = random.choice(os.listdir(CASE_BACKGROUND))
background_img = BuildImage(0, 0, background=CASE_BACKGROUND / background)
if case_name:
log_list = (
await BuffSkinLog.filter(case_name=case_name)
.annotate(count=Count("id"))
.group_by("skin_name")
.values_list("skin_name", "count")
)
skin_list_ = await BuffSkin.filter(case_name=case_name).all()
skin2count = {item[0]: item[1] for item in log_list}
case = None
skin_list: List[BuffSkin] = []
exists_name = []
for skin in skin_list_:
if skin.color == "CASE":
case = skin
else:
name = skin.name + skin.skin_name
if name not in exists_name:
skin_list.append(skin)
exists_name.append(name)
generate_img = {}
for skin in skin_list:
skin_img = await generate_skin(skin, skin2count[skin.skin_name])
if skin_img:
if not generate_img.get(skin.color):
generate_img[skin.color] = []
generate_img[skin.color].append(skin_img)
skin_image_list = []
for color in COLOR2NAME:
if generate_img.get(color):
skin_image_list = skin_image_list + generate_img[color]
img = skin_image_list[0]
img_w, img_h = img.size
total_size = (img_w + 25) * (img_h + 10) * len(skin_image_list) # 总面积
new_size = get_bk_image_size(total_size, background_img.size, img.size, 250)
A = BuildImage(
new_size[0] + 50, new_size[1], background=CASE_BACKGROUND / background
)
await A.afilter("GaussianBlur", 2)
if case:
case_img = await generate_skin(case, skin2count[f"{case_name}武器箱"])
if case_img:
A.paste(case_img, (25, 25), True)
w = 25
h = 230
skin_image_list.reverse()
for image in skin_image_list:
A.paste(image, (w, h), True)
w += image.w + 20
if w + image.w - 25 > A.w:
h += image.h + 10
w = 25
if h + img_h + 100 < A.h:
await A.acrop((0, 0, A.w, h + img_h + 100))
return A
else:
log_list = (
await BuffSkinLog.filter(color="CASE")
.annotate(count=Count("id"))
.group_by("case_name")
.values_list("case_name", "count")
)
name2count = {item[0]: item[1] for item in log_list}
skin_list = await BuffSkin.filter(color="CASE").all()
image_list: List[BuildImage] = []
for skin in skin_list:
if img := await generate_skin(skin, name2count[skin.case_name]):
image_list.append(img)
if not image_list:
return "未收录武器箱"
w = 25
h = 150
img = image_list[0]
img_w, img_h = img.size
total_size = (img_w + 25) * (img_h + 10) * len(image_list) # 总面积
new_size = get_bk_image_size(total_size, background_img.size, img.size, 155)
A = BuildImage(
new_size[0] + 50, new_size[1], background=CASE_BACKGROUND / background
)
await A.afilter("GaussianBlur", 2)
bk_img = BuildImage(
img_w, 120, color=(25, 25, 25, 100), font_size=60, font="CJGaoDeGuo.otf"
)
await bk_img.atext(
(0, 0), f"已收录 {len(image_list)} 个武器箱", (255, 255, 255), center_type="center"
)
await A.apaste(bk_img, (10, 10), True, "by_width")
for image in image_list:
A.paste(image, (w, h), True)
w += image.w + 20
if w + image.w - 25 > A.w:
h += image.h + 10
w = 25
if h + img_h + 100 < A.h:
await A.acrop((0, 0, A.w, h + img_h + 100))
return A
def get_bk_image_size(
total_size: int,
base_size: Tuple[int, int],
img_size: Tuple[int, int],
extra_height: int = 0,
):
"""获取所需背景大小且不改变图片长宽比
Args:
total_size (int): 总面积
base_size (Tuple[int, int]): 初始背景大小
img_size (Tuple[int, int]): 贴图大小
Returns:
_type_: 满足所有贴图大小
"""
bk_w, bk_h = base_size
img_w, img_h = img_size
is_add_title_size = False
left_dis = 0
right_dis = 0
old_size = (0, 0)
new_size = (0, 0)
ratio = 1.1
while 1:
w_ = int(ratio * bk_w)
h_ = int(ratio * bk_h)
size = w_ * h_
if size < total_size:
left_dis = size
else:
right_dis = size
r = w_ / (img_w + 25)
if right_dis and r - int(r) < 0.1:
if not is_add_title_size and extra_height:
total_size = int(total_size + w_ * extra_height)
is_add_title_size = True
right_dis = 0
continue
if total_size - left_dis > right_dis - total_size:
new_size = (w_, h_)
else:
new_size = old_size
break
old_size = (w_, h_)
ratio += 0.1
return new_size
async def get_skin_case(id_: str) -> Optional[str]:
"""获取皮肤所在箱子
Args:
id_ (str): 皮肤id
Returns:
Optional[str]: 武器箱名称
"""
url = f"{SELL_URL}/{id_}"
proxy = None
if ip := Config.get_config("open_cases", "BUFF_PROXY"):
proxy = {"http://": ip, "https://": ip}
response = await AsyncHttpx.get(
url,
proxy=proxy,
)
if response.status_code == 200:
text = response.text
if r := re.search('<meta name="description".*,(.*)武器箱.*?>', text):
return r.group(1)
else:
logger.debug(f"访问皮肤所属武器箱异常 url: {url} code: {response.status_code}")
return None
async def reset_count_daily():
"""
重置每日开箱
"""
try:
await OpenCasesUser.all().update(today_open_total=0)
await broadcast_group(
"[[_task|open_case_reset_remind]]今日开箱次数重置成功", log_cmd="开箱重置提醒"
)
except Exception as e:
logger.error(f"开箱重置错误", e=e)
@driver.on_startup
async def _():
await CaseManager.reload()