zhenxun_bot/plugins/draw_card/handles/prts_handle.py
2022-05-25 07:02:42 +08:00

300 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import random
import dateparser
from lxml import etree
from PIL import ImageDraw
from bs4 import BeautifulSoup
from datetime import datetime
from urllib.parse import unquote
from typing import List, Optional, Tuple
from pydantic import ValidationError
from nonebot.adapters.onebot.v11 import Message
from utils.message_builder import image
from nonebot.log import logger
import asyncio
try:
import ujson as json
except ModuleNotFoundError:
import json
from .base_handle import BaseHandle, BaseData, UpChar, UpEvent
from ..config import draw_config, DRAW_PATH
from ..util import remove_prohibited_str, cn2py, load_font
from utils.image_utils import BuildImage
class Operator(BaseData):
recruit_only: bool # 公招限定
event_only: bool # 活动获得干员
# special_only: bool # 升变/异格干员
class PrtsHandle(BaseHandle[Operator]):
def __init__(self):
super().__init__("prts", "明日方舟", "#eff2f5")
self.max_star = 6
self.config = draw_config.prts
self.ALL_OPERATOR: List[Operator] = []
self.UP_EVENT: Optional[UpEvent] = None
def get_card(self, add: float) -> Operator:
star = self.get_star(
[6, 5, 4, 3],
[
self.config.PRTS_SIX_P + add,
self.config.PRTS_FIVE_P,
self.config.PRTS_FOUR_P,
self.config.PRTS_THREE_P,
],
)
all_operators = [
x
for x in self.ALL_OPERATOR
if x.star == star and not any([x.limited, x.recruit_only, x.event_only])
]
acquire_operator = None
if self.UP_EVENT:
up_operators = [x for x in self.UP_EVENT.up_char if x.star == star]
# UPs
try:
zooms = [x.zoom for x in up_operators]
zoom_sum = sum(zooms)
if random.random() < zoom_sum:
up_name = random.choices(up_operators, weights=zooms, k=1)[0].name
acquire_operator = [
x for x in self.ALL_OPERATOR if x.name == up_name
][0]
except IndexError:
pass
if not acquire_operator:
acquire_operator = random.choice(all_operators)
return acquire_operator
def get_cards(self, count: int) -> List[Tuple[Operator, int]]:
card_list = [] # 获取所有角色
add = 0.0
count_idx = 0
for i in range(count):
count_idx += 1
card = self.get_card(add)
if card.star == self.max_star:
add = 0.0
count_idx = 0
elif count_idx > 50:
add += 0.02
card_list.append((card, i + 1))
return card_list
def format_pool_info(self) -> str:
info = ""
if self.UP_EVENT:
star6_list = [x.name for x in self.UP_EVENT.up_char if x.star == 6]
star5_list = [x.name for x in self.UP_EVENT.up_char if x.star == 5]
star4_list = [x.name for x in self.UP_EVENT.up_char if x.star == 4]
if star6_list:
info += f"六星UP{' '.join(star6_list)}\n"
if star5_list:
info += f"五星UP{' '.join(star5_list)}\n"
if star4_list:
info += f"四星UP{' '.join(star4_list)}\n"
info = f"当前up池: {self.UP_EVENT.title}\n{info}"
return info.strip()
async def draw(self, count: int, **kwargs) -> Message:
return await asyncio.get_event_loop().run_in_executor(None, self._draw, count)
def _draw(self, count: int, **kwargs) -> Message:
index2card = self.get_cards(count)
"""这里cards修复了抽卡图文不符的bug"""
cards = [card[0] for card in index2card]
up_list = [x.name for x in self.UP_EVENT.up_char] if self.UP_EVENT else []
result = self.format_result(index2card, up_list=up_list)
pool_info = self.format_pool_info()
return pool_info + image(b64=self.generate_img(cards).pic2bs4()) + result
def generate_card_img(self, card: Operator) -> BuildImage:
sep_w = 5
sep_h = 5
star_h = 15
img_w = 120
img_h = 120
font_h = 20
bg = BuildImage(img_w + sep_w * 2, img_h + font_h + sep_h * 2, color="#EFF2F5")
star_path = str(self.img_path / "star.png")
star = BuildImage(star_h, star_h, background=star_path)
img_path = str(self.img_path / f"{cn2py(card.name)}.png")
img = BuildImage(img_w, img_h, background=img_path)
bg.paste(img, (sep_w, sep_h), alpha=True)
for i in range(card.star):
bg.paste(star, (sep_w + img_w - 5 - star_h * (i + 1), sep_h), alpha=True)
# 加名字
text = card.name[:7] + "..." if len(card.name) > 8 else card.name
font = load_font(fontsize=16)
text_w, text_h = font.getsize(text)
draw = ImageDraw.Draw(bg.markImg)
draw.text(
(sep_w + (img_w - text_w) / 2, sep_h + img_h + (font_h - text_h) / 2),
text,
font=font,
fill="gray",
)
return bg
def _init_data(self):
self.ALL_OPERATOR = [
Operator(
name=value["名称"],
star=int(value["星级"]),
limited="干员寻访" not in value["获取途径"],
recruit_only=True
if "干员寻访" not in value["获取途径"] and "公开招募" in value["获取途径"]
else False,
event_only=True if "活动获取" in value["获取途径"] else False,
)
for key, value in self.load_data().items()
if "阿米娅" not in key
]
self.load_up_char()
def load_up_char(self):
try:
if (DRAW_PATH / "draw_card_up" / f"{self.game_name}_up_char.json").exists():
data = self.load_data(f"draw_card_up/{self.game_name}_up_char.json")
self.UP_EVENT = UpEvent.parse_obj(data.get("char", {}))
except ValidationError:
logger.warning(f"{self.game_name}_up_char 解析出错")
def dump_up_char(self):
if self.UP_EVENT:
data = {"char": json.loads(self.UP_EVENT.json())}
self.dump_data(data, f"draw_card_up/{self.game_name}_up_char.json")
async def _update_info(self):
info = {}
url = "https://wiki.biligame.com/arknights/干员数据表"
result = await self.get_url(url)
if not result:
logger.warning(f"更新 {self.game_name_cn} 出错")
return
dom = etree.HTML(result, etree.HTMLParser())
char_list = dom.xpath("//table[@id='CardSelectTr']/tbody/tr")
for char in char_list:
try:
avatar = char.xpath("./td[1]/div/div/div/a/img/@srcset")[0]
name = char.xpath("./td[2]/a/text()")[0]
star = char.xpath("./td[5]/text()")[0]
"""这里sources修好了干员获取标签有问题的bug如三星只能抽到卡缇就是这个原因"""
sources = [_.strip('\n') for _ in char.xpath("./td[8]/text()")]
except IndexError:
continue
member_dict = {
"头像": unquote(str(avatar).split(" ")[-2]),
"名称": remove_prohibited_str(str(name).strip()),
"星级": int(str(star).strip()),
"获取途径": sources,
}
info[member_dict["名称"]] = member_dict
self.dump_data(info)
logger.info(f"{self.game_name_cn} 更新成功")
# 下载头像
for value in info.values():
await self.download_img(value["头像"], value["名称"])
# 下载星星
await self.download_img(
"https://patchwiki.biligame.com/images/pcr/0/02/s75ys2ecqhu2xbdw1wf1v9ccscnvi5g.png",
"star",
)
await self.update_up_char()
async def update_up_char(self):
announcement_url = "https://ak.hypergryph.com/news.html"
result = await self.get_url(announcement_url)
if not result:
logger.warning(f"{self.game_name_cn}获取公告出错")
return
dom = etree.HTML(result, etree.HTMLParser())
activity_urls = dom.xpath(
"//ol[@class='articleList' and @data-category-key='ACTIVITY']/li/a/@href"
)
start_time = None
end_time = None
up_chars = []
pool_img = ""
title = ""
for activity_url in activity_urls:
activity_url = f"https://ak.hypergryph.com{activity_url}"
result = await self.get_url(activity_url)
if not result:
logger.warning(f"{self.game_name_cn}获取公告 {activity_url} 出错")
continue
soup = BeautifulSoup(result, "lxml")
contents = soup.find_all("p")
for index, content in enumerate(contents):
if re.search("(.*)(寻访|复刻).*?开启", content.text):
title = content.text
if "" in title and "" in title:
title = re.split(r"[【】]", title)[1]
lines = [str(contents[index + i + 1].text) for i in range(5)]
time = ""
chars: List[str] = []
for line in lines:
match = re.search(
r"(\d{1,2}月\d{1,2}日.*?-.*?\d{1,2}月\d{1,2}日.*?$)", line
)
if match:
time = match.group(1)
if "" in line:
"""这里修复了某些池子六星名称显示错误的问题(如奔崖号角)"""
if line[0] != '':
idx = line.find('')
line = line[idx:]
chars.append(line)
if not time:
continue
start, end = time.replace("", "/").replace("", "").split("-")[:2]
start_time = dateparser.parse(start)
end_time = dateparser.parse(end)
pool_img = content.find_previous("img")["src"]
for char in chars:
star = char.split("")[0].count("")
name = re.split(r"[]", char)[1]
names = name.split("/") if "/" in name else [name]
names = [name.replace("[限定]", "").strip() for name in names]
if "权值" in char:
match = re.search(r"(在.*?以.*?(\d+).*?倍权值.*?", char)
else:
match = re.search(r"(占.*?的.*?(\d+).*?%", char)
zoom = 1
if match:
zoom = float(match.group(1))
zoom = zoom / 100 if zoom > 10 else zoom
for name in names:
up_chars.append(
UpChar(name=name, star=star, limited=False, zoom=zoom)
)
break
if title and start_time and end_time:
if start_time <= datetime.now() <= end_time:
self.UP_EVENT = UpEvent(
title=title,
pool_img=pool_img,
start_time=start_time,
end_time=end_time,
up_char=up_chars,
)
self.dump_up_char()
logger.info(f"成功获取{self.game_name_cn}当前up信息...当前up池: {title}")
break
async def _reload_pool(self) -> Optional[Message]:
await self.update_up_char()
self.load_up_char()
if self.UP_EVENT:
return f"重载成功!\n当前UP池子{self.UP_EVENT.title}" + image(
self.UP_EVENT.pool_img
)