mirror of
https://github.com/zhenxun-org/zhenxun_bot.git
synced 2025-12-15 14:22:55 +08:00
293 lines
12 KiB
Python
293 lines
12 KiB
Python
import re
|
||
import random
|
||
import dateparser
|
||
from lxml import etree
|
||
from PIL import ImageDraw
|
||
from bs4 import BeautifulSoup
|
||
from datetime import datetime
|
||
from urllib.parse import unquote
|
||
from typing import List, Optional, Tuple
|
||
from pydantic import ValidationError
|
||
from nonebot.adapters.onebot.v11 import Message
|
||
from utils.message_builder import image
|
||
from nonebot.log import logger
|
||
import asyncio
|
||
|
||
try:
|
||
import ujson as json
|
||
except ModuleNotFoundError:
|
||
import json
|
||
|
||
from .base_handle import BaseHandle, BaseData, UpChar, UpEvent
|
||
from ..config import draw_config
|
||
from ..util import remove_prohibited_str, cn2py, load_font
|
||
from utils.image_utils import BuildImage
|
||
|
||
|
||
class Operator(BaseData):
|
||
recruit_only: bool # 公招限定
|
||
event_only: bool # 活动获得干员
|
||
# special_only: bool # 升变/异格干员
|
||
|
||
|
||
class PrtsHandle(BaseHandle[Operator]):
|
||
def __init__(self):
|
||
super().__init__("prts", "明日方舟", "#eff2f5")
|
||
self.max_star = 6
|
||
self.config = draw_config.prts
|
||
|
||
self.ALL_OPERATOR: List[Operator] = []
|
||
self.UP_EVENT: Optional[UpEvent] = None
|
||
|
||
def get_card(self, add: float) -> Operator:
|
||
star = self.get_star(
|
||
[6, 5, 4, 3],
|
||
[
|
||
self.config.PRTS_SIX_P + add,
|
||
self.config.PRTS_FIVE_P,
|
||
self.config.PRTS_FOUR_P,
|
||
self.config.PRTS_THREE_P,
|
||
],
|
||
)
|
||
|
||
all_operators = [
|
||
x
|
||
for x in self.ALL_OPERATOR
|
||
if x.star == star and not any([x.limited, x.event_only, x.recruit_only])
|
||
]
|
||
acquire_operator = None
|
||
|
||
if self.UP_EVENT:
|
||
up_operators = [x for x in self.UP_EVENT.up_char if x.star == star]
|
||
# UPs
|
||
try:
|
||
zooms = [x.zoom for x in up_operators]
|
||
zoom_sum = sum(zooms)
|
||
if random.random() < zoom_sum:
|
||
up_name = random.choices(up_operators, weights=zooms, k=1)[0].name
|
||
acquire_operator = [
|
||
x for x in self.ALL_OPERATOR if x.name == up_name
|
||
][0]
|
||
except IndexError:
|
||
pass
|
||
if not acquire_operator:
|
||
acquire_operator = random.choice(all_operators)
|
||
return acquire_operator
|
||
|
||
def get_cards(self, count: int) -> List[Tuple[Operator, int]]:
|
||
card_list = [] # 获取所有角色
|
||
add = 0.0
|
||
count_idx = 0
|
||
for i in range(count):
|
||
count_idx += 1
|
||
card = self.get_card(add)
|
||
if card.star == self.max_star:
|
||
add = 0.0
|
||
count_idx = 0
|
||
elif count_idx > 50:
|
||
add += 0.02
|
||
card_list.append((card, i + 1))
|
||
return card_list
|
||
|
||
def format_pool_info(self) -> str:
|
||
info = ""
|
||
if self.UP_EVENT:
|
||
star6_list = [x.name for x in self.UP_EVENT.up_char if x.star == 6]
|
||
star5_list = [x.name for x in self.UP_EVENT.up_char if x.star == 5]
|
||
star4_list = [x.name for x in self.UP_EVENT.up_char if x.star == 4]
|
||
if star6_list:
|
||
info += f"六星UP:{' '.join(star6_list)}\n"
|
||
if star5_list:
|
||
info += f"五星UP:{' '.join(star5_list)}\n"
|
||
if star4_list:
|
||
info += f"四星UP:{' '.join(star4_list)}\n"
|
||
info = f"当前up池: {self.UP_EVENT.title}\n{info}"
|
||
return info.strip()
|
||
|
||
async def draw(self, count: int, **kwargs) -> Message:
|
||
return await asyncio.get_event_loop().run_in_executor(None, self._draw, count)
|
||
|
||
def _draw(self, count: int, **kwargs) -> Message:
|
||
index2card = self.get_cards(count)
|
||
cards = [card[0] for card in self.get_cards(count)]
|
||
up_list = [x.name for x in self.UP_EVENT.up_char] if self.UP_EVENT else []
|
||
result = self.format_result(index2card, up_list=up_list)
|
||
pool_info = self.format_pool_info()
|
||
return pool_info + image(b64=self.generate_img(cards).pic2bs4()) + result
|
||
|
||
def generate_card_img(self, card: Operator) -> BuildImage:
|
||
sep_w = 5
|
||
sep_h = 5
|
||
star_h = 15
|
||
img_w = 120
|
||
img_h = 120
|
||
font_h = 20
|
||
bg = BuildImage(img_w + sep_w * 2, img_h + font_h + sep_h * 2, color="#EFF2F5")
|
||
star_path = str(self.img_path / "star.png")
|
||
star = BuildImage(star_h, star_h, background=star_path)
|
||
img_path = str(self.img_path / f"{cn2py(card.name)}.png")
|
||
img = BuildImage(img_w, img_h, background=img_path)
|
||
bg.paste(img, (sep_w, sep_h), alpha=True)
|
||
for i in range(card.star):
|
||
bg.paste(star, (sep_w + img_w - 5 - star_h * (i + 1), sep_h), alpha=True)
|
||
# 加名字
|
||
text = card.name[:7] + "..." if len(card.name) > 8 else card.name
|
||
font = load_font(fontsize=16)
|
||
text_w, text_h = font.getsize(text)
|
||
draw = ImageDraw.Draw(bg.markImg)
|
||
draw.text(
|
||
(sep_w + (img_w - text_w) / 2, sep_h + img_h + (font_h - text_h) / 2),
|
||
text,
|
||
font=font,
|
||
fill="gray",
|
||
)
|
||
return bg
|
||
|
||
def _init_data(self):
|
||
self.ALL_OPERATOR = [
|
||
Operator(
|
||
name=value["名称"],
|
||
star=int(value["星级"]),
|
||
limited="干员寻访" not in value["获取途径"],
|
||
recruit_only=True
|
||
if "干员寻访" not in value["获取途径"] and "公开招募" in value["获取途径"]
|
||
else False,
|
||
event_only=True if "活动获取" in value["获取途径"] else False,
|
||
)
|
||
for key, value in self.load_data().items()
|
||
if "阿米娅" not in key
|
||
]
|
||
self.load_up_char()
|
||
|
||
def load_up_char(self):
|
||
try:
|
||
data = self.load_data(f"draw_card_up/{self.game_name}_up_char.json")
|
||
self.UP_EVENT = UpEvent.parse_obj(data.get("char", {}))
|
||
except ValidationError:
|
||
logger.warning(f"{self.game_name}_up_char 解析出错")
|
||
|
||
def dump_up_char(self):
|
||
if self.UP_EVENT:
|
||
data = {"char": json.loads(self.UP_EVENT.json())}
|
||
self.dump_data(data, f"draw_card_up/{self.game_name}_up_char.json")
|
||
|
||
async def _update_info(self):
|
||
info = {}
|
||
url = "https://wiki.biligame.com/arknights/干员数据表"
|
||
result = await self.get_url(url)
|
||
if not result:
|
||
logger.warning(f"更新 {self.game_name_cn} 出错")
|
||
return
|
||
dom = etree.HTML(result, etree.HTMLParser())
|
||
char_list = dom.xpath("//table[@id='CardSelectTr']/tbody/tr")
|
||
for char in char_list:
|
||
try:
|
||
avatar = char.xpath("./td[1]/div/div/div/a/img/@srcset")[0]
|
||
name = char.xpath("./td[2]/a/text()")[0]
|
||
star = char.xpath("./td[5]/text()")[0]
|
||
sources = str(char.xpath("./td[8]/text()")[0]).split("\n")
|
||
except IndexError:
|
||
continue
|
||
member_dict = {
|
||
"头像": unquote(str(avatar).split(" ")[-2]),
|
||
"名称": remove_prohibited_str(str(name).strip()),
|
||
"星级": int(str(star).strip()),
|
||
"获取途径": [s for s in sources if s],
|
||
}
|
||
info[member_dict["名称"]] = member_dict
|
||
self.dump_data(info)
|
||
logger.info(f"{self.game_name_cn} 更新成功")
|
||
# 下载头像
|
||
for value in info.values():
|
||
await self.download_img(value["头像"], value["名称"])
|
||
# 下载星星
|
||
await self.download_img(
|
||
"https://patchwiki.biligame.com/images/pcr/0/02/s75ys2ecqhu2xbdw1wf1v9ccscnvi5g.png",
|
||
"star",
|
||
)
|
||
await self.update_up_char()
|
||
|
||
async def update_up_char(self):
|
||
announcement_url = "https://ak.hypergryph.com/news.html"
|
||
result = await self.get_url(announcement_url)
|
||
if not result:
|
||
logger.warning(f"{self.game_name_cn}获取公告出错")
|
||
return
|
||
dom = etree.HTML(result, etree.HTMLParser())
|
||
activity_urls = dom.xpath(
|
||
"//ol[@class='articleList' and @data-category-key='ACTIVITY']/li/a/@href"
|
||
)
|
||
start_time = None
|
||
end_time = None
|
||
up_chars = []
|
||
pool_img = ""
|
||
title = ""
|
||
for activity_url in activity_urls:
|
||
activity_url = f"https://ak.hypergryph.com{activity_url}"
|
||
result = await self.get_url(activity_url)
|
||
if not result:
|
||
logger.warning(f"{self.game_name_cn}获取公告 {activity_url} 出错")
|
||
continue
|
||
soup = BeautifulSoup(result, "lxml")
|
||
contents = soup.find_all("p")
|
||
for index, content in enumerate(contents):
|
||
if re.search("(.*)(寻访|复刻).*?开启", content.text):
|
||
title = content.text
|
||
if "【" in title and "】" in title:
|
||
title = re.split(r"[【】]", title)[1]
|
||
lines = [str(contents[index + i + 1].text) for i in range(5)]
|
||
time = ""
|
||
chars: List[str] = []
|
||
for line in lines:
|
||
match = re.search(
|
||
r"(\d{1,2}月\d{1,2}日.*?-.*?\d{1,2}月\d{1,2}日.*?$)", line
|
||
)
|
||
if match:
|
||
time = match.group(1)
|
||
if "★" in line:
|
||
chars.append(line)
|
||
if not time:
|
||
continue
|
||
start, end = time.replace("月", "/").replace("日", "").split("-")[:2]
|
||
start_time = dateparser.parse(start)
|
||
end_time = dateparser.parse(end)
|
||
pool_img = content.find_previous("img")["src"]
|
||
for char in chars:
|
||
star = char.split("(")[0].count("★")
|
||
name = re.split(r"[:(]", char)[1]
|
||
names = name.split("/") if "/" in name else [name]
|
||
names = [name.replace("[限定]", "").strip() for name in names]
|
||
if "权值" in char:
|
||
match = re.search(r"(在.*?以.*?(\d+).*?倍权值.*?)", char)
|
||
else:
|
||
match = re.search(r"(占.*?的.*?(\d+).*?%)", char)
|
||
zoom = 1
|
||
if match:
|
||
zoom = float(match.group(1))
|
||
zoom = zoom / 100 if zoom > 10 else zoom
|
||
for name in names:
|
||
up_chars.append(
|
||
UpChar(name=name, star=star, limited=False, zoom=zoom)
|
||
)
|
||
break
|
||
if title and start_time and end_time:
|
||
if start_time <= datetime.now() <= end_time:
|
||
self.UP_EVENT = UpEvent(
|
||
title=title,
|
||
pool_img=pool_img,
|
||
start_time=start_time,
|
||
end_time=end_time,
|
||
up_char=up_chars,
|
||
)
|
||
self.dump_up_char()
|
||
logger.info(f"成功获取{self.game_name_cn}当前up信息...当前up池: {title}")
|
||
break
|
||
|
||
async def _reload_pool(self) -> Optional[Message]:
|
||
await self.update_up_char()
|
||
self.load_up_char()
|
||
if self.UP_EVENT:
|
||
return f"重载成功!\n当前UP池子:{self.UP_EVENT.title}" + image(
|
||
self.UP_EVENT.pool_img
|
||
)
|