This commit is contained in:
hibiki 2021-06-17 09:43:03 +08:00
parent c10cb9987a
commit 55070cf54a
15 changed files with 490 additions and 232 deletions

View File

@ -17,7 +17,6 @@ RSSHUBAPP: str = "https://docs.rsshub.app/" # rsshub
TL_KEY: List[str] = []
# 数据库(必要)
# 如果填写了bind就不需要再填写后面的字段了#
# 示例:"bind": "postgresql://user:password@127.0.0.1:5432/database"
bind: str = '' # 数据库连接url
sql_name: str = 'postgresql'
@ -55,6 +54,7 @@ FGO_FLAG = True # 命运-冠位指定FGO
ONMYOJI_FLAG = True # 阴阳师
PCR_TAI = True # pcr是否开启台服卡池
SEMAPHORE = 5 # 限制更新碧蓝航线和FGO数据并发数
ADMIN_DEFAULT_AUTH: int = 5 # 默认群管理员权限
@ -84,10 +84,10 @@ MUTE_LEVEL: int = 5 # 更改禁言设置权限
# 需要为哪些群更新最新版gocq吗上传最新版gocq
# 示例:[434995955, 239483248]
UPDATE_GOCQ_GROUP: List[int] = []
UPDATE_GOCQ_GROUP: List[int] = [774261838]
# 代理
system_proxy: str = ''
system_proxy: str = 'http://127.0.0.1:7890'
buff_proxy: str = ''
# 是否存储色图

View File

