feat: Ai

This commit is contained in:
HibiKier 2024-03-09 23:42:59 +08:00
parent 5a50a2bff4
commit 960b665ba4
5 changed files with 542 additions and 0 deletions

1
bot.py
View File

@ -20,6 +20,7 @@ driver.on_shutdown(disconnect)
nonebot.load_builtin_plugins("echo") # 内置插件
nonebot.load_plugins("zhenxun/builtin_plugins")
nonebot.load_plugins("zhenxun/plugins")
if __name__ == "__main__":

View File

@ -0,0 +1,114 @@
from typing import List
from nonebot import on_message
from nonebot.plugin import PluginMetadata
from nonebot.rule import to_me
from nonebot_plugin_alconna import UniMsg
from nonebot_plugin_saa import Text
from nonebot_plugin_session import EventSession
from zhenxun.configs.config import NICKNAME, Config
from zhenxun.configs.utils import PluginExtraData, RegisterConfig
from zhenxun.models.friend_user import FriendUser
from zhenxun.models.group_member_info import GroupInfoUser
from zhenxun.services.log import logger
from zhenxun.utils.depends import UserName
from .data_source import get_chat_result, hello, no_result
__zx_plugin_name__ = "AI"
__plugin_usage__ = f"""
usage
{NICKNAME}普普通通的对话吧
"""
__plugin_version__ = 0.1
__plugin_author__ = "HibiKier"
__plugin_settings__ = {
"level": 5,
"cmd": ["Ai", "ai", "AI", "aI"],
}
__plugin_configs__ = {
"TL_KEY": {"value": [], "help": "图灵Key", "type": List[str]},
"ALAPI_AI_CHECK": {
"value": False,
"help": "是否检测青云客骂娘回复",
"default_value": False,
"type": bool,
},
"TEXT_FILTER": {
"value": ["", "口交"],
"help": "文本过滤器,将敏感词更改为*",
"default_value": [],
"type": List[str],
},
}
__plugin_meta__ = PluginMetadata(
name="AI",
description="屑Ai",
usage="""
{NICKNAME}普普通通的对话吧
""".strip(),
extra=PluginExtraData(
author="HibiKier",
version="0.1",
configs=[
RegisterConfig(
module="alapi",
key="ALAPI_TOKEN",
value=None,
help="在 https://admin.alapi.cn/user/login 登录后获取token",
),
RegisterConfig(key="TL_KEY", value=[], help="图灵Key", type=List[str]),
RegisterConfig(
key="ALAPI_AI_CHECK",
value=False,
help="是否检测青云客骂娘回复",
default_value=False,
type=bool,
),
RegisterConfig(
key="TEXT_FILTER",
value=["", "口交"],
help="文本过滤器,将敏感词更改为*",
type=List[str],
),
],
).dict(),
)
ai = on_message(rule=to_me(), priority=998)
@ai.handle()
async def _(message: UniMsg, session: EventSession, uname: str = UserName()):
if not message or message.extract_plain_text() in [
"你好啊",
"你好",
"在吗",
"在不在",
"您好",
"您好啊",
"你好",
"",
]:
await hello().finish()
if not session.id1:
await Text("用户id不存在...").finish()
gid = session.id3 or session.id2
if gid:
nickname = await GroupInfoUser.get_user_nickname(session.id1, gid)
else:
nickname = await FriendUser.get_user_nickname(session.id1)
if not nickname:
nickname = uname
result = await get_chat_result(message, session.id1, nickname)
logger.info(f"问题:{message} ---- 回答:{result}", "ai", session=session)
if result:
result = str(result)
for t in Config.get_config("ai", "TEXT_FILTER"):
result = result.replace(t, "*")
await Text(result).finish()
else:
await no_result().finish()

View File

