diff --git a/bot.py b/bot.py index 71a794b0..b3009b5c 100644 --- a/bot.py +++ b/bot.py @@ -20,6 +20,7 @@ driver.on_shutdown(disconnect) nonebot.load_builtin_plugins("echo") # 内置插件 nonebot.load_plugins("zhenxun/builtin_plugins") +nonebot.load_plugins("zhenxun/plugins") if __name__ == "__main__": diff --git a/zhenxun/plugins/ai/__init__.py b/zhenxun/plugins/ai/__init__.py new file mode 100644 index 00000000..c66be3ed --- /dev/null +++ b/zhenxun/plugins/ai/__init__.py @@ -0,0 +1,114 @@ +from typing import List + +from nonebot import on_message +from nonebot.plugin import PluginMetadata +from nonebot.rule import to_me +from nonebot_plugin_alconna import UniMsg +from nonebot_plugin_saa import Text +from nonebot_plugin_session import EventSession + +from zhenxun.configs.config import NICKNAME, Config +from zhenxun.configs.utils import PluginExtraData, RegisterConfig +from zhenxun.models.friend_user import FriendUser +from zhenxun.models.group_member_info import GroupInfoUser +from zhenxun.services.log import logger +from zhenxun.utils.depends import UserName + +from .data_source import get_chat_result, hello, no_result + +__zx_plugin_name__ = "AI" +__plugin_usage__ = f""" +usage: + 与{NICKNAME}普普通通的对话吧! +""" +__plugin_version__ = 0.1 +__plugin_author__ = "HibiKier" +__plugin_settings__ = { + "level": 5, + "cmd": ["Ai", "ai", "AI", "aI"], +} +__plugin_configs__ = { + "TL_KEY": {"value": [], "help": "图灵Key", "type": List[str]}, + "ALAPI_AI_CHECK": { + "value": False, + "help": "是否检测青云客骂娘回复", + "default_value": False, + "type": bool, + }, + "TEXT_FILTER": { + "value": ["鸡", "口交"], + "help": "文本过滤器,将敏感词更改为*", + "default_value": [], + "type": List[str], + }, +} + +__plugin_meta__ = PluginMetadata( + name="AI", + description="屑Ai", + usage=""" + 与{NICKNAME}普普通通的对话吧! + """.strip(), + extra=PluginExtraData( + author="HibiKier", + version="0.1", + configs=[ + RegisterConfig( + module="alapi", + key="ALAPI_TOKEN", + value=None, + help="在 https://admin.alapi.cn/user/login 登录后获取token", + ), + RegisterConfig(key="TL_KEY", value=[], help="图灵Key", type=List[str]), + RegisterConfig( + key="ALAPI_AI_CHECK", + value=False, + help="是否检测青云客骂娘回复", + default_value=False, + type=bool, + ), + RegisterConfig( + key="TEXT_FILTER", + value=["鸡", "口交"], + help="文本过滤器,将敏感词更改为*", + type=List[str], + ), + ], + ).dict(), +) + + +ai = on_message(rule=to_me(), priority=998) + + +@ai.handle() +async def _(message: UniMsg, session: EventSession, uname: str = UserName()): + if not message or message.extract_plain_text() in [ + "你好啊", + "你好", + "在吗", + "在不在", + "您好", + "您好啊", + "你好", + "在", + ]: + await hello().finish() + if not session.id1: + await Text("用户id不存在...").finish() + gid = session.id3 or session.id2 + if gid: + nickname = await GroupInfoUser.get_user_nickname(session.id1, gid) + else: + nickname = await FriendUser.get_user_nickname(session.id1) + if not nickname: + nickname = uname + result = await get_chat_result(message, session.id1, nickname) + logger.info(f"问题:{message} ---- 回答:{result}", "ai", session=session) + if result: + result = str(result) + for t in Config.get_config("ai", "TEXT_FILTER"): + result = result.replace(t, "*") + await Text(result).finish() + else: + await no_result().finish() diff --git a/zhenxun/plugins/ai/data_source.py b/zhenxun/plugins/ai/data_source.py new file mode 100644 index 00000000..4b9136c4 --- /dev/null +++ b/zhenxun/plugins/ai/data_source.py @@ -0,0 +1,245 @@ +import os +import random +import re + +import ujson as json +from nonebot_plugin_alconna import UniMsg +from nonebot_plugin_saa import Image, MessageFactory, Text + +from zhenxun.configs.config import NICKNAME, Config +from zhenxun.configs.path_config import DATA_PATH, IMAGE_PATH +from zhenxun.services.log import logger +from zhenxun.utils.http_utils import AsyncHttpx + +from .utils import ai_message_manager + +url = "http://openapi.tuling123.com/openapi/api/v2" + +check_url = "https://v2.alapi.cn/api/censor/text" + +index = 0 + +anime_data = json.load(open(DATA_PATH / "anime.json", "r", encoding="utf8")) + + +async def get_chat_result( + message: UniMsg, user_id: str, nickname: str +) -> Text | MessageFactory: + """获取 AI 返回值,顺序: 特殊回复 -> 图灵 -> 青云客 + + 参数: + text: 问题 + img_url: 图片链接 + user_id: 用户id + nickname: 用户昵称 + + 返回 + str: 回答 + """ + global index + text = message.extract_plain_text() + ai_message_manager.add_message(user_id, text) + special_rst = await ai_message_manager.get_result(user_id, nickname) + if special_rst: + ai_message_manager.add_result(user_id, special_rst) + return Text(special_rst) + if index == 5: + index = 0 + if len(text) < 6 and random.random() < 0.6: + keys = anime_data.keys() + for key in keys: + if text.find(key) != -1: + return random.choice(anime_data[key]).replace("你", nickname) + rst = await tu_ling(text, "", user_id) + if not rst: + rst = await xie_ai(text) + if not rst: + return no_result() + if nickname: + if len(nickname) < 5: + if random.random() < 0.5: + nickname = "~".join(nickname) + "~" + if random.random() < 0.2: + if nickname.find("大人") == -1: + nickname += "大~人~" + rst = str(rst).replace("小主人", nickname).replace("小朋友", nickname) + ai_message_manager.add_result(user_id, rst) + for t in Config.get_config("ai", "TEXT_FILTER"): + rst = rst.replace(t, "*") + return Text(rst) + + +# 图灵接口 +async def tu_ling(text: str, img_url: str, user_id: str) -> str | None: + """获取图灵接口的回复 + + 参数: + text: 问题 + img_url: 图片链接 + user_id: 用户id + + 返回 + str: 图灵回复 + """ + global index + TL_KEY = Config.get_config("ai", "TL_KEY") + req = None + if not TL_KEY: + return None + try: + if text: + req = { + "perception": { + "inputText": {"text": text}, + "selfInfo": { + "location": { + "city": "陨石坑", + "province": "火星", + "street": "第5坑位", + } + }, + }, + "userInfo": {"apiKey": TL_KEY[index], "userId": str(user_id)}, + } + elif img_url: + req = { + "reqType": 1, + "perception": { + "inputImage": {"url": img_url}, + "selfInfo": { + "location": { + "city": "陨石坑", + "province": "火星", + "street": "第5坑位", + } + }, + }, + "userInfo": {"apiKey": TL_KEY[index], "userId": str(user_id)}, + } + except IndexError: + index = 0 + return None + text = "" + response = await AsyncHttpx.post(url, json=req) + if response.status_code != 200: + return None + resp_payload = json.loads(response.text) + if int(resp_payload["intent"]["code"]) in [4003]: + return None + if resp_payload["results"]: + for result in resp_payload["results"]: + if result["resultType"] == "text": + text = result["values"]["text"] + if "请求次数超过" in text: + text = "" + return text + + +# 屑 AI +async def xie_ai(text: str) -> str: + """获取青云客回复 + + 参数: + text: 问题 + + 返回: + str: 青云可回复 + """ + res = await AsyncHttpx.get( + f"http://api.qingyunke.com/api.php?key=free&appid=0&msg={text}" + ) + content = "" + try: + data = json.loads(res.text) + if data["result"] == 0: + content = data["content"] + if "菲菲" in content: + content = content.replace("菲菲", NICKNAME) + if "艳儿" in content: + content = content.replace("艳儿", NICKNAME) + if "公众号" in content: + content = "" + if "{br}" in content: + content = content.replace("{br}", "\n") + if "提示" in content: + content = content[: content.find("提示")] + if "淘宝" in content or "taobao.com" in content: + return "" + while True: + r = re.search("{face:(.*)}", content) + if r: + id_ = r.group(1) + content = content.replace("{" + f"face:{id_}" + "}", "") + else: + break + return ( + content + if not content and not Config.get_config("ai", "ALAPI_AI_CHECK") + else await check_text(content) + ) + except Exception as e: + logger.error(f"Ai xie_ai 发生错误", e=e) + return "" + + +def hello() -> MessageFactory: + """一些打招呼的内容""" + result = random.choice( + ( + "哦豁?!", + "你好!Ov<", + f"库库库,呼唤{NICKNAME}做什么呢", + "我在呢!", + "呼呼,叫俺干嘛", + ) + ) + img = random.choice(os.listdir(IMAGE_PATH / "zai")) + return MessageFactory([Image(IMAGE_PATH / "zai" / img), Text(result)]) + + +def no_result() -> MessageFactory: + """ + 没有回答时的回复 + """ + return MessageFactory( + [ + Text( + random.choice( + [ + "你在说啥子?", + f"纯洁的{NICKNAME}没听懂", + "下次再告诉你(下次一定)", + "你觉得我听懂了吗?嗯?", + "我!不!知!道!", + ] + ) + ), + Image( + IMAGE_PATH + / "noresult" + / random.choice(os.listdir(IMAGE_PATH / "noresult")) + ), + ] + ) + + +async def check_text(text: str) -> str: + """ALAPI文本检测,主要针对青云客API,检测为恶俗文本改为无回复的回答 + + 参数: + text: 回复 + + 返回: + str: 检测文本 + """ + if not Config.get_config("alapi", "ALAPI_TOKEN"): + return text + params = {"token": Config.get_config("alapi", "ALAPI_TOKEN"), "text": text} + try: + data = (await AsyncHttpx.get(check_url, timeout=2, params=params)).json() + if data["code"] == 200: + if data["data"]["conclusion_type"] == 2: + return "" + except Exception as e: + logger.error(f"检测违规文本错误...", e=e) + return text diff --git a/zhenxun/plugins/ai/utils.py b/zhenxun/plugins/ai/utils.py new file mode 100644 index 00000000..946bc234 --- /dev/null +++ b/zhenxun/plugins/ai/utils.py @@ -0,0 +1,153 @@ +import random +import time + +from zhenxun.configs.config import NICKNAME +from zhenxun.models.ban_console import BanConsole + + +class AiMessageManager: + def __init__(self): + self._data = {} + self._same_message = [ + "为什么要发一样的话?", + "请不要再重复对我说一句话了,不然我就要生气了!", + "别再发这句话了,我已经知道了...", + "你是只会说这一句话吗?", + "[*],你发我也发!", + "[uname],[*]", + f"救命!有笨蛋一直给{NICKNAME}发一样的话!", + "这句话你已经给我发了{}次了,再发就生气!", + ] + self._repeat_message = [ + f"请不要学{NICKNAME}说话", + f"为什么要一直学{NICKNAME}说话?", + "你再学!你再学我就生气了!", + f"呜呜,你是想欺负{NICKNAME}嘛..", + "[uname]不要再学我说话了!", + "再学我说话,我就把你拉进黑名单(生气", + "你再学![uname]是个笨蛋!", + "你已经学我说话{}次了!别再学了!", + ] + + def add_message(self, user_id: str, message: str): + """添加用户消息 + + 参数: + user_id: 用户id + message: 消息内容 + """ + if message: + if self._data.get(user_id) is None: + self._data[user_id] = { + "time": time.time(), + "message": [], + "result": [], + "repeat_count": 0, + } + if time.time() - self._data[user_id]["time"] > 60 * 10: + self._data[user_id]["message"].clear() + self._data[user_id]["time"] = time.time() + self._data[user_id]["message"].append(message.strip()) + + def add_result(self, user_id: str, message: str): + """添加回复用户的消息 + + 参数: + user_id: 用户id + message: 回复消息内容 + """ + if message: + if self._data.get(user_id) is None: + self._data[user_id] = { + "time": time.time(), + "message": [], + "result": [], + "repeat_count": 0, + } + if time.time() - self._data[user_id]["time"] > 60 * 10: + self._data[user_id]["result"].clear() + self._data[user_id]["repeat_count"] = 0 + self._data[user_id]["time"] = time.time() + self._data[user_id]["result"].append(message.strip()) + + async def get_result(self, user_id: str, nickname: str) -> str | None: + """特殊消息特殊回复 + + 参数: + user_id: 用户id + nickname: 用户昵称 + + 返回: + str | None: 回答 + """ + try: + if len(self._data[user_id]["message"]) < 2: + return None + except KeyError: + return None + msg = await self._get_user_repeat_message_result(user_id) + if not msg: + msg = await self._get_user_same_message_result(user_id) + if msg: + if "[uname]" in msg: + msg = msg.replace("[uname]", nickname) + if not msg.startswith("生气了!你好烦,闭嘴!") and "[*]" in msg: + msg = msg.replace("[*]", self._data[user_id]["message"][-1]) + return msg + + async def _get_user_same_message_result(self, user_id: str) -> str | None: + """重复消息回复 + + 参数: + user_id: 用户id + + 返回: + str | None: 回答 + """ + msg = self._data[user_id]["message"][-1] + cnt = 0 + _tmp = self._data[user_id]["message"][:-1] + _tmp.reverse() + for s in _tmp: + if s == msg: + cnt += 1 + else: + break + if cnt > 1: + if random.random() < 0.5 and cnt > 3: + rand = random.randint(60, 300) + await BanConsole.ban(user_id, None, 9, rand, None) + self._data[user_id]["message"].clear() + return f"生气了!你好烦,闭嘴!给我老实安静{rand}秒" + return random.choice(self._same_message).format(cnt) + return None + + async def _get_user_repeat_message_result(self, user_id: str) -> str | None: + """复读真寻的消息回复 + + 参数: + user_id: 用户id + + 返回: + str | None: 回答 + """ + msg = self._data[user_id]["message"][-1] + if self._data[user_id]["result"]: + rst = self._data[user_id]["result"][-1] + else: + return None + if msg == rst: + self._data[user_id]["repeat_count"] += 1 + cnt = self._data[user_id]["repeat_count"] + if cnt > 1: + if random.random() < 0.5 and cnt > 3: + rand = random.randint(60, 300) + await BanConsole.ban(user_id, None, 9, rand, None) + self._data[user_id]["result"].clear() + self._data[user_id]["repeat_count"] = 0 + return f"生气了!你好烦,闭嘴!给我老实安静{rand}秒" + return random.choice(self._repeat_message).format(cnt) + return None + + +ai_message_manager = AiMessageManager() diff --git a/zhenxun/utils/depends/__init__.py b/zhenxun/utils/depends/__init__.py new file mode 100644 index 00000000..a993ee45 --- /dev/null +++ b/zhenxun/utils/depends/__init__.py @@ -0,0 +1,29 @@ +from nonebot.internal.params import Depends +from nonebot.params import Command +from nonebot_plugin_userinfo import EventUserInfo, UserInfo + + +def OneCommand(): + """ + 获取单个命令Command + """ + + async def dependency( + cmd: tuple[str, ...] = Command(), + ): + return cmd[0] if cmd else None + + return Depends(dependency) + + +def UserName(): + """ + 用户名称 + """ + + async def dependency(user_info: UserInfo = EventUserInfo()): + return ( + user_info.user_displayname or user_info.user_remark or user_info.user_name + ) + + return Depends(dependency)