@ -4,6 +4,8 @@ import re
from datetime import datetime
from .config import DRAW_PATH
from pathlib import Path
from asyncio.exceptions import TimeoutError
from services.log import logger
try:
import ujson as json
except ModuleNotFoundError:
@ -13,9 +15,11 @@ headers = {'User-Agent': '"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Te
prts_up_char = Path(DRAW_PATH + "/draw_card_up/prts_up_char.json")
genshin_up_char = Path(DRAW_PATH + "/draw_card_up/genshin_up_char.json")
pretty_up_char = Path(DRAW_PATH + "/draw_card_up/pretty_up_char.json")
prts_url = "https://wiki.biligame.com/arknights/%E6%96%B0%E9%97%BB%E5%85%AC%E5%91%8A"
genshin_url = "https://wiki.biligame.com/ys/%E7%A5%88%E6%84%BF"
pretty_url = "https://wiki.biligame.com/umamusume/%E5%85%AC%E5%91%8A"
# 是否过时
@ -89,47 +93,52 @@ class PrtsAnnouncement:
async def update_up_char():
prts_up_char.parent.mkdir(parents=True, exist_ok=True)
data = {'up_char': {'6': {}, '5': {}, '4': {}}, 'title': '', 'time': '', 'pool_img': ''}
text, title = await PrtsAnnouncement.get_announcement_text()
soup = BeautifulSoup(text, 'lxml')
data['title'] = title
context = soup.find('div', {'id': 'mw-content-text'}).find('div')
data['pool_img'] = str(context.find('div', {'class': 'center'}).find('div').find('a').
find('img').get('srcset')).split(' ')[-2]
# print(context.find_all('p'))
for p in context.find_all('p')[1:]:
if p.text.find('活动时间') != -1:
pr = re.search(r'.*?活动时间:(.*)', p.text)
data['time'] = pr.group(1)
elif p.text.find('★★★★★★') != -1:
chars, probability = _get_up_char(r'.*?★★★★★★:(.*?).*?出率的?(.*?)%.*?.*?', p.text)
slt = '/'
if chars.find('\\') != -1:
slt = '\\'
for char in chars.split(slt):
data['up_char']['6'][char.strip()] = probability.strip()
elif p.text.find('★★★★★') != -1:
chars, probability = _get_up_char(r'.*?★★★★★:(.*?).*?出率的?(.*?)%.*?.*?', p.text)
slt = '/'
if chars.find('\\') != -1:
slt = '\\'
for char in chars.split(slt):
data['up_char']['5'][char.strip()] = probability.strip()
elif p.text.find('★★★★') != -1:
chars, probability = _get_up_char(r'.*?★★★★:(.*?).*?出率的?(.*?)%.*?.*?', p.text)
slt = '/'
if chars.find('\\') != -1:
slt = '\\'
for char in chars.split(slt):
data['up_char']['4'][char.strip()] = probability.strip()
break
pr = re.search(r'.*?★:(.*?)(在(.*?)★.*?以(.*?)倍权值.*?.*?', p.text)
if pr:
char = pr.group(1)
star = pr.group(2)
weight = pr.group(3)
char = char.replace('[限定]', '').replace('[', '').replace(']', '')
data['up_char'][star][char.strip()] = f'{weight}'
# data['time'] = '03月09日16:00 - 05月23日03:59'
try:
text, title = await PrtsAnnouncement.get_announcement_text()
soup = BeautifulSoup(text, 'lxml')
data['title'] = title
context = soup.find('div', {'id': 'mw-content-text'}).find('div')
data['pool_img'] = str(context.find('div', {'class': 'center'}).find('div').find('a').
find('img').get('srcset')).split(' ')[-2]
# print(context.find_all('p'))
for p in context.find_all('p')[1:]:
if p.text.find('活动时间') != -1:
pr = re.search(r'.*?活动时间:(.*)', p.text)
data['time'] = pr.group(1)
elif p.text.find('★★★★★★') != -1:
chars, probability = _get_up_char(r'.*?★★★★★★:(.*?).*?出率的?(.*?)%.*?.*?', p.text)
slt = '/'
if chars.find('\\') != -1:
slt = '\\'
for char in chars.split(slt):
data['up_char']['6'][char.strip()] = probability.strip()
elif p.text.find('★★★★★') != -1:
chars, probability = _get_up_char(r'.*?★★★★★:(.*?).*?出率的?(.*?)%.*?.*?', p.text)
slt = '/'
if chars.find('\\') != -1:
slt = '\\'
for char in chars.split(slt):
data['up_char']['5'][char.strip()] = probability.strip()
elif p.text.find('★★★★') != -1:
chars, probability = _get_up_char(r'.*?★★★★:(.*?).*?出率的?(.*?)%.*?.*?', p.text)
slt = '/'
if chars.find('\\') != -1:
slt = '\\'
for char in chars.split(slt):
data['up_char']['4'][char.strip()] = probability.strip()
break
pr = re.search(r'.*?★:(.*?)(在(.*?)★.*?以(.*?)倍权值.*?.*?', p.text)
if pr:
char = pr.group(1)
star = pr.group(2)
weight = pr.group(3)
char = char.replace('[限定]', '').replace('[', '').replace(']', '')
data['up_char'][star][char.strip()] = f'{weight}'
# data['time'] = '03月09日16:00 - 05月23日03:5
except TimeoutError:
print(f'更新明日方舟UP池信息超时...')
with open(prts_up_char, 'r', encoding='utf8') as f:
data = json.load(f)
return check_write(data, prts_up_char)
@ -180,6 +189,10 @@ class GenshinAnnouncement:
for a in trs[3].find('td').find_all('a'):
char_name = a['title']
data[itype]['up_char']['4'][char_name] = "50"
except TimeoutError as e:
logger.warning(f'更新原神UP池信息超时...')
with open(genshin_up_char, 'r', encoding='utf8') as f:
data = json.load(f)
except Exception as e:
print(f'更新原神UP失败疑似UP池已结束 e{e}')
with open(genshin_up_char, 'r', encoding='utf8') as f:
@ -192,4 +205,91 @@ class GenshinAnnouncement:
return check_write(data, genshin_up_char, 'genshin')
class PrettyAnnouncement:
@staticmethod
async def get_announcement_text():
async with aiohttp.ClientSession(headers=headers) as session:
async with session.get(pretty_url, timeout=7) as res:
soup = BeautifulSoup(await res.text(), 'lxml')
divs = soup.find('div', {'id': 'mw-content-text'}).find('div').find_all('div')
for div in divs:
a = div.find('a')
try:
title = a['title']
except (KeyError, TypeError):
continue
if title.find('新角色追加') != -1:
url = a['href']
break
async with session.get(f'https://wiki.biligame.com/{url}', timeout=7) as res:
return await res.text(), title[:-2]
@staticmethod
async def update_up_char():
data = {
'char': {'up_char': {'3': {}, '2': {}, '1': {}}, 'title': '', 'time': '', 'pool_img': ''},
'card': {'up_char': {'3': {}, '2': {}, '1': {}}, 'title': '', 'time': '', 'pool_img': ''}
}
try:
text, title = await PrettyAnnouncement.get_announcement_text()
soup = BeautifulSoup(text, 'lxml')
context = soup.find('div', {'class': 'toc-sticky'})
if not context:
context = soup.find('div', {'class': 'mw-parser-output'})
data['char']['title'] = title
data['card']['title'] = title
time = str(context.find_all('big')[1].text)
time = time.replace('', '-').replace('/', '').split(' ')
time = time[0] + '' + time[1] + ' - ' + time[3] + '' + time[4]
data['char']['time'] = time
data['card']['time'] = time
for p in context.find_all('p'):
if str(p).find('当期UP赛马娘') != -1:
data['char']['pool_img'] = p.find('img')['src']
r = re.findall(r'.*?当期UP赛马娘([\s\S]*)<奖励内容>.*?', str(p))
if r:
for x in r:
x = str(x).split('\n')
for msg in x:
if msg.find('') != -1:
msg = msg.replace('<br/>', '')
msg = msg.split(' ')
if (star := len(msg[0].strip())) == 3:
data['char']['up_char']['3'][msg[1]] = '70'
elif star == 2:
data['char']['up_char']['2'][msg[1]] = '70'
elif star == 1:
data['char']['up_char']['1'][msg[1]] = '70'
if str(p).find('当期UP对象') != -1 and str(p).find('赛马娘') == -1:
data['card']['pool_img'] = p.find('img')['src']
r = re.search(r'■全?新?支援卡当期UP对象([\s\S]*)</p>', str(p))
if r:
rmsg = r.group(1)
rmsg = rmsg.split('<br/>')
for x in rmsg[1:]:
x = x.replace('\n', '').replace('', '')
x = x.split(' ')
if x[0] == 'SSR':
data['card']['up_char']['3'][x[1]] = '70'
if x[0] == 'SR':
data['card']['up_char']['2'][x[1]] = '70'
if x[0] == 'R':
data['card']['up_char']['1'][x[1]] = '70'
# 日文->中文
with open(DRAW_PATH + 'pretty_card.json', 'r', encoding='utf8') as f:
all_data = json.load(f)
for star in data['card']['up_char'].keys():
for name in list(data['card']['up_char'][star].keys()):
char_name = name.split(']')[1].strip()
tp_name = name[name.find('['): name.find(']') + 1].strip().replace('[', '').replace(']', '')
for x in all_data.keys():
if all_data[x]['名称'].find(tp_name) != -1 and all_data[x]['关联角色'] == char_name:
data['card']['up_char'][star].pop(name)
data['card']['up_char'][star][all_data[x]['中文名']] = '70'
except TimeoutError:
logger.warning(f'更新赛马娘UP池信息超时...')
with open(pretty_up_char, 'r', encoding='utf8') as f:
data = json.load(f)
except Exception as e:
logger.error(f'赛马娘up更新失败 {type(e)}{e}')
return check_write(data, pretty_up_char, 'pretty')

View File

@ -2,7 +2,7 @@ import nonebot
from pathlib import Path
from configs.path_config import DATA_PATH
from configs.config import FGO_FLAG, PCR_FLAG, AZUR_FLAG, PRTS_FLAG,\
PRETTY_FLAG, GUARDIAN_FLAG, GENSHIN_FLAG, ONMYOJI_FLAG, PCR_TAI
PRETTY_FLAG, GUARDIAN_FLAG, GENSHIN_FLAG, ONMYOJI_FLAG, PCR_TAI, SEMAPHORE
try:
import ujson as json
except ModuleNotFoundError:
@ -25,6 +25,7 @@ FGO_FLAG = FGO_FLAG
ONMYOJI_FLAG = ONMYOJI_FLAG
PCR_TAI = PCR_TAI
SEMAPHORE = SEMAPHORE
# 方舟概率
PRTS_SIX_P = 0.02

View File

@ -8,6 +8,7 @@ def init_game_pool(game: str, data: dict, Operator: Any):
limited = False
recruit_only = False
event_only = False
print(key)
if '限定寻访' in data[key]['获取途径']:
limited = True
if '干员寻访' not in data[key]['获取途径'] and '公开招募' in data[key]['获取途径']:

View File

@ -1,14 +1,15 @@
import os
import nonebot
from nonebot.adapters.cqhttp import MessageSegment
from .update_game_info import update_info
from .announcement import PrettyAnnouncement
from .util import download_img, init_star_rst, generate_img, max_card, BaseData, \
set_list, get_star, format_card_information
set_list, get_star, format_card_information, UpEvent
import random
from .config import PRETTY_THREE_P, PRETTY_TWO_P, DRAW_PATH, PRETTY_ONE_P, PRETTY_FLAG
from dataclasses import dataclass
from .init_card_pool import init_game_pool
import asyncio
from util.init_result import image
from configs.path_config import IMAGE_PATH
try:
import ujson as json
@ -20,6 +21,12 @@ driver: nonebot.Driver = nonebot.get_driver()
ALL_CHAR = []
ALL_CARD = []
_CURRENT_CHAR_POOL_TITLE = ""
_CURRENT_CARD_POOL_TITLE = ""
UP_CHAR = []
UP_CARD = []
POOL_IMG = []
@dataclass
class PrettyChar(BaseData):
@ -34,11 +41,41 @@ async def pretty_draw(count: int, pool_name):
star_list = [0, 0, 0]
obj_list, obj_dict, three_list, star_list, three_olist = format_card_information(count, star_list,
_get_pretty_card, pool_name)
rst = init_star_rst(star_list, cnlist, three_list, three_olist)
up_type = []
up_list = []
title = ''
if pool_name == 'char' and _CURRENT_CHAR_POOL_TITLE:
up_type = UP_CHAR
title = _CURRENT_CHAR_POOL_TITLE
elif pool_name == 'card' and _CURRENT_CARD_POOL_TITLE:
up_type = UP_CARD
title = _CURRENT_CARD_POOL_TITLE
tmp = ''
if up_type:
for x in up_type:
for operator in x.operators:
up_list.append(operator.split(']')[1] if pool_name == 'char' else operator)
if x.star == 3:
if pool_name == 'char':
tmp += f'三星UP{" ".join(x.operators)} \n'
else:
tmp += f'SSR UP{" ".join(x.operators)} \n'
elif x.star == 2:
if pool_name == 'char':
tmp += f'二星UP{" ".join(x.operators)} \n'
else:
tmp += f'SR UP{" ".join(x.operators)} \n'
elif x.star == 1:
if pool_name == 'char':
tmp += f'一星UP{" ".join(x.operators)} '
else:
tmp += f'R UP{" ".join(x.operators)} '
tmp = tmp[:-1] if tmp and tmp[-1] == '\n' else tmp
pool_info = f'当前up池{title}\n{tmp}' if title else ''
rst = init_star_rst(star_list, cnlist, three_list, three_olist, up_list)
if count > 90:
obj_list = set_list(obj_list)
return MessageSegment.image(
"base64://" + await generate_img(obj_list, 'pretty', star_list)) \
return pool_info + image(b64=await generate_img(obj_list, 'pretty', star_list)) \
+ '\n' + rst[:-1] + '\n' + max_card(obj_dict)
@ -67,7 +104,67 @@ async def init_pretty_data():
# 抽取卡池
def _get_pretty_card(pool_name: str):
global ALL_CHAR, ALL_CARD
global ALL_CHAR, ALL_CARD, _CURRENT_CHAR_POOL_TITLE, _CURRENT_CARD_POOL_TITLE
star = get_star([3, 2, 1], [PRETTY_THREE_P, PRETTY_TWO_P, PRETTY_ONE_P])
chars = [x for x in (ALL_CARD if pool_name == 'card' else ALL_CHAR) if x.star == star]
return random.choice(chars), 3 - star
if pool_name == 'card':
title = _CURRENT_CARD_POOL_TITLE
up_data = UP_CARD
data = ALL_CARD
else:
title = _CURRENT_CHAR_POOL_TITLE
up_data = UP_CHAR
data = ALL_CHAR
# 有UP池子
if title and star in [x.star for x in up_data]:
all_char_lst = [x for x in data if x.star == star and not x.limited]
# 抽到UP
if random.random() < 1 / len(all_char_lst) * (0.7 / 0.1385):
all_up_star = [x.operators for x in up_data if x.star == star][0]
acquire_operator = random.choice(all_up_star)
if pool_name == 'char':
acquire_operator = acquire_operator.split(']')[1]
acquire_operator = [x for x in data if x.name == acquire_operator][0]
else:
acquire_operator = random.choice([x for x in data if x.star == star and not x.limited])
else:
acquire_operator = random.choice([x for x in data if x.star == star and not x.limited])
return acquire_operator, 3 - star
# 获取up和概率
async def _init_up_char():
global _CURRENT_CHAR_POOL_TITLE, _CURRENT_CARD_POOL_TITLE, UP_CHAR, UP_CARD, POOL_IMG
UP_CHAR = []
UP_CARD = []
up_char_dict = await PrettyAnnouncement.update_up_char()
_CURRENT_CHAR_POOL_TITLE = up_char_dict['char']['title']
_CURRENT_CARD_POOL_TITLE = up_char_dict['card']['title']
if _CURRENT_CHAR_POOL_TITLE and _CURRENT_CARD_POOL_TITLE:
await download_img(up_char_dict['char']['pool_img'], 'pretty', 'up_char_pool_img')
await download_img(up_char_dict['card']['pool_img'], 'pretty', 'up_card_pool_img')
POOL_IMG = image('up_char_pool_img.png', '/draw_card/pretty/') + \
image('up_card_pool_img.png', '/draw_card/pretty/')
else:
if os.path.exists(IMAGE_PATH + '/draw_card/genshin/up_char_pool_img.png'):
os.remove(IMAGE_PATH + '/draw_card/pretty/up_char_pool_img.png')
if os.path.exists(IMAGE_PATH + '/draw_card/genshin/up_arms_pool_img.png'):
os.remove(IMAGE_PATH + '/draw_card/pretty/up_card_pool_img.png')
print(f'成功获取赛马娘当前up信息...当前up池: {_CURRENT_CHAR_POOL_TITLE} & {_CURRENT_CARD_POOL_TITLE}')
for key in up_char_dict.keys():
for star in up_char_dict[key]['up_char'].keys():
up_lst = []
for char in up_char_dict[key]['up_char'][star].keys():
up_lst.append(char)
if key == 'char':
if up_lst:
UP_CHAR.append(UpEvent(star=int(star), operators=up_lst, zoom=0))
else:
if up_lst:
UP_CARD.append(UpEvent(star=int(star), operators=up_lst, zoom=0))
async def reload_pretty_pool():
await _init_up_char()
return f'当前UP池子{_CURRENT_CHAR_POOL_TITLE} & {_CURRENT_CARD_POOL_TITLE} {POOL_IMG}'

View File

@ -54,7 +54,7 @@ async def prts_draw(count: int = 300):
async def update_prts_info():
global prts_dict, ALL_OPERATOR
url = 'https://wiki.biligame.com/arknights/干员数据表'
data, code = await update_info(url, 'prts', ['头像', '名称', '阵营', '星级', '性别', '是否感染', '初始生命', '初始防御',
data, code = await update_info(url, 'prts', ['头像', '名称', '阵营', '星级', '性别', '是否感染', '获取途径', '初始生命', '初始防御',
'初始法抗', '再部署', '部署费用', '阻挡数', '攻击速度', '标签'])
if code == 200:
prts_dict = data

View File

@ -98,14 +98,14 @@ def _find_last_tag(element: bs4.element.Tag, attr: str, game_name: str) -> str:
# 获取大图(小图快爬)
async def _modify_avatar_url(session: aiohttp.ClientSession, game_name: str, char_name: str):
if game_name == 'prts':
async with session.get(f'https://wiki.biligame.com/arknights/{char_name}', timeout=7) as res:
soup = BeautifulSoup(await res.text(), 'lxml')
try:
img_url = str(soup.find('img', {'class': 'img-bg'})['srcset']).split(' ')[-2]
except KeyError:
img_url = str(soup.find('img', {'class': 'img-bg'})['src'])
return img_url
# if game_name == 'prts':
# async with session.get(f'https://wiki.biligame.com/arknights/{char_name}', timeout=7) as res:
# soup = BeautifulSoup(await res.text(), 'lxml')
# try:
# img_url = str(soup.find('img', {'class': 'img-bg'})['srcset']).split(' ')[-2]
# except KeyError:
# img_url = str(soup.find('img', {'class': 'img-bg'})['src'])
# return img_url
if game_name == 'genshin':
return None
if game_name == 'pretty_card':
@ -150,6 +150,7 @@ async def _last_check(data: dict, game_name: str, session: aiohttp.ClientSession
_trs = table.find('tbody').find_all('tr')
break
for tr in _trs:
data[key]['常驻/限定'] = '未知'
if str(tr).find('限定UP') != -1:
data[key]['常驻/限定'] = '限定UP'
logger.info(f'原神获取额外数据 {key}...{data[key]["常驻/限定"]}')
@ -252,31 +253,31 @@ def get_tbody(soup: bs4.BeautifulSoup, game_name: str, url: str):
return _tbody
async def _async_update_prts_extra_info(url: str, key: str, session: aiohttp.ClientSession):
for i in range(10):
try:
async with session.get(f'https://wiki.biligame.com/arknights/{key}', timeout=7) as res:
soup = BeautifulSoup(await res.text(), 'lxml')
obtain = str(soup.find('table', {'class': 'wikitable'}).find('tbody').find_all('td')[-1])
obtain = re.search(r'<td.*?>([\s\S]*)</.*?', obtain).group(1)
obtain = obtain[:-1] if obtain[-1] == '\n' else obtain
if obtain.find('<br/>'):
obtain = obtain.split('<br/>')
elif obtain.find('<br>'):
obtain = obtain.split('<br>')
for i in range(len(obtain)):
if obtain[i].find('<a') != -1:
text = ''
for msg in obtain[i].split('</a>'):
r = re.search('>(.*)', msg)
if r:
text += r.group(1) + ' '
obtain[i] = obtain[i].split('<a')[0] + text[:-1] + obtain[i].split('</a>')[-1]
logger.info(f'明日方舟获取额外信息 {key}...{obtain}')
x = {key: {}}
x[key]['获取途径'] = obtain
return x
except TimeoutError:
logger.warning(f'访问{url}{key}{i}次 超时...已再次访问')
return {}
# async def _async_update_prts_extra_info(url: str, key: str, session: aiohttp.ClientSession):
# for i in range(10):
# try:
# async with session.get(f'https://wiki.biligame.com/arknights/{key}', timeout=7) as res:
# soup = BeautifulSoup(await res.text(), 'lxml')
# obtain = str(soup.find('table', {'class': 'wikitable'}).find('tbody').find_all('td')[-1])
# obtain = re.search(r'<td.*?>([\s\S]*)</.*?', obtain).group(1)
# obtain = obtain[:-1] if obtain[-1] == '\n' else obtain
# if obtain.find('<br/>'):
# obtain = obtain.split('<br/>')
# elif obtain.find('<br>'):
# obtain = obtain.split('<br>')
# for i in range(len(obtain)):
# if obtain[i].find('<a') != -1:
# text = ''
# for msg in obtain[i].split('</a>'):
# r = re.search('>(.*)', msg)
# if r:
# text += r.group(1) + ' '
# obtain[i] = obtain[i].split('<a')[0] + text[:-1] + obtain[i].split('</a>')[-1]
# logger.info(f'明日方舟获取额外信息 {key}...{obtain}')
# x = {key: {}}
# x[key]['获取途径'] = obtain
# return x
# except TimeoutError:
# logger.warning(f'访问{url}{key} 第 {i}次 超时...已再次访问')
# return {}

View File

@ -1,5 +1,5 @@
import aiohttp
from .config import DRAW_PATH
from .config import DRAW_PATH, SEMAPHORE
from asyncio.exceptions import TimeoutError
from .util import download_img
from bs4 import BeautifulSoup
@ -101,8 +101,9 @@ async def _last_check(data: dict, game_name: str, session: aiohttp.ClientSession
if game_name == 'fgo':
url = 'http://fgo.vgtime.com/servant/'
tasks = []
semaphore = asyncio.Semaphore(SEMAPHORE)
for key in data.keys():
tasks.append(asyncio.ensure_future(_async_update_fgo_extra_info(url, key, data[key]['id'], session)))
tasks.append(asyncio.ensure_future(_async_update_fgo_extra_info(url, key, data[key]['id'], session, semaphore)))
asyResult = await asyncio.gather(*tasks)
for x in asyResult:
for key in x.keys():
@ -117,28 +118,29 @@ async def _last_check(data: dict, game_name: str, session: aiohttp.ClientSession
return data
async def _async_update_fgo_extra_info(url: str, key: str, _id: str, session: aiohttp.ClientSession):
async def _async_update_fgo_extra_info(url: str, key: str, _id: str, session: aiohttp.ClientSession, semaphore):
# 防止访问超时
for i in range(10):
try:
async with session.get(f'{url}{_id}', timeout=7) as response:
soup = BeautifulSoup(await response.text(), 'lxml')
obtain = soup.find('table', {'class': 'uk-table uk-codex-table'}).find_all('td')[-1].text
if obtain.find('限时活动免费获取 活动结束后无法获得') != -1:
obtain = ['活动获取']
elif obtain.find('非限时UP无法获得') != -1:
obtain = ['限时召唤']
else:
if obtain.find('&') != -1:
obtain = obtain.strip().split('&')
async with semaphore:
for i in range(10):
try:
async with session.get(f'{url}{_id}', timeout=7) as response:
soup = BeautifulSoup(await response.text(), 'lxml')
obtain = soup.find('table', {'class': 'uk-table uk-codex-table'}).find_all('td')[-1].text
if obtain.find('限时活动免费获取 活动结束后无法获得') != -1:
obtain = ['活动获取']
elif obtain.find('非限时UP无法获得') != -1:
obtain = ['限时召唤']
else:
obtain = obtain.strip().split(' ')
logger.info(f'Fgo获取额外信息 {key}....{obtain}')
x = {key: {}}
x[key]['入手方式'] = obtain
return x
except TimeoutError:
logger.warning(f'访问{url}{_id}{i}次 超时...已再次访问')
if obtain.find('&') != -1:
obtain = obtain.strip().split('&')
else:
obtain = obtain.strip().split(' ')
logger.info(f'Fgo获取额外信息 {key}....{obtain}')
x = {key: {}}
x[key]['入手方式'] = obtain
return x
except TimeoutError:
logger.warning(f'访问{url}{_id}{i}次 超时...已再次访问')
return {}

View File

@ -1,5 +1,5 @@
import aiohttp
from .config import DRAW_PATH
from .config import DRAW_PATH, SEMAPHORE
from asyncio.exceptions import TimeoutError
from bs4 import BeautifulSoup
from .util import download_img
@ -87,8 +87,9 @@ async def _last_check(data: dict, game_name: str, session: aiohttp.ClientSession
await download_img(url, 'azur', f'{idx}_star')
idx += 1
tasks = []
semaphore = asyncio.Semaphore(SEMAPHORE)
for key in data.keys():
tasks.append(asyncio.ensure_future(_async_update_azur_extra_info(key, session)))
tasks.append(asyncio.ensure_future(_async_update_azur_extra_info(key, session, semaphore)))
asyResult = await asyncio.gather(*tasks)
for x in asyResult:
for key in x.keys():
@ -150,24 +151,25 @@ async def retrieve_char_data(char: bs4.element.Tag, game_name: str, data: dict,
return data
async def _async_update_azur_extra_info(key: str, session: aiohttp.ClientSession):
async def _async_update_azur_extra_info(key: str, session: aiohttp.ClientSession, semaphore):
if key[-1] == '':
return {key: {'获取途径': ['无法建造']}}
for i in range(20):
try:
async with session.get(f'https://wiki.biligame.com/blhx/{key}', timeout=7) as res:
soup = BeautifulSoup(await res.text(), 'lxml')
construction_time = str(soup.find('table', {'class': 'wikitable sv-general'}).find('tbody'))
x = {key: {'获取途径': []}}
if construction_time.find('无法建造') != -1:
x[key]['获取途径'].append('无法建造')
elif construction_time.find('活动已关闭') != -1:
x[key]['获取途径'].append('活动限定')
else:
x[key]['获取途径'].append('可以建造')
logger.info(f'碧蓝航线获取额外信息 {key}...{x[key]["获取途径"]}')
return x
except TimeoutError:
logger.warning(f'访问 https://wiki.biligame.com/blhx/{key}{i}次 超时...已再次访问')
async with semaphore:
for i in range(20):
try:
async with session.get(f'https://wiki.biligame.com/blhx/{key}', timeout=7) as res:
soup = BeautifulSoup(await res.text(), 'lxml')
construction_time = str(soup.find('table', {'class': 'wikitable sv-general'}).find('tbody'))
x = {key: {'获取途径': []}}
if construction_time.find('无法建造') != -1:
x[key]['获取途径'].append('无法建造')
elif construction_time.find('活动已关闭') != -1:
x[key]['获取途径'].append('活动限定')
else:
x[key]['获取途径'].append('可以建造')
logger.info(f'碧蓝航线获取额外信息 {key}...{x[key]["获取途径"]}')
return x
except TimeoutError:
logger.warning(f'访问 https://wiki.biligame.com/blhx/{key}{i}次 超时...已再次访问')
return {}

View File

@ -16,6 +16,7 @@ import random
from dataclasses import dataclass
import os
import asyncio
from PIL import UnidentifiedImageError
try:
import ujson as json
except ModuleNotFoundError:
@ -130,9 +131,15 @@ def _pst(h: int, img_list: list, game_name: str, background_list: list):
bk.paste(b, (1, 5))
b = bk
else:
b = CreateImg(100, 100, background=img)
try:
b = CreateImg(100, 100, background=img)
except UnidentifiedImageError as e:
logger.warning(f'无法识别图片 已删除图片,下次更新重新下载... e{e}')
if os.path.exists(img):
os.remove(img)
b = CreateImg(100, 100, color='black')
except FileNotFoundError:
print(f'{img} not exists')
logger.warning(f'{img} not exists')
b = CreateImg(100, 100, color='black')
card_img.paste(b)
idx += 1

View File

@ -49,8 +49,8 @@ def create_help_img():
A.paste(e, (0, 0))
A.paste(u, (0, u_height))
A.paste(o, (0, o_height))
A.text((10, h * 0.76), '大部分交互功能可以通过输入‘取消’,‘算了’来取消当前交互\n我说 “指令名 帮助” 获取对应详细帮助\n'
'可以通过 “滴滴滴- 后接内容” 联系管理员(有趣的想法尽管来吧!<还有Bug和建议>'
A.text((10, h * 0.76), '大部分交互功能可以通过输入‘取消’,‘算了’来取消当前交互\n真寻说 “真寻帮助 指令名” 获取对应详细帮助\n'
'可以通过 “滴滴滴- [消息]” 联系管理员(有趣的想法尽管来吧!<还有Bug和建议>'
'\n[群管理员请看 管理员帮助(群主与管理员自带 5 级权限)]\n\n'
'\t「如果真寻回复了一些不符合人设的话那是因为每日白嫖的图灵次数已用完使用的是备用接口【QAQ】」')
@ -99,8 +99,8 @@ def create_group_help_img(group_id: int):
A.paste(u, (0, u_height))
A.paste(o, (0, o_height))
# A.text((width, 10), f'总开关【{"√" if data["总开关"] else "×"}】')
A.text((10, h * 0.76), '大部分交互功能可以通过输入‘取消’,‘算了’来取消当前交互\n我说 “指令名 帮助” 获取对应详细帮助\n'
'可以通过 “滴滴滴- 后接内容” 联系管理员(有趣的想法尽管来吧!<还有Bug和建议>'
A.text((10, h * 0.76), '大部分交互功能可以通过输入‘取消’,‘算了’来取消当前交互\n真寻说 “真寻帮助 指令名” 获取对应详细帮助\n'
'可以通过 “滴滴滴- [消息]” 联系管理员(有趣的想法尽管来吧!<还有Bug和建议>'
f'\n[群管理员请看 管理员帮助(群主与管理员自带 {ADMIN_DEFAULT_AUTH} 级权限)]')
A.text((10, h * 0.81), f"【注】「色图概率:好感度 + {int(INITIAL_SETU_PROBABILITY*100)}%\n"
f"\t\t每 3 点好感度 + 1次开箱初始 {INITIAL_OPEN_CASE_COUNT}\n"

View File

@ -28,7 +28,6 @@ async def get_pic(qq):
@one_friend.handle()
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
arr = []
msg = get_message_text(event.json())
qq = get_message_at(event.json())
if not qq:

37
plugins/server_ip.py Normal file
View File

@ -0,0 +1,37 @@
from nonebot import on_command
from nonebot.adapters.cqhttp import Bot, PrivateMessageEvent, GroupMessageEvent
from nonebot.typing import T_State
from nonebot.rule import to_me
__plugin_name__ = '服务器'
__plugin_usage__ = '用法: 无'
server_ip = on_command("服务器", aliases={'ip'}, rule=to_me(), priority=5, block=True)
@server_ip.handle()
async def _(bot: Bot, event: PrivateMessageEvent, state: T_State):
await server_ip.finish("|* 请不要发给其他人! *|\n"
"\t121.40.195.22\n"
"|* 请不要发给其他人! *|\n"
"csgo ~号键控制台输入 connect 121.40.195.22 进入服务器\n"
"然后再公屏输入 !diy 来使用皮肤(英文感叹号,注意)\n"
"【说不定可以凑到内战噢,欢迎~{娱乐为主}", at_sender=True)
@server_ip.handle()
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
if event.group_id == 698279647:
await server_ip.finish("嗨呀!当前服务器地址是:"
"\ncsgo:\n\tay: 121.40.195.22\n\t"
"wzt: 101.200.199.143\n\t夜之北枭: 101.132.170.254\n我的世界:\n\t47.111.1.22025565")
elif event.group_id == 1046451860:
await server_ip.finish("嗨呀!当前服务器地址是:\n121.40.195.22\n !diy")
else:
await server_ip.finish("不好意思呀小真寻不能为你使用此功能因为服务器IP是真寻的小秘密呀", at_sender=True)

View File

@ -21,7 +21,7 @@ __plugin_usage__ = '金币红包帮助:\n' \
'\t塞红包 [金币数] [红包个数](默认5)\n' \
'示例:\n' \
'\t塞红包 500 5\n' \
'抢红包 --> 戳一戳,开' \
'抢红包 --> 戳一戳,开\n' \
'退还剩余红包 --> 退还'
gold_redbag = on_command('塞红包', aliases={'金币红包'}, priority=5, block=True, permission=GROUP)

View File

@ -17,98 +17,109 @@ except ModuleNotFoundError:
async def update_setu_img():
async with aiohttp.ClientSession(headers=get_user_agent()) as session:
for file_name in ['setu_url.json', 'setu_r18_url.json']:
if file_name == 'setu_url.json':
json_name = 'setu_data.json'
path = '_setu/'
rar_path = 'setu_rar/'
else:
json_name = 'r18_setu_data.json'
path = '_r18/'
rar_path = 'r18_rar/'
if not os.path.exists(IMAGE_PATH + path):
os.mkdir(IMAGE_PATH + path)
if not os.path.exists(IMAGE_PATH + rar_path):
os.mkdir(IMAGE_PATH + rar_path)
try:
data = json.load(open(TXT_PATH + json_name, encoding='utf8'))
if not data:
try:
async with aiohttp.ClientSession(headers=get_user_agent()) as session:
for file_name in ['setu_url.json', 'setu_r18_url.json']:
if file_name == 'setu_url.json':
json_name = 'setu_data.json'
path = '_setu/'
rar_path = 'setu_rar/'
else:
json_name = 'r18_setu_data.json'
path = '_r18/'
rar_path = 'r18_rar/'
if not os.path.exists(IMAGE_PATH + path):
os.mkdir(IMAGE_PATH + path)
if not os.path.exists(IMAGE_PATH + rar_path):
os.mkdir(IMAGE_PATH + rar_path)
try:
data = json.load(open(TXT_PATH + json_name, encoding='utf8'))
if not data:
data = {}
except (FileNotFoundError, TypeError):
data = {}
except (FileNotFoundError, TypeError):
data = {}
_success = 0
_similar = 0
try:
with open(TXT_PATH + file_name, 'r', encoding='utf8') as f:
txt_data = json.load(f)
if not txt_data:
continue
except (FileNotFoundError, ValueError):
continue
total = len(txt_data)
urls = [data[x]['img_url'] for x in data.keys()]
for pid in txt_data.keys():
index = str(len(os.listdir(IMAGE_PATH + path)))
url = txt_data[pid]["img_url"].replace('img-master', 'img-original').replace('_master1200', '')
if url in urls or txt_data[pid]["img_url"] in urls:
continue
logger.info(f'开始更新 index:{index} --> {url}')
for _ in range(3):
try:
async with session.get(url, proxy=get_local_proxy(), timeout=15) as response:
if response.status == 200:
async with aiofiles.open(IMAGE_PATH + rar_path + index + ".jpg", 'wb') as f:
await f.write(await response.read())
_success += 1
else:
logger.info(f'{url} 不存在使用更新原url')
url = txt_data[pid]["img_url"]
async with session.get(txt_data[pid]["img_url"], proxy=get_local_proxy(),
timeout=15) as response:
if response.status == 200:
async with aiofiles.open(IMAGE_PATH + rar_path + index + ".jpg", 'wb') as f:
await f.write(await response.read())
_success += 1
if os.path.getsize(IMAGE_PATH + rar_path + str(index) + ".jpg") > 1024 * 1024 * 1.5:
rar_imgs(
rar_path,
path,
in_file_name=index,
out_file_name=index
)
else:
logger.info('不需要压缩,移动图片 ' + IMAGE_PATH + rar_path + index + ".jpg --> "
+ IMAGE_PATH + path + index + ".jpg")
os.rename(IMAGE_PATH + rar_path + index + ".jpg",
IMAGE_PATH + path + index + ".jpg")
img_hash = str(get_img_hash(f'{IMAGE_PATH}{path}{index}.jpg'))
if img_hash in [data[x]['img_hash'] for x in data.keys()]:
logger.info(f'index:{index}'
f'{[data[x]["img_hash"] for x in data.keys()].index(img_hash)} 存在重复,删除')
os.remove(IMAGE_PATH + path + index + ".jpg")
_similar += 1
else:
data[index] = {
'title': txt_data[pid]['title'],
'author': txt_data[pid]['author'],
'pid': txt_data[pid]['pid'],
'img_hash': img_hash,
'img_url': url,
'tags': txt_data[pid]['tags'],
}
break
except (TimeoutError, ClientConnectorError) as e:
logger.warning(f'{url} 更新失败 ..{type(e)}{e}')
_success = 0
_similar = 0
try:
with open(TXT_PATH + file_name, 'r', encoding='utf8') as f:
txt_data = json.load(f)
if not txt_data:
continue
with open(TXT_PATH + json_name, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
open(TXT_PATH + file_name, 'w')
logger.info(
f'{str(datetime.now()).split(".")[0]} 更新 {file_name.split(".")[0]}完成,预计更新 {total} 张,'
f'实际更新 {_success} 张,相似 {_similar} 张,实际存入 {_success - _similar}')
await get_bot().send_private_msg(
user_id=int(list(get_bot().config.superusers)[0]),
message=f'{str(datetime.now()).split(".")[0]} 更新{file_name.split(".")[0]}完成,预计更新 {total} 张,'
f'实际更新 {_success} 张,相似 {_similar} 张,实际存入 {_success - _similar}'
)
except (FileNotFoundError, ValueError):
continue
total = len(txt_data)
urls = [data[x]['img_url'] for x in data.keys()]
for pid in txt_data.keys():
index = str(len(os.listdir(IMAGE_PATH + path)))
url = txt_data[pid]["img_url"].replace('img-master', 'img-original').replace('_master1200', '')
if url in urls or txt_data[pid]["img_url"] in urls:
continue
logger.info(f'开始更新 index:{index} --> {url}')
for _ in range(3):
try:
async with session.get(url, proxy=get_local_proxy(), timeout=15) as response:
if response.status == 200:
async with aiofiles.open(IMAGE_PATH + rar_path + index + ".jpg", 'wb') as f:
await f.write(await response.read())
_success += 1
else:
logger.info(f'{url} 不存在使用更新原url')
url = txt_data[pid]["img_url"]
async with session.get(txt_data[pid]["img_url"], proxy=get_local_proxy(),
timeout=15) as response:
if response.status == 200:
async with aiofiles.open(IMAGE_PATH + rar_path + index + ".jpg", 'wb') as f:
await f.write(await response.read())
_success += 1
try:
if os.path.getsize(IMAGE_PATH + rar_path + str(index) + ".jpg") > 1024 * 1024 * 1.5:
rar_imgs(
rar_path,
path,
in_file_name=index,
out_file_name=index
)
else:
logger.info('不需要压缩,移动图片 ' + IMAGE_PATH + rar_path + index + ".jpg --> "
+ IMAGE_PATH + path + index + ".jpg")
os.rename(IMAGE_PATH + rar_path + index + ".jpg",
IMAGE_PATH + path + index + ".jpg")
except FileNotFoundError:
_success -= 1
continue
img_hash = str(get_img_hash(f'{IMAGE_PATH}{path}{index}.jpg'))
if img_hash in [data[x]['img_hash'] for x in data.keys()]:
logger.info(f'index:{index}'
f'{[data[x]["img_hash"] for x in data.keys()].index(img_hash)} 存在重复,删除')
os.remove(IMAGE_PATH + path + index + ".jpg")
_similar += 1
else:
data[index] = {
'title': txt_data[pid]['title'],
'author': txt_data[pid]['author'],
'pid': txt_data[pid]['pid'],
'img_hash': img_hash,
'img_url': url,
'tags': txt_data[pid]['tags'],
}
break
except (TimeoutError, ClientConnectorError) as e:
logger.warning(f'{url} 更新失败 ..{type(e)}{e}')
continue
with open(TXT_PATH + json_name, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
open(TXT_PATH + file_name, 'w')
logger.info(
f'{str(datetime.now()).split(".")[0]} 更新 {file_name.split(".")[0]}完成,预计更新 {total} 张,'
f'实际更新 {_success} 张,相似 {_similar} 张,实际存入 {_success - _similar}')
await get_bot().send_private_msg(
user_id=int(list(get_bot().config.superusers)[0]),
message=f'{str(datetime.now()).split(".")[0]} 更新{file_name.split(".")[0]}完成,预计更新 {total} 张,'
f'实际更新 {_success} 张,相似 {_similar} 张,实际存入 {_success - _similar}'
)
except Exception as e:
await get_bot().send_private_msg(
user_id=int(list(get_bot().config.superusers)[0]),
message=f'更新色图错误 {type(e)}: {e}'
)
logger.error(f'更新色图错误 {type(e)}: {e}')