Delete plugins/draw_card directory

This commit is contained in:
HibiKier 2022-02-27 17:29:57 +08:00 committed by GitHub
parent 3c8983e753
commit 222e1719a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 0 additions and 3525 deletions

View File

@ -1,413 +0,0 @@
from nonebot import on_regex, on_keyword
from nonebot.adapters.onebot.v11 import Bot, MessageEvent, Message
from nonebot.permission import SUPERUSER
from nonebot.typing import T_State
from nonebot.params import RegexGroup
from utils.utils import scheduler
from .genshin_handle import genshin_draw, update_genshin_info, reset_count, reload_genshin_pool
from .prts_handle import update_prts_info, prts_draw, reload_prts_pool
from .pretty_handle import update_pretty_info, pretty_draw, reload_pretty_pool
from .guardian_handle import update_guardian_info, guardian_draw, reload_guardian_pool
from .pcr_handle import update_pcr_info, pcr_draw
from .azur_handle import update_azur_info, azur_draw
from .fgo_handle import update_fgo_info, fgo_draw
from .onmyoji_handle import update_onmyoji_info, onmyoji_draw
from .update_game_info import update_info
from .util import is_number, check_num
from .rule import is_switch
from .config import draw_config
from .async_update_game_info import async_update_game
from typing import Tuple, Any
import re
import asyncio
__zx_plugin_name__ = "游戏抽卡"
__plugin_usage__ = """
usage
模拟赛马娘原神明日方舟坎公骑冠剑公主连结(/)碧蓝航线FGO阴阳师进行抽卡
指令
原神[1-300]: 原神常驻池
原神角色[1-300]: 原神角色UP池子
原神武器[1-300]: 原神武器UP池子
重置原神抽卡: 清空当前卡池的抽卡次数[即从0开始计算UP概率]
方舟[1-300]: 方舟卡池当有当期UP时指向UP池
赛马娘[1-200]: 赛马娘卡池当有当期UP时指向UP池
坎公骑冠剑[1-300]: 坎公骑冠剑卡池当有当期UP时指向UP池
pcr/公主连接[1-300]: 公主连接卡池
碧蓝航线/碧蓝[重型/轻型/特型][1-300]: 碧蓝航线重型/轻型/特型卡池
fgo[1-300]: fgo卡池
阴阳师[1-300]: 阴阳师卡池
* 以上指令可以通过 XX一井 来指定最大抽取数量 *
* 示例原神一井 *
""".strip()
__plugin_superuser_usage__ = """
usage
卡池方面的更新
指令
更新方舟信息
重载方舟卡池
更新原神信息
重载原神卡池
更新赛马娘信息
重载赛马娘卡池
更新坎公骑冠剑信息
更新碧蓝航线信息
更新fgo信息
更新阴阳师信息
""".strip()
__plugin_des__ = "就算是模拟抽卡也不能改变自己是个非酋"
__plugin_cmd__ = [
"原神[1-300]抽",
"原神角色[1-300]抽",
"原神武器[1-300]抽",
"重置原神抽卡",
"方舟[1-300]抽",
"赛马娘[1-200]抽",
"坎公骑冠剑[1-300]抽",
"pcr/公主连接[1-300]抽",
"fgo[1-300]抽",
"阴阳师[1-300]抽",
"更新方舟信息 [_superuser]",
"重载方舟卡池 [_superuser]",
"更新原神信息 [_superuser]",
"重载原神卡池 [_superuser]",
"更新赛马娘信息 [_superuser]",
"重载赛马娘卡池 [_superuser]",
"更新坎公骑冠剑信息 [_superuser]",
"更新碧蓝航线信息 [_superuser]",
"更新fgo信息 [_superuser]",
"更新阴阳师信息 [_superuser]",
]
__plugin_type__ = ("抽卡相关", 1)
__plugin_version__ = 0.1
__plugin_author__ = "HibiKier"
__plugin_settings__ = {
"level": 5,
"default_status": True,
"limit_superuser": False,
"cmd": ["游戏抽卡", "抽卡"],
}
prts = on_regex(r'.*?方舟([1-9|一][0-9]{0,2}[抽|井|连])', rule=is_switch('prts'), priority=5, block=True)
prts_update = on_keyword({'更新方舟信息', '更新明日方舟信息'}, permission=SUPERUSER, priority=1, block=True)
prts_up_reload = on_keyword({'重载方舟卡池'}, priority=1, block=True)
genshin = on_regex(r'.*?原神(武器|角色)?池?[1-9|一][0-9]{0,2}[抽|井|连]', rule=is_switch('genshin'), priority=5, block=True)
genshin_update = on_keyword({'更新原神信息'}, permission=SUPERUSER, priority=1, block=True)
genshin_reset = on_keyword({'重置原神抽卡'}, priority=1, block=True)
genshin_up_reload = on_keyword({'重载原神卡池'}, priority=1, block=True)
pretty = on_regex(r'.*?马娘卡?[1-9|一][0-9]{0,2}[抽|井|连]', rule=is_switch('pretty'), priority=5, block=True)
pretty_update = on_keyword({'更新马娘信息', '更新赛马娘信息'}, permission=SUPERUSER, priority=1, block=True)
pretty_up_reload = on_keyword({'重载赛马娘卡池'}, priority=1, block=True)
guardian = on_regex(r'.*?坎公骑冠剑武?器?[1-9|一][0-9]{0,2}[抽|井|连]', rule=is_switch('guardian'), priority=5, block=True)
guardian_update = on_keyword({'更新坎公骑冠剑信息'}, permission=SUPERUSER, priority=1, block=True)
guardian_up_reload = on_keyword({'重载坎公骑冠剑卡池'}, priority=1, block=True)
pcr = on_regex(r'.*?(pcr|公主连结|公主连接|公主链接|公主焊接)[1-9|一][0-9]{0,2}[抽|井|连]', rule=is_switch('pcr'), priority=5, block=True)
pcr_update = on_keyword({'更新pcr信息', '更新公主连结信息'}, permission=SUPERUSER, priority=1, block=True)
azur = on_regex(r'.*?碧蓝航?线?(轻型|重型|特型)池?[1-9|一][0-9]{0,2}[抽|连]', rule=is_switch('azur'), priority=5, block=True)
azur_update = on_keyword({'更新碧蓝信息', '更新碧蓝航线信息'}, permission=SUPERUSER, priority=1, block=True)
fgo = on_regex(r'.*?fgo[1-9|一][0-9]{0,2}[抽|连]', rule=is_switch('fgo'), priority=5, block=True)
fgo_update = on_keyword({'更新fgo信息'}, permission=SUPERUSER, priority=1, block=True)
onmyoji = on_regex(r'.*?阴阳师[1-9|一][0-9]{0,2}[抽|连]', rule=is_switch('onmyoji'), priority=5, block=True)
onmyoji_update = on_keyword({'更新阴阳师信息'}, permission=SUPERUSER, priority=1, block=True)
@prts.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State, reg: Tuple[Any, ...] = RegexGroup()):
msg = str(event.get_message()).strip()
if msg in ['方舟一井', '方舟1井']:
num = 300
else:
rmsg = re.search(r'.*?方舟(.*)[抽|连]', msg)
if rmsg:
num, flag = check_num(rmsg.group(1), 300)
if not flag:
await prts.finish(num, at_sender=True)
else:
return
await prts.send(await prts_draw(int(num)), at_sender=True)
@prts_up_reload.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
text = await reload_prts_pool()
await prts_up_reload.finish(Message(f'重载完成!\n{text}'))
@genshin.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.get_message()).strip()
rmsg = re.search(r'.*?原神(武器|角色)?池?(.*)[抽|井|连]', msg)
if rmsg:
pool_name = rmsg.group(1)
if pool_name == '武器':
pool_name = 'arms'
elif pool_name == '角色':
pool_name = 'char'
else:
pool_name = ''
num = rmsg.group(2)
if msg.find('一井') != -1 or msg.find('1井') != -1:
num = 180
else:
num, flag = check_num(num, 180)
if not flag:
await genshin.finish(num, at_sender=True)
else:
return
await genshin.send(await genshin_draw(event.user_id, int(num), pool_name), at_sender=True)
@genshin_up_reload.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
text = await reload_genshin_pool()
await genshin_reset.finish(Message(f'重载成功!\n{text}'))
@genshin_reset.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
reset_count(event.user_id)
await genshin_reset.send('重置了原神抽卡次数', at_sender=True)
@pretty.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.get_message()).strip()
if msg.find('1井') != -1 or msg.find('一井') != -1:
num = 200
if msg.find("") == -1:
pool_name = 'char'
else:
pool_name = 'card'
else:
rmsg = re.search(r'.*?马娘(.*)[抽|连]', msg)
if rmsg:
num = rmsg.group(1)
if num[0] == '':
num = num[1:]
pool_name = 'card'
else:
pool_name = 'char'
num, flag = check_num(num, 200)
if not flag:
await pretty.finish(num, at_sender=True)
else:
return
await pretty.send(await pretty_draw(int(num), pool_name), at_sender=True)
@pretty_up_reload.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
text = await reload_pretty_pool()
await genshin_reset.finish(Message(f'重载成功!\n{text}'))
@guardian.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.get_message()).strip()
pool_name = 'char'
if msg.find('1井') != -1 or msg.find('一井') != -1:
num = 300
if msg.find('武器') != -1:
pool_name = 'arms'
else:
rmsg = re.search(r'.*?坎公骑冠剑(.*)[抽|连]', msg)
if rmsg:
num = rmsg.group(1)
if num.find('武器') != -1:
pool_name = 'arms'
num = num.replace('武器', '')
num, flag = check_num(num, 300)
if not flag:
await guardian.finish(num, at_sender=True)
else:
return
await guardian.send(await guardian_draw(int(num), pool_name), at_sender=True)
@guardian_up_reload.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
text = await reload_guardian_pool()
await genshin_reset.finish(Message(f'重载成功!\n{text}'))
@pcr.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.get_message()).strip()
if msg.find('1井') != -1 or msg.find('一井') != -1:
num = 300
else:
rmsg = re.search(r'.*?(pcr|公主连结)(.*)[抽|井|连]', msg)
if rmsg:
num, flag = check_num(rmsg.group(2), 300)
if not flag:
await pcr.finish(num, at_sender=True)
else:
return
await pcr.send(await pcr_draw(int(num)), at_sender=True)
@azur.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.get_message()).strip()
rmsg = re.search(r'.*?碧蓝航?线?(轻型|重型|特型)池?(.*)[抽|连]', msg)
if rmsg:
pool_name = rmsg.group(1)
num, flag = check_num(rmsg.group(2), 300)
if not flag:
await azur.finish(num, at_sender=True)
else:
return
await azur.send(await azur_draw(int(num), pool_name), at_sender=True)
@fgo.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.get_message()).strip()
rmsg = re.search(r'.*?fgo(.*)[抽|连]', msg)
if rmsg:
num, flag = check_num(rmsg.group(1), 300)
if not flag:
await fgo.finish(num, at_sender=True)
else:
return
await fgo.send(await fgo_draw(int(num)), at_sender=True)
@onmyoji.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.get_message()).strip()
rmsg = re.search(r'.*?阴阳师(.*)[抽|连]', msg)
if rmsg:
num, flag = check_num(rmsg.group(1), 300)
if not flag:
await onmyoji.finish(num, at_sender=True)
else:
return
await onmyoji.send(await onmyoji_draw(int(num)), at_sender=True)
@prts_update.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
await update_prts_info()
await prts_update.finish('更新完成!')
@genshin_update.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
await update_genshin_info()
await genshin_update.finish('更新完成!')
@pretty_update.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
await update_pretty_info()
await genshin_update.finish('更新完成!')
@guardian_update.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
await update_guardian_info()
await genshin_update.finish('更新完成!')
@pcr_update.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
await update_pcr_info()
await genshin_update.finish('更新完成!')
@azur_update.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
await update_azur_info()
await genshin_update.finish('更新完成!')
@fgo_update.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
await update_fgo_info()
await genshin_update.finish('更新完成!')
@onmyoji_update.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
await update_onmyoji_info()
await genshin_update.finish('更新完成!')
# 更新资源
@scheduler.scheduled_job(
'cron',
hour=4,
minute=1,
)
async def _():
tasks = []
if draw_config.PRTS_FLAG:
tasks.append(asyncio.ensure_future(update_prts_info()))
if draw_config.GENSHIN_FLAG:
tasks.append(asyncio.ensure_future(update_genshin_info()))
if draw_config.PRETTY_FLAG:
tasks.append(asyncio.ensure_future(update_pretty_info()))
if draw_config.GUARDIAN_FLAG:
tasks.append(asyncio.ensure_future(update_guardian_info()))
if draw_config.PCR_FLAG:
tasks.append(asyncio.ensure_future(update_pcr_info()))
if draw_config.AZUR_FLAG:
tasks.append(asyncio.ensure_future(update_azur_info()))
if draw_config.FGO_FLAG:
tasks.append(asyncio.ensure_future(update_fgo_info()))
if draw_config.ONMYOJI_FLAG:
tasks.append(asyncio.ensure_future(update_onmyoji_info()))
await asyncio.gather(*tasks)
# 每天四点重载方舟up卡池
@scheduler.scheduled_job(
'cron',
hour=4,
minute=1,
)
async def _():
if draw_config.PRTS_FLAG:
await reload_prts_pool()
# 每天四点重载赛马娘up卡池
@scheduler.scheduled_job(
'cron',
hour=4,
minute=1,
)
async def _():
if draw_config.PRETTY_FLAG:
await reload_pretty_pool()
# 每天下午六点点重载原神up卡池
@scheduler.scheduled_job(
'cron',
hour=18,
minute=1,
)
async def _():
if draw_config.PRTS_FLAG:
await reload_genshin_pool()
# 重载坎公骑冠剑卡池
@scheduler.scheduled_job(
'cron',
hour=4,
minute=1,
)
async def _():
if draw_config.GUARDIAN_FLAG:
await reload_guardian_pool()

View File

