zhenxun_bot/plugins/ai/data_source.py

177 lines
6.6 KiB
Python
Raw Normal View History

2021-06-24 15:32:06 +08:00
from configs.config import TL_KEY, ALAPI_TOKEN
2021-05-20 19:21:05 +08:00
import aiohttp
import random
import os
from configs.path_config import IMAGE_PATH, DATA_PATH
2021-06-24 15:32:06 +08:00
from services.log import logger
2021-06-30 19:50:55 +08:00
from utils.init_result import image, face
from utils.utils import get_bot
import re
2021-05-20 19:21:05 +08:00
try:
import ujson as json
except ModuleNotFoundError:
import json
url = "http://openapi.tuling123.com/openapi/api/v2"
2021-06-24 15:32:06 +08:00
check_url = "https://v2.alapi.cn/api/censor/text"
2021-05-20 19:21:05 +08:00
index = 0
anime_data = json.load(open(DATA_PATH + 'anime.json', 'r', encoding='utf8'))
# 图灵AI
async def get_qqbot_chat_result(text: str, img_url: str, user_id: int, user_name: str) -> str:
global index
if index == 5:
index = 0
if len(text) < 6 and random.random() < 0.6:
keys = anime_data.keys()
for key in keys:
if text.find(key) != -1:
return random.choice(anime_data[key]).replace('', user_name)
2021-07-06 21:29:23 +08:00
try:
if text:
req = {
"perception":
{
"inputText":
{
"text": text
},
"selfInfo":
{
"location":
{
"city": "陨石坑",
"province": "火星",
"street": "第5坑位"
}
}
},
"userInfo":
{
"apiKey": TL_KEY[index],
"userId": str(user_id)
}
}
elif img_url:
req = {
"reqType": 1,
"perception":
{
"inputImage": {
"url": img_url
2021-05-20 19:21:05 +08:00
},
2021-07-06 21:29:23 +08:00
"selfInfo":
{
"location":
{
"city": "陨石坑",
"province": "火星",
"street": "第5坑位"
}
}
2021-05-20 19:21:05 +08:00
},
2021-07-06 21:29:23 +08:00
"userInfo":
{
"apiKey": TL_KEY[index],
"userId": str(user_id)
}
}
except IndexError:
index = 0
return no_result()
2021-05-20 19:21:05 +08:00
async with aiohttp.ClientSession() as sess:
async with sess.post(url, json=req) as response:
if response.status != 200:
return ''
resp_payload = json.loads(await response.text())
if resp_payload['intent']:
2021-06-15 10:57:08 +08:00
if int(resp_payload['intent']['code']) in [4003]:
2021-05-20 19:21:05 +08:00
index += 1
# 该AI很屑
async with sess.get(f'http://api.qingyunke.com/api.php?key=free&appid=0&msg={text}') as res:
data = json.loads(await res.text())
if data['result'] == 0:
content = data['content']
if content.find('菲菲') != -1:
content = content.replace('菲菲', list(get_bot().config.nickname)[0])
if content.find('公众号') != -1:
content = ''
if content.find('{br}') != -1:
content = content.replace('{br}', '\n')
if content.find('提示') != -1:
content = content[:content.find('提示')]
2021-06-30 19:50:55 +08:00
while True:
r = re.search('{face:(.*)}', content)
if r:
id_ = r.group(1)
content = content.replace('{' + f'face:{id_}' + '}', str(face(id_)))
else:
break
2021-05-20 19:21:05 +08:00
return content
if resp_payload['results']:
for result in resp_payload['results']:
if result['resultType'] == 'text':
text = result['values']['text']
if user_name:
text = text.replace('小朋友', user_name)
if len(user_name) < 5:
if random.random() < 0.5:
user_name = "~".join(user_name) + '~'
2021-06-30 19:50:55 +08:00
if random.random() < 0.2:
2021-05-20 19:21:05 +08:00
if user_name.find('大人') == -1:
user_name += '大~人~'
text = text.replace('小主人', user_name)
return text
def hello() -> str:
result = random.choice((
"哦豁?!",
"你好Ov<",
f"库库库,呼唤{list(get_bot().config.nickname)[0]}做什么呢",
"我在呢!",
"呼呼,叫俺干嘛"
))
img = random.choice(os.listdir(IMAGE_PATH + "zai/"))
if img[-4:] == ".gif":
result += image(img, "zai")
else:
result += image(img, "zai")
return result
def no_result() -> str:
return random.choice([
'你在说啥子?',
f'纯洁的{list(get_bot().config.nickname)[0]}没听懂',
'下次再告诉你(下次一定)',
'你觉得我听懂了吗?嗯?',
'我!不!知!道!'
]) + image(
random.choice(os.listdir(IMAGE_PATH + "noresult/")
), "noresult")
2021-06-24 15:32:06 +08:00
async def check_text(text: str):
if not ALAPI_TOKEN:
return text
params = {
'token': ALAPI_TOKEN,
'text': text
}
async with aiohttp.ClientSession() as session:
try:
async with session.get(url, timeout=2, params=params) as response:
data = await response.json()
if data['code'] == 200:
if data['conclusion_type'] == 2:
return no_result()
except Exception as e:
logger.error(f'检测违规文本错误...e{e}')
return text