From 137870b698e45e585b4394d7f87052ce8bd5da3b Mon Sep 17 00:00:00 2001 From: HibiKier <775757368@qq.com> Date: Sat, 27 Jul 2024 04:30:03 +0800 Subject: [PATCH] =?UTF-8?q?feat=E2=9C=A8:=20=E6=B7=BB=E5=8A=A0=E8=AF=8D?= =?UTF-8?q?=E6=9D=A1word=5Fbank?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- zhenxun/builtin_plugins/admin/admin_help.py | 2 +- .../builtin_plugins/superuser/super_help.py | 2 +- zhenxun/plugins/word_bank/__init__.py | 18 + zhenxun/plugins/word_bank/_config.py | 24 + zhenxun/plugins/word_bank/_data_source.py | 288 +++++++++ zhenxun/plugins/word_bank/_model.py | 566 ++++++++++++++++++ zhenxun/plugins/word_bank/_rule.py | 59 ++ zhenxun/plugins/word_bank/command.py | 54 ++ zhenxun/plugins/word_bank/message_handle.py | 31 + zhenxun/plugins/word_bank/word_handle.py | 314 ++++++++++ zhenxun/utils/_image_template.py | 22 +- 11 files changed, 1371 insertions(+), 9 deletions(-) create mode 100644 zhenxun/plugins/word_bank/__init__.py create mode 100644 zhenxun/plugins/word_bank/_config.py create mode 100644 zhenxun/plugins/word_bank/_data_source.py create mode 100644 zhenxun/plugins/word_bank/_model.py create mode 100644 zhenxun/plugins/word_bank/_rule.py create mode 100644 zhenxun/plugins/word_bank/command.py create mode 100644 zhenxun/plugins/word_bank/message_handle.py create mode 100644 zhenxun/plugins/word_bank/word_handle.py diff --git a/zhenxun/builtin_plugins/admin/admin_help.py b/zhenxun/builtin_plugins/admin/admin_help.py index 6cad3cc0..b151ed09 100644 --- a/zhenxun/builtin_plugins/admin/admin_help.py +++ b/zhenxun/builtin_plugins/admin/admin_help.py @@ -125,7 +125,7 @@ async def build_help() -> BuildImage: ) if task_list := await TaskInfo.all(): task_str = "\n".join([task.name for task in task_list]) - task_str = "通过 开启/关闭 来控制群被动\n----------\n" + task_str + task_str = "通过 开启/关闭群被动 来控制群被动\n----------\n" + task_str task_image = await text2image(task_str, padding=5, color=(255, 255, 255)) await task_image.circle_corner(10) A = BuildImage(task_image.width + 50, task_image.height + 85, "#EAEDF2") diff --git a/zhenxun/builtin_plugins/superuser/super_help.py b/zhenxun/builtin_plugins/superuser/super_help.py index a47943d7..5136bb5c 100644 --- a/zhenxun/builtin_plugins/superuser/super_help.py +++ b/zhenxun/builtin_plugins/superuser/super_help.py @@ -121,7 +121,7 @@ async def build_help() -> BuildImage: ) if task_list := await TaskInfo.all(): task_str = "\n".join([task.name for task in task_list]) - task_str = "通过 开启/关闭 来控制群被动\n----------\n" + task_str + task_str = "通过 开启/关闭群被动 来控制群被动\n----------\n" + task_str task_image = await text2image(task_str, padding=5, color=(255, 255, 255)) await task_image.circle_corner(10) A = BuildImage(task_image.width + 50, task_image.height + 85, "#EAEDF2") diff --git a/zhenxun/plugins/word_bank/__init__.py b/zhenxun/plugins/word_bank/__init__.py new file mode 100644 index 00000000..c3ef6546 --- /dev/null +++ b/zhenxun/plugins/word_bank/__init__.py @@ -0,0 +1,18 @@ +from pathlib import Path + +import nonebot + +from zhenxun.configs.config import Config + +Config.add_plugin_config( + "word_bank", + "WORD_BANK_LEVEL", + 5, + help="设置增删词库的权限等级", + default_value=5, + type=int, +) +Config.set_name("word_bank", "词库问答") + + +nonebot.load_plugins(str(Path(__file__).parent.resolve())) diff --git a/zhenxun/plugins/word_bank/_config.py b/zhenxun/plugins/word_bank/_config.py new file mode 100644 index 00000000..3d074c18 --- /dev/null +++ b/zhenxun/plugins/word_bank/_config.py @@ -0,0 +1,24 @@ +from zhenxun.configs.path_config import DATA_PATH + +data_dir = DATA_PATH / "word_bank" +data_dir.mkdir(parents=True, exist_ok=True) + +scope2int = { + "全局": 0, + "群聊": 1, + "私聊": 2, +} + +type2int = { + "精准": 0, + "模糊": 1, + "正则": 2, + "图片": 3, +} + +int2type = { + 0: "精准", + 1: "模糊", + 2: "正则", + 3: "图片", +} diff --git a/zhenxun/plugins/word_bank/_data_source.py b/zhenxun/plugins/word_bank/_data_source.py new file mode 100644 index 00000000..efd7052d --- /dev/null +++ b/zhenxun/plugins/word_bank/_data_source.py @@ -0,0 +1,288 @@ +import re + +from nonebot.adapters.onebot.v11 import unescape +from nonebot_plugin_alconna import At as alcAt +from nonebot_plugin_alconna import Image as alcImage +from nonebot_plugin_alconna import Text as alcText +from nonebot_plugin_alconna import UniMessage, UniMsg +from nonebot_plugin_saa import Image, Mention, MessageFactory, Text + +from zhenxun.utils.image_utils import ImageTemplate + +from ._model import WordBank + + +def get_img_and_at_list(message: UniMsg) -> tuple[list[str], list[str]]: + """获取图片和at数据 + + 参数: + message: UniMsg + + 返回: + tuple[list[str], list[str]]: 图片列表,at列表 + """ + img_list, at_list = [], [] + for msg in message: + if isinstance(msg, alcImage): + img_list.append(msg.url) + elif isinstance(msg, alcAt): + at_list.append(msg.target) + return img_list, at_list + + +def get_problem(message: UniMsg) -> str: + """获取问题内容 + + 参数: + message: UniMsg + + 返回: + str: 问题文本 + """ + problem = "" + a, b = True, True + for msg in message: + if isinstance(msg, alcText) or isinstance(msg, str): + msg = str(msg) + if "问" in str(msg) and a: + a = False + split_text = msg.split("问") + if len(split_text) > 1: + problem += "问".join(split_text[1:]) + if b: + if "答" in problem: + b = False + problem = problem.split("答")[0] + elif "答" in msg and b: + b = False + # problem += "答".join(msg.split("答")[:-1]) + problem += msg.split("答")[0] + if not a and not b: + break + if isinstance(msg, alcAt): + problem += f"[at:{msg.target}]" + return problem + + +def get_answer(message: UniMsg) -> UniMessage | None: + """获取at时回答 + + 参数: + message: UniMsg + + 返回: + str: 回答内容 + """ + temp_message = None + answer = "" + index = 0 + for msg in message: + index += 1 + if isinstance(msg, alcText) or isinstance(msg, str): + msg = str(msg) + if "答" in msg: + answer += "答".join(msg.split("答")[1:]) + break + if answer: + temp_message = message[index:] + temp_message.insert(0, alcText(answer)) + return temp_message + + +class WordBankManage: + + @classmethod + async def update_word( + cls, + replace: str, + problem: str = "", + index: int | None = None, + group_id: str | None = None, + word_scope: int = 1, + ) -> tuple[str, str]: + """修改群词条 + + 参数: + params: 参数 + group_id: 群号 + word_scope: 词条范围 + + 返回: + tuple[str, str]: 处理消息,替换的旧词条 + """ + return await cls.__word_handle( + problem, group_id, "update", index, None, word_scope, replace + ) + + @classmethod + async def delete_word( + cls, + problem: str, + index: int | None = None, + aid: int | None = None, + group_id: str | None = None, + word_scope: int = 1, + ) -> tuple[str, str]: + """删除群词条 + + 参数: + params: 参数 + index: 指定下标 + aid: 指定回答下标 + group_id: 群号 + word_scope: 词条范围 + + 返回: + tuple[str, str]: 处理消息,空 + """ + return await cls.__word_handle( + problem, group_id, "delete", index, aid, word_scope + ) + + @classmethod + async def __word_handle( + cls, + problem: str, + group_id: str | None, + handle_type: str, + index: int | None = None, + aid: int | None = None, + word_scope: int = 0, + replace_problem: str = "", + ) -> tuple[str, str]: + """词条操作 + + 参数: + problem: 参数 + group_id: 群号 + handle_type: 类型 + index: 指定回答下标 + aid: 指定回答下标 + word_scope: 词条范围 + replace_problem: 替换问题内容 + + 返回: + tuple[str, str]: 处理消息,替换的旧词条 + """ + if index is not None: + problem, code = await cls.__get_problem_str(index, group_id, word_scope) + if code != 200: + return problem, "" + if handle_type == "delete": + if index: + problem, _problem_list = await WordBank.get_problem_all_answer( + problem, None, group_id, word_scope + ) + if not _problem_list: + return problem, "" + if await WordBank.delete_group_problem(problem, group_id, aid, word_scope): # type: ignore + return "删除词条成功!", "" + return "词条不存在", "" + if handle_type == "update": + old_problem = await WordBank.update_group_problem( + problem, replace_problem, group_id, word_scope=word_scope + ) + return f"修改词条成功!\n{old_problem} -> {replace_problem}", old_problem + return "类型错误", "" + + @classmethod + async def __get_problem_str( + cls, idx: int, group_id: str | None = None, word_scope: int = 1 + ) -> tuple[str, int]: + """通过id获取问题字符串 + + 参数: + idx: 下标 + group_id: 群号 + word_scope: 获取类型 + """ + if word_scope in [0, 2]: + all_problem = await WordBank.get_problem_by_scope(word_scope) + elif group_id: + all_problem = await WordBank.get_group_all_problem(group_id) + else: + raise Exception("词条类型与群组id不能为空") + if idx < 0 or idx >= len(all_problem): + return "问题下标id必须在范围内", 999 + return all_problem[idx][0], 200 + + @classmethod + async def show_word( + cls, + problem: str | None, + index: int | None = None, + group_id: str | None = None, + word_scope: int | None = 1, + ) -> Text | MessageFactory | Image: + """获取群词条 + + 参数: + problem: 问题 + group_id: 群组id + word_scope: 词条范围 + index: 指定回答下标 + """ + if problem or index != None: + msg_list = [] + problem, _problem_list = await WordBank.get_problem_all_answer( + problem, # type: ignore + index, + group_id if group_id is None else None, + word_scope, + ) + if not _problem_list: + return Text(problem) + for msg in _problem_list: + _text = str(msg) + if isinstance(msg, Mention): + _text = f"[at:{msg.data}]" + elif isinstance(msg, Image): + _text = msg.data + elif isinstance(msg, list): + _text = [] + for m in msg: + __text = str(m) + if isinstance(m, Mention): + __text = f"[at:{m.data['user_id']}]" + elif isinstance(m, Image): + # TODO: 显示词条回答图片 + # __text = (m.data["image"], 30, 30) + __text = "[图片]" + _text.append(__text) + msg_list.append("".join(_text)) + column_name = ["序号", "回答内容"] + data_list = [] + for index, msg in enumerate(msg_list): + data_list.append([index, msg]) + template_image = await ImageTemplate.table_page( + f"词条 {problem} 的回答", None, column_name, data_list + ) + return Image(template_image.pic2bytes()) + else: + result = [] + if group_id: + _problem_list = await WordBank.get_group_all_problem(group_id) + elif word_scope is not None: + _problem_list = await WordBank.get_problem_by_scope(word_scope) + else: + raise Exception("群组id和词条范围不能都为空") + global_problem_list = await WordBank.get_problem_by_scope(0) + if not _problem_list and not global_problem_list: + return Text("未收录任何词条...") + column_name = ["序号", "关键词", "匹配类型", "收录用户"] + data_list = [list(s) for s in _problem_list] + for i in range(len(data_list)): + data_list[i].insert(0, i) + group_image = await ImageTemplate.table_page( + "群组内词条" if group_id else "私聊词条", None, column_name, data_list + ) + result.append(Image(group_image.pic2bytes())) + if global_problem_list: + data_list = [list(s) for s in global_problem_list] + for i in range(len(data_list)): + data_list[i].insert(0, i) + global_image = await ImageTemplate.table_page( + "全局词条", None, column_name, data_list + ) + result.append(Image(global_image.pic2bytes())) + return MessageFactory(result) diff --git a/zhenxun/plugins/word_bank/_model.py b/zhenxun/plugins/word_bank/_model.py new file mode 100644 index 00000000..eef86941 --- /dev/null +++ b/zhenxun/plugins/word_bank/_model.py @@ -0,0 +1,566 @@ +import random +import re +import time +import uuid +from datetime import datetime +from typing import Any + +from nonebot_plugin_alconna import At as alcAt +from nonebot_plugin_alconna import Image as alcImage +from nonebot_plugin_alconna import Text as alcText +from nonebot_plugin_saa import Image, Mention, MessageFactory, Text +from tortoise import Tortoise, fields +from tortoise.expressions import Q +from typing_extensions import Self + +from zhenxun.configs.path_config import DATA_PATH +from zhenxun.services.db_context import Model +from zhenxun.utils.http_utils import AsyncHttpx +from zhenxun.utils.image_utils import get_img_hash + +from ._config import int2type + +path = DATA_PATH / "word_bank" + + +class WordBank(Model): + + id = fields.IntField(pk=True, generated=True, auto_increment=True) + """自增id""" + user_id = fields.CharField(255) + """用户id""" + group_id = fields.CharField(255, null=True) + """群聊id""" + word_scope = fields.IntField(default=0) + """生效范围 0: 全局 1: 群聊 2: 私聊""" + word_type = fields.IntField(default=0) + """词条类型 0: 完全匹配 1: 模糊 2: 正则 3: 图片""" + status = fields.BooleanField() + """词条状态""" + problem = fields.TextField() + """问题,为图片时使用图片hash""" + answer = fields.TextField() + """回答""" + placeholder = fields.TextField(null=True) + """占位符""" + image_path = fields.TextField(null=True) + """使用图片作为问题时图片存储的路径""" + to_me = fields.CharField(255, null=True) + """昵称开头时存储的昵称""" + create_time = fields.DatetimeField(auto_now=True) + """创建时间""" + update_time = fields.DatetimeField(auto_now_add=True) + """更新时间""" + platform = fields.CharField(255, default="qq") + """平台""" + author = fields.CharField(255, null=True, default="") + """收录人""" + + class Meta: + table = "word_bank2" + table_description = "词条数据库" + + @classmethod + async def exists( + cls, + user_id: str | None, + group_id: str | None, + problem: str, + answer: str | None, + word_scope: int | None = None, + word_type: int | None = None, + ) -> bool: + """检测问题是否存在 + + 参数: + user_id: 用户id + group_id: 群号 + problem: 问题 + answer: 回答 + word_scope: 词条范围 + word_type: 词条类型 + """ + query = cls.filter(problem=problem) + if user_id: + query = query.filter(user_id=user_id) + if group_id: + query = query.filter(group_id=group_id) + if answer: + query = query.filter(answer=answer) + if word_type is not None: + query = query.filter(word_type=word_type) + if word_scope is not None: + query = query.filter(word_scope=word_scope) + return await query.exists() + + @classmethod + async def add_problem_answer( + cls, + user_id: str, + group_id: str | None, + word_scope: int, + word_type: int, + problem: str, + answer: list[str | alcText | alcAt | alcImage], + to_me_nickname: str | None = None, + platform: str = "", + author: str = "", + ): + """添加或新增一个问答 + + 参数: + user_id: 用户id + group_id: 群号 + word_scope: 词条范围, + word_type: 词条类型, + problem: 问题, 为图片时是URl + answer: 回答 + to_me_nickname: at真寻名称 + platform: 所属平台 + author: 收录人id + """ + # 对图片做额外处理 + image_path = None + if word_type == 3: + _file = ( + path / "problem" / f"{group_id}" / f"{user_id}_{int(time.time())}.jpg" + ) + _file.parent.mkdir(exist_ok=True, parents=True) + await AsyncHttpx.download_file(problem, _file) + problem = get_img_hash(_file) + image_path = f"problem/{group_id}/{user_id}_{int(time.time())}.jpg" + new_answer, placeholder_list = await cls._answer2format( + answer, user_id, group_id + ) + if not await cls.exists( + user_id, group_id, problem, new_answer, word_scope, word_type + ): + await cls.create( + user_id=user_id, + group_id=group_id, + word_scope=word_scope, + word_type=word_type, + status=True, + problem=str(problem).strip(), + answer=new_answer, + image_path=image_path, + placeholder=",".join(placeholder_list), + create_time=datetime.now().replace(microsecond=0), + update_time=datetime.now().replace(microsecond=0), + to_me=to_me_nickname, + platform=platform, + author=author, + ) + + @classmethod + async def _answer2format( + cls, + answer: list[str | alcText | alcAt | alcImage], + user_id: str, + group_id: str | None, + ) -> tuple[str, list[Any]]: + """将特殊字段转化为占位符,图片,at等 + + 参数: + answer: 回答内容 + user_id: 用户id + group_id: 群号 + + 返回: + tuple[str, list[Any]]: 替换后的文本回答内容,占位符 + """ + placeholder_list = [] + text = "" + index = 0 + for seg in answer: + placeholder = uuid.uuid1() + if isinstance(seg, str): + text += seg + elif isinstance(seg, alcText): + text += seg.text + elif seg.type == "face": # TODO: face貌似无用... + text += f"[face:placeholder_{placeholder}]" + placeholder_list.append(seg.data["id"]) + elif isinstance(seg, alcAt): + text += f"[at:placeholder_{placeholder}]" + placeholder_list.append(seg.target) + elif isinstance(seg, alcImage) and seg.url: + text += f"[image:placeholder_{placeholder}]" + index += 1 + _file = ( + path + / "answer" + / f"{group_id or user_id}" + / f"{user_id}_{placeholder}.jpg" + ) + _file.parent.mkdir(exist_ok=True, parents=True) + await AsyncHttpx.download_file(seg.url, _file) + placeholder_list.append( + f"answer/{group_id or user_id}/{user_id}_{placeholder}.jpg" + ) + return text, placeholder_list + + @classmethod + async def _format2answer( + cls, + problem: str, + answer: str, + user_id: int, + group_id: int, + query: Self | None = None, + ) -> MessageFactory | Text: + """将占位符转换为实际内容 + + 参数: + problem: 问题内容 + answer: 回答内容 + user_id: 用户id + group_id: 群组id + """ + result_list = [] + if not query: + query = await cls.get_or_none( + problem=problem, + user_id=user_id, + group_id=group_id, + answer=answer, + ) + if not answer: + answer = str(query.answer) # type: ignore + if query and query.placeholder: + type_list = re.findall(rf"\[(.*?):placeholder_.*?]", answer) + answer_split = re.split(rf"\[.*:placeholder_.*?]", answer) + placeholder_split = query.placeholder.split(",") + for index, ans in enumerate(answer_split): + result_list.append(Text(ans)) + if index < len(type_list): + t = type_list[index] + p = placeholder_split[index] + if t == "image": + result_list.append(Image(path / p)) + elif t == "at": + result_list.append(Mention(p)) + return MessageFactory(result_list) + return Text(answer) + + @classmethod + async def check_problem( + cls, + group_id: str | None, + problem: str, + word_scope: int | None = None, + word_type: int | None = None, + ) -> Any: + """检测是否包含该问题并获取所有回答 + + 参数: + group_id: 群组id + problem: 问题内容 + word_scope: 词条范围 + word_type: 词条类型 + """ + query = cls + if group_id: + if word_scope: + query = query.filter(word_scope=word_scope) + else: + query = query.filter(Q(group_id=group_id) | Q(word_scope=0)) + else: + query = query.filter(Q(word_scope=2) | Q(word_scope=0)) + if word_type: + query = query.filter(word_scope=word_type) + # 完全匹配 + if data_list := await query.filter( + Q(Q(word_type=0) | Q(word_type=3)), Q(problem=problem) + ).all(): + return data_list + db = Tortoise.get_connection("default") + # 模糊匹配 + sql = query.filter(word_type=1).sql() + " and POSITION(problem in $1) > 0" + data_list = await db.execute_query_dict(sql, [problem]) + if data_list: + return [cls(**data) for data in data_list] + # 正则 + sql = ( + query.filter(word_type=2, word_scope__not=999).sql() + " and $1 ~ problem;" + ) + data_list = await db.execute_query_dict(sql, [problem]) + if data_list: + return [cls(**data) for data in data_list] + return None + + @classmethod + async def get_answer( + cls, + group_id: str | None, + problem: str, + word_scope: int | None = None, + word_type: int | None = None, + ) -> Text | MessageFactory | None: + """根据问题内容获取随机回答 + + 参数: + user_id: 用户id + group_id: 群组id + problem: 问题内容 + word_scope: 词条范围 + word_type: 词条类型 + """ + data_list = await cls.check_problem(group_id, problem, word_scope, word_type) + if data_list: + random_answer = random.choice(data_list) + if random_answer.word_type == 2: + r = re.search(random_answer.problem, problem) + has_placeholder = re.search(rf"\$(\d)", random_answer.answer) + if r and r.groups() and has_placeholder: + pats = re.sub(r"\$(\d)", r"\\\1", random_answer.answer) + random_answer.answer = re.sub(random_answer.problem, pats, problem) + return ( + await cls._format2answer( + random_answer.problem, + random_answer.answer, + random_answer.user_id, + random_answer.group_id, + random_answer, + ) + if random_answer.placeholder + else Text(random_answer.answer) + ) + + @classmethod + async def get_problem_all_answer( + cls, + problem: str, + index: int | None = None, + group_id: str | None = None, + word_scope: int | None = 0, + ) -> tuple[str, list[Text | MessageFactory]]: + """获取指定问题所有回答 + + 参数: + problem: 问题 + index: 下标 + group_id: 群号 + word_scope: 词条范围 + + 返回: + tuple[str, list[Text | MessageFactory]]: 问题和所有回答 + """ + if index is not None: + # TODO: group_by和order_by不能同时使用 + if group_id: + _problem = ( + await cls.filter(group_id=group_id).order_by("create_time") + # .group_by("problem") + .values_list("problem", flat=True) + ) + else: + _problem = ( + await cls.filter(word_scope=(word_scope or 0)).order_by( + "create_time" + ) + # .group_by("problem") + .values_list("problem", flat=True) + ) + # if index is None and problem not in _problem: + # return "词条不存在...", [] + sort_problem = [] + for p in _problem: + if p not in sort_problem: + sort_problem.append(p) + if index > len(sort_problem) - 1: + return "下标错误,必须小于问题数量...", [] + problem = sort_problem[index] # type: ignore + f = cls.filter(problem=problem, word_scope=(word_scope or 0)) + if group_id: + f = f.filter(group_id=group_id) + answer_list = await f.all() + if not answer_list: + return "词条不存在...", [] + return problem, [await cls._format2answer("", "", 0, 0, a) for a in answer_list] + + @classmethod + async def delete_group_problem( + cls, + problem: str, + group_id: str | None, + index: int | None = None, + word_scope: int = 1, + ): + """删除指定问题全部或指定回答 + + 参数: + problem: 问题文本 + group_id: 群号 + index: 回答下标 + word_scope: 词条范围 + """ + if await cls.exists(None, group_id, problem, None, word_scope): + if index is not None: + if group_id: + query = await cls.filter( + group_id=group_id, problem=problem, word_scope=word_scope + ).all() + else: + query = await cls.filter( + word_scope=word_scope, problem=problem + ).all() + await query[index].delete() + else: + if group_id: + await WordBank.filter( + group_id=group_id, problem=problem, word_scope=word_scope + ).delete() + else: + await WordBank.filter( + word_scope=word_scope, problem=problem + ).delete() + return True + return False + + @classmethod + async def update_group_problem( + cls, + problem: str, + replace_str: str, + group_id: str | None, + index: int | None = None, + word_scope: int = 1, + ) -> str: + """修改词条问题 + + 参数: + problem: 问题 + replace_str: 替换问题 + group_id: 群号 + index: 问题下标 + word_scope: 词条范围 + + 返回: + str: 修改前的问题 + """ + if index is not None: + if group_id: + query = await cls.filter(group_id=group_id, problem=problem).all() + else: + query = await cls.filter(word_scope=word_scope, problem=problem).all() + tmp = query[index].problem + query[index].problem = replace_str + await query[index].save(update_fields=["problem"]) + return tmp + else: + if group_id: + await cls.filter(group_id=group_id, problem=problem).update( + problem=replace_str + ) + else: + await cls.filter(word_scope=word_scope, problem=problem).update( + problem=replace_str + ) + return problem + + @classmethod + async def get_group_all_problem(cls, group_id: str) -> list[tuple[Any | str]]: + """获取群聊所有词条 + + 参数: + group_id: 群号 + """ + return cls._handle_problem( + await cls.filter(group_id=group_id).order_by("create_time").all() # type: ignore + ) + + @classmethod + async def get_problem_by_scope(cls, word_scope: int): + """通过词条范围获取词条 + + 参数: + word_scope: 词条范围 + """ + return cls._handle_problem( + await cls.filter(word_scope=word_scope).order_by("create_time").all() # type: ignore + ) + + @classmethod + async def get_problem_by_type(cls, word_type: int): + """通过词条类型获取词条 + + 参数: + word_type: 词条类型 + """ + return cls._handle_problem( + await cls.filter(word_type=word_type).order_by("create_time").all() # type: ignore + ) + + @classmethod + def _handle_problem(cls, problem_list: list["WordBank"]): + """格式化处理问题 + + 参数: + msg_list: 消息列表 + """ + _tmp = [] + result_list = [] + for q in problem_list: + if q.problem not in _tmp: + # TODO: 获取收录人名称 + problem = ( + (path / q.image_path, 30, 30) if q.image_path else q.problem, + int2type[q.word_type], + # q.author, + "-", + ) + result_list.append(problem) + _tmp.append(q.problem) + return result_list + + @classmethod + async def _move( + cls, + user_id: str, + group_id: str | None, + problem: str, + answer: str, + placeholder: str, + ): + """旧词条图片移动方法 + + 参数: + user_id: 用户id + group_id: 群号 + problem: 问题 + answer: 回答 + placeholder: 占位符 + """ + word_scope = 0 + word_type = 0 + # 对图片做额外处理 + if not await cls.exists( + user_id, group_id, problem, answer, word_scope, word_type + ): + await cls.create( + user_id=user_id, + group_id=group_id, + word_scope=word_scope, + word_type=word_type, + status=True, + problem=problem, + answer=answer, + image_path=None, + placeholder=placeholder, + create_time=datetime.now().replace(microsecond=0), + update_time=datetime.now().replace(microsecond=0), + ) + + @classmethod + async def _run_script(cls): + return [ + "ALTER TABLE word_bank2 ADD to_me varchar(255);", # 添加 to_me 字段 + "ALTER TABLE word_bank2 ALTER COLUMN create_time TYPE timestamp with time zone USING create_time::timestamp with time zone;", + "ALTER TABLE word_bank2 ALTER COLUMN update_time TYPE timestamp with time zone USING update_time::timestamp with time zone;", + "ALTER TABLE word_bank2 RENAME COLUMN user_qq TO user_id;", # 将user_qq改为user_id + "ALTER TABLE word_bank2 ALTER COLUMN user_id TYPE character varying(255);", + "ALTER TABLE word_bank2 ALTER COLUMN group_id TYPE character varying(255);", + "ALTER TABLE word_bank2 ADD platform varchar(255) DEFAULT 'qq';", + "ALTER TABLE word_bank2 ADD author varchar(255) DEFAULT '';", + ] diff --git a/zhenxun/plugins/word_bank/_rule.py b/zhenxun/plugins/word_bank/_rule.py new file mode 100644 index 00000000..c61d7a1e --- /dev/null +++ b/zhenxun/plugins/word_bank/_rule.py @@ -0,0 +1,59 @@ +from io import BytesIO + +import imagehash +from nonebot.adapters import Bot, Event +from nonebot.typing import T_State +from nonebot_plugin_alconna import At as alcAt +from nonebot_plugin_alconna import Text as alcText +from nonebot_plugin_alconna import UniMsg +from nonebot_plugin_session import EventSession +from PIL import Image +from requests import session + +from zhenxun.services.log import logger +from zhenxun.utils.http_utils import AsyncHttpx + +from ._data_source import get_img_and_at_list +from ._model import WordBank + + +async def check( + bot: Bot, + event: Event, + message: UniMsg, + session: EventSession, + state: T_State, +) -> bool: + text = message.extract_plain_text().strip() + img_list, at_list = get_img_and_at_list(message) + problem = text + if not text and len(img_list) == 1: + try: + r = await AsyncHttpx.get(img_list[0]) + problem = str(imagehash.average_hash(Image.open(BytesIO(r.content)))) + except Exception as e: + logger.warning(f"获取图片失败", "词条检测", session=session, e=e) + if at_list: + temp = "" + # TODO: 支持更多消息类型 + for msg in message: + if isinstance(msg, alcAt): + temp += f"[at:{msg.target}]" + elif isinstance(msg, alcText): + temp += msg.text + problem = temp + if event.is_tome() and bot.config.nickname: + if isinstance(message[0], alcAt) and message[0].target == bot.self_id: + problem = f"[at:{bot.self_id}]" + problem + else: + if problem and bot.config.nickname: + nickname = [ + nk for nk in bot.config.nickname if str(message).startswith(nk) + ] + problem = nickname[0] + problem if nickname else problem + if problem and ( + await WordBank.check_problem(session.id3 or session.id2, problem) is not None + ): + state["problem"] = problem # type: ignore + return True + return False diff --git a/zhenxun/plugins/word_bank/command.py b/zhenxun/plugins/word_bank/command.py new file mode 100644 index 00000000..880de48d --- /dev/null +++ b/zhenxun/plugins/word_bank/command.py @@ -0,0 +1,54 @@ +from nonebot import on_regex +from nonebot_plugin_alconna import ( + Alconna, + Args, + Option, + Subcommand, + on_alconna, + store_true, +) + +from zhenxun.utils.rules import admin_check, ensure_group + +_add_matcher = on_regex( + r"^(全局|私聊)?添加词条\s*?(模糊|正则|图片)?问\s*?(\S*\s?\S*)\s*?答\s?(\S*)", + priority=5, + block=True, + rule=admin_check("word_bank", "WORD_BANK_LEVEL"), +) + + +_del_matcher = on_alconna( + Alconna( + "删除词条", + Args["problem?", str], + Option("--all", action=store_true, help_text="所有词条"), + Option("--id", Args["index", int], help_text="下标id"), + Option("--aid", Args["answer_id", int], help_text="回答下标id"), + ), + priority=5, + block=True, +) + + +_update_matcher = on_alconna( + Alconna( + "修改词条", + Args["replace", str]["problem?", str], + Option("--id", Args["index", int], help_text="词条id"), + Option("--all", action=store_true, help_text="全局词条"), + ) +) + +_show_matcher = on_alconna( + Alconna( + "显示词条", + Args["problem?", str], + Option("-g|--group", Args["gid", str], help_text="群组id"), + Option("--id", Args["index", int], help_text="词条id"), + Option("--all", action=store_true, help_text="全局词条"), + ), + aliases={"查看词条"}, + priority=5, + block=True, +) diff --git a/zhenxun/plugins/word_bank/message_handle.py b/zhenxun/plugins/word_bank/message_handle.py new file mode 100644 index 00000000..a9bf624a --- /dev/null +++ b/zhenxun/plugins/word_bank/message_handle.py @@ -0,0 +1,31 @@ +from nonebot import on_message +from nonebot.plugin import PluginMetadata +from nonebot.typing import T_State +from nonebot_plugin_session import EventSession + +from zhenxun.configs.utils import PluginExtraData +from zhenxun.services import logger +from zhenxun.utils.enum import PluginType + +from ._model import WordBank +from ._rule import check + +__plugin_meta__ = PluginMetadata( + name="词库问答回复操作", + description="", + usage="""""", + extra=PluginExtraData( + author="HibiKier", version="0.1", plugin_type=PluginType.HIDDEN + ).dict(), +) + +_matcher = on_message(priority=6, block=True, rule=check) + + +@_matcher.handle() +async def _(session: EventSession, state: T_State): + if problem := state.get("problem"): + gid = session.id3 or session.id2 + if result := await WordBank.get_answer(gid, problem): + await result.send() + logger.info(f" 触发词条 {problem}", "词条检测", session=session) diff --git a/zhenxun/plugins/word_bank/word_handle.py b/zhenxun/plugins/word_bank/word_handle.py new file mode 100644 index 00000000..3bea68ad --- /dev/null +++ b/zhenxun/plugins/word_bank/word_handle.py @@ -0,0 +1,314 @@ +import re +from typing import Any + +from nonebot.adapters import Bot, Message +from nonebot.adapters.onebot.v11 import unescape +from nonebot.exception import FinishedException +from nonebot.internal.params import Arg, ArgStr +from nonebot.params import RegexGroup +from nonebot.plugin import PluginMetadata +from nonebot.typing import T_State +from nonebot_plugin_alconna import AlconnaQuery, Arparma +from nonebot_plugin_alconna import Image as alcImage +from nonebot_plugin_alconna import Match, Query +from nonebot_plugin_alconna import Text as alcText +from nonebot_plugin_alconna import UniMsg +from nonebot_plugin_saa import Image, MessageFactory, Text +from nonebot_plugin_session import EventSession + +from zhenxun.configs.config import Config +from zhenxun.configs.utils import PluginExtraData +from zhenxun.services.log import logger + +from ._config import scope2int, type2int +from ._data_source import WordBankManage, get_answer, get_img_and_at_list, get_problem +from ._model import WordBank +from .command import _add_matcher, _del_matcher, _show_matcher, _update_matcher + +base_config = Config.get("word_bank") + + +__plugin_meta__ = PluginMetadata( + name="词库问答", + description="自定义词条内容随机回复", + usage=""" + usage: + 对指定问题的随机回答,对相同问题可以设置多个不同回答 + 删除词条后每个词条的id可能会变化,请查看后再删除 + 更推荐使用id方式删除 + 问题回答支持的类型:at, image + 查看词条命令:群聊时为 群词条+全局词条,私聊时为 私聊词条+全局词条 + 添加词条正则:添加词条(模糊|正则|图片)?问\s*?(\S*\s?\S*)\s*?答\s?(\S*) + 正则问可以通过$1类推()捕获的组 + 指令: + 添加词条 ?[模糊|正则|图片]问...答...:添加问答词条,可重复添加相同问题的不同回答 + 删除词条 [问题/下标] ?[下标]:删除指定词条指定或全部回答 + 修改词条 [问题/下标] [新问题]:修改词条问题 + 查看词条 ?[问题/下标]:查看全部词条或对应词条回答 + 示例:添加图片词条问答嗨嗨嗨 + [图片]... + 示例:添加词条@萝莉 我来啦 + 示例:添加词条问谁是萝莉答是我 + 示例:添加词条正则问那个(.+)是萝莉答没错$1是萝莉 + 示例:删除词条 谁是萝莉 + 示例:删除词条 谁是萝莉 0 + 示例:删除词条 id:0 1 + 示例:修改词条 谁是萝莉 是你 + 示例:修改词条 id:0 是你 + 示例:查看词条 + 示例:查看词条 谁是萝莉 + 示例:查看词条 id:0 (群/私聊词条) + 示例:查看词条 gid:0 (全局词条) + """.strip(), + extra=PluginExtraData( + author="HibiKier & yajiwa", + version="0.1", + superuser_help=""" + 在私聊中超级用户额外设置 + 指令: + (全局|私聊)?添加词条\s*?(模糊|正则|图片)?问\s*?(\S*\s?\S*)\s*?答\s?(\S*):添加问答词条,可重复添加相同问题的不同回答 + 全局添加词条 + 私聊添加词条 + (私聊情况下)删除词条: 删除私聊词条 + (私聊情况下)删除全局词条 + (私聊情况下)修改词条: 修改私聊词条 + (私聊情况下)修改全局词条 + 用法与普通用法相同 + """, + admin_level=base_config.get("WORD_BANK_LEVEL"), + ).dict(), +) + + +@_add_matcher.handle() +async def _( + bot: Bot, + session: EventSession, + state: T_State, + message: UniMsg, + reg_group: tuple[Any, ...] = RegexGroup(), +): + img_list, at_list = get_img_and_at_list(message) + user_id = session.id1 + group_id = session.id3 or session.id2 + if not group_id and user_id not in bot.config.superusers: + await Text("权限不足捏...").finish(reply=True) + word_scope, word_type, problem, answer = reg_group + if not word_scope and not group_id: + word_scope = "私聊" + if ( + word_scope + and word_scope in ["全局", "私聊"] + and user_id not in bot.config.superusers + ): + await Text("权限不足,无法添加该范围词条...").finish(reply=True) + if (not problem or not problem.strip()) and word_type != "图片": + await Text("词条问题不能为空!").finish(reply=True) + if (not answer or not answer.strip()) and not len(img_list) and not len(at_list): + await Text("词条回答不能为空!").finish(reply=True) + if word_type != "图片": + state["problem_image"] = "YES" + temp_problem = message.copy() + # answer = message.copy() + # 对at问题对额外处理 + # if at_list: + answer = get_answer(message.copy()) + # text = str(message.pop(0)).split("答", maxsplit=1)[-1].strip() + # temp_problem.insert(0, alcText(text)) + state["word_scope"] = word_scope + state["word_type"] = word_type + state["problem"] = get_problem(temp_problem) + state["answer"] = answer + logger.info( + f"添加词条 范围: {word_scope} 类型: {word_type} 问题: {problem} 回答: {answer}", + "添加词条", + session=session, + ) + + +@_add_matcher.got("problem_image", prompt="请发送该回答设置的问题图片") +async def _( + bot: Bot, + session: EventSession, + message: UniMsg, + word_scope: str | None = ArgStr("word_scope"), + word_type: str | None = ArgStr("word_type"), + problem: str | None = ArgStr("problem"), + answer: Any = Arg("answer"), +): + if not session.id1: + await Text("用户id不存在...").finish() + user_id = session.id1 + group_id = session.id3 or session.id2 + try: + if word_type == "图片": + problem = [m for m in message if isinstance(m, alcImage)][0].url + elif word_type == "正则" and problem: + problem = unescape(problem) + try: + re.compile(problem) + except re.error: + await Text(f"添加词条失败,正则表达式 {problem} 非法!").finish( + reply=True + ) + # if str(event.user_id) in bot.config.superusers and isinstance(event, PrivateMessageEvent): + # word_scope = "私聊" + nickname = None + if problem and bot.config.nickname: + nickname = [nk for nk in bot.config.nickname if problem.startswith(nk)] + if not problem: + await Text("获取问题失败...").finish(reply=True) + await WordBank.add_problem_answer( + user_id, + ( + group_id + if group_id and (not word_scope or word_scope == "私聊") + else "0" + ), + scope2int[word_scope] if word_scope else 1, + type2int[word_type] if word_type else 0, + problem, + answer, + nickname[0] if nickname else None, + session.platform, + session.id1, + ) + except Exception as e: + if isinstance(e, FinishedException): + await _add_matcher.finish() + logger.error( + f"添加词条 {problem} 错误...", + "添加词条", + session=session, + e=e, + ) + await Text( + f"添加词条 {problem if word_type != '图片' else '图片'} 发生错误!" + ).finish(reply=True) + if word_type == "图片": + result = MessageFactory([Text("添加词条 "), Image(problem), Text(" 成功!")]) + else: + result = Text(f"添加词条 {problem} 成功!") + await result.send() + logger.info( + f"添加词条 {problem} 成功!", + "添加词条", + session=session, + ) + + +@_del_matcher.handle() +async def _( + bot: Bot, + session: EventSession, + problem: Match[str], + index: Match[int], + answer_id: Match[int], + arparma: Arparma, + all: Query[bool] = AlconnaQuery("all.value", False), +): + if not problem.available and not index.available: + await Text("此命令之后需要跟随指定词条或id,通过“显示词条“查看").finish( + reply=True + ) + word_scope = 1 if session.id3 or session.id2 else 2 + if all.result: + word_scope = 0 + if gid := session.id3 or session.id2: + result, _ = await WordBankManage.delete_word( + problem.result, + index.result if index.available else None, + answer_id.result if answer_id.available else None, + gid, + word_scope, + ) + else: + if session.id1 not in bot.config.superusers: + await Text("权限不足捏...").finish(reply=True) + result, _ = await WordBankManage.delete_word( + problem.result, + index.result if index.available else None, + answer_id.result if answer_id.available else None, + None, + word_scope, + ) + await Text(result).send(reply=True) + logger.info(f"删除词条: {problem.result}", arparma.header_result, session=session) + + +@_update_matcher.handle() +async def _( + bot: Bot, + session: EventSession, + replace: str, + problem: Match[str], + index: Match[int], + arparma: Arparma, + all: Query[bool] = AlconnaQuery("all.value", False), +): + if not problem.available and not index.available: + await Text("此命令之后需要跟随指定词条或id,通过“显示词条“查看").finish( + reply=True + ) + word_scope = 1 if session.id3 or session.id2 else 2 + if all.result: + word_scope = 0 + if gid := session.id3 or session.id2: + result, old_problem = await WordBankManage.update_word( + replace, + problem.result if problem.available else "", + index.result if index.available else None, + gid, + word_scope, + ) + else: + if session.id1 not in bot.config.superusers: + await Text("权限不足捏...").finish(reply=True) + result, old_problem = await WordBankManage.update_word( + replace, + problem.result if problem.available else "", + index.result if index.available else None, + session.id3 or session.id2, + word_scope, + ) + await Text(result).send(reply=True) + logger.info( + f"更新词条词条: {old_problem} -> {replace}", + arparma.header_result, + session=session, + ) + + +@_show_matcher.handle() +async def _( + session: EventSession, + problem: Match[str], + index: Match[int], + gid: Match[str], + arparma: Arparma, + all: Query[bool] = AlconnaQuery("all.value", False), +): + word_scope = 1 if session.id3 or session.id2 else 2 + if all.result: + word_scope = 0 + group_id = session.id3 or session.id2 + if gid.available: + group_id = gid.result + if problem.available: + if index.available: + if index.result < 0 or index.result > len( + await WordBank.get_problem_by_scope(2) + ): + await Text("id必须在范围内...").finish(reply=True) + result = await WordBankManage.show_word( + problem.result, + index.result if index.available else None, + group_id, + word_scope, + ) + else: + result = await WordBankManage.show_word( + None, index.result if index.available else None, group_id, word_scope + ) + await result.send() + logger.info(f"查看词条回答: {problem}", arparma.header_result, session=session) diff --git a/zhenxun/utils/_image_template.py b/zhenxun/utils/_image_template.py index 6f160090..399c054c 100644 --- a/zhenxun/utils/_image_template.py +++ b/zhenxun/utils/_image_template.py @@ -110,11 +110,20 @@ class ImageTemplate: 返回: BuildImage: 表格图片 """ + font = BuildImage.load_font(font_size=50) + min_width, _ = BuildImage.get_text_size(head_text, font) table = await cls.table( - column_name, data_list, row_space, column_space, padding, text_style + column_name, + data_list, + row_space, + column_space, + padding, + text_style, ) await table.circle_corner() - table_bk = BuildImage(table.width + 100, table.height + 50, "#EAEDF2") + table_bk = BuildImage( + max(table.width, min_width) + 100, table.height + 50, "#EAEDF2" + ) await table_bk.paste(table, center_type="center") height = table_bk.height + 200 background = BuildImage(table_bk.width, height, (255, 255, 255), font_size=50) @@ -144,13 +153,12 @@ class ImageTemplate: column_space: 列间距. padding: 文本内间距. text_style: 文本样式. + min_width: 最低宽度 返回: BuildImage: 表格图片 """ font = BuildImage.load_font("HYWenHei-85W.ttf", 20) - column_num = max([len(l) for l in data_list]) - list_data = [] column_data = [] for i in range(len(column_name)): c = [] @@ -163,7 +171,7 @@ class ImageTemplate: build_data_list = [] _, base_h = BuildImage.get_text_size("A", font) for i, column_list in enumerate(column_data): - name_width, name_height = BuildImage.get_text_size(column_name[i], font) + name_width, _ = BuildImage.get_text_size(column_name[i], font) _temp = {"width": name_width, "data": column_list} for s in column_list: if isinstance(s, tuple): @@ -207,8 +215,8 @@ class ImageTemplate: ) cur_h += base_h + row_space column_image_list.append(background) - height = max([bk.height for bk in column_image_list]) - width = sum([bk.width for bk in column_image_list]) + # height = max([bk.height for bk in column_image_list]) + # width = sum([bk.width for bk in column_image_list]) return await BuildImage.auto_paste( column_image_list, len(column_image_list), column_space )