@ -1,395 +0,0 @@
from bs4 import BeautifulSoup
import re
from datetime import datetime, timedelta
from .config import DRAW_DATA_PATH
from asyncio.exceptions import TimeoutError
from nonebot.log import logger
from utils.http_utils import AsyncHttpx
try:
import ujson as json
except ModuleNotFoundError:
import json
headers = {'User-Agent': '"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; TencentTraveler 4.0)"'}
prts_up_char = DRAW_DATA_PATH / "draw_card_up" / "prts_up_char.json"
genshin_up_char = DRAW_DATA_PATH / "draw_card_up" / "genshin_up_char.json"
pretty_up_char = DRAW_DATA_PATH / "draw_card_up" / "pretty_up_char.json"
guardian_up_char = DRAW_DATA_PATH / "draw_card_up" / "guardian_up_char.json"
prts_url = "https://ak.hypergryph.com/news.html"
genshin_url = "https://wiki.biligame.com/ys/%E7%A5%88%E6%84%BF"
pretty_url = "https://wiki.biligame.com/umamusume/%E5%85%AC%E5%91%8A"
guardian_url = "https://wiki.biligame.com/gt/%E9%A6%96%E9%A1%B5"
# 是否过时
def is_expired(data: dict):
times = data['time'].split('-')
for i in range(len(times)):
times[i] = str(datetime.now().year) + '-' + times[i].split('')[0].strip().replace('', '-')
start_date = datetime.strptime(times[0], '%Y-%m-%d').date()
end_date = datetime.strptime(times[1], '%Y-%m-%d').date()
now = datetime.now().date()
return not start_date <= now <= end_date
# 检查写入
def check_write(data: dict, up_char_file):
try:
if is_expired(data['char']):
for x in list(data.keys()):
data[x]['title'] = ''
else:
with open(up_char_file, 'w', encoding='utf8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
if not up_char_file.exists():
with open(up_char_file, 'w', encoding='utf8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
else:
with open(up_char_file, 'r', encoding='utf8') as f:
old_data = json.load(f)
if is_expired(old_data['char']):
return old_data
else:
with open(up_char_file, 'w', encoding='utf8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
except ValueError:
pass
return data
class PrtsAnnouncement:
def __init__(self):
self.game_name = '明日方舟'
async def _get_announcement_text(self):
res = await AsyncHttpx.get(prts_url, timeout=7)
soup = BeautifulSoup(res.text, 'lxml')
ol = soup.find('ol', {'class': 'articleList active', 'data-category-key': 'LATEST'})
for li in ol:
type_ = li.find('span', {'class': 'articleItemCate'}).text
if type_ == '活动':
a = li.find('a')['href']
return (await AsyncHttpx.get(f'https://ak.hypergryph.com{a}', timeout=7)).text
async def update_up_char(self):
prts_up_char.parent.mkdir(parents=True, exist_ok=True)
if prts_up_char.exists():
with open(prts_up_char, 'r', encoding='utf8') as f:
data = json.load(f)
if not data.get('char'):
prts_up_char.unlink()
try:
data = {'char': {'up_char': {'6': {}, '5': {}, '4': {}}, 'title': '', 'time': '', 'pool_img': ''}}
text = await self._get_announcement_text()
soup = BeautifulSoup(text, 'lxml')
content = soup.find('div', {'class': 'article-content'})
contents = [x for x in content.contents if x.text or str(x).find('img') != -1]
start_index = -1
end_index = -1
for i in range(len(contents)):
if str(contents[i]).startswith('<p>'):
r = re.search('(.*)(寻访|复刻).*?开启', contents[i].text)
if r:
if str(contents[i+3].text).find('') != -1:
img = contents[i-1].find('img')
if img:
data['char']['pool_img'] = img['src']
start_index = i
for j in range(i, len(contents)):
if str(contents[j]).find('注意') != -1:
end_index = j
break
break
contents = contents[start_index: end_index]
title = contents[0].text
data['char']['title'] = title[title.find(''): title.find('') + 1]
data['char']['time'] = str(contents[1].text).split('', maxsplit=1)[1]
for p in contents[2:]:
p = str(p.text)
r = None
if p.find('') != -1:
if p.find('权值') == -1:
r = re.search(r'.*?(.*)(占(.*)★.*?的(.*)%', p)
else:
r = re.search(r'.*?(.*)(在(.*)★.*?以(.*)倍权值.*?', p)
star = r.group(2)
if r:
chars = r.group(1)
if chars.find('/') != -1:
chars = chars.strip().split('/')
elif chars.find('\\') != -1:
chars = chars.strip().split('\\')
else:
chars = chars.split('\n')
chars = [x.replace('[限定]', '').strip() for x in chars]
probability = r.group(3)
probability = probability if int(probability) > 10 else f'{probability}'
for char in chars:
if char.strip():
data['char']['up_char'][star][char.strip()] = probability
except TimeoutError:
logger.warning(f'更新明日方舟UP池信息超时...')
if prts_up_char.exists():
with open(prts_up_char, 'r', encoding='utf8') as f:
data = json.load(f)
except Exception as e:
logger.error(f'更新明日方舟未知错误 e{e}')
if prts_up_char.exists():
with open(prts_up_char, 'r', encoding='utf8') as f:
data = json.load(f)
return check_write(data, prts_up_char)
class GenshinAnnouncement:
def __init__(self):
self.game_name = '原神'
async def _get_announcement_text(self):
return (await AsyncHttpx.get(genshin_url, timeout=7)).text
async def update_up_char(self):
genshin_up_char.parent.mkdir(exist_ok=True, parents=True)
data = {
'char': {'up_char': {'5': {}, '4': {}}, 'title': '', 'time': '', 'pool_img': ''},
'arms': {'up_char': {'5': {}, '4': {}}, 'title': '', 'time': '', 'pool_img': ''}
}
text = await self._get_announcement_text()
soup = BeautifulSoup(text, 'lxml')
try:
div = soup.find_all('div', {'class': 'row'})[1]
tables = div.find_all('table', {'class': 'wikitable'})
for table in tables:
trs = table.find('tbody').find_all('tr')
pool_img = trs[0].find('th').find('img')
if pool_img['title'].find('角色活动') == -1:
type_ = 'arms'
else:
type_ = 'char'
try:
data[type_]['pool_img'] = str(pool_img['srcset']).split(' ')[0]
except KeyError:
data[type_]['pool_img'] = pool_img['src']
data[type_]['title'] = str(pool_img['title']).split(f'{"角色" if type_ == "char" else "武器"}')[0][:-3]
data[type_]['time'] = trs[1].find('td').text
if data[type_]['time'][-1] == '\n':
data[type_]['time'] = data[type_]['time'][:-1]
if '版本更新后' in data[type_]['time']:
sp = data[type_]['time'].split('~')
end_time = datetime.strptime(sp[1].strip(), "%Y/%m/%d %H:%M")
start_time = end_time - timedelta(days=20)
data[type_]['time'] = start_time.strftime('%Y/%m/%d') + ' ~ ' + end_time.strftime('%Y/%m/%d')
tmp = ''
for tm in data[type_]['time'].split('~'):
date_time_sp = tm.split('/')
date_time_sp[2] = date_time_sp[2].strip().replace(' ', '')
tmp += date_time_sp[1] + '' + date_time_sp[2] + ' - '
data[type_]['time'] = tmp[:-2].strip()
for a in trs[2].find('td').find_all('a'):
char_name = a['title']
data[type_]['up_char']['5'][char_name] = "50"
for a in trs[3].find('td').find_all('a'):
char_name = a['title']
data[type_]['up_char']['4'][char_name] = "50"
except TimeoutError:
logger.warning(f'更新原神UP池信息超时...')
if genshin_up_char.exists():
with open(genshin_up_char, 'r', encoding='utf8') as f:
data = json.load(f)
except Exception as e:
logger.error(f'更新原神UP失败疑似UP池已结束 e{e}')
if genshin_up_char.exists():
with open(genshin_up_char, 'r', encoding='utf8') as f:
data = json.load(f)
data['char']['title'] = ''
data['arms']['title'] = ''
with open(genshin_up_char, 'w', encoding='utf8') as wf:
json.dump(data, wf, ensure_ascii=False, indent=4)
return data
return check_write(data, genshin_up_char)
class PrettyAnnouncement:
def __init__(self):
self.game_name = '赛马娘'
async def _get_announcement_text(self):
res = await AsyncHttpx.get(pretty_url, timeout=7)
soup = BeautifulSoup(res.text, 'lxml')
divs = soup.find('div', {'id': 'mw-content-text'}).find('div').find_all('div')
for div in divs:
a = div.find('a')
try:
title = a['title']
except (KeyError, TypeError):
continue
if title.find('新角色追加') != -1:
url = a['href']
break
return (await AsyncHttpx.get(f'https://wiki.biligame.com/{url}', timeout=7)).text, title[:-2]
async def update_up_char(self):
data = {
'char': {'up_char': {'3': {}, '2': {}, '1': {}}, 'title': '', 'time': '', 'pool_img': ''},
'card': {'up_char': {'3': {}, '2': {}, '1': {}}, 'title': '', 'time': '', 'pool_img': ''}
}
try:
text, title = await self._get_announcement_text()
soup = BeautifulSoup(text, 'lxml')
context = soup.find('div', {'class': 'toc-sticky'})
if not context:
context = soup.find('div', {'class': 'mw-parser-output'})
data['char']['title'] = title
data['card']['title'] = title
for big in context.find_all('big'):
r = re.search(r'\d{1,2}/\d{1,2} \d{1,2}:\d{1,2}', str(big.text))
if r:
time = str(big.text)
break
else:
logger.warning('赛马娘UP无法找到活动日期....取消更新UP池子...')
return
time = time.replace('', '-').replace('/', '').split(' ')
time = time[0] + '' + time[1] + ' - ' + time[3] + '' + time[4]
data['char']['time'] = time
data['card']['time'] = time
for p in context.find_all('p'):
if str(p).find('当期UP赛马娘') != -1 and str(p).find('') != -1:
if not data['char']['pool_img']:
try:
data['char']['pool_img'] = p.find('img')['src']
except TypeError:
for center in context.find_all('center'):
try:
img = center.find('img')
if img and str(img['alt']).find('新马娘') != -1 and str(img['alt']).find('总览') == 1:
data['char']['pool_img'] = img['src']
except (TypeError, KeyError):
pass
r = re.findall(r'.*?当期UP赛马娘([\s\S]*)<奖励内容>.*?', str(p))
if r:
for x in r:
x = str(x).split('\n')
for msg in x:
if msg.find('') != -1:
msg = msg.replace('<br/>', '')
char_name = msg[msg.find('['):].strip()
if (star := len(msg[:msg.find('[')].strip())) == 3:
data['char']['up_char']['3'][char_name] = '70'
elif star == 2:
data['char']['up_char']['2'][char_name] = '70'
elif star == 1:
data['char']['up_char']['1'][char_name] = '70'
if str(p).find('当期UP对象') != -1 and str(p).find('赛马娘') == -1 and str(p).find('') != -1:
# data['card']['pool_img'] = p.find('img')['src']
if not data['char']['pool_img']:
try:
data['char']['pool_img'] = p.find('img')['src']
except TypeError:
for center in context.find_all('center'):
try:
img = center.find('img')
if img and str(img['alt']).find('新卡') != -1 and str(img['alt']).find('总览') == 1:
data['card']['pool_img'] = img['src']
except (TypeError, KeyError):
pass
r = re.search(r'■全?新?支援卡当期UP对象([\s\S]*)</p>', str(p))
if r:
rmsg = r.group(1).strip()
rmsg = rmsg.split('<br/>')
rmsg = [x for x in rmsg if x]
for x in rmsg:
x = x.replace('\n', '').replace('', '')
star = x[:x.find('[')].strip()
char_name = x[x.find('['):].strip()
if star == 'SSR':
data['card']['up_char']['3'][char_name] = '70'
if star == 'SR':
data['card']['up_char']['2'][char_name] = '70'
if star == 'R':
data['card']['up_char']['1'][char_name] = '70'
# 日文->中文
with open(DRAW_DATA_PATH / 'pretty_card.json', 'r', encoding='utf8') as f:
all_data = json.load(f)
for star in data['card']['up_char'].keys():
for name in list(data['card']['up_char'][star].keys()):
char_name = name.split(']')[1].strip()
tp_name = name[name.find('['): name.find(']') + 1].strip().replace('[', '').replace(']', '')
for x in all_data.keys():
if all_data[x]['名称'].find(tp_name) != -1 and all_data[x]['关联角色'] == char_name:
data['card']['up_char'][star].pop(name)
data['card']['up_char'][star][all_data[x]['中文名']] = '70'
except TimeoutError:
logger.warning(f'更新赛马娘UP池信息超时...')
if pretty_up_char.exists():
with open(pretty_up_char, 'r', encoding='utf8') as f:
data = json.load(f)
except Exception as e:
logger.error(f'赛马娘up更新未知错误 {type(e)}{e}')
if pretty_up_char.exists():
with open(pretty_up_char, 'r', encoding='utf8') as f:
data = json.load(f)
return check_write(data, pretty_up_char)
class GuardianAnnouncement:
def __init__(self):
self.game_name = '坎公骑冠剑'
async def _get_announcement_text(self):
return (await AsyncHttpx.get(guardian_url, timeout=7)).text
async def update_up_char(self):
data = {
'char': {'up_char': {'3': {}}, 'title': '', 'time': '', 'pool_img': ''},
'arms': {'up_char': {'5': {}}, 'title': '', 'time': '', 'pool_img': ''}
}
try:
text = await self._get_announcement_text()
soup = BeautifulSoup(text, 'lxml')
context = soup.select('div.col-sm-3:nth-child(3) > div:nth-child(2) > div:nth-child(1) '
'> div:nth-child(2) > div:nth-child(3) > font:nth-child(1)')[0]
title = context.find('p').find('b').text
tmp = title.split('')
time = ''
for msg in tmp:
r = re.search(r'[从|至](.*)(开始|结束)', msg)
if r:
time += r.group(1).strip() + ' - '
time = time[:-3]
title = time.split(' - ')[0] + 'UP卡池'
data['char']['title'] = title
data['arms']['title'] = title
data['char']['time'] = time
data['arms']['time'] = time
start_idx = -1
end_idx = -1
index = 0
divs = context.find_all('div')
for x in divs:
if x.text == '角色':
start_idx = index
if x.text == '武器':
end_idx = index
break
index += 1
for x in divs[start_idx + 1: end_idx]:
name = x.find('p').find_all('a')[-1].text
data['char']['up_char']['3'][name] = '0'
for x in divs[end_idx + 1:]:
name = x.find('p').find_all('a')[-1].text
data['arms']['up_char']['5'][name] = '0'
except TimeoutError:
logger.warning(f'更新坎公骑冠剑UP池信息超时...')
if guardian_up_char.exists():
with open(guardian_up_char, 'r', encoding='utf8') as f:
data = json.load(f)
except Exception as e:
logger.error(f'坎公骑冠剑up更新未知错误 {type(e)}{e}')
return check_write(data, guardian_up_char)

View File

@ -1,63 +0,0 @@
import asyncio
import nonebot
from nonebot.log import logger
from .pcr_handle import update_pcr_info, init_pcr_data
from .azur_handle import update_azur_info, init_azur_data
from .prts_handle import update_prts_info, init_prts_data
from .pretty_handle import update_pretty_info, init_pretty_data
from .guardian_handle import update_guardian_info, init_guardian_data
from .genshin_handle import update_genshin_info, init_genshin_data
from .fgo_handle import update_fgo_info, init_fgo_data
from .onmyoji_handle import update_onmyoji_info, init_onmyoji_data
from .config import draw_config, DRAW_DATA_PATH
driver = nonebot.get_driver()
@driver.on_startup
async def async_update_game():
tasks = []
init_lst = [init_pcr_data, init_pretty_data, init_azur_data, init_prts_data, init_genshin_data, init_guardian_data,
init_fgo_data, init_onmyoji_data]
if draw_config.PRTS_FLAG and not (DRAW_DATA_PATH / 'prts.json').exists():
tasks.append(asyncio.ensure_future(update_prts_info()))
init_lst.remove(init_prts_data)
if draw_config.PRETTY_FLAG and (not (DRAW_DATA_PATH / 'pretty.json').exists() or
not (DRAW_DATA_PATH / 'pretty_card.json').exists()):
tasks.append(asyncio.ensure_future(update_pretty_info()))
init_lst.remove(init_pretty_data)
if draw_config.GUARDIAN_FLAG and not (DRAW_DATA_PATH / 'guardian.json').exists():
tasks.append(asyncio.ensure_future(update_guardian_info()))
init_lst.remove(init_guardian_data)
if draw_config.PCR_FLAG and not (DRAW_DATA_PATH / 'pcr.json').exists():
tasks.append(asyncio.ensure_future(update_pcr_info()))
init_lst.remove(init_pcr_data)
if draw_config.GENSHIN_FLAG and (not (DRAW_DATA_PATH / 'genshin.json').exists() or
not (DRAW_DATA_PATH / 'genshin_arms.json').exists()):
tasks.append(asyncio.ensure_future(update_genshin_info()))
init_lst.remove(init_genshin_data)
if draw_config.AZUR_FLAG and not (DRAW_DATA_PATH / 'azur.json').exists():
tasks.append(asyncio.ensure_future(update_azur_info()))
init_lst.remove(init_azur_data)
if draw_config.FGO_FLAG and (not (DRAW_DATA_PATH / 'fgo.json').exists() or
not (DRAW_DATA_PATH / 'fgo_card.json').exists()):
tasks.append(asyncio.ensure_future(update_fgo_info()))
init_lst.remove(init_fgo_data)
if draw_config.ONMYOJI_FLAG and not (DRAW_DATA_PATH / 'onmyoji.json').exists():
tasks.append(asyncio.ensure_future(update_onmyoji_info()))
init_lst.remove(init_onmyoji_data)
try:
await asyncio.gather(*tasks)
for func in init_lst:
await func()
except asyncio.exceptions.CancelledError:
logger.warning('更新异常CancelledError再次更新...')
await async_update_game()

View File

@ -1,62 +0,0 @@
from nonebot.adapters.onebot.v11 import MessageSegment
import random
from .update_game_simple_info import update_simple_info
from .util import generate_img, init_star_rst, BaseData, set_list, get_star, max_card, format_card_information
from .config import draw_config, DRAW_DATA_PATH
from dataclasses import dataclass
from .init_card_pool import init_game_pool
try:
import ujson as json
except ModuleNotFoundError:
import json
ALL_CHAR = []
@dataclass
class AzurChar(BaseData):
type_: str # 舰娘类型
async def azur_draw(count: int, pool_name: str):
# 0 1 2
cnlist = ['', '', '', '']
star_list = [0, 0, 0, 0]
char_list, char_dict, max_star_list, star_list, max_star_index_list = \
format_card_information(count, star_list, _get_azur_card, pool_name, guaranteed=False)
rst = init_star_rst(star_list, cnlist, max_star_list, max_star_index_list)
if count > 90:
char_list = set_list(char_list)
return MessageSegment.image("base64://" + await generate_img(char_list, 'azur', star_list)) \
+ '\n' + rst[:-1] + '\n' + max_card(char_dict)
async def update_azur_info():
global ALL_CHAR
url = 'https://wiki.biligame.com/blhx/舰娘图鉴'
data, code = await update_simple_info(url, 'azur')
if code == 200:
ALL_CHAR = init_game_pool('azur', data, AzurChar)
async def init_azur_data():
global ALL_CHAR
if draw_config.AZUR_FLAG:
with (DRAW_DATA_PATH / 'azur.json').open('r', encoding='utf8') as f:
azur_dict = json.load(f)
ALL_CHAR = init_game_pool('azur', azur_dict, AzurChar)
# 抽取卡池
def _get_azur_card(pool_name: str, mode: int = 1):
global ALL_CHAR
azur_config = draw_config.azur
if pool_name == '轻型':
type_ = ['驱逐', '轻巡', '维修']
elif pool_name == '重型':
type_ = ['重巡', '战列', '战巡', '重炮']
else:
type_ = ['维修', '潜艇', '重巡', '轻航', '航母']
star = get_star([4, 3, 2, 1], [azur_config.AZUR_FOUR_P, azur_config.AZUR_THREE_P, azur_config.AZUR_TWO_P, azur_config.AZUR_ONE_P])
chars = [x for x in ALL_CHAR if x.star == star and x.type_ in type_ and not x.limited]
return random.choice(chars), 4 - star

View File

@ -1,163 +0,0 @@
import nonebot
from nonebot.log import logger
from pydantic import BaseModel, Extra, ValidationError
from configs.path_config import IMAGE_PATH, DATA_PATH
from configs.config import Config
try:
import ujson as json
except ModuleNotFoundError:
import json
# 原神
class GenshinConfig(BaseModel, extra=Extra.ignore):
GENSHIN_FIVE_P: float = 0.006
GENSHIN_FOUR_P: float = 0.051
GENSHIN_THREE_P: float = 0.43
GENSHIN_G_FIVE_P: float = 0.13
GENSHIN_G_FOUR_P: float = 0.016
I72_ADD: float = 0.0585
# 明日方舟
class PrtsConfig(BaseModel, extra=Extra.ignore):
PRTS_SIX_P: float = 0.02
PRTS_FIVE_P: float = 0.08
PRTS_FOUR_P: float = 0.48
PRTS_THREE_P: float = 0.42
# 赛马娘
class PrettyConfig(BaseModel, extra=Extra.ignore):
PRETTY_THREE_P: float = 0.03
PRETTY_TWO_P: float = 0.18
PRETTY_ONE_P: float = 0.79
# 坎公骑冠剑
class GuardianConfig(BaseModel, extra=Extra.ignore):
GUARDIAN_THREE_CHAR_P: float = 0.0275
GUARDIAN_TWO_CHAR_P: float = 0.19
GUARDIAN_ONE_CHAR_P: float = 0.7825
GUARDIAN_THREE_CHAR_UP_P: float = 0.01375
GUARDIAN_THREE_CHAR_OTHER_P: float = 0.01375
GUARDIAN_EXCLUSIVE_ARMS_P: float = 0.03
GUARDIAN_FIVE_ARMS_P: float = 0.03
GUARDIAN_FOUR_ARMS_P: float = 0.09
GUARDIAN_THREE_ARMS_P: float = 0.27
GUARDIAN_TWO_ARMS_P: float = 0.58
GUARDIAN_EXCLUSIVE_ARMS_UP_P: float = 0.01
GUARDIAN_EXCLUSIVE_ARMS_OTHER_P: float = 0.02
# 公主连结
class PcrConfig(BaseModel, extra=Extra.ignore):
PCR_THREE_P: float = 0.025
PCR_TWO_P: float = 0.18
PCR_ONE_P: float = 0.795
PCR_G_THREE_P: float = 0.025
PCR_G_TWO_P: float = 0.975
# 碧蓝航线
class AzurConfig(BaseModel, extra=Extra.ignore):
AZUR_FIVE_P: float = 0.012
AZUR_FOUR_P: float = 0.07
AZUR_THREE_P: float = 0.12
AZUR_TWO_P: float = 0.51
AZUR_ONE_P: float = 0.3
# 命运-冠位指定
class FgoConfig(BaseModel, extra=Extra.ignore):
FGO_SERVANT_FIVE_P: float = 0.01
FGO_SERVANT_FOUR_P: float = 0.03
FGO_SERVANT_THREE_P: float = 0.4
FGO_CARD_FIVE_P: float = 0.04
FGO_CARD_FOUR_P: float = 0.12
FGO_CARD_THREE_P: float = 0.4
# 阴阳师
class OnmyojiConfig(BaseModel, extra=Extra.ignore):
ONMYOJI_SP: float = 0.0025
ONMYOJI_SSR: float = 0.01
ONMYOJI_SR: float = 0.2
ONMYOJI_R: float = 0.7875
class PathDict(BaseModel, extra=Extra.ignore):
genshin: str = "原神"
prts: str = "明日方舟"
pretty: str = "赛马娘"
guardian: str = "坎公骑冠剑"
pcr: str = "公主连结"
azur: str = "碧蓝航线"
fgo: str = "命运-冠位指定"
onmyoji: str = "阴阳师"
class DrawConfig(BaseModel, extra=Extra.ignore):
# 开关
PRTS_FLAG: bool = Config.get_config("draw_card", "PRTS_FLAG")
GENSHIN_FLAG: bool = Config.get_config("draw_card", "GENSHIN_FLAG")
PRETTY_FLAG: bool = Config.get_config("draw_card", "PRETTY_FLAG")
GUARDIAN_FLAG: bool = Config.get_config("draw_card", "GUARDIAN_FLAG")
PCR_FLAG: bool = Config.get_config("draw_card", "PCR_FLAG")
AZUR_FLAG: bool = Config.get_config("draw_card", "AZUR_FLAG")
FGO_FLAG: bool = Config.get_config("draw_card", "FGO_FLAG")
ONMYOJI_FLAG: bool = Config.get_config("draw_card", "ONMYOJI_FLAG")
# 其他配置
PCR_TAI: bool = Config.get_config("draw_card", "PCR_TAI")
SEMAPHORE: int = Config.get_config("draw_card", "SEMAPHORE")
# 路径
path_dict: dict = {
"genshin": "原神",
"prts": "明日方舟",
"pretty": "赛马娘",
"guardian": "坎公骑冠剑",
"pcr": "公主连结",
"azur": "碧蓝航线",
"fgo": "命运-冠位指定",
"onmyoji": "阴阳师",
}
# 抽卡概率
prts: PrtsConfig = PrtsConfig()
genshin: GenshinConfig = GenshinConfig()
pretty: PrettyConfig = PrettyConfig()
guardian: GuardianConfig = GuardianConfig()
pcr: PcrConfig = PcrConfig()
azur: AzurConfig = AzurConfig()
fgo: FgoConfig = FgoConfig()
onmyoji: OnmyojiConfig = OnmyojiConfig()
driver = nonebot.get_driver()
global_config = driver.config
DRAW_DATA_PATH = DATA_PATH / "draw_card"
DRAW_IMAGE_PATH = IMAGE_PATH / "draw_card"
# DRAW_PATH = Path(draw_path) if draw_path else Path("data/draw_card").absolute()
config_path = DRAW_DATA_PATH / "draw_card_config" / "draw_card_config.json"
draw_config: Config = DrawConfig()
@driver.on_startup
def check_config():
global draw_config
if not config_path.exists():
config_path.parent.mkdir(parents=True, exist_ok=True)
draw_config = DrawConfig()
logger.warning("draw_card配置文件不存在已重新生成配置文件.....")
json.dump(
draw_config.dict(),
config_path.open("w", encoding="utf8"),
indent=4,
ensure_ascii=False,
)

View File

@ -1,113 +0,0 @@
from typing import Optional, Union
class DrawCountManager:
"""
抽卡统计保底
"""
def __init__(self, game_draw_count_rule: tuple, star2name: tuple):
"""
初始化保底统计
例如DrawCountManager((10, 90, 180), ("4", "5", "5"))
抽卡保底需要的次数和返回的对应名称例如星级等
"""
# 只有保底
self._data = {}
self._guarantee_tuple = game_draw_count_rule
self._star2name = star2name
def increase(self, key: int, value: int = 1):
"""
用户抽卡次数加1
"""
if self._data.get(key) is None:
self._data[key] = {
"count": value,
}
for x in range(len(self._guarantee_tuple)):
self._data[key][f"count_{x}"] = 0
else:
self._data[key][f"count"] += value
if self._data[key][f"count"] > self._guarantee_tuple[-1]:
self._data[key][f"count"] = self._data[key][f"count"] % self._guarantee_tuple[-1]
def reset(self, key: int):
"""
清空记录
"""
del self._data[key]
def set_count(self, key: int, type_: int, count: int):
if self._data.get(key):
self._data[key][f"count_{type_}"] = count
def check(self, key: int, *args) -> Optional[Union[str, int]]:
"""
是否保底
"""
pass
def get_user_count(self, key: int, type_: Optional[int] = None) -> int:
"""
获取用户当前抽卡次数
"""
if self._data.get(key):
if type_ is None:
return self._data[key]["count"]
return self._data[key][f"count_{type_}"]
return 0
def record_count(self, key: int, type_: int):
"""
抽出对应星级后记录当前次数
"""
if self._data.get(key):
self._data[key][f"count_{type_}"] = self._data[key]["count"]
class GenshinCountManager(DrawCountManager):
def increase(self, key: int, value: int = 1):
"""
用户抽卡次数加1
"""
if self._data.get(key) is None:
self._data[key] = {
"is_up": False,
"count": value,
}
for x in range(len(self._guarantee_tuple)):
self._data[key][f"count_{x}"] = 0
else:
self._data[key][f"count"] += value
if self._data[key][f"count"] > self._guarantee_tuple[-1]:
self._data[key][f"count"] = self._data[key][f"count"] % 180
def set_is_up(self, key: int, value: bool):
if self._data.get(key):
self._data[key]["is_up"] = value
def is_up(self, key: int) -> bool:
if self._data.get(key):
return self._data[key]["is_up"]
return False
def check(self, key: int, *args) -> Optional[Union[str, int]]:
"""
是否保底
"""
# print(self._data)
if self._data.get(key):
for i in [1, 0]:
count = self._data[key]["count"]
if count - self._data[key][f"count_{i}"] == self._guarantee_tuple[i]:
if i in [2, 1]:
# print("clean four count")
self._data[key][f"count_0"] = self._data[key]['count']
self._data[key][f"count_{i}"] = self._data[key]['count']
return self._star2name[i]
return None

View File

@ -1,111 +0,0 @@
from nonebot.adapters.onebot.v11 import MessageSegment
import random
from .update_game_requests_info import update_requests_info
from .util import generate_img, init_star_rst, BaseData, set_list, get_star, max_card
from .config import DRAW_DATA_PATH, draw_config
from dataclasses import dataclass
from .init_card_pool import init_game_pool
try:
import ujson as json
except ModuleNotFoundError:
import json
ALL_CHAR = []
ALL_CARD = []
@dataclass
class FgoChar(BaseData):
pass
async def fgo_draw(count: int):
# 0 1 2
cnlist = ['★★★★★', '★★★★', '★★★']
obj_list, obj_dict, max_star_list, star_list, max_star_index_list = _format_card_information(count)
rst = init_star_rst(star_list, cnlist, max_star_list, max_star_index_list)
if count > 90:
obj_list = set_list(obj_list)
return MessageSegment.image("base64://" + await generate_img(obj_list, 'fgo', star_list)) \
+ '\n' + rst[:-1] + '\n' + max_card(obj_dict)
async def update_fgo_info():
global ALL_CHAR, ALL_CARD
data, code = await update_requests_info('fgo')
if code == 200:
ALL_CHAR = init_game_pool('fgo', data, FgoChar)
data, code = await update_requests_info('fgo_card')
if code == 200:
ALL_CARD = init_game_pool('fgo_card', data, FgoChar)
async def init_fgo_data():
global ALL_CHAR, ALL_CARD
if draw_config.FGO_FLAG:
with (DRAW_DATA_PATH / 'fgo.json').open('r', encoding='utf8') as f:
fgo_dict = json.load(f)
ALL_CHAR = init_game_pool('fgo', fgo_dict, FgoChar)
with (DRAW_DATA_PATH / 'fgo_card.json').open('r', encoding='utf8') as f:
fgo_dict = json.load(f)
ALL_CARD = init_game_pool('fgo', fgo_dict, FgoChar)
# 抽取卡池
def _get_fgo_card(mode: int = 1):
global ALL_CHAR, ALL_CARD
fgo_config = draw_config.fgo
if mode == 1:
star = get_star([8, 7, 6, 5, 4, 3], [fgo_config.FGO_SERVANT_FIVE_P, fgo_config.FGO_SERVANT_FOUR_P, fgo_config.FGO_SERVANT_THREE_P,
fgo_config.FGO_CARD_FIVE_P, fgo_config.FGO_CARD_FOUR_P, fgo_config.FGO_CARD_THREE_P])
elif mode == 2:
star = get_star([5, 4], [fgo_config.FGO_CARD_FIVE_P, fgo_config.FGO_CARD_FOUR_P])
else:
star = get_star([8, 7, 6], [fgo_config.FGO_SERVANT_FIVE_P, fgo_config.FGO_SERVANT_FOUR_P, fgo_config.FGO_SERVANT_THREE_P])
if star > 5:
type_ = 'servant'
star -= 3
chars = [x for x in ALL_CHAR if x.star == star if not x.limited]
else:
type_ = 'card'
chars = [x for x in ALL_CARD if x.star == star if not x.limited]
return random.choice(chars), 5 - star, type_
# 整理数据
def _format_card_information(count: int):
max_star_lst = [] # 获取的最高星级角色列表
max_index_lst = [] # 获取最高星级角色的次数
star_list = [0, 0, 0]
obj_list = [] # 获取所有角色
obj_dict = {} # 获取角色次数字典
servant_count = 0 # 保底计算
card_count = 0 # 保底计算
for i in range(count):
servant_count += 1
card_count += 1
# 四星卡片保底
if card_count == 9:
obj, code, type_ = _get_fgo_card(2)
# 三星从者保底
elif servant_count == 10:
obj, code, type_ = _get_fgo_card(3)
_count = 0
# 普通抽
else:
obj, code, type_ = _get_fgo_card()
star_list[code] += 1
if type_ == 'card' and code < 2:
card_count = 0
if type_ == 'servant':
servant_count = 0
if code == 0:
max_star_lst.append(obj.name)
max_index_lst.append(i)
try:
obj_dict[obj.name] += 1
except KeyError:
obj_dict[obj.name] = 1
obj_list.append(obj)
return obj_list, obj_dict, max_star_lst, star_list, max_index_lst

View File

@ -1,288 +0,0 @@
from nonebot.adapters.onebot.v11 import MessageSegment, Message
import random
from .update_game_info import update_info
from .util import (
generate_img,
init_rst,
BaseData,
set_list,
get_star,
init_up_char,
)
from .config import DRAW_DATA_PATH, draw_config
from .count_manager import GenshinCountManager
from dataclasses import dataclass
from .init_card_pool import init_game_pool
from .announcement import GenshinAnnouncement
try:
import ujson as json
except ModuleNotFoundError:
import json
announcement = GenshinAnnouncement()
draw_count_manager = GenshinCountManager((10, 90), ("4", "5"))
# genshin_five = {}
# genshin_count = {}
# genshin_pl_count = {}
ALL_CHAR = []
ALL_ARMS = []
UP_CHAR = []
UP_ARMS = []
_CURRENT_CHAR_POOL_TITLE = ""
_CURRENT_ARMS_POOL_TITLE = ""
POOL_IMG = ""
@dataclass
class GenshinChar(BaseData):
pass
async def genshin_draw(user_id: int, count: int, pool_name: str):
# 0 1 2
star = ["★★★★★", "★★★★", "★★★"]
(
char_list,
five_dict,
star_num_list,
) = _format_card_information(count, user_id, pool_name)
title = ""
up_type = []
up_list = []
if pool_name == "char" and _CURRENT_CHAR_POOL_TITLE:
up_type = UP_CHAR
title = _CURRENT_CHAR_POOL_TITLE
elif pool_name == "arms" and _CURRENT_ARMS_POOL_TITLE:
up_type = UP_ARMS
title = _CURRENT_ARMS_POOL_TITLE
tmp = ""
if up_type:
for x in up_type:
for operator in x.operators:
up_list.append(operator)
if x.star == 5:
tmp += f'五星UP{" ".join(x.operators)} \n'
elif x.star == 4:
tmp += f'四星UP{" ".join(x.operators)}'
rst = init_rst(five_dict, star_num_list, star, up_list)
pool_info = f"当前up池{title}\n{tmp}" if title else ""
if count > 90:
char_list = set_list(char_list)
return (
pool_info
+ "\n"
+ MessageSegment.image(
"base64://" + await generate_img(char_list, "genshin", star_num_list)
)
+ rst
+ f'\n距离保底发还剩 {draw_count_manager.get_user_count(user_id, 1) % 90}'
+ "\n【五星0.6%四星5.1%\n第72抽开始五星概率每抽加0.585%"
)
async def update_genshin_info():
global ALL_CHAR, ALL_ARMS
url = "https://wiki.biligame.com/ys/角色筛选"
data, code = await update_info(url, "genshin")
if code == 200:
ALL_CHAR = init_game_pool("genshin", data, GenshinChar)
url = "https://wiki.biligame.com/ys/武器图鉴"
data, code = await update_info(
url,
"genshin_arms",
)
if code == 200:
ALL_ARMS = init_game_pool("genshin_arms", data, GenshinChar)
await _genshin_init_up_char()
async def init_genshin_data():
global ALL_CHAR, ALL_ARMS
if draw_config.GENSHIN_FLAG:
if not (DRAW_DATA_PATH / "genshin.json").exists() or not (DRAW_DATA_PATH / "genshin_arms.json").exists():
await update_genshin_info()
else:
with (DRAW_DATA_PATH / "genshin.json").open("r", encoding="utf8") as f:
genshin_dict = json.load(f)
with (DRAW_DATA_PATH / "genshin_arms.json").open("r", encoding="utf8") as f:
genshin_ARMS_dict = json.load(f)
ALL_CHAR = init_game_pool("genshin", genshin_dict, GenshinChar)
ALL_ARMS = init_game_pool("genshin_arms", genshin_ARMS_dict, GenshinChar)
await _genshin_init_up_char()
# 抽取卡池
def _get_genshin_card(mode: int = 1, pool_name: str = "", add: float = 0.0, is_up: bool = False):
"""
mode 1普通抽 2四星保底 3五星保底
"""
global ALL_ARMS, ALL_CHAR, UP_ARMS, UP_CHAR, _CURRENT_ARMS_POOL_TITLE, _CURRENT_CHAR_POOL_TITLE
genshin_config = draw_config.genshin
if mode == 1:
star = get_star(
[5, 4, 3], [genshin_config.GENSHIN_FIVE_P + add, genshin_config.GENSHIN_FOUR_P, genshin_config.GENSHIN_THREE_P]
)
elif mode == 2:
star = get_star([5, 4], [genshin_config.GENSHIN_G_FIVE_P + add, genshin_config.GENSHIN_G_FOUR_P])
else:
star = 5
if pool_name == "char":
data_lst = UP_CHAR
flag = _CURRENT_CHAR_POOL_TITLE
type_all_lst = ALL_CHAR + [
x for x in ALL_ARMS if x.star == star and x.star < 5
]
elif pool_name == "arms":
data_lst = UP_ARMS
flag = _CURRENT_ARMS_POOL_TITLE
type_all_lst = ALL_ARMS + [
x for x in ALL_CHAR if x.star == star and x.star < 5
]
else:
data_lst = ""
flag = ""
type_all_lst = ""
all_lst = ALL_ARMS + ALL_CHAR
# 是否UP
try:
if flag and star > 3:
# 获取up角色列表
up_char_lst = [x.operators for x in data_lst if x.star == star][0]
# print(up_char_lst)
# 成功获取up角色
if random.random() < 0.5 or is_up:
up_char_name = random.choice(up_char_lst)
acquire_char = [x for x in all_lst if x.name == up_char_name][0]
else:
# 无up
all_char_lst = [
x
for x in type_all_lst
if x.star == star and x.name not in up_char_lst and not x.limited
]
acquire_char = random.choice(all_char_lst)
else:
chars = [x for x in all_lst if x.star == star and not x.limited]
acquire_char = random.choice(chars)
except IndexError:
chars = [x for x in all_lst if x.star == star and not x.limited]
acquire_char = random.choice(chars)
return acquire_char, 5 - star
def _format_card_information(_count: int, user_id, pool_name):
char_list = [] # 获取角色列表
star_num_list = [0, 0, 0] # 各个星级数量
five_dict = {} # 五星数量
add = 0.0
pool = UP_CHAR if pool_name == 'char' else UP_ARMS
for _ in range(_count):
draw_count_manager.increase(user_id)
star = draw_count_manager.check(user_id)
if (draw_count_manager.get_user_count(user_id) - draw_count_manager.get_user_count(user_id, 1)) % 90 >= 72:
add += draw_config.genshin.I72_ADD
if star:
star = int(star)
if star == 4:
char, code = _get_genshin_card(2, pool_name, add=add)
draw_count_manager.record_count(user_id, 0)
else:
char, code = _get_genshin_card(3, pool_name, add, draw_count_manager.is_up(user_id))
else:
char, code = _get_genshin_card(1, pool_name, add, draw_count_manager.is_up(user_id))
if code == 0:
add = 0
if not five_dict.get(char.name):
five_dict[char.name] = [draw_count_manager.get_user_count(user_id)]
else:
five_dict[char.name].append(draw_count_manager.get_user_count(user_id) % 90)
draw_count_manager.set_count(user_id, 2, draw_count_manager.get_user_count(user_id, 1))
draw_count_manager.record_count(user_id, 0)
draw_count_manager.record_count(user_id, 1)
if char.name not in [x.operators for x in pool if x.star == 5][0]:
draw_count_manager.set_is_up(user_id, True)
else:
draw_count_manager.set_is_up(user_id, False)
star_num_list[code] += 1
char_list.append(char)
return char_list, five_dict, star_num_list
# if genshin_count.get(user_id) and _count <= 90:
# f_count = genshin_count[user_id]
# else:
# f_count = 0
# if genshin_pl_count.get(user_id) and _count <= 90:
# count = genshin_pl_count[user_id]
# else:
# count = 0
# for i in range(_count):
# count += 1
# f_count += 1
# # 十连保底
# if count == 10 and f_count != 90:
# if f_count >= 72:
# add += I72_ADD
# char, code = _get_genshin_card(2, pool_name, add=add)
# count = 0
# # 大保底
# elif f_count == 90:
# char, code = _get_genshin_card(3, pool_name)
# else:
# if f_count >= 72:
# add += I72_ADD
# char, code = _get_genshin_card(pool_name=pool_name, add=add)
# if code == 1:
# count = 0
# star_list[code] += 1
# if code == 0:
# if _count <= 90:
# genshin_five[user_id] = f_count
# add = 0.0
# f_count = 0
# five_list.append(char.name)
# five_index_list.append(i)
# try:
# five_dict[char.name] += 1
# except KeyError:
# five_dict[char.name] = 1
# char_list.append(char)
# if _count <= 90:
# genshin_count[user_id] = f_count
# genshin_pl_count[user_id] = count
# return char_list, five_list, five_index_list, five_dict, star_list
def reset_count(user_id: int):
draw_count_manager.reset(user_id)
# 获取up和概率
async def _genshin_init_up_char():
global _CURRENT_CHAR_POOL_TITLE, _CURRENT_ARMS_POOL_TITLE, UP_CHAR, UP_ARMS, POOL_IMG
(
_CURRENT_CHAR_POOL_TITLE,
_CURRENT_ARMS_POOL_TITLE,
POOL_IMG,
UP_CHAR,
UP_ARMS,
) = await init_up_char(announcement)
async def reload_genshin_pool():
await _genshin_init_up_char()
return Message(
f"当前UP池子{_CURRENT_CHAR_POOL_TITLE} & {_CURRENT_ARMS_POOL_TITLE} {POOL_IMG}"
)

View File

@ -1,159 +0,0 @@
from nonebot.adapters.onebot.v11 import MessageSegment, Message
from .update_game_info import update_info
from .util import init_star_rst, generate_img, max_card, BaseData,\
set_list, get_star, format_card_information, init_up_char
import random
from .config import DRAW_DATA_PATH, draw_config
from dataclasses import dataclass
from .init_card_pool import init_game_pool
from .announcement import GuardianAnnouncement
try:
import ujson as json
except ModuleNotFoundError:
import json
announcement = GuardianAnnouncement()
ALL_CHAR = []
ALL_ARMS = []
_CURRENT_CHAR_POOL_TITLE = ''
_CURRENT_ARMS_POOL_TITLE = ''
UP_CHAR = []
UP_ARMS = []
POOL_IMG = ''
@dataclass
class GuardianChar(BaseData):
pass
@dataclass
class GuardianArms(BaseData):
pass
async def guardian_draw(count: int, pool_name):
if pool_name == 'arms':
cnlist = ['★★★★★', '★★★★', '★★★', '★★']
star_list = [0, 0, 0, 0]
else:
cnlist = ['★★★', '★★', '']
star_list = [0, 0, 0]
title = ''
up_type = []
up_list = []
if pool_name == 'char' and _CURRENT_CHAR_POOL_TITLE:
up_type = UP_CHAR
title = _CURRENT_CHAR_POOL_TITLE
elif pool_name == 'arms' and _CURRENT_ARMS_POOL_TITLE:
up_type = UP_ARMS
title = _CURRENT_ARMS_POOL_TITLE
tmp = ''
if up_type:
for x in up_type:
for operator in x.operators:
up_list.append(operator)
if pool_name == 'char':
if x.star == 3:
tmp += f'三星UP{" ".join(x.operators)} \n'
else:
if x.star == 5:
tmp += f'五星UP{" ".join(x.operators)}'
obj_list, obj_dict, max_list, star_list, max_index_list = format_card_information(count, star_list,
_get_guardian_card, pool_name)
rst = init_star_rst(star_list, cnlist, max_list, max_index_list, up_list)
pool_info = f'当前up池{title}\n{tmp}' if title else ''
if count > 90:
obj_list = set_list(obj_list)
return pool_info + '\n' + MessageSegment.image(
"base64://" + await generate_img(obj_list, 'guardian', star_list)) \
+ '\n' + rst[:-1] + '\n' + max_card(obj_dict)
async def update_guardian_info():
global ALL_CHAR, ALL_ARMS
url = 'https://wiki.biligame.com/gt/英雄筛选表'
data, code = await update_info(url, 'guardian')
if code == 200:
ALL_CHAR = init_game_pool('guardian', data, GuardianChar)
url = 'https://wiki.biligame.com/gt/武器'
tmp, code_1 = await update_info(url, 'guardian_arms')
url = 'https://wiki.biligame.com/gt/盾牌'
data, code_2 = await update_info(url, 'guardian_arms')
if code_1 == 200 and code_2 == 200:
data.update(tmp)
ALL_ARMS = init_game_pool('guardian_arms', data, GuardianArms)
await _guardian_init_up_char()
async def init_guardian_data():
global ALL_CHAR, ALL_ARMS
if draw_config.GUARDIAN_FLAG:
if not (DRAW_DATA_PATH / 'guardian.json').exists() or not (DRAW_DATA_PATH / 'guardian_arms.json').exists():
await update_guardian_info()
else:
with (DRAW_DATA_PATH / 'guardian.json').open('r', encoding='utf8') as f:
guardian_char_dict = json.load(f)
with (DRAW_DATA_PATH / 'guardian_arms.json').open('r', encoding='utf8') as f:
guardian_arms_dict = json.load(f)
ALL_CHAR = init_game_pool('guardian', guardian_char_dict, GuardianChar)
ALL_ARMS = init_game_pool('guardian_arms', guardian_arms_dict, GuardianArms)
await _guardian_init_up_char()
# 抽取卡池
def _get_guardian_card(pool_name: str = '', mode: int = 1):
global ALL_ARMS, ALL_CHAR, UP_ARMS, UP_CHAR, _CURRENT_ARMS_POOL_TITLE, _CURRENT_CHAR_POOL_TITLE
guardian_config = draw_config.guardian
if pool_name == 'char':
if mode == 1:
star = get_star([3, 2, 1], [guardian_config.GUARDIAN_THREE_CHAR_P, guardian_config.GUARDIAN_TWO_CHAR_P, guardian_config.GUARDIAN_ONE_CHAR_P])
else:
star = get_star([3, 2], [guardian_config.GUARDIAN_THREE_CHAR_P, guardian_config.GUARDIAN_TWO_CHAR_P])
up_lst = UP_CHAR
flag = _CURRENT_CHAR_POOL_TITLE
_max_star = 3
all_data = ALL_CHAR
else:
if mode == 1:
star = get_star([5, 4, 3, 2], [guardian_config.GUARDIAN_FIVE_ARMS_P, guardian_config.GUARDIAN_FOUR_ARMS_P,
guardian_config.GUARDIAN_THREE_ARMS_P, guardian_config.GUARDIAN_TWO_ARMS_P])
else:
star = get_star([5, 4], [guardian_config.GUARDIAN_FIVE_ARMS_P, guardian_config.GUARDIAN_FOUR_ARMS_P])
up_lst = UP_ARMS
flag = _CURRENT_ARMS_POOL_TITLE
_max_star = 5
all_data = ALL_ARMS
# 是否UP
if flag and star == _max_star and pool_name:
# 获取up角色列表
up_char_lst = [x.operators for x in up_lst if x.star == star][0]
# 成功获取up角色
if random.random() < 0.5:
up_char_name = random.choice(up_char_lst)
acquire_char = [x for x in all_data if x.name == up_char_name][0]
else:
# 无up
all_char_lst = [x for x in all_data if x.star == star and x.name not in up_char_lst and not x.limited]
acquire_char = random.choice(all_char_lst)
else:
chars = [x for x in all_data if x.star == star and not x.limited]
acquire_char = random.choice(chars)
return acquire_char, _max_star - star
# 获取up和概率
async def _guardian_init_up_char():
global _CURRENT_CHAR_POOL_TITLE, _CURRENT_ARMS_POOL_TITLE, UP_CHAR, UP_ARMS, POOL_IMG
_CURRENT_CHAR_POOL_TITLE, _CURRENT_ARMS_POOL_TITLE, POOL_IMG, UP_CHAR, UP_ARMS = await init_up_char(announcement)
async def reload_guardian_pool():
await _guardian_init_up_char()
return Message(f'当前UP池子{_CURRENT_CHAR_POOL_TITLE} & {_CURRENT_ARMS_POOL_TITLE}')

View File

@ -1,204 +0,0 @@
from typing import Any
from .config import DRAW_DATA_PATH
from .util import is_number
from nonebot.log import logger
try:
import ujson as json
except ModuleNotFoundError:
import json
def init_game_pool(game: str, data: dict, operator: Any):
tmp_lst = []
if game == "prts":
for key in data.keys():
limited = False
recruit_only = False
event_only = False
if "限定寻访" in data[key]["获取途径"]:
limited = True
if "干员寻访" not in data[key]["获取途径"] and "公开招募" in data[key]["获取途径"]:
recruit_only = True
if "活动获取" in data[key]["获取途径"]:
event_only = True
if "干员寻访" not in data[key]["获取途径"] and "凭证交易所" == data[key]["获取途径"][0]:
limited = True
if "干员寻访" not in data[key]["获取途径"] and "信用累计奖励" == data[key]["获取途径"][0]:
limited = True
if key.find("阿米娅") != -1:
continue
try:
tmp_lst.append(
operator(
name=key,
star=int(data[key]["星级"]),
limited=limited,
recruit_only=recruit_only,
event_only=event_only,
)
)
except Exception as e:
logger.warning(f"明日方舟导入角色 {key} 数据错误:{type(e)}{e}")
if game == "genshin":
for key in data.keys():
if key.find("旅行者") != -1:
continue
limited = False
if data[key]["常驻/限定"] == "限定UP":
limited = True
try:
tmp_lst.append(
operator(name=key, star=int(data[key]["稀有度"][:1]), limited=limited)
)
except Exception as e:
logger.warning(f"原神导入角色 {key} 数据错误:{type(e)}{e}")
if game == "genshin_arms":
for key in data.keys():
if data[key]["获取途径"].find("祈愿") != -1:
limited = False
if data[key]["获取途径"].find("限定祈愿") != -1:
limited = True
try:
tmp_lst.append(
operator(
name=key, star=int(data[key]["稀有度"][:1]), limited=limited
)
)
except Exception as e:
logger.warning(f"原神导入武器 {key} 数据错误:{type(e)}{e}")
if game == "pretty":
for key in data.keys():
try:
tmp_lst.append(
operator(name=key, star=data[key]["初始星级"], limited=False)
)
except Exception as e:
logger.warning(f"赛马娘导入角色 {key} 数据错误:{type(e)}{e}")
if game == "pretty_card":
for key in data.keys():
limited = False
if "卡池" not in data[key]["获取方式"]:
limited = True
if not data[key]["获取方式"]:
limited = False
try:
tmp_lst.append(
operator(
name=data[key]["中文名"],
star=len(data[key]["稀有度"]),
limited=limited,
)
)
except Exception as e:
logger.warning(f"赛马娘导入卡片 {key} 数据错误:{type(e)}{e}")
if game in ["guardian", "guardian_arms"]:
for key in data.keys():
tmp_lst.append(
operator(name=data[key]["名称"], star=int(data[key]["星级"]), limited=False)
)
if game == "pcr":
for key in data.keys():
limited = False
if key.find("") != -1:
limited = True
try:
tmp_lst.append(
operator(
name=data[key]["名称"], star=int(data[key]["星级"]), limited=limited
)
)
except Exception as e:
logger.warning(f"公主连接导入角色 {key} 数据错误:{type(e)}{e}")
if game == "azur":
for key in data.keys():
if is_number(data[key]["星级"]):
limited = False
if "可以建造" not in data[key]["获取途径"]:
limited = True
try:
tmp_lst.append(
operator(
name=data[key]["名称"],
star=int(data[key]["星级"]),
limited=limited,
type_=data[key]["类型"],
)
)
except Exception as e:
logger.warning(f"碧蓝航线导入角色 {key} 数据错误:{type(e)}{e}")
if game in ["fgo", "fgo_card"]:
for key in data.keys():
limited = False
try:
if (
"圣晶石召唤" not in data[key]["入手方式"]
and "圣晶石召唤Story卡池" not in data[key]["入手方式"]
):
limited = True
except KeyError:
pass
try:
tmp_lst.append(
operator(
name=data[key]["名称"], star=int(data[key]["星级"]), limited=limited
)
)
except Exception as e:
logger.warning(f"FGO导入角色 {key} 数据错误:{type(e)}{e}")
if game == "onmyoji":
for key in data.keys():
limited = False
if key in [
"奴良陆生",
"卖药郎",
"鬼灯",
"阿香",
"蜜桃&芥子",
"犬夜叉",
"杀生丸",
"桔梗",
"朽木露琪亚",
"黑崎一护",
"灶门祢豆子",
"灶门炭治郎",
]:
limited = True
try:
tmp_lst.append(
operator(
name=data[key]["名称"], star=data[key]["星级"], limited=limited
)
)
except Exception as e:
logger.warning(f"阴阳师导入角色 {key} 数据错误:{type(e)}{e}")
# print(tmp_lst)
char_name_lst = [x.name for x in tmp_lst]
up_char_file = (
DRAW_DATA_PATH
/ f"draw_card"
/ "draw_card_up"
/ f"{game.split('_')[0]}_up_char.json"
)
if up_char_file.exists():
data = json.load(open(up_char_file, "r", encoding="utf8"))
if len(game.split("_")) == 1:
key = "char"
else:
key = list(data.keys())[1]
for x in data[key]["up_char"]:
for char in data[key]["up_char"][x]:
if char not in char_name_lst:
if game.find("prts") != -1:
tmp_lst.append(
operator(
name=char,
star=int(x),
recruit_only=False,
event_only=False,
limited=False,
)
)
else:
tmp_lst.append(operator(name=char, star=int(x), limited=False))
return tmp_lst

View File

@ -1,81 +0,0 @@
from nonebot.adapters.onebot.v11 import MessageSegment
import random
from .update_game_requests_info import update_requests_info
from .util import generate_img, init_star_rst, BaseData, set_list, get_star, max_card
from .config import DRAW_DATA_PATH, draw_config
from dataclasses import dataclass
from .init_card_pool import init_game_pool
try:
import ujson as json
except ModuleNotFoundError:
import json
ALL_CHAR = []
@dataclass
class OnmyojiChar(BaseData):
pass
async def onmyoji_draw(count: int):
# 0 1 2
cnlist = ['SP', 'SSR', 'SR', 'R']
obj_list, obj_dict, star_list, rst = format_card_information(count)
rst = init_star_rst(star_list, cnlist, [], []) + rst
if count > 90:
obj_list = set_list(obj_list)
return MessageSegment.image("base64://" + await generate_img(obj_list, 'onmyoji', star_list)) \
+ '\n' + rst[:-1] + '\n' + max_card(obj_dict)
async def update_onmyoji_info():
global ALL_CHAR
data, code = await update_requests_info('onmyoji')
if code == 200:
ALL_CHAR = init_game_pool('onmyoji', data, OnmyojiChar)
async def init_onmyoji_data():
global ALL_CHAR
if draw_config.ONMYOJI_FLAG:
with (DRAW_DATA_PATH / 'onmyoji.json').open('r', encoding='utf8') as f:
azur_dict = json.load(f)
ALL_CHAR = init_game_pool('onmyoji', azur_dict, OnmyojiChar)
onmyoji_star = {
5: 'SP',
4: 'SSR',
3: 'SR',
2: 'R',
}
# 抽取卡池
def _get_onmyoji_card():
global ALL_CHAR
onmyoji_config = draw_config.onmyoji
star = get_star([5, 4, 3, 2], [onmyoji_config.ONMYOJI_SP, onmyoji_config.ONMYOJI_SSR, onmyoji_config.ONMYOJI_SR, onmyoji_config.ONMYOJI_R])
chars = [x for x in ALL_CHAR if x.star == onmyoji_star[star] and not x.limited]
return random.choice(chars), 5 - star
def format_card_information(count: int):
star_list = [0, 0, 0, 0]
obj_list = [] # 获取所有角色
obj_dict = {} # 获取角色次数字典
rst = ''
for i in range(count):
obj, code = _get_onmyoji_card()
star_list[code] += 1
if code == 0:
rst += f'{i+1} 抽获取SP {obj.name}\n'
elif code == 1:
rst += f'{i+1} 抽获取SSR {obj.name}\n'
try:
obj_dict[obj.name] += 1
except KeyError:
obj_dict[obj.name] = 1
obj_list.append(obj)
return obj_list, obj_dict, star_list, rst

View File

@ -1,93 +0,0 @@
from nonebot.adapters.onebot.v11 import MessageSegment
import random
from .update_game_info import update_info
from .update_game_simple_info import update_simple_info
from .util import generate_img, init_star_rst, BaseData, set_list, get_star, max_card
from .config import DRAW_DATA_PATH, draw_config
from dataclasses import dataclass
from .init_card_pool import init_game_pool
try:
import ujson as json
except ModuleNotFoundError:
import json
ALL_CHAR = []
@dataclass
class PcrChar(BaseData):
pass
async def pcr_draw(count: int):
# 0 1 2
cnlist = ['★★★', '★★', '']
char_list, three_list, three_index_list, char_dict, star_list = _format_card_information(count)
rst = init_star_rst(star_list, cnlist, three_list, three_index_list)
if count > 90:
char_list = set_list(char_list)
return MessageSegment.image("base64://" + await generate_img(char_list, 'pcr', star_list)) \
+ '\n' + rst[:-1] + '\n' + max_card(char_dict)
async def update_pcr_info():
global ALL_CHAR
if draw_config.PCR_TAI:
url = 'https://wiki.biligame.com/pcr/角色图鉴'
data, code = await update_simple_info(url, 'pcr')
else:
url = 'https://wiki.biligame.com/pcr/角色筛选表'
data, code = await update_info(url, 'pcr')
if code == 200:
ALL_CHAR = init_game_pool('pcr', data, PcrChar)
async def init_pcr_data():
global ALL_CHAR
if draw_config.PCR_FLAG:
with (DRAW_DATA_PATH / 'pcr.json').open('r', encoding='utf8') as f:
pcr_dict = json.load(f)
ALL_CHAR = init_game_pool('pcr', pcr_dict, PcrChar)
# 抽取卡池
def _get_pcr_card(mode: int = 1):
global ALL_CHAR
pcr_config = draw_config.pcr
if mode == 2:
star = get_star([3, 2], [pcr_config.PCR_G_THREE_P, pcr_config.PCR_G_TWO_P])
else:
star = get_star([3, 2, 1], [pcr_config.PCR_THREE_P, pcr_config.PCR_TWO_P, pcr_config.PCR_ONE_P])
chars = [x for x in ALL_CHAR if x.star == star and not x.limited]
return random.choice(chars), 3 - star
def _format_card_information(_count: int):
char_list = []
star_list = [0, 0, 0]
three_index_list = []
three_list = []
char_dict = {}
# 保底计算
count = 0
for i in range(_count):
count += 1
# 十连保底
if count == 10:
char, code = _get_pcr_card(2)
count = 0
else:
char, code = _get_pcr_card()
if code < 2:
count = 0
star_list[code] += 1
if code == 0:
three_list.append(char.name)
three_index_list.append(i)
try:
char_dict[char.name] += 1
except KeyError:
char_dict[char.name] = 1
char_list.append(char)
return char_list, three_list, three_index_list, char_dict, star_list

View File

@ -1,148 +0,0 @@
from nonebot.adapters.onebot.v11 import MessageSegment
from .announcement import PrettyAnnouncement
from .update_game_info import update_info
from .util import init_star_rst, generate_img, max_card, BaseData, \
set_list, get_star, format_card_information, init_up_char
import random
from .config import DRAW_DATA_PATH, draw_config
from dataclasses import dataclass
from .init_card_pool import init_game_pool
try:
import ujson as json
except ModuleNotFoundError:
import json
announcement = PrettyAnnouncement()
ALL_CHAR = []
ALL_CARD = []
_CURRENT_CHAR_POOL_TITLE = ""
_CURRENT_CARD_POOL_TITLE = ""
UP_CHAR = []
UP_CARD = []
POOL_IMG = []
@dataclass
class PrettyChar(BaseData):
pass
async def pretty_draw(count: int, pool_name):
if pool_name == 'card':
cnlist = ['SSR', 'SR', 'R']
else:
cnlist = ['★★★', '★★', '']
star_list = [0, 0, 0]
obj_list, obj_dict, three_list, star_list, three_olist = format_card_information(count, star_list,
_get_pretty_card, pool_name)
up_type = []
up_list = []
title = ''
if pool_name == 'char' and _CURRENT_CHAR_POOL_TITLE:
up_type = UP_CHAR
title = _CURRENT_CHAR_POOL_TITLE
elif pool_name == 'card' and _CURRENT_CARD_POOL_TITLE:
up_type = UP_CARD
title = _CURRENT_CARD_POOL_TITLE
tmp = ''
if up_type:
for x in up_type:
for operator in x.operators:
up_list.append(operator.split(']')[1] if pool_name == 'char' else operator)
if x.star == 3:
if pool_name == 'char':
tmp += f'三星UP{" ".join(x.operators)} \n'
else:
tmp += f'SSR UP{" ".join(x.operators)} \n'
elif x.star == 2:
if pool_name == 'char':
tmp += f'二星UP{" ".join(x.operators)} \n'
else:
tmp += f'SR UP{" ".join(x.operators)} \n'
elif x.star == 1:
if pool_name == 'char':
tmp += f'一星UP{" ".join(x.operators)} '
else:
tmp += f'R UP{" ".join(x.operators)} '
tmp = tmp[:-1] if tmp and tmp[-1] == '\n' else tmp
pool_info = f'当前up池{title}\n{tmp}' if title else ''
rst = init_star_rst(star_list, cnlist, three_list, three_olist, up_list)
if count > 90:
obj_list = set_list(obj_list)
return pool_info + MessageSegment.image(
"base64://" + await generate_img(obj_list, 'pretty', star_list)) \
+ '\n' + rst[:-1] + '\n' + max_card(obj_dict)
async def update_pretty_info():
global ALL_CHAR, ALL_CARD
url = 'https://wiki.biligame.com/umamusume/赛马娘图鉴'
data, code = await update_info(url, 'pretty')
if code == 200:
ALL_CHAR = init_game_pool('pretty', data, PrettyChar)
url = 'https://wiki.biligame.com/umamusume/支援卡图鉴'
data, code = await update_info(url, 'pretty_card')
if code == 200:
ALL_CARD = init_game_pool('pretty_card', data, PrettyChar)
await _pretty_init_up_char()
async def init_pretty_data():
global ALL_CHAR, ALL_CARD
if draw_config.PRETTY_FLAG:
with (DRAW_DATA_PATH / 'pretty.json').open('r', encoding='utf8') as f:
pretty_char_dict = json.load(f)
with (DRAW_DATA_PATH / 'pretty_card.json').open('r', encoding='utf8') as f:
pretty_card_dict = json.load(f)
ALL_CHAR = init_game_pool('pretty', pretty_char_dict, PrettyChar)
ALL_CARD = init_game_pool('pretty_card', pretty_card_dict, PrettyChar)
await _pretty_init_up_char()
# 抽取卡池
def _get_pretty_card(pool_name: str, mode: int = 1):
global ALL_CHAR, ALL_CARD, _CURRENT_CHAR_POOL_TITLE, _CURRENT_CARD_POOL_TITLE
pretty_config = draw_config.pretty
if mode == 1:
star = get_star([3, 2, 1], [pretty_config.PRETTY_THREE_P, pretty_config.PRETTY_TWO_P, pretty_config.PRETTY_ONE_P])
else:
star = get_star([3, 2], [pretty_config.PRETTY_THREE_P, pretty_config.PRETTY_TWO_P])
if pool_name == 'card':
title = _CURRENT_CARD_POOL_TITLE
up_data = UP_CARD
data = ALL_CARD
else:
title = _CURRENT_CHAR_POOL_TITLE
up_data = UP_CHAR
data = ALL_CHAR
# 有UP池子
if title and star in [x.star for x in up_data]:
all_char_lst = [x for x in data if x.star == star and not x.limited]
# 抽到UP
if random.random() < 1 / len(all_char_lst) * (0.7 / 0.1385):
all_up_star = [x.operators for x in up_data if x.star == star][0]
acquire_operator = random.choice(all_up_star)
if pool_name == 'char':
acquire_operator = acquire_operator.split(']')[1]
acquire_operator = [x for x in data if x.name == acquire_operator][0]
else:
acquire_operator = random.choice([x for x in data if x.star == star and not x.limited])
else:
acquire_operator = random.choice([x for x in data if x.star == star and not x.limited])
return acquire_operator, 3 - star
# 获取up和概率
async def _pretty_init_up_char():
global _CURRENT_CHAR_POOL_TITLE, _CURRENT_CARD_POOL_TITLE, UP_CHAR, UP_CARD, POOL_IMG
_CURRENT_CHAR_POOL_TITLE, _CURRENT_CARD_POOL_TITLE, POOL_IMG, UP_CHAR, UP_CARD = await init_up_char(announcement)
async def reload_pretty_pool():
await _pretty_init_up_char()
return f'当前UP池子{_CURRENT_CHAR_POOL_TITLE} & {_CURRENT_CARD_POOL_TITLE} {POOL_IMG}'

View File

@ -1,171 +0,0 @@
from nonebot.adapters.onebot.v11 import MessageSegment, Message
import random
from .config import DRAW_DATA_PATH, draw_config
from .update_game_info import update_info
from .util import generate_img, init_star_rst, max_card, BaseData, UpEvent, set_list, get_star
from .init_card_pool import init_game_pool
from .announcement import PrtsAnnouncement
from dataclasses import dataclass
from nonebot.log import logger
try:
import ujson as json
except ModuleNotFoundError:
import json
announcement = PrtsAnnouncement()
prts_dict = {}
UP_OPERATOR = []
ALL_OPERATOR = []
_CURRENT_POOL_TITLE = ''
POOL_IMG = ''
@dataclass
class Operator(BaseData):
recruit_only: bool # 公招限定
event_only: bool # 活动获得干员
# special_only: bool # 升变/异格干员
async def prts_draw(count: int = 300):
cnlist = ['★★★★★★', '★★★★★', '★★★★', '★★★']
star_list = [0, 0, 0, 0]
operator_list, operator_dict, six_list, star_list, six_index_list = format_card_information(count, star_list)
up_list = []
tmp = ''
if _CURRENT_POOL_TITLE:
for x in UP_OPERATOR:
for operator in x.operators:
up_list.append(operator)
if x.star == 6:
tmp += f'六星UP{" ".join(x.operators)} \n'
elif x.star == 5:
tmp += f'五星UP{" ".join(x.operators)} \n'
elif x.star == 4:
tmp += f'四星UP{" ".join(x.operators)}'
rst = init_star_rst(star_list, cnlist, six_list, six_index_list, up_list)
if count > 90:
operator_list = set_list(operator_list)
pool_info = f"当前up池: {_CURRENT_POOL_TITLE}\n{tmp}" if _CURRENT_POOL_TITLE else ""
return pool_info + MessageSegment.image(
"base64://" + await generate_img(operator_list, 'prts', star_list)) \
+ '\n' + rst[:-1] + '\n' + max_card(operator_dict)
async def update_prts_info():
global prts_dict, ALL_OPERATOR
url = 'https://wiki.biligame.com/arknights/干员数据表'
data, code = await update_info(url, 'prts', ['头像', '名称', '阵营', '星级', '性别', '是否感染', '获取途径', '初始生命', '初始防御',
'初始法抗', '再部署', '部署费用', '阻挡数', '攻击速度', '标签'])
if code == 200:
prts_dict = data
ALL_OPERATOR = init_game_pool('prts', prts_dict, Operator)
await _init_up_char()
async def init_prts_data():
global prts_dict, ALL_OPERATOR
if draw_config.PRTS_FLAG:
with (DRAW_DATA_PATH / 'prts.json').open('r', encoding='utf8') as f:
prts_dict = json.load(f)
ALL_OPERATOR = init_game_pool('prts', prts_dict, Operator)
await _init_up_char()
# 抽取干员
def _get_operator_card(add: float):
prts_config = draw_config.prts
star = get_star([6, 5, 4, 3], [prts_config.PRTS_SIX_P + add, prts_config.PRTS_FIVE_P, prts_config.PRTS_FOUR_P, prts_config.PRTS_THREE_P])
if _CURRENT_POOL_TITLE:
zooms = [x.zoom for x in UP_OPERATOR if x.star == star]
zoom = 0
weight = 0
# 分配概率和权重
for z in zooms:
if z < 1:
zoom = z
else:
weight = z
up_operator_name = ""
# UPs
try:
if 0 < zoom:
up_operators = [x.operators for x in UP_OPERATOR if x.star == star and x.zoom < 1][0]
up_operator_name = random.choice(up_operators)
acquire_operator = [x for x in ALL_OPERATOR if x.name == up_operator_name][0]
else:
all_star_operators = [x for x in ALL_OPERATOR if x.star == star
and not any([x.limited, x.event_only, x.recruit_only])]
weight_up_operators = [x.operators for x in UP_OPERATOR if x.star == star and x.zoom > 1]
# 权重
if weight_up_operators and random.random() < 1.0 / float(len(all_star_operators)) * weight:
up_operator_name = random.choice(weight_up_operators[0])
acquire_operator = [x for x in ALL_OPERATOR if x.name == up_operator_name][0]
else:
acquire_operator = random.choice(all_star_operators)
except IndexError:
acquire_operator = Operator(up_operator_name, star, True, False, False)
else:
acquire_operator = random.choice([x for x in ALL_OPERATOR if x.star == star
and not any([x.limited, x.event_only, x.recruit_only])])
return acquire_operator, 6 - star
# 整理数据
def format_card_information(count: int, star_list: list):
max_star_lst = [] # 获取的最高星级角色列表
max_index_lst = [] # 获取最高星级角色的次数
obj_list = [] # 获取所有角色
obj_dict = {} # 获取角色次数字典
add = 0.0
count_idx = 0
for i in range(count):
count_idx += 1
obj, code = _get_operator_card(add)
star_list[code] += 1
if code == 0:
max_star_lst.append(obj.name)
max_index_lst.append(i)
add = 0.0
count_idx = 0
elif count_idx > 50:
add += 0.02
try:
obj_dict[obj.name] += 1
except KeyError:
obj_dict[obj.name] = 1
obj_list.append(obj)
return obj_list, obj_dict, max_star_lst, star_list, max_index_lst
# 获取up干员和概率
async def _init_up_char():
global _CURRENT_POOL_TITLE, POOL_IMG, UP_OPERATOR
UP_OPERATOR = []
up_char_dict = await announcement.update_up_char()
_CURRENT_POOL_TITLE = up_char_dict['char']['title']
if _CURRENT_POOL_TITLE:
POOL_IMG = MessageSegment.image(up_char_dict['char']['pool_img'])
up_char_dict = up_char_dict['char']['up_char']
logger.info(f'成功获取明日方舟当前up信息...当前up池: {_CURRENT_POOL_TITLE}')
average_dict = {'6': {}, '5': {}, '4': {}}
for star in up_char_dict.keys():
for key in up_char_dict[star].keys():
if average_dict[star].get(up_char_dict[star][key]):
average_dict[star][up_char_dict[star][key]].append(key)
else:
average_dict[star][up_char_dict[star][key]] = [key]
for star in average_dict.keys():
for str_zoom in average_dict[star].keys():
if str_zoom[0] == '':
zoom = float(str_zoom[1:])
else:
zoom = float(str_zoom) / 100
UP_OPERATOR.append(UpEvent(star=int(star), operators=average_dict[star][str_zoom], zoom=zoom))
async def reload_prts_pool():
await _init_up_char()
return Message(f'当前UP池{_CURRENT_POOL_TITLE} {POOL_IMG}')

View File

@ -1,29 +0,0 @@
from nonebot.rule import Rule
from nonebot.adapters.onebot.v11 import Bot, MessageEvent
from nonebot.typing import T_State
from .config import draw_config
def is_switch(game_name: str) -> Rule:
async def _is_switch(bot: Bot, event: MessageEvent, state: T_State) -> bool:
if game_name == 'prts':
return draw_config.PRTS_FLAG
if game_name == 'genshin':
return draw_config.GENSHIN_FLAG
if game_name == 'pretty':
return draw_config.PRETTY_FLAG
if game_name == 'guardian':
return draw_config.GUARDIAN_FLAG
if game_name == 'pcr':
return draw_config.PCR_FLAG
if game_name == 'azur':
return draw_config.AZUR_FLAG
if game_name == 'fgo':
return draw_config.FGO_FLAG
if game_name == 'onmyoji':
return draw_config.ONMYOJI_FLAG
else:
return False
return Rule(_is_switch)

View File

@ -1,295 +0,0 @@
from typing import Tuple
from .config import DRAW_DATA_PATH
from asyncio.exceptions import TimeoutError
from bs4 import BeautifulSoup
from .util import download_img
from urllib.parse import unquote
from .util import remove_prohibited_str
from utils.http_utils import AsyncHttpx
from nonebot.log import logger
import bs4
import re
try:
import ujson as json
except ModuleNotFoundError:
import json
headers = {'User-Agent': '"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; TencentTraveler 4.0)"'}
async def update_info(url: str, game_name: str, info_list: list = None) -> Tuple[dict, int]:
info_path = DRAW_DATA_PATH / f"{game_name}.json"
try:
with info_path.open('r', encoding='utf8') as f:
data = json.load(f)
except (ValueError, FileNotFoundError):
data = {}
try:
response = await AsyncHttpx.get(url, timeout=7)
soup = BeautifulSoup(response.text, 'lxml')
_tbody = get_tbody(soup, game_name, url)
trs = _tbody.find_all('tr')
att_dict, start_index, index = init_attr(game_name)
if game_name == 'guardian':
start_index = 1
if game_name == 'azur':
start_index = 0
for th in trs[0].find_all('th')[start_index:]:
text = th.text
if text[-1] == '\n':
text = text[:-1]
att_dict[text] = index
index += 1
for tr in trs[1:]:
member_dict = {}
tds = tr.find_all('td')
if not info_list:
info_list = att_dict.keys()
for key in info_list:
key, attr = parse_key(key, game_name)
td = tds[att_dict[key]]
last_tag = unquote(_find_last_tag(td, attr, game_name), 'utf-8')
member_dict[key] = last_tag
member_dict = intermediate_check(member_dict, key, game_name, td)
avatar_img = await _modify_avatar_url(game_name, member_dict["名称"])
member_dict['头像'] = avatar_img if avatar_img else member_dict['头像']
member_dict, name = replace_update_name(member_dict, game_name)
await download_img(member_dict['头像'], game_name, name)
data[name] = member_dict
logger.info(f'{name} is update...')
data = await _last_check(data, game_name)
except TimeoutError:
logger.warning(f'更新 {game_name} 超时...')
return {}, 999
with info_path.open('w', encoding='utf8') as wf:
wf.write(json.dumps(data, ensure_ascii=False, indent=4))
return data, 200
def _find_last_tag(element: bs4.element.Tag, attr: str, game_name: str) -> str:
last_tag = []
for des in element.descendants:
last_tag.append(des)
if len(last_tag) == 1 and last_tag[0] == '\n':
last_tag = ''
elif last_tag[-1] == '\n':
last_tag = last_tag[-2]
else:
last_tag = last_tag[-1]
if attr and str(last_tag):
last_tag = last_tag[attr]
elif str(last_tag).find('<img') != -1:
if last_tag.get('srcset'):
last_tag = str(last_tag.get('srcset')).strip().split(' ')[-2].strip()
else:
last_tag = last_tag['src']
else:
last_tag = str(last_tag)
if str(last_tag) and str(last_tag)[-1] == '\n':
last_tag = str(last_tag)[:-1]
if game_name not in ['pretty', 'pretty_card', 'guardian'] and last_tag.find('http') == -1:
last_tag = last_tag.split('.')[0]
return last_tag
# 获取大图(小图快爬)
async def _modify_avatar_url(game_name: str, char_name: str):
if game_name == 'prts':
res = await AsyncHttpx.get(f'https://wiki.biligame.com/arknights/{char_name}', timeout=7)
soup = BeautifulSoup(res.text, 'lxml')
try:
img_url = str(soup.find('img', {'class': 'img-bg'})['srcset']).split(' ')[-2]
except KeyError:
img_url = str(soup.find('img', {'class': 'img-bg'})['src'])
return img_url
if game_name == 'genshin':
return None
if game_name == 'pretty_card':
res = await AsyncHttpx.get(f'https://wiki.biligame.com/umamusume/{char_name}', timeout=7)
soup = BeautifulSoup(res.text, 'lxml')
try:
img_url = soup.find('div', {'class': 'support_card-left'}).find('div').find('img').get('src')
return img_url
except AttributeError:
logger.warning("pretty_card 获取大图像失败")
return None
if game_name == 'guardian':
# 未上传图片太多,换成像素图
# async with session.get(f'https://wiki.biligame.com/gt/{char_name}', timeout=7) as res:
# soup = BeautifulSoup(await res.text(), 'lxml')
# soup = soup.find('table', {'class': 'wikitable'}).find('tbody').find('tr')
# try:
# img_url = str(soup.find('img', {'class': 'img-kk'})['srcset']).split(' ')[-2]
# except KeyError:
# img_url = str(soup.find('img', {'class': 'img-kk'})['src'])
# except TypeError:
# print(f'{char_name} 图片还未上传,跳过...')
# img_url = ''
# return img_url
return None
# 数据最后处理(是否需要额外数据或处理数据)
async def _last_check(data: dict, game_name: str):
# if game_name == 'prts':
# tasks = []
# for key in data.keys():
# tasks.append(asyncio.ensure_future(_async_update_prts_extra_info(key, session)))
# asyResult = await asyncio.gather(*tasks)
# for x in asyResult:
# for key in x.keys():
# data[key]['获取途径'] = x[key]['获取途径']
if game_name == 'genshin':
for key in data.keys():
res = await AsyncHttpx.get(f'https://wiki.biligame.com/ys/{key}', timeout=7)
soup = BeautifulSoup(res.text, 'lxml')
_trs = ''
for table in soup.find_all('table', {'class': 'wikitable'}):
if str(table).find('常驻/限定') != -1:
_trs = table.find('tbody').find_all('tr')
break
for tr in _trs:
data[key]['常驻/限定'] = '未知'
if str(tr).find('限定UP') != -1:
data[key]['常驻/限定'] = '限定UP'
logger.info(f'原神获取额外数据 {key}...{data[key]["常驻/限定"]}')
break
elif str(tr).find('常驻UP') != -1:
data[key]['常驻/限定'] = '常驻UP'
logger.info(f'原神获取额外数据 {key}...{data[key]["常驻/限定"]}')
break
if game_name == 'pretty':
for keys in data.keys():
for key in data[keys].keys():
r = re.search(r'.*?40px-(.*)图标.png', str(data[keys][key]))
if r:
data[keys][key] = r.group(1)
logger.info(f'赛马娘额外修改数据...{keys}[{key}]=> {r.group(1)}')
if game_name == 'guardian':
for keys in data.keys():
for key in data[keys].keys():
r = re.search(r'.*?-star_(.*).png', str(data[keys][key]))
if r:
data[keys][key] = r.group(1)
logger.info(f'坎公骑士剑额外修改数据...{keys}[{key}] => {r.group(1)}')
return data
# 对抓取每行数据是否需要额外处理?
def intermediate_check(member_dict: dict, key: str, game_name: str, td: bs4.element.Tag):
if game_name == "genshin_arms":
if key == "稀有度":
member_dict["稀有度"] = td.find("img")["alt"].split('.')[0]
if game_name == 'prts':
if key == '获取途径':
msg = re.search('<td.*?>([\\s\\S]*)</td>', str(td)).group(1).strip()
msg = msg[:-1] if msg and msg[-1] == '\n' else msg
if msg.find('<a') != -1:
for a in td.find_all('a'):
msg = msg.replace(str(a), a.text)
member_dict['获取途径'] = msg.split('<br/>')
if game_name == 'pretty':
if key == '初始星级':
member_dict['初始星级'] = len(td.find_all('img'))
if game_name == 'pretty_card':
if key == '获取方式':
obtain = []
for x in str(td.text).replace('\n', '').strip().split(''):
if x:
obtain.append(x)
member_dict['获取方式'] = obtain
if game_name == 'guardian':
if key == '头像':
member_dict['星级'] = str(td.find('span').find('img')['alt'])[-5]
try:
member_dict['头像'] = str(td.find('img')['srcset']).split(' ')[0]
except KeyError:
member_dict['头像'] = str(td.find('img')['src'])
return member_dict
def init_attr(game_name: str):
att_dict = {'头像': 0, '名称': 1}
start_index = 2
index = 2
if game_name == 'guardian':
att_dict = {'头像': 0, '名称': 0}
start_index = 1
index = 1
return att_dict, start_index, index
# 解析key
def parse_key(key: str, game_name):
attr = ''
if game_name == 'genshin_arms':
if key.find('.') != -1:
key = key.split('.')
attr = key[-1]
key = key[0]
return key, attr
# 拿到名称
def replace_update_name(member_dict: dict, game_name: str):
name = member_dict['名称']
if game_name == 'pretty_card':
name = member_dict['中文名']
name = remove_prohibited_str(name)
member_dict['中文名'] = name
else:
name = remove_prohibited_str(name)
member_dict['名称'] = name
return member_dict, name
# 拿到tbody不同游戏tbody可能不同
def get_tbody(soup: bs4.BeautifulSoup, game_name: str, url: str):
max_count = 0
_tbody = None
if game_name == 'guardian_arms':
if url[-2:] == '盾牌':
div = soup.find('div', {'class': 'resp-tabs-container'}).find_all('div', {'class': 'resp-tab-content'})[1]
_tbody = div.find('tbody')
else:
div = soup.find('div', {'class': 'resp-tabs-container'}).find_all('div', {'class': 'resp-tab-content'})[0]
_tbody = div.find('table', {'id': 'CardSelectTr'}).find('tbody')
else:
for tbody in soup.find_all('tbody'):
if len(tbody.find_all('tr')) > max_count:
_tbody = tbody
max_count = len(tbody.find_all('tr'))
return _tbody
# async def _async_update_prts_extra_info(key: str, session: aiohttp.ClientSession):
# for i in range(10):
# try:
# async with session.get(f'https://wiki.biligame.com/arknights/{key}', timeout=7) as res:
# soup = BeautifulSoup(await res.text(), 'lxml')
# obtain = str(soup.find('table', {'class': 'wikitable'}).find('tbody').find_all('td')[-1])
# obtain = re.search(r'<td.*?>([\s\S]*)</.*?', obtain).group(1)
# obtain = obtain[:-1] if obtain[-1] == '\n' else obtain
# if obtain.find('<br/>'):
# obtain = obtain.split('<br/>')
# elif obtain.find('<br>'):
# obtain = obtain.split('<br>')
# for i in range(len(obtain)):
# if obtain[i].find('<a') != -1:
# text = ''
# for msg in obtain[i].split('</a>'):
# r = re.search('>(.*)', msg)
# if r:
# text += r.group(1) + ' '
# obtain[i] = obtain[i].split('<a')[0] + text[:-1] + obtain[i].split('</a>')[-1]
# print(f'明日方舟获取额外信息 {key}...{obtain}')
# x = {key: {}}
# x[key]['获取途径'] = obtain
# return x
# except TimeoutError:
# print(f'访问 https://wiki.biligame.com/arknights/{key} 第 {i}次 超时...已再次访问')
# return {}

View File

@ -1,156 +0,0 @@
from .config import DRAW_DATA_PATH, draw_config
from asyncio.exceptions import TimeoutError
from .util import download_img
from bs4 import BeautifulSoup
from .util import remove_prohibited_str
from utils.http_utils import AsyncHttpx
from nonebot.log import logger
import asyncio
try:
import ujson as json
except ModuleNotFoundError:
import json
headers = {'User-Agent': '"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; TencentTraveler 4.0)"'}
async def update_requests_info(game_name: str):
info_path = DRAW_DATA_PATH / f"{game_name}.json"
try:
with info_path.open('r', encoding='utf8') as f:
data = json.load(f)
except (ValueError, FileNotFoundError):
data = {}
try:
if game_name in ['fgo', 'fgo_card']:
if game_name == 'fgo':
url = 'http://fgo.vgtime.com/servant/ajax?card=&wd=&ids=&sort=12777&o=desc&pn='
else:
url = 'http://fgo.vgtime.com/equipment/ajax?wd=&ids=&sort=12958&o=desc&pn='
for i in range(9999):
response = await AsyncHttpx.get(f'{url}{i}', timeout=7)
fgo_data = json.loads(response.text)
if int(fgo_data['nums']) == 0:
break
for x in fgo_data['data']:
x['name'] = remove_prohibited_str(x['name'])
key = x['name']
data = add_to_data(data, x, game_name)
await download_img(data[key]['头像'], game_name, key)
logger.info(f'{key} is update...')
if game_name == 'onmyoji':
url = 'https://yys.res.netease.com/pc/zt/20161108171335/js/app/all_shishen.json?v74='
response = await AsyncHttpx.get(url, timeout=7)
onmyoji_data = response.json()
for x in onmyoji_data:
x['name'] = remove_prohibited_str(x['name'])
key = x['name']
data = add_to_data(data, x, game_name)
logger.info(f'{key} is update...')
data = await _last_check(data, game_name)
except TimeoutError:
logger.warning(f'更新 {game_name} 超时...')
return {}, 999
except Exception as e:
logger.warning(f'更新 {game_name} 失败 {type(e)}{e}...')
return {}, 999
with info_path.open('w', encoding='utf8') as wf:
json.dump(data, wf, ensure_ascii=False, indent=4)
return data, 200
# 添加到字典
def add_to_data(data: dict, x: dict, game_name: str) -> dict:
member_dict = {}
if game_name == 'fgo':
member_dict = {
'id': x['id'],
'card_id': x['charid'],
'头像': x['icon'],
'名称': x['name'],
'职阶': x['classes'],
'星级': x['star'],
'hp': x['lvmax4hp'],
'atk': x['lvmax4atk'],
'card_quick': x['cardquick'],
'card_arts': x['cardarts'],
'card_buster': x['cardbuster'],
'宝具': x['tprop'],
}
if game_name == 'fgo_card':
member_dict = {
'id': x['id'],
'card_id': x['equipid'],
'头像': x['icon'],
'名称': x['name'],
'星级': x['star'],
'hp': x['lvmax_hp'],
'atk': x['lvmax_atk'],
'skill_e': x['skill_e'].split('<br />')[: -1],
}
if game_name == 'onmyoji':
member_dict = {
'id': x['id'],
'名称': x['name'],
'星级': x['level'],
}
data[member_dict['名称']] = member_dict
return data
# 获取额外数据
async def _last_check(data: dict, game_name: str) -> dict:
if game_name == 'fgo':
url = 'http://fgo.vgtime.com/servant/'
tasks = []
semaphore = asyncio.Semaphore(draw_config.SEMAPHORE)
for key in data.keys():
tasks.append(asyncio.ensure_future(
_async_update_fgo_extra_info(url, key, data[key]['id'], semaphore)))
result = await asyncio.gather(*tasks)
for x in result:
for key in x.keys():
data[key]['入手方式'] = x[key]['入手方式']
if game_name == 'onmyoji':
url = 'https://yys.163.com/shishen/{}.html'
for key in data.keys():
response = await AsyncHttpx.get(f'{url.format(data[key]["id"])}', timeout=7)
soup = BeautifulSoup(response.text, 'lxml')
data[key]['头像'] = "https:" + soup.find('div', {'class': 'pic_wrap'}).find('img')['src']
await download_img(data[key]['头像'], game_name, key)
return data
async def _async_update_fgo_extra_info(url: str, key: str, _id: str, semaphore):
# 防止访问超时
async with semaphore:
for i in range(10):
try:
response = await AsyncHttpx.get(f'{url}{_id}', timeout=7)
soup = BeautifulSoup(response.text, 'lxml')
obtain = soup.find('table', {'class': 'uk-table uk-codex-table'}).find_all('td')[-1].text
if obtain.find('限时活动免费获取 活动结束后无法获得') != -1:
obtain = ['活动获取']
elif obtain.find('非限时UP无法获得') != -1:
obtain = ['限时召唤']
else:
if obtain.find('&') != -1:
obtain = obtain.strip().split('&')
else:
obtain = obtain.strip().split(' ')
logger.info(f'Fgo获取额外信息 {key}....{obtain}')
x = {key: {}}
x[key]['入手方式'] = obtain
return x
except TimeoutError:
logger.warning(f'访问{url}{_id}{i}次 超时...已再次访问')
except Exception as e:
logger.warning(f'访问{url}{_id}{i}次 发生错误:{e}...已再次访问')
return {}

View File

@ -1,225 +0,0 @@
from typing import Tuple
from .config import DRAW_DATA_PATH, draw_config
from asyncio.exceptions import TimeoutError
from bs4 import BeautifulSoup
from .util import download_img
from .util import remove_prohibited_str
from urllib.parse import unquote
from utils.http_utils import AsyncHttpx
from nonebot.log import logger
import bs4
import asyncio
try:
import ujson as json
except ModuleNotFoundError:
import json
headers = {
"User-Agent": '"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; TencentTraveler 4.0)"'
}
async def update_simple_info(url: str, game_name: str) -> Tuple[dict, int]:
info_path = DRAW_DATA_PATH / f"{game_name}.json"
try:
with info_path.open("r", encoding="utf8") as f:
data = json.load(f)
except (ValueError, FileNotFoundError):
data = {}
try:
response = await AsyncHttpx.get(url, timeout=7)
soup = BeautifulSoup(response.text, "lxml")
divs = get_char_divs(soup, game_name)
for div in divs:
type_lst = get_type_lst(div, game_name)
index = 0
for char_lst in type_lst:
try:
contents = get_char_lst_contents(char_lst, game_name)
except AttributeError:
continue
for char in contents[1:]:
try:
data = await retrieve_char_data(
char, game_name, data, index
)
except AttributeError:
continue
index += 1
data = await _last_check(data, game_name)
except TimeoutError:
logger.warning(f"更新 {game_name} 超时...")
return {}, 999
with info_path.open("w", encoding="utf8") as wf:
wf.write(json.dumps(data, ensure_ascii=False, indent=4))
return data, 200
# 获取所有包含需要图片的divs
def get_char_divs(soup: bs4.BeautifulSoup, game_name: str) -> bs4.element.ResultSet:
# if game_name == "pcr":
# return soup.find_all("div", {"class": "tabbertab"})
if game_name in ["azur", "pcr"]:
return soup.find_all("div", {"class": "resp-tabs"})
# 拿到所有类型
def get_type_lst(div: bs4.element.Tag, game_name: str):
if game_name in ["pcr", "azur"]:
return div.find("div", {"class": "resp-tabs-container"}).find_all(
"div", {"class": "resp-tab-content"}
)
# 获取所有角色div
def get_char_lst_contents(char_lst: bs4.element.Tag, game_name: str):
contents = []
# print(len(char_lst.find_all('tr')))
if game_name == "pcr":
contents = char_lst.contents
if game_name == "azur":
contents = char_lst.find("table").find("tbody").contents[-1].find("td").contents
return [x for x in contents if x != "\n"]
# 额外数据
async def _last_check(
data: dict, game_name: str
) -> dict:
if game_name == "azur":
idx = 1
for url in [
"https://patchwiki.biligame.com/images/blhx/thumb/1/15/pxho13xsnkyb546tftvh49etzdh74cf.png/60px"
"-舰娘头像外框普通.png",
"https://patchwiki.biligame.com/images/blhx/thumb/a/a9/k8t7nx6c8pan5vyr8z21txp45jxeo66.png/60px"
"-舰娘头像外框稀有.png",
"https://patchwiki.biligame.com/images/blhx/thumb/a/a5/5whkzvt200zwhhx0h0iz9qo1kldnidj.png/60px"
"-舰娘头像外框精锐.png",
"https://patchwiki.biligame.com/images/blhx/thumb/a/a2/ptog1j220x5q02hytpwc8al7f229qk9.png/60px-"
"舰娘头像外框超稀有.png",
]:
await download_img(url, "azur", f"{idx}_star")
idx += 1
tasks = []
semaphore = asyncio.Semaphore(draw_config.SEMAPHORE)
for key in data.keys():
tasks.append(
asyncio.ensure_future(
_async_update_azur_extra_info(key, semaphore)
)
)
result = await asyncio.gather(*tasks)
for x in result:
for key in x.keys():
data[key]["获取途径"] = x[key]["获取途径"]
return data
azur_type = {
"0": "驱逐",
"1": "轻巡",
"2": "重巡",
"3": "超巡",
"4": "战巡",
"5": "战列",
"6": "航母",
"7": "航站",
"8": "轻航",
"9": "重炮",
"10": "维修",
"11": "潜艇",
"12": "运输",
}
# 整理数据
async def retrieve_char_data(
char: bs4.element.Tag,
game_name: str,
data: dict,
index: int = 0,
) -> dict:
member_dict = {}
if game_name == "pcr":
member_dict = {
"头像": unquote(char.find("a").find("img")["src"]),
"名称": remove_prohibited_str(char.find("a")["title"]),
"星级": 3 - index,
}
if game_name == "azur":
char = char.find("div").find("div").find("div")
avatar_img = char.find("a").find("img")
char = char.find("div")
try:
member_dict["名称"] = remove_prohibited_str(char.find("a")["title"])
except TypeError:
member_dict["名称"] = char.find("a")["title"][:-4]
try:
member_dict["头像"] = unquote(str(avatar_img["srcset"]).split(" ")[-2])
except KeyError:
member_dict["头像"] = unquote(str(avatar_img["src"]).split(" ")[-2])
except TypeError:
member_dict["头像"] = "img link not find..."
logger.warning(f'{member_dict["名称"]} 图片缺失....')
star = char.find("img")["alt"]
if star == "舰娘头像外框普通.png":
star = 1
elif star == "舰娘头像外框稀有.png":
star = 2
elif star == "舰娘头像外框精锐.png":
star = 3
elif star == "舰娘头像外框超稀有.png":
star = 4
elif star == "舰娘头像外框海上传奇.png":
star = 5
elif star in [
"舰娘头像外框最高方案.png",
"舰娘头像外框决战方案.png",
"舰娘头像外框超稀有META.png",
"舰娘头像外框精锐META.png",
]:
star = 6
else:
star = 6
member_dict["星级"] = star
member_dict["类型"] = azur_type[str(index)]
await download_img(member_dict["头像"], game_name, member_dict["名称"])
data[member_dict["名称"]] = member_dict
logger.info(f'{member_dict["名称"]} is update...')
return data
async def _async_update_azur_extra_info(
key: str, semaphore
):
if key[-1] == "":
return {key: {"获取途径": ["无法建造"]}}
async with semaphore:
for i in range(20):
try:
res = await AsyncHttpx.get(f"https://wiki.biligame.com/blhx/{key}", timeout=7)
soup = BeautifulSoup(res.text, "lxml")
try:
construction_time = str(
soup.find("table", {"class": "wikitable sv-general"}).find(
"tbody"
)
)
x = {key: {"获取途径": []}}
if construction_time.find("无法建造") != -1:
x[key]["获取途径"].append("无法建造")
elif construction_time.find("活动已关闭") != -1:
x[key]["获取途径"].append("活动限定")
else:
x[key]["获取途径"].append("可以建造")
logger.info(f'碧蓝航线获取额外信息 {key}...{x[key]["获取途径"]}')
except AttributeError:
x = {key: {"获取途径": []}}
logger.warning(f"碧蓝航线获取额外信息错误 {key}...{[]}")
return x
except TimeoutError:
logger.warning(
f"访问 https://wiki.biligame.com/blhx/{key}{i}次 超时...已再次访问"
)
return {}

View File

@ -1,356 +0,0 @@
import platform
from asyncio.exceptions import TimeoutError
from utils.utils import cn2py
from utils.http_utils import AsyncHttpx
from typing import List, Tuple, Union, Set
from .config import draw_config, DRAW_IMAGE_PATH
import nonebot
from PIL import UnidentifiedImageError
from utils.image_utils import BuildImage
from nonebot.adapters.onebot.v11 import MessageSegment
from nonebot.log import logger
import random
from dataclasses import dataclass
import os
import asyncio
driver = nonebot.get_driver()
loop = asyncio.get_event_loop()
@dataclass
class BaseData:
name: str
star: int
limited: bool # 限定
@dataclass
class UpEvent:
star: int # 对应up星级
operators: List[BaseData] # 干员列表
zoom: float # up提升倍率
async def download_img(url: str, path: str, name: str) -> bool:
path = path.split("_")[0]
codename = cn2py(name)
img_path = DRAW_IMAGE_PATH / f"{path}" / f"{codename}.png"
if not img_path.exists():
try:
if await AsyncHttpx.download_file(url, img_path):
logger.info(
f"下载 {draw_config.path_dict[path]} 图片成功,名称:{name}url{url}"
)
return True
except TimeoutError:
logger.warning(f"下载 {draw_config.path_dict[path]} 图片超时,名称:{name}url{url}")
# logger.info(f'{path_dict[path]} 图片 {name} 已存在')
return False
@driver.on_startup
def _check_dir():
for dir_name in draw_config.path_dict.keys():
dir_path = DRAW_IMAGE_PATH / f"{dir_name}"
if not dir_path.exists():
dir_path.mkdir(parents=True, exist_ok=True)
async def generate_img(
card_set: Union[Set[BaseData], List[BaseData]], game_name: str, star_list: list
) -> str:
# try:
img_list = []
background_list = []
for x in card_set:
if game_name == "prts":
if x.star == 6:
background_list.append("#FFD700")
elif x.star == 5:
background_list.append("#DAA520")
elif x.star == 4:
background_list.append("#9370D8")
else:
background_list.append("white")
img_path = DRAW_IMAGE_PATH / f"{game_name}" / f"{x.star}_star.png"
if game_name == "azur":
if img_path.exists():
background_list.append(str(img_path))
py_name = cn2py(x.name)
img_list.append(str(DRAW_IMAGE_PATH / f"{game_name}" / f"{py_name}.png"))
img_len = len(img_list)
w = 100 * 10
if img_len <= 10:
w = 100 * img_len
h = 100
elif img_len % 10 == 0:
h = 100 * int(img_len / 10)
else:
h = 100 * int(img_len / 10) + 100
card_img = await asyncio.get_event_loop().run_in_executor(
None, _pst, h, img_list, game_name, background_list
)
num = 0
for n in star_list:
num += n
A = BuildImage(w, h)
A.paste(card_img)
return A.pic2bs4()
def _pst(h: int, img_list: list, game_name: str, background_list: list):
card_img = BuildImage(100 * 10, h, 100, 100)
idx = 0
for img in img_list:
try:
if game_name == "prts":
bk = BuildImage(100, 100, color=background_list[idx])
b = BuildImage(94, 94, background=img)
bk.paste(b, (3, 3))
b = bk
elif game_name == "azur" and background_list:
bk = BuildImage(100, 100, background=background_list[idx])
b = BuildImage(98, 90, background=img)
bk.paste(b, (1, 5))
b = bk
else:
try:
b = BuildImage(100, 100, background=img)
except UnidentifiedImageError as e:
logger.warning(f"无法识别图片 已删除图片,下次更新重新下载... e{e}")
if os.path.exists(img):
os.remove(img)
b = BuildImage(100, 100, color="black")
except FileNotFoundError:
logger.warning(f"{img} not exists")
b = BuildImage(100, 100, color="black")
card_img.paste(b)
idx += 1
return card_img
# 初始化输出数据
def init_star_rst(
star_list: list,
cnlist: list,
max_star_list: list,
max_star_index_list: list,
up_list: list = None,
) -> str:
if not up_list:
up_list = []
rst = ""
for i in range(len(star_list)):
if star_list[i]:
rst += f"[{cnlist[i]}×{star_list[i]}] "
rst += "\n"
for i in range(len(max_star_list)):
if max_star_list[i] in up_list:
rst += f"{max_star_index_list[i]+1} 抽获取UP {max_star_list[i]}\n"
else:
rst += f"{max_star_index_list[i]+1} 抽获取 {max_star_list[i]}\n"
return rst
# 更好的初始化
def init_rst(
max_star_char_dict: dict,
star_num_list: List[int],
star: List[str],
up_list: list = None,
):
# print(max_star_char_dict)
# print(star_num_list)
# print(up_list)
up_list = up_list if up_list else []
rst = ""
for i in range(len(star_num_list)):
if star_num_list[i]:
rst += f"[{star[i]}×{star_num_list[i]}] "
rst += "\n"
_tmp = []
for name in max_star_char_dict.keys():
_tmp += max_star_char_dict[name]
for index in sorted(_tmp):
for name in max_star_char_dict.keys():
if index in max_star_char_dict[name]:
if name in up_list:
rst += f"{index} 抽获取UP {name}\n"
else:
rst += f"{index} 抽获取 {name}\n"
return rst[:-1] if rst else ""
def max_card(_dict: dict):
_max_value = max(_dict.values())
_max_user = list(_dict.keys())[list(_dict.values()).index(_max_value)]
return f"抽取到最多的是{_max_user},共抽取了{_max_value}"
# ThreeHighest = nlargest(3, operator_dict, key=operator_dict.get)
# rst = '最喜欢你的前三位是干员是:\n'
# for name in ThreeHighest:
# rst += f'{name} 共投了 {operator_dict[name]} 份简历\n'
# return rst[:-1]
# 获取up和概率
async def init_up_char(announcement):
UP_CHAR = []
UP_ARMS = []
tmp = ""
up_char_dict = await announcement.update_up_char()
for x in list(up_char_dict.keys()):
tmp += up_char_dict[x]["title"] + "[\n]"
tmp = tmp.split("[\n]")
_CURRENT_CHAR_POOL_TITLE = tmp[0]
if len(up_char_dict) > 1:
_CURRENT_ARMS_POOL_TITLE = tmp[1]
else:
_CURRENT_ARMS_POOL_TITLE = ""
POOL_IMG = ""
x = [x for x in list(up_char_dict.keys())]
if _CURRENT_CHAR_POOL_TITLE:
POOL_IMG += MessageSegment.image(up_char_dict[x[0]]["pool_img"])
try:
if _CURRENT_ARMS_POOL_TITLE:
POOL_IMG += MessageSegment.image(up_char_dict[x[1]]["pool_img"])
except (IndexError, KeyError):
pass
logger.info(
f"成功获取{announcement.game_name}当前up信息...当前up池: {_CURRENT_CHAR_POOL_TITLE} & {_CURRENT_ARMS_POOL_TITLE}"
)
for key in up_char_dict.keys():
for star in up_char_dict[key]["up_char"].keys():
up_char_lst = []
for char in up_char_dict[key]["up_char"][star].keys():
up_char_lst.append(char)
if up_char_lst:
if key == "char":
UP_CHAR.append(
UpEvent(star=int(star), operators=up_char_lst, zoom=0)
)
else:
UP_ARMS.append(
UpEvent(star=int(star), operators=up_char_lst, zoom=0)
)
return (
_CURRENT_CHAR_POOL_TITLE,
_CURRENT_ARMS_POOL_TITLE,
POOL_IMG,
UP_CHAR,
UP_ARMS,
)
def is_number(s) -> bool:
try:
float(s)
return True
except ValueError:
pass
try:
import unicodedata
unicodedata.numeric(s)
return True
except (TypeError, ValueError):
pass
return False
def set_list(lst: List[BaseData]) -> list:
tmp = []
name_lst = []
for x in lst:
if x.name not in name_lst:
tmp.append(x)
name_lst.append(x.name)
return tmp
# 获取星级
def get_star(star_lst: List[int], probability_lst: List[float]) -> int:
rand = random.random()
add = 0
tmp_lst = [(0, probability_lst[0])]
for i in range(1, len(probability_lst) - 1):
add += probability_lst[i - 1]
tmp_lst.append((tmp_lst[i - 1][1], probability_lst[i] + add))
tmp_lst.append((tmp_lst[-1][1], 1))
for i in range(len(tmp_lst)):
if tmp_lst[i][0] <= rand <= tmp_lst[i][1]:
return star_lst[i]
# 整理数据
def format_card_information(
count: int, star_list: List[int], func, pool_name: str = "", guaranteed: bool = True
):
max_star_lst = [] # 获取的最高星级角色列表
max_index_lst = [] # 获取最高星级角色的次数
obj_list = [] # 获取所有角色
obj_dict = {} # 获取角色次数字典
_count = -1
if guaranteed:
_count = 0
for i in range(count):
if guaranteed:
_count += 1
if pool_name:
if _count == 10:
obj, code = func(pool_name, 2)
_count = 0
else:
obj, code = func(pool_name)
else:
if _count == 10:
obj, code = func(mode=2)
_count = 0
else:
obj, code = func()
star_list[code] += 1
if code == 0:
max_star_lst.append(obj.name)
max_index_lst.append(i)
if guaranteed:
_count = 0
if code == 1:
if guaranteed:
_count = 0
try:
obj_dict[obj.name] += 1
except KeyError:
obj_dict[obj.name] = 1
obj_list.append(obj)
return obj_list, obj_dict, max_star_lst, star_list, max_index_lst
# 检测次数是否合法
def check_num(num: str, max_num: int) -> Tuple[str, bool]:
if is_number(num):
try:
num = int(num)
except ValueError:
return "必!须!是!数!字!", False
if num > max_num:
return "一井都满不足不了你嘛!快爬开!", False
if num < 1:
return "虚空抽卡???", False
else:
return str(num), True
# 移除windows和linux下特殊字符
def remove_prohibited_str(name: str) -> str:
if platform.system().lower() == "windows":
tmp = ""
for i in name:
if i not in ["\\", "/", ":", "*", "?", '"', "<", ">", "|"]:
tmp += i
name = tmp
else:
name = name.replace("/", "\\")
return name