@ -0,0 +1,245 @@
import os
import random
import re
import ujson as json
from nonebot_plugin_alconna import UniMsg
from nonebot_plugin_saa import Image, MessageFactory, Text
from zhenxun.configs.config import NICKNAME, Config
from zhenxun.configs.path_config import DATA_PATH, IMAGE_PATH
from zhenxun.services.log import logger
from zhenxun.utils.http_utils import AsyncHttpx
from .utils import ai_message_manager
url = "http://openapi.tuling123.com/openapi/api/v2"
check_url = "https://v2.alapi.cn/api/censor/text"
index = 0
anime_data = json.load(open(DATA_PATH / "anime.json", "r", encoding="utf8"))
async def get_chat_result(
message: UniMsg, user_id: str, nickname: str
) -> Text | MessageFactory:
"""获取 AI 返回值,顺序: 特殊回复 -> 图灵 -> 青云客
参数:
text: 问题
img_url: 图片链接
user_id: 用户id
nickname: 用户昵称
返回
str: 回答
"""
global index
text = message.extract_plain_text()
ai_message_manager.add_message(user_id, text)
special_rst = await ai_message_manager.get_result(user_id, nickname)
if special_rst:
ai_message_manager.add_result(user_id, special_rst)
return Text(special_rst)
if index == 5:
index = 0
if len(text) < 6 and random.random() < 0.6:
keys = anime_data.keys()
for key in keys:
if text.find(key) != -1:
return random.choice(anime_data[key]).replace("", nickname)
rst = await tu_ling(text, "", user_id)
if not rst:
rst = await xie_ai(text)
if not rst:
return no_result()
if nickname:
if len(nickname) < 5:
if random.random() < 0.5:
nickname = "~".join(nickname) + "~"
if random.random() < 0.2:
if nickname.find("大人") == -1:
nickname += "大~人~"
rst = str(rst).replace("小主人", nickname).replace("小朋友", nickname)
ai_message_manager.add_result(user_id, rst)
for t in Config.get_config("ai", "TEXT_FILTER"):
rst = rst.replace(t, "*")
return Text(rst)
# 图灵接口
async def tu_ling(text: str, img_url: str, user_id: str) -> str | None:
"""获取图灵接口的回复
参数:
text: 问题
img_url: 图片链接
user_id: 用户id
返回
str: 图灵回复
"""
global index
TL_KEY = Config.get_config("ai", "TL_KEY")
req = None
if not TL_KEY:
return None
try:
if text:
req = {
"perception": {
"inputText": {"text": text},
"selfInfo": {
"location": {
"city": "陨石坑",
"province": "火星",
"street": "第5坑位",
}
},
},
"userInfo": {"apiKey": TL_KEY[index], "userId": str(user_id)},
}
elif img_url:
req = {
"reqType": 1,
"perception": {
"inputImage": {"url": img_url},
"selfInfo": {
"location": {
"city": "陨石坑",
"province": "火星",
"street": "第5坑位",
}
},
},
"userInfo": {"apiKey": TL_KEY[index], "userId": str(user_id)},
}
except IndexError:
index = 0
return None
text = ""
response = await AsyncHttpx.post(url, json=req)
if response.status_code != 200:
return None
resp_payload = json.loads(response.text)
if int(resp_payload["intent"]["code"]) in [4003]:
return None
if resp_payload["results"]:
for result in resp_payload["results"]:
if result["resultType"] == "text":
text = result["values"]["text"]
if "请求次数超过" in text:
text = ""
return text
# 屑 AI
async def xie_ai(text: str) -> str:
"""获取青云客回复
参数:
text: 问题
返回:
str: 青云可回复
"""
res = await AsyncHttpx.get(
f"http://api.qingyunke.com/api.php?key=free&appid=0&msg={text}"
)
content = ""
try:
data = json.loads(res.text)
if data["result"] == 0:
content = data["content"]
if "菲菲" in content:
content = content.replace("菲菲", NICKNAME)
if "艳儿" in content:
content = content.replace("艳儿", NICKNAME)
if "公众号" in content:
content = ""
if "{br}" in content:
content = content.replace("{br}", "\n")
if "提示" in content:
content = content[: content.find("提示")]
if "淘宝" in content or "taobao.com" in content:
return ""
while True:
r = re.search("{face:(.*)}", content)
if r:
id_ = r.group(1)
content = content.replace("{" + f"face:{id_}" + "}", "")
else:
break
return (
content
if not content and not Config.get_config("ai", "ALAPI_AI_CHECK")
else await check_text(content)
)
except Exception as e:
logger.error(f"Ai xie_ai 发生错误", e=e)
return ""
def hello() -> MessageFactory:
"""一些打招呼的内容"""
result = random.choice(
(
"哦豁?!",
"你好Ov<",
f"库库库,呼唤{NICKNAME}做什么呢",
"我在呢!",
"呼呼,叫俺干嘛",
)
)
img = random.choice(os.listdir(IMAGE_PATH / "zai"))
return MessageFactory([Image(IMAGE_PATH / "zai" / img), Text(result)])
def no_result() -> MessageFactory:
"""
没有回答时的回复
"""
return MessageFactory(
[
Text(
random.choice(
[
"你在说啥子?",
f"纯洁的{NICKNAME}没听懂",
"下次再告诉你(下次一定)",
"你觉得我听懂了吗?嗯?",
"我!不!知!道!",
]
)
),
Image(
IMAGE_PATH
/ "noresult"
/ random.choice(os.listdir(IMAGE_PATH / "noresult"))
),
]
)
async def check_text(text: str) -> str:
"""ALAPI文本检测主要针对青云客API检测为恶俗文本改为无回复的回答
参数:
text: 回复
返回:
str: 检测文本
"""
if not Config.get_config("alapi", "ALAPI_TOKEN"):
return text
params = {"token": Config.get_config("alapi", "ALAPI_TOKEN"), "text": text}
try:
data = (await AsyncHttpx.get(check_url, timeout=2, params=params)).json()
if data["code"] == 200:
if data["data"]["conclusion_type"] == 2:
return ""
except Exception as e:
logger.error(f"检测违规文本错误...", e=e)
return text

153
zhenxun/plugins/ai/utils.py Normal file
View File

