zhenxun_bot/plugins/send_setu/data_source.py
2021-06-17 19:22:07 +08:00

248 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from configs.path_config import IMAGE_PATH, TXT_PATH
import os
import random
from util.init_result import image
from configs.config import LOLICON_KEY
import aiohttp
import aiofiles
from services.log import logger
from util.img_utils import get_img_hash
from util.utils import get_local_proxy, is_number
from asyncio.exceptions import TimeoutError
from models.count_user import UserCount
from configs.config import DOWNLOAD_SETU
try:
import ujson as json
except ModuleNotFoundError:
import json
url = "https://api.lolicon.app/setu/"
path = '_setu/'
async def get_setu_urls(keyword: str, num: int = 1, r18: int = 0):
# print(keyword)
if r18 == 1:
file_name = 'setu_r18_url.json'
else:
file_name = 'setu_url.json'
try:
with open(TXT_PATH + file_name, 'r', encoding='utf8') as f:
txt_data = json.load(f)
except (FileNotFoundError, ValueError):
txt_data = {}
txt_urls = [txt_data[x]['img_url'] for x in txt_data.keys()]
params = {
"apikey": LOLICON_KEY, # 添加apikey
'r18': r18, # 添加r18参数 0为否1为是2为混合
'keyword': keyword, # 若指定关键字,将会返回从插画标题、作者、标签中模糊搜索的结果
'num': 100, # 一次返回的结果数量范围为1到10不提供 APIKEY 时固定为1
'size1200': 1, # 是否使用 master_1200 缩略图,以节省流量或提升加载速度
}
urls = []
text_list = []
for count in range(3):
print(f'get_setu_url: count --> {count}')
async with aiohttp.ClientSession() as session:
try:
async with session.get(url, proxy=get_local_proxy(), timeout=5, params=params) as response:
if response.status == 429:
return '调用达到上限,明日赶早呀~', '', 429
if response.status == 404:
return "网站裂开了...", '', 998
if response.status == 200:
data = await response.json()
if data['code'] == 0:
for i in range(len(data['data'])):
img_url = data['data'][i]['url']
title = data['data'][i]['title']
author = data['data'][i]['author']
pid = data['data'][i]['pid']
urls.append(img_url)
text_list.append(f'title{title}\nauthor{author}\nPID{pid}')
tags = []
for j in range(len(data['data'][i]['tags'])):
tags.append(data['data'][i]['tags'][j])
if img_url not in txt_urls:
save_setu_dict = {
'title': title,
'author': author,
'pid': pid,
'img_url': img_url,
'tags': tags
}
if str(pid) not in txt_data.keys():
txt_data[pid] = save_setu_dict
if DOWNLOAD_SETU:
with open(TXT_PATH + file_name, 'w', encoding='utf8') as f:
json.dump(txt_data, f, ensure_ascii=False, indent=4)
random_idx = random.sample(range(len(data['data'])), num)
x_urls = []
x_text_lst = []
for x in random_idx:
x_urls.append(urls[x])
x_text_lst.append(text_list[x])
return x_urls, x_text_lst, 200
else:
return "没找到符合条件的色图...", '', 401
except TimeoutError:
pass
return '我网线被人拔了..QAQ', '', 999
async def search_online_setu(url: str):
async with aiohttp.ClientSession() as session:
for i in range(3):
print(f'search_online_setu --> {i}')
try:
async with session.get(url, proxy=get_local_proxy(), timeout=7) as res:
if res.status == 200:
index = str(random.randint(1, 100000))
async with aiofiles.open(IMAGE_PATH + 'temp/' + index + "_temp_setu.jpg", 'wb') as f:
try:
await f.write(await res.read())
except TimeoutError:
# return '\n这图没下载过来~(网太差?)', -1, False
continue
logger.info(f"下载 lolicon图片 {url} 成功, id{index}")
return image(f'{index}_temp_setu.jpg', 'temp'), index
else:
logger.warning(f"访问 lolicon图片 {url} 失败 status{res.status}")
# return '\n这图好难下载啊QAQ', -1, False
except TimeoutError:
pass
return '\n图片被小怪兽恰掉啦..!QAQ', -1
def get_setu(index: str, setu_data: dict):
length = len(os.listdir(IMAGE_PATH + path))
if length == 0:
return None, None
if not index:
index = random.randint(0, length - 1)
if is_number(index):
if int(index) > length or int(index) < 0:
return f"超过当前上下限!({length - 1})", 999
else:
if setu_data:
index = str(index)
return f'id{index}\n' \
f'title{setu_data[index]["title"]}\n' \
f'author{setu_data[index]["author"]}\n' \
f'PID{setu_data[index]["pid"]}' + image(f'{index}.jpg', path), index
else:
return f'id{index}' + image(f'{index}.jpg', path), index
return None, None
def get_luoxiang(impression):
probability = impression + 70
if probability < random.randint(1, 101):
return "我为什么要给你发这个?" + image(random.choice(os.listdir(IMAGE_PATH + "luoxiang/")), 'luoxiang') + \
"\n(快向小真寻签到提升好感度吧!)"
return None
async def check_r18_and_keyword(msg: str, user_id) -> 'str, int, int':
msg_list = msg.split(' ')
num = 1
r18 = 0
keyword = ''
if len(msg_list) == 1:
if msg_list[0].strip().lower() in ['r', 'r18']:
r18 = 1
num = 10
else:
keyword = msg_list[0]
elif len(msg_list) == 2:
keyword = msg_list[1].strip()
if msg_list[0].strip().lower() in ['r', 'r18']:
r18 = 1
num = 10
else:
keyword = msg[0]
if r18 == 1:
await UserCount.add_user(user_id)
return keyword, r18, num
async def find_img_index(img_url, user_id):
try:
setu_data = json.load(open(TXT_PATH + 'setu_data.json', encoding='utf8'))
except (FileNotFoundError, ValueError):
setu_data = {}
async with aiohttp.ClientSession() as session:
async with session.get(img_url, proxy=get_local_proxy(), timeout=5) as res:
async with aiofiles.open(IMAGE_PATH + f"temp/{user_id}_find_setu_index.jpg", 'wb') as f:
await f.write(await res.read())
img_hash = str(get_img_hash(IMAGE_PATH + f"temp/{user_id}_find_setu_index.jpg"))
try:
index = str([setu_data[x]['img_hash'] for x in setu_data.keys()].index(img_hash))
return f"id{index}\n" \
f"title{setu_data[index]['title']}\n" \
f"author{setu_data[index]['author']}\n" \
f"PID{setu_data[index]['pid']}"
except ValueError:
return "该图不在色图库中!"
def delete_img(_id: int):
lens = len(os.listdir(IMAGE_PATH + path)) - 1
if _id < 0 or _id > lens:
return False, f'超过上下限限制,上限:{lens}'
try:
os.remove(IMAGE_PATH + path + f'{_id}.jpg')
if _id != lens:
setu_hash_dict = json.load(open(TXT_PATH + 'setu_data.json', encoding='utf8'))
setu_hash_dict[str(_id)] = setu_hash_dict[str(lens)]
os.rename(IMAGE_PATH + path + f'{lens}.jpg', IMAGE_PATH + path + f'{_id}.jpg')
with open(TXT_PATH + 'setu_data.json', 'w', encoding='utf8') as f:
json.dump(setu_hash_dict, f, ensure_ascii=False, indent=4)
return True, ''
except Exception as e:
logger.error(f'删除色图错误 e{e}')
return False, str(type(e))
# 添加涩图
async def add_img(imgs: list):
index = 0
lens = len(os.listdir(IMAGE_PATH + path))
add_count = 0
setu_data_dict = json.load(open(TXT_PATH + 'setu_data.json', encoding='utf8'))
async with aiohttp.ClientSession() as session:
for img in imgs:
async with session.get(img, proxy=get_local_proxy(), timeout=5) as res:
async with aiofiles.open(IMAGE_PATH + f"temp/add_setu_check_{index}.jpg", 'wb') as f:
await f.write(await res.read())
index += 1
index -= 1
for i in range(index, -1, -1):
img_hash = str(get_img_hash(IMAGE_PATH + f"temp/add_setu_check_{index}.jpg"))
if img_hash not in [setu_data_dict[x]['img_hash'] for x in setu_data_dict.keys()]:
os.rename(IMAGE_PATH + f"temp/add_setu_check_{index}.jpg", IMAGE_PATH + path + f'/{lens}.jpg')
setu_data_dict[lens] = {
'title': 'not title',
'author': 'not author',
'pid': 'not pid',
'img_hash': img_hash,
'img_url': 'not url',
}
lens += 1
add_count += 1
if add_count:
with open(TXT_PATH + 'setu_data.json', 'w', encoding='utf8') as f:
json.dump(setu_data_dict, f, ensure_ascii=False, indent=4)
return lens, add_count