Update data_source.py

This commit is contained in:
AkashiCoin 2021-10-03 18:34:31 +08:00 committed by GitHub
parent 918adc4b1c
commit 46b55e4224
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,56 +1,124 @@
import aiohttp
import aiofiles
from utils.utils import get_local_proxy
import feedparser
import platform
from utils.message_builder import image
from configs.path_config import IMAGE_PATH
from utils.user_agent import get_user_agent
from asyncio.exceptions import TimeoutError
from configs.config import RSSHUBAPP
if platform.system() == "Windows":
import asyncio
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
from httpx import AsyncClient
from datetime import datetime
from nonebot.log import logger
from nonebot.adapters.cqhttp import Bot, Event, Message, MessageEvent, GroupMessageEvent
from configs.config import NICKNAME
url = f"{RSSHUBAPP}/epicgames/freegames"
async def get_epic_game() -> "str, int":
result = "今天没有游戏可以白嫖了!"
code = 999
# 获取所有 Epic Game Store 促销游戏
# 方法参考RSSHub /epicgames 路由
# https://github.com/DIYgod/RSSHub/blob/master/lib/routes/epicgames/index.js
async def get_Epicgame():
epic_url = "https://www.epicgames.com/store/backend/graphql-proxy"
headers = {
"Referer": "https://www.epicgames.com/store/zh-CN/",
"Content-Type": "application/json; charset=utf-8",
}
data = {
"query":
"query searchStoreQuery($allowCountries: String, $category: String, $count: Int, $country: String!, $keywords: String, $locale: String, $namespace: String, $sortBy: String, $sortDir: String, $start: Int, $tag: String, $withPrice: Boolean = false, $withPromotions: Boolean = false) {\n Catalog {\n searchStore(allowCountries: $allowCountries, category: $category, count: $count, country: $country, keywords: $keywords, locale: $locale, namespace: $namespace, sortBy: $sortBy, sortDir: $sortDir, start: $start, tag: $tag) {\n elements {\n title\n id\n namespace\n description\n effectiveDate\n keyImages {\n type\n url\n }\n seller {\n id\n name\n }\n productSlug\n urlSlug\n url\n items {\n id\n namespace\n }\n customAttributes {\n key\n value\n }\n categories {\n path\n }\n price(country: $country) @include(if: $withPrice) {\n totalPrice {\n discountPrice\n originalPrice\n voucherDiscount\n discount\n currencyCode\n currencyInfo {\n decimals\n }\n fmtPrice(locale: $locale) {\n originalPrice\n discountPrice\n intermediatePrice\n }\n }\n lineOffers {\n appliedRules {\n id\n endDate\n discountSetting {\n discountType\n }\n }\n }\n }\n promotions(category: $category) @include(if: $withPromotions) {\n promotionalOffers {\n promotionalOffers {\n startDate\n endDate\n discountSetting {\n discountType\n discountPercentage\n }\n }\n }\n upcomingPromotionalOffers {\n promotionalOffers {\n startDate\n endDate\n discountSetting {\n discountType\n discountPercentage\n }\n }\n }\n }\n }\n paging {\n count\n total\n }\n }\n }\n}\n",
"variables": {
"allowCountries": "CN",
"category": "freegames",
"count": 1000,
"country": "CN",
"locale": "zh-CN",
"sortBy": "effectiveDate",
"sortDir": "asc",
"withPrice": True,
"withPromotions": True
}
}
async with AsyncClient(headers=headers) as client:
try:
async with aiohttp.ClientSession(headers=get_user_agent()) as session:
async with session.get(url, proxy=get_local_proxy(), timeout=7) as response:
data = feedparser.parse(await response.text())["entries"]
if len(data) == 0:
return result
index = 0
for item in data:
title = item["title"]
img_url = item["summary"][
item["summary"].find('src="') + 5 : item["summary"].rfind('"')
]
async with session.get(
img_url, proxy=get_local_proxy(), timeout=7
) as res:
async with aiofiles.open(
IMAGE_PATH + f"temp/epic_{index}.jpg", "wb"
) as f:
await f.write(await res.read())
link = item["link"]
result += (
image(f"epic_{index}.jpg", "temp")
+ f"\n【游戏】| {title}\n【链接】 | {link}\n"
)
code = 200
index += 1
if result != "":
result = "epic限免游戏速速白嫖\n" + result
res = await client.post(epic_url, json=data, timeout=10.0)
resJson = res.json()
games = resJson['data']['Catalog']['searchStore']['elements']
return games
except Exception as e:
logger.error(str(e))
return None
# 获取 Epic Game Store 免费游戏信息
# 处理免费游戏的信息方法借鉴 pip 包 epicstore_api 示例
# https://github.com/SD4RK/epicstore_api/blob/master/examples/free_games_example.py
async def get_Epicfree(bot: Bot, event: Event):
games = await get_Epicgame()
if not games:
return "Epic 可能又抽风啦,请稍后再试(", 404
else:
msg = ""
msg_list = []
for game in games:
game_name = game['title']
game_corp = game['seller']['name']
game_price = game['price']['totalPrice']['fmtPrice']['originalPrice']
# 赋初值以避免 local variable referenced before assignment
game_dev, game_pub, game_thumbnail = (None, None, None)
try:
game_promotions = game['promotions']['promotionalOffers']
upcoming_promotions = game['promotions']['upcomingPromotionalOffers']
if not game_promotions and upcoming_promotions:
# 促销暂未上线,但即将上线
promotion_data = upcoming_promotions[0]['promotionalOffers'][0]
start_date_iso, end_date_iso = (promotion_data['startDate'][:-1],
promotion_data['endDate'][:-1])
# 删除字符串中最后一个 "Z" 使 Python datetime 可处理此时间
start_date = datetime.fromisoformat(start_date_iso).strftime(
'%b.%d %H:%M')
end_date = datetime.fromisoformat(end_date_iso).strftime('%b.%d %H:%M')
if isinstance(event, GroupMessageEvent):
_message = '\n{} 公司发行的游戏 {} ({}) 在 UTC 时间 {} 即将推出免费游玩,预计截至 {}'.format(game_corp, game_name, game_price, start_date, end_date)
data = {
"type": "node",
"data": {
"name": f"这里是{NICKNAME}",
"uin": f"{bot.self_id}",
"content": _message,
},
}
msg_list.append(data)
else:
msg = '\n{} 公司发行的游戏 {} ({}) 在 UTC 时间 {} 即将推出免费游玩,预计截至 {}'.format(
game_corp, game_name, game_price, start_date, end_date)
msg_list.append(msg)
else:
result = "今天没有游戏可以白嫖了!"
except TimeoutError:
return "请求超时!", code
return result, code
for image in game['keyImages']:
if image['type'] == 'Thumbnail':
game_thumbnail = image['url']
for pair in game['customAttributes']:
if pair['key'] == 'developerName':
game_dev = pair['value']
if pair['key'] == 'publisherName':
game_pub = pair['value']
# 如 game['customAttributes'] 未找到则均使用 game_corp 值
game_dev = game_dev if game_dev != None else game_corp
game_pub = game_pub if game_pub != None else game_corp
game_desp = game['description']
end_date_iso = game['promotions']['promotionalOffers'][0]['promotionalOffers'][0]['endDate'][:-1]
end_date = datetime.fromisoformat(end_date_iso).strftime('%b.%d %H:%M')
# API 返回不包含游戏商店 URL此处自行拼接可能出现少数游戏 404 请反馈
game_url_part = (game['productSlug'].replace('/home', '')) if ('/home' in game['productSlug']) else game['productSlug']
game_url = 'https://www.epicgames.com/store/zh-CN/p/{}'.format(
game_url_part)
if isinstance(event, GroupMessageEvent):
_message = '[CQ:image,file={}]\n\nFREE now :: {} ({})\n{}\n此游戏由 {} 开发、{} 发行,将在 UTC 时间 {} 结束免费游玩,戳链接速度加入你的游戏库吧~\n{}\n'.format(game_thumbnail, game_name, game_price, game_desp, game_dev, game_pub,end_date, game_url)
data = {
"type": "node",
"data": {
"name": f"这里是{NICKNAME}",
"uin": f"{bot.self_id}",
"content": _message,
},
}
msg_list.append(data)
else:
msg = '[CQ:image,file={}]\n\nFREE now :: {} ({})\n{}\n此游戏由 {} 开发、{} 发行,将在 UTC 时间 {} 结束免费游玩,戳链接速度加入你的游戏库吧~\n{}\n'.format(
game_thumbnail, game_name, game_price, game_desp, game_dev, game_pub,
end_date, game_url)
msg_list.append(msg)
except TypeError as e:
# logger.info(str(e))
pass
return msg_list, 200