@ -0,0 +1,153 @@
import random
import time
from zhenxun.configs.config import NICKNAME
from zhenxun.models.ban_console import BanConsole
class AiMessageManager:
def __init__(self):
self._data = {}
self._same_message = [
"为什么要发一样的话?",
"请不要再重复对我说一句话了,不然我就要生气了!",
"别再发这句话了,我已经知道了...",
"你是只会说这一句话吗?",
"[*],你发我也发!",
"[uname][*]",
f"救命!有笨蛋一直给{NICKNAME}发一样的话!",
"这句话你已经给我发了{}次了,再发就生气!",
]
self._repeat_message = [
f"请不要学{NICKNAME}说话",
f"为什么要一直学{NICKNAME}说话?",
"你再学!你再学我就生气了!",
f"呜呜,你是想欺负{NICKNAME}嘛..",
"[uname]不要再学我说话了!",
"再学我说话,我就把你拉进黑名单(生气",
"你再学![uname]是个笨蛋!",
"你已经学我说话{}次了!别再学了!",
]
def add_message(self, user_id: str, message: str):
"""添加用户消息
参数:
user_id: 用户id
message: 消息内容
"""
if message:
if self._data.get(user_id) is None:
self._data[user_id] = {
"time": time.time(),
"message": [],
"result": [],
"repeat_count": 0,
}
if time.time() - self._data[user_id]["time"] > 60 * 10:
self._data[user_id]["message"].clear()
self._data[user_id]["time"] = time.time()
self._data[user_id]["message"].append(message.strip())
def add_result(self, user_id: str, message: str):
"""添加回复用户的消息
参数:
user_id: 用户id
message: 回复消息内容
"""
if message:
if self._data.get(user_id) is None:
self._data[user_id] = {
"time": time.time(),
"message": [],
"result": [],
"repeat_count": 0,
}
if time.time() - self._data[user_id]["time"] > 60 * 10:
self._data[user_id]["result"].clear()
self._data[user_id]["repeat_count"] = 0
self._data[user_id]["time"] = time.time()
self._data[user_id]["result"].append(message.strip())
async def get_result(self, user_id: str, nickname: str) -> str | None:
"""特殊消息特殊回复
参数:
user_id: 用户id
nickname: 用户昵称
返回:
str | None: 回答
"""
try:
if len(self._data[user_id]["message"]) < 2:
return None
except KeyError:
return None
msg = await self._get_user_repeat_message_result(user_id)
if not msg:
msg = await self._get_user_same_message_result(user_id)
if msg:
if "[uname]" in msg:
msg = msg.replace("[uname]", nickname)
if not msg.startswith("生气了!你好烦,闭嘴!") and "[*]" in msg:
msg = msg.replace("[*]", self._data[user_id]["message"][-1])
return msg
async def _get_user_same_message_result(self, user_id: str) -> str | None:
"""重复消息回复
参数:
user_id: 用户id
返回:
str | None: 回答
"""
msg = self._data[user_id]["message"][-1]
cnt = 0
_tmp = self._data[user_id]["message"][:-1]
_tmp.reverse()
for s in _tmp:
if s == msg:
cnt += 1
else:
break
if cnt > 1:
if random.random() < 0.5 and cnt > 3:
rand = random.randint(60, 300)
await BanConsole.ban(user_id, None, 9, rand, None)
self._data[user_id]["message"].clear()
return f"生气了!你好烦,闭嘴!给我老实安静{rand}"
return random.choice(self._same_message).format(cnt)
return None
async def _get_user_repeat_message_result(self, user_id: str) -> str | None:
"""复读真寻的消息回复
参数:
user_id: 用户id
返回:
str | None: 回答
"""
msg = self._data[user_id]["message"][-1]
if self._data[user_id]["result"]:
rst = self._data[user_id]["result"][-1]
else:
return None
if msg == rst:
self._data[user_id]["repeat_count"] += 1
cnt = self._data[user_id]["repeat_count"]
if cnt > 1:
if random.random() < 0.5 and cnt > 3:
rand = random.randint(60, 300)
await BanConsole.ban(user_id, None, 9, rand, None)
self._data[user_id]["result"].clear()
self._data[user_id]["repeat_count"] = 0
return f"生气了!你好烦,闭嘴!给我老实安静{rand}"
return random.choice(self._repeat_message).format(cnt)
return None
ai_message_manager = AiMessageManager()

View File

@ -0,0 +1,29 @@
from nonebot.internal.params import Depends
from nonebot.params import Command
from nonebot_plugin_userinfo import EventUserInfo, UserInfo
def OneCommand():
"""
获取单个命令Command
"""
async def dependency(
cmd: tuple[str, ...] = Command(),
):
return cmd[0] if cmd else None
return Depends(dependency)
def UserName():
"""
用户名称
"""
async def dependency(user_info: UserInfo = EventUserInfo()):
return (
user_info.user_displayname or user_info.user_remark or user_info.user_name
)
return Depends(dependency)