#coding:utf-8 import aiohttp from .config import DRAW_PATH from asyncio.exceptions import TimeoutError from bs4 import BeautifulSoup from .util import download_img from urllib.parse import unquote import bs4 import re from util.utils import get_local_proxy try: import ujson as json except ModuleNotFoundError: import json headers = {'User-Agent': '"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; TencentTraveler 4.0)"'} async def update_info(url: str, game_name: str, info_list: list = None) -> 'dict, int': try: with open(DRAW_PATH + f'{game_name}.json', 'r', encoding='utf8') as f: data = json.load(f) except (ValueError, FileNotFoundError): data = {} try: async with aiohttp.ClientSession(headers=headers) as session: async with session.get(url, proxy=get_local_proxy(), timeout=7) as response: soup = BeautifulSoup(await response.text(), 'lxml') _tbody = get_tbody(soup, game_name, url) trs = _tbody.find_all('tr') att_dict, start_index, index = init_attr(game_name) if game_name == 'guardian': start_index = 1 for th in trs[0].find_all('th')[start_index:]: text = th.text if text[-1] == '\n': text = text[:-1] att_dict[text] = index index += 1 for tr in trs[1:]: member_dict = {} tds = tr.find_all('td') if not info_list: info_list = att_dict.keys() for key in info_list: key, attr = parse_key(key, game_name) td = tds[att_dict[key]] last_tag = unquote(_find_last_tag(td, attr, game_name), 'utf-8') member_dict[key] = last_tag member_dict = intermediate_check(member_dict, key, game_name, td) avatar_img = await _modify_avatar_url(session, game_name, member_dict["名称"]) member_dict['头像'] = avatar_img if avatar_img else member_dict['头像'] name = replace_name(member_dict, game_name) await download_img(member_dict['头像'], game_name, name) data[name] = member_dict print(f'{name} is update...') data = await _last_check(data, game_name, session) except TimeoutError: return {}, 999 with open(DRAW_PATH + f'{game_name}.json', 'w', encoding='utf8') as wf: wf.write(json.dumps(data, ensure_ascii=False, indent=4)) return data, 200 def _find_last_tag(element: bs4.element.Tag, attr: str, game_name: str) -> str: last_tag = [] for des in element.descendants: last_tag.append(des) if len(last_tag) == 1 and last_tag[0] == '\n': last_tag = '' elif last_tag[-1] == '\n': last_tag = last_tag[-2] else: last_tag = last_tag[-1] if attr and str(last_tag): last_tag = last_tag[attr] elif str(last_tag).find('([\s\S]*)'): obtain = obtain.split('
') elif obtain.find('
'): obtain = obtain.split('
') for i in range(len(obtain)): if obtain[i].find(''): r = re.search('>(.*)', msg) if r: text += r.group(1) + ' ' obtain[i] = obtain[i].split('')[-1] print(f'明日方舟获取额外信息....{obtain}') data[key]['获取途径'] = obtain # if game_name == 'genshin': # for key in data.keys(): # async with session.get(f'https://wiki.biligame.com/ys/{key}', timeout=7) as res: # soup = BeautifulSoup(await res.text(), 'lxml') # trs = soup.find('div', {'class': 'poke-bg'}).find('table').find('tbody').find_all('tr') # for tr in trs: # if tr.find('th').text.find('常驻/限定') != -1: # data[key]['常驻/限定'] = tr.find('td').text # break if game_name == 'pretty': for keys in data.keys(): for key in data[keys].keys(): r = re.search(r'.*?40px-(.*)图标.png', str(data[keys][key])) if r: data[keys][key] = r.group(1) print(f'赛马娘额外修改数据....{keys}[{key}]=> {r.group(1)}') if game_name == 'guardian': for keys in data.keys(): for key in data[keys].keys(): r = re.search(r'.*?-star_(.*).png', str(data[keys][key])) if r: data[keys][key] = r.group(1) print(f'坎公骑士剑额外修改数据...{keys}[{key}] => {r.group(1)}') return data # 对抓取每行数据是否需要额外处理? def intermediate_check(member_dict: dict, key: str, game_name: str, td: bs4.element.Tag): if game_name == 'pretty': if key == '初始星级': member_dict['初始星级'] = len(td.find_all('img')) if game_name == 'guardian': if key == '头像': member_dict['星级'] = str(td.find('span').find('img')['alt'])[-5] try: member_dict['头像'] = str(td.find('img')['srcset']).split(' ')[0] except KeyError: member_dict['头像'] = str(td.find('img')['src']) return member_dict def init_attr(game_name: str): att_dict = {'头像': 0, '名称': 1} start_index = 2 index = 2 if game_name == 'guardian': att_dict = {'头像': 0, '名称': 0} start_index = 1 index = 1 return att_dict, start_index, index def parse_key(key: str, game_name): attr = '' if game_name == 'genshin_arms': if key.find('.') != -1: key = key.split('.') attr = key[-1] key = key[0] return key, attr def replace_name(member_dict: dict, game_name: str): name = member_dict['名称'] if game_name == 'pretty_card': name = member_dict['中文名'] return name # 拿到tbody,不同游戏tbody可能不同 def get_tbody(soup: bs4.BeautifulSoup, game_name: str, url: str): max_count = 0 _tbody = None if game_name == 'guardian_arms': if url[-2:] == '盾牌': div = soup.find('div', {'class': 'resp-tabs-container'}).find_all('div', {'class': 'resp-tab-content'})[1] _tbody = div.find('tbody') else: div = soup.find('div', {'class': 'resp-tabs-container'}).find_all('div', {'class': 'resp-tab-content'})[0] _tbody = div.find('table', {'id': 'CardSelectTr'}).find('tbody') else: for tbody in soup.find_all('tbody'): if len(tbody.find_all('tr')) > max_count: _tbody = tbody max_count = len(tbody.find_all('tr')) return _tbody