zhenxun_bot/zhenxun/plugins/draw_card/handles/genshin_handle.py

466 lines
19 KiB
Python
Raw Normal View History

2024-07-28 03:37:37 +08:00
import random
from datetime import datetime, timedelta
from urllib.parse import unquote
import dateparser
import ujson as json
from lxml import etree
from nonebot_plugin_saa import Image as SaaImage
from nonebot_plugin_saa import MessageFactory, Text
from PIL import Image, ImageDraw
from pydantic import ValidationError
from zhenxun.services.log import logger
from zhenxun.utils.image_utils import BuildImage
from ..config import draw_config
from ..count_manager import GenshinCountManager
from ..util import cn2py, load_font, remove_prohibited_str
from .base_handle import BaseData, BaseHandle, UpChar, UpEvent
class GenshinData(BaseData):
pass
class GenshinChar(GenshinData):
pass
class GenshinArms(GenshinData):
pass
class GenshinHandle(BaseHandle[GenshinData]):
def __init__(self):
super().__init__("genshin", "原神")
self.data_files.append("genshin_arms.json")
self.max_star = 5
self.game_card_color = "#ebebeb"
self.config = draw_config.genshin
self.ALL_CHAR: list[GenshinData] = []
self.ALL_ARMS: list[GenshinData] = []
self.UP_CHAR: UpEvent | None = None
self.UP_CHAR_LIST: UpEvent | None = []
self.UP_ARMS: UpEvent | None = None
self.count_manager = GenshinCountManager((10, 90), ("4", "5"), 180)
# 抽取卡池
def get_card(
self,
pool_name: str,
mode: int = 1,
add: float = 0.0,
is_up: bool = False,
card_index: int = 0,
):
"""
mode 1普通抽 2四星保底 3五星保底
"""
if mode == 1:
star = self.get_star(
[5, 4, 3],
[
self.config.GENSHIN_FIVE_P + add,
self.config.GENSHIN_FOUR_P,
self.config.GENSHIN_THREE_P,
],
)
elif mode == 2:
star = self.get_star(
[5, 4],
[self.config.GENSHIN_G_FIVE_P + add, self.config.GENSHIN_G_FOUR_P],
)
else:
star = 5
if pool_name == "char":
up_event = self.UP_CHAR_LIST[card_index]
all_list = self.ALL_CHAR + [
x for x in self.ALL_ARMS if x.star == star and x.star < 5
]
elif pool_name == "arms":
up_event = self.UP_ARMS
all_list = self.ALL_ARMS + [
x for x in self.ALL_CHAR if x.star == star and x.star < 5
]
else:
up_event = None
all_list = self.ALL_ARMS + self.ALL_CHAR
acquire_char = None
# 是否UP
if up_event and star > 3:
# 获取up角色列表
up_list = [x.name for x in up_event.up_char if x.star == star]
# 成功获取up角色
if random.random() < 0.5 or is_up:
up_name = random.choice(up_list)
try:
acquire_char = [x for x in all_list if x.name == up_name][0]
except IndexError:
pass
if not acquire_char:
chars = [x for x in all_list if x.star == star and not x.limited]
acquire_char = random.choice(chars)
return acquire_char
def get_cards(
self, count: int, user_id: int, pool_name: str, card_index: int = 0
) -> list[tuple[GenshinData, int]]:
card_list = [] # 获取角色列表
add = 0.0
count_manager = self.count_manager
count_manager.check_count(user_id, count) # 检查次数累计
pool = self.UP_CHAR_LIST[card_index] if pool_name == "char" else self.UP_ARMS
for i in range(count):
count_manager.increase(user_id)
star = count_manager.check(user_id) # 是否有四星或五星保底
if (
count_manager.get_user_count(user_id)
- count_manager.get_user_five_index(user_id)
) % count_manager.get_max_guarantee() >= 72:
add += draw_config.genshin.I72_ADD
if star:
if star == 4:
card = self.get_card(pool_name, 2, add=add, card_index=card_index)
else:
card = self.get_card(
pool_name,
3,
add,
count_manager.is_up(user_id),
card_index=card_index,
)
else:
card = self.get_card(
pool_name,
1,
add,
count_manager.is_up(user_id),
card_index=card_index,
)
# print(f"{count_manager.get_user_count(user_id)}",
# count_manager.get_user_five_index(user_id), star, card.star, add)
# 四星角色
if card.star == 4:
count_manager.mark_four_index(user_id)
# 五星角色
elif card.star == self.max_star:
add = 0
count_manager.mark_five_index(user_id) # 记录五星保底
count_manager.mark_four_index(user_id) # 记录四星保底
if pool and card.name in [
x.name for x in pool.up_char if x.star == self.max_star
]:
count_manager.set_is_up(user_id, True)
else:
count_manager.set_is_up(user_id, False)
card_list.append((card, count_manager.get_user_count(user_id)))
return card_list
async def generate_card_img(self, card: GenshinData) -> BuildImage:
sep_w = 10
sep_h = 5
frame_w = 112
frame_h = 132
img_w = 106
img_h = 106
bg = BuildImage(frame_w + sep_w * 2, frame_h + sep_h * 2, color="#EBEBEB")
frame_path = str(self.img_path / "avatar_frame.png")
frame = Image.open(frame_path)
# 加名字
text = card.name
font = load_font(fontsize=14)
text_w, text_h = BuildImage.get_text_size(text, font)
draw = ImageDraw.Draw(frame)
draw.text(
((frame_w - text_w) / 2, frame_h - 15 - text_h / 2),
text,
font=font,
fill="gray",
)
img_path = str(self.img_path / f"{cn2py(card.name)}.png")
img = BuildImage(img_w, img_h, background=img_path)
if isinstance(card, GenshinArms):
# 武器卡背景不是透明的,切去上方两个圆弧
r = 12
circle = Image.new("L", (r * 2, r * 2), 0)
alpha = Image.new("L", img.size, 255)
alpha.paste(circle, (-r - 3, -r - 3)) # 左上角
alpha.paste(circle, (img_h - r + 3, -r - 3)) # 右上角
img.markImg.putalpha(alpha)
star_path = str(self.img_path / f"{card.star}_star.png")
star = Image.open(star_path)
await bg.paste(frame, (sep_w, sep_h))
await bg.paste(img, (sep_w + 3, sep_h + 3))
await bg.paste(star, (sep_w + int((frame_w - star.width) / 2), sep_h - 6))
return bg
def format_pool_info(self, pool_name: str, card_index: int = 0) -> str:
info = ""
up_event = None
if pool_name == "char":
up_event = self.UP_CHAR_LIST[card_index]
elif pool_name == "arms":
up_event = self.UP_ARMS
if up_event:
star5_list = [x.name for x in up_event.up_char if x.star == 5]
star4_list = [x.name for x in up_event.up_char if x.star == 4]
if star5_list:
info += f"五星UP{' '.join(star5_list)}\n"
if star4_list:
info += f"四星UP{' '.join(star4_list)}\n"
info = f"当前up池{up_event.title}\n{info}"
return info.strip()
async def draw(
self, count: int, user_id: int, pool_name: str = "", **kwargs
) -> Text | MessageFactory:
card_index = 0
if "1" in pool_name:
card_index = 1
pool_name = pool_name.replace("1", "")
index2cards = self.get_cards(count, user_id, pool_name, card_index)
cards = [card[0] for card in index2cards]
up_event = None
if pool_name == "char":
if card_index == 1 and len(self.UP_CHAR_LIST) == 1:
return Text("当前没有第二个角色UP池")
up_event = self.UP_CHAR_LIST[card_index]
elif pool_name == "arms":
up_event = self.UP_ARMS
up_list = [x.name for x in up_event.up_char] if up_event else []
result = self.format_star_result(cards)
result += (
"\n" + max_star_str
if (max_star_str := self.format_max_star(index2cards, up_list=up_list))
else ""
)
result += f"\n距离保底发还剩 {self.count_manager.get_user_guarantee_count(user_id)}"
# result += "\n【五星0.6%四星5.1%第72抽开始五星概率每抽加0.585%】"
pool_info = self.format_pool_info(pool_name, card_index)
img = await self.generate_img(cards)
bk = BuildImage(img.width, img.height + 50, font_size=20, color="#ebebeb")
await bk.paste(img)
await bk.text(
(0, img.height + 10),
"【五星0.6%四星5.1%第72抽开始五星概率每抽加0.585%",
)
return MessageFactory([Text(pool_info), SaaImage(bk.pic2bytes()), Text(result)])
def _init_data(self):
self.ALL_CHAR = [
GenshinChar(
name=value["名称"],
star=int(value["星级"]),
limited=value["常驻/限定"] == "限定UP",
)
for key, value in self.load_data().items()
if "旅行者" not in key
]
self.ALL_ARMS = [
GenshinArms(
name=value["名称"],
star=int(value["星级"]),
limited="祈愿" not in value["获取途径"],
)
for value in self.load_data("genshin_arms.json").values()
]
self.load_up_char()
def load_up_char(self):
try:
data = self.load_data(f"draw_card_up/{self.game_name}_up_char.json")
self.UP_CHAR_LIST.append(UpEvent.parse_obj(data.get("char", {})))
self.UP_CHAR_LIST.append(UpEvent.parse_obj(data.get("char1", {})))
self.UP_ARMS = UpEvent.parse_obj(data.get("arms", {}))
except ValidationError:
logger.warning(f"{self.game_name}_up_char 解析出错")
def dump_up_char(self):
if self.UP_CHAR_LIST and self.UP_ARMS:
data = {
"char": json.loads(self.UP_CHAR_LIST[0].json()),
"arms": json.loads(self.UP_ARMS.json()),
}
if len(self.UP_CHAR_LIST) > 1:
data["char1"] = json.loads(self.UP_CHAR_LIST[1].json())
self.dump_data(data, f"draw_card_up/{self.game_name}_up_char.json")
async def _update_info(self):
# genshin.json
char_info = {}
url = "https://wiki.biligame.com/ys/角色筛选"
result = await self.get_url(url)
if not result:
logger.warning(f"更新 {self.game_name_cn} 出错")
else:
dom = etree.HTML(result, etree.HTMLParser())
char_list = dom.xpath("//table[@id='CardSelectTr']/tbody/tr")
for char in char_list:
try:
name = char.xpath("./td[1]/a/@title")[0]
avatar = char.xpath("./td[1]/a/img/@srcset")[0]
star = char.xpath("./td[3]/text()")[0]
except IndexError:
continue
member_dict = {
"头像": unquote(str(avatar).split(" ")[-2]),
"名称": remove_prohibited_str(name),
"星级": int(str(star).strip()[:1]),
}
char_info[member_dict["名称"]] = member_dict
# 更新额外信息
for key in char_info.keys():
result = await self.get_url(f"https://wiki.biligame.com/ys/{key}")
if not result:
char_info[key]["常驻/限定"] = "未知"
logger.warning(f"{self.game_name_cn} 获取额外信息错误 {key}")
continue
try:
dom = etree.HTML(result, etree.HTMLParser())
limit = dom.xpath(
"//table[contains(string(.),'常驻/限定')]/tbody/tr[6]/td/text()"
)[0]
char_info[key]["常驻/限定"] = str(limit).strip()
except IndexError:
char_info[key]["常驻/限定"] = "未知"
logger.warning(f"{self.game_name_cn} 获取额外信息错误 {key}")
self.dump_data(char_info)
logger.info(f"{self.game_name_cn} 更新成功")
# genshin_arms.json
arms_info = {}
url = "https://wiki.biligame.com/ys/武器图鉴"
result = await self.get_url(url)
if not result:
logger.warning(f"更新 {self.game_name_cn} 出错")
else:
dom = etree.HTML(result, etree.HTMLParser())
char_list = dom.xpath("//table[@id='CardSelectTr']/tbody/tr")
for char in char_list:
try:
name = char.xpath("./td[1]/a/@title")[0]
avatar = char.xpath("./td[1]/a/img/@srcset")[0]
star = char.xpath("./td[4]/img/@alt")[0]
sources = str(char.xpath("./td[5]/text()")[0]).split(",")
except IndexError:
continue
member_dict = {
"头像": unquote(str(avatar).split(" ")[-2]),
"名称": remove_prohibited_str(name),
"星级": int(str(star).strip()[:1]),
"获取途径": [s.strip() for s in sources if s.strip()],
}
arms_info[member_dict["名称"]] = member_dict
self.dump_data(arms_info, "genshin_arms.json")
logger.info(f"{self.game_name_cn} 武器更新成功")
# 下载头像
for value in char_info.values():
await self.download_img(value["头像"], value["名称"])
for value in arms_info.values():
await self.download_img(value["头像"], value["名称"])
# 下载星星
idx = 1
YS_URL = "https://patchwiki.biligame.com/images/ys"
for url in [
"/1/13/7xzg7tgf8dsr2hjpmdbm5gn9wvzt2on.png",
"/b/bc/sd2ige6d7lvj7ugfumue3yjg8gyi0d1.png",
"/e/ec/l3mnhy56pyailhn3v7r873htf2nofau.png",
"/9/9c/sklp02ffk3aqszzvh8k1c3139s0awpd.png",
"/c/c7/qu6xcndgj6t14oxvv7yz2warcukqv1m.png",
]:
await self.download_img(YS_URL + url, f"{idx}_star")
idx += 1
# 下载头像框
await self.download_img(
YS_URL + "/2/2e/opbcst4xbtcq0i4lwerucmosawn29ti.png", f"avatar_frame"
)
await self.update_up_char()
async def update_up_char(self):
self.UP_CHAR_LIST = []
url = "https://wiki.biligame.com/ys/祈愿"
result = await self.get_url(url)
if not result:
logger.warning(f"{self.game_name_cn}获取祈愿页面出错")
return
dom = etree.HTML(result, etree.HTMLParser())
tables = dom.xpath(
"//div[@class='mw-parser-output']/div[@class='row']/div/table[@class='wikitable']/tbody"
)
if not tables or len(tables) < 2:
logger.warning(f"{self.game_name_cn}获取活动祈愿出错")
return
try:
for index, table in enumerate(tables):
title = table.xpath("./tr[1]/th/img/@title")[0]
title = str(title).split("")[0] + "" if "" in title else title
pool_img = str(table.xpath("./tr[1]/th/img/@srcset")[0]).split(" ")[-2]
time = table.xpath("./tr[2]/td/text()")[0]
star5_list = table.xpath("./tr[3]/td/a/@title")
star4_list = table.xpath("./tr[4]/td/a/@title")
start, end = str(time).split("~")
start_time = dateparser.parse(start)
end_time = dateparser.parse(end)
if not start_time and end_time:
start_time = end_time - timedelta(days=20)
if start_time and end_time and start_time <= datetime.now() <= end_time:
up_event = UpEvent(
title=title,
pool_img=pool_img,
start_time=start_time,
end_time=end_time,
up_char=[
UpChar(name=name, star=5, limited=False, zoom=50)
for name in star5_list
]
+ [
UpChar(name=name, star=4, limited=False, zoom=50)
for name in star4_list
],
)
if "神铸赋形" not in title:
self.UP_CHAR_LIST.append(up_event)
else:
self.UP_ARMS = up_event
if self.UP_CHAR_LIST and self.UP_ARMS:
self.dump_up_char()
char_title = " & ".join([x.title for x in self.UP_CHAR_LIST])
logger.info(
f"成功获取{self.game_name_cn}当前up信息...当前up池: {char_title} & {self.UP_ARMS.title}"
)
except Exception as e:
logger.warning(f"{self.game_name_cn}UP更新出错", e=e)
def reset_count(self, user_id: str) -> bool:
self.count_manager.reset(user_id)
return True
async def _reload_pool(self) -> MessageFactory | None:
await self.update_up_char()
self.load_up_char()
if self.UP_CHAR_LIST and self.UP_ARMS:
if len(self.UP_CHAR_LIST) > 1:
return MessageFactory(
[
Text(
f"重载成功!\n当前UP池子{self.UP_CHAR_LIST[0].title} & {self.UP_CHAR_LIST[1].title} & {self.UP_ARMS.title}"
),
Image(self.UP_CHAR_LIST[0].pool_img),
Image(self.UP_CHAR_LIST[1].pool_img),
Image(self.UP_ARMS.pool_img),
]
)
return MessageFactory(
[
Text(
f"重载成功!\n当前UP池子{char_title} & {self.UP_ARMS.title}"
),
Image(self.UP_CHAR_LIST[0].pool_img),
Image(self.UP_ARMS.pool_img),
]
)