From 56ea3964a738eb6d6c472b1b7642d11222f297a2 Mon Sep 17 00:00:00 2001 From: yajiwa <839790708@qq.com> Date: Sun, 19 Jun 2022 17:24:50 +0800 Subject: [PATCH] update word_bank --- plugins/word_bank/_rule.py | 12 +++--- plugins/word_bank/message_handle.py | 38 ++++++++++++----- plugins/word_bank/model.py | 64 ++++++++++------------------- plugins/word_bank/word_hanlde.py | 39 ++++++++---------- 4 files changed, 71 insertions(+), 82 deletions(-) diff --git a/plugins/word_bank/_rule.py b/plugins/word_bank/_rule.py index fc6e7a6a..a452aa20 100644 --- a/plugins/word_bank/_rule.py +++ b/plugins/word_bank/_rule.py @@ -1,12 +1,10 @@ import re - -from nonebot.adapters.onebot.v11 import Bot, GroupMessageEvent, Event +from nonebot.adapters.onebot.v11 import GroupMessageEvent, Event from utils.utils import get_message_img_file -from nonebot.typing import T_State from .model import WordBank -async def check(bot: Bot, event: Event, state: T_State) -> bool: +async def check(event: Event) -> bool: if isinstance(event, GroupMessageEvent): msg = event.raw_message list_img = get_message_img_file(event.json()) @@ -14,7 +12,7 @@ async def check(bot: Bot, event: Event, state: T_State) -> bool: for img_file in list_img: strinfo = re.compile(f"{img_file},.*?]") msg = strinfo.sub(f'{img_file}]', msg) - return bool( - await WordBank.check(event.group_id, msg, event.is_tome()) - ) + strinfo_face = re.compile(f",type=sticker]") + msg = strinfo_face.sub(f']', msg) + return bool(await WordBank.check(event.group_id, msg,)) return False diff --git a/plugins/word_bank/message_handle.py b/plugins/word_bank/message_handle.py index 92c838c2..43b41cd9 100644 --- a/plugins/word_bank/message_handle.py +++ b/plugins/word_bank/message_handle.py @@ -1,9 +1,10 @@ -from utils.message_builder import image, at +from utils.message_builder import image, at, face +from typing import Tuple from ._rule import check from .model import WordBank from configs.path_config import DATA_PATH from nonebot.adapters.onebot.v11 import GroupMessageEvent -from utils.utils import get_message_at, get_message_img +from utils.utils import get_message_at, get_message_img, change_img_md5 from nonebot import on_message from models.group_member_info import GroupInfoUser from utils.utils import get_message_img_file, is_number @@ -25,14 +26,14 @@ async def _(event: GroupMessageEvent): for img_file in list_img: strinfo = re.compile(f"{img_file},.*?]") msg = strinfo.sub(f'{img_file}]', msg) - q = await WordBank.check( - event.group_id, msg, event.is_tome() - ) + strinfo_face = re.compile(f",type=sticker]") + msg = strinfo_face.sub(f']', msg) + q = await WordBank.check(event.group_id, msg, ) await message_handle.send(await get_one_answer(event, q.format, q.answer)) # 处理单条回答 -async def get_one_answer(event, format, _answer, all=1): +async def get_one_answer(event, format: str, _answer: str, all: bool = True) -> str: path = data_dir / f"{event.group_id}" placeholder_list = ( [ @@ -50,12 +51,13 @@ async def get_one_answer(event, format, _answer, all=1): else: for idx, placeholder in placeholder_list: if placeholder.endswith("jpg"): + change_img_md5(path / placeholder) answer += _a[: _a.find(f"[__placeholder_{idx}]")] + image( path / placeholder ) else: - if all == 1: - answer += _a[: _a.find(f"[__placeholder_{idx}]")] + at(placeholder) + if all: + answer += _a[: _a.find(f"[__placeholder_{idx}]")] + at(int(placeholder)) else: q = await GroupInfoUser.get_member_info( int(placeholder), event.group_id) @@ -65,7 +67,7 @@ async def get_one_answer(event, format, _answer, all=1): # 处理单条问题 -async def get_one_problem(event, problem): +async def get_one_problem(event, problem: str, ) -> Tuple[str, str]: strinfo = re.compile(f",subType=\d") problem = strinfo.sub('', problem) _problem = problem @@ -93,7 +95,7 @@ async def get_one_problem(event, problem): # 显示单条数据库问题 -async def get_one_image_problem(event, problem): +async def get_one_image_problem(event, problem: str) -> str: path = data_dir / f"{event.group_id}" / "problem" placeholder_list = [] idx = 0 @@ -127,3 +129,19 @@ async def get_one_image_problem(event, problem): _p = _p[_p.find(f"[__placeholder_{idx}]") + len(f"[__placeholder_{idx}]"):] return problem + _p + + +# 替换cq码 +async def replace_cq(group_id, msg: str, is_face: bool = True) -> str: + strinfo_img = re.compile(f"\[CQ:image.*?]") + msg = strinfo_img.sub('[图片]', msg) + at_list = re.findall(rf"\[CQ:at,qq=(.*?)]", msg) + if at_list: + for ats in at_list: + q = await GroupInfoUser.get_member_info( + int(ats), group_id) + msg = msg.replace(f'[CQ:at,qq={ats}]', "@" + q.user_name) + if is_face: + strinfo_face = re.compile(f"\[CQ:face,id=.*?]") + msg = strinfo_face.sub('[表情]', msg) + return msg diff --git a/plugins/word_bank/model.py b/plugins/word_bank/model.py index 8c9e5af7..5408782a 100644 --- a/plugins/word_bank/model.py +++ b/plugins/word_bank/model.py @@ -1,9 +1,7 @@ from services.db_context import db from typing import Optional, List, Union, Tuple from datetime import datetime -from pathlib import Path from configs.path_config import DATA_PATH -import re import random from configs.config import Config @@ -125,58 +123,40 @@ class WordBank(db.Model): q.sort() _tmp = [] for problem in q: - if "[_to_me" in problem: - r = re.search(r"\[_to_me\|(.*?)](.*)", problem) - if r: - bot_name = r.group(1) - problem = problem.replace(f"[_to_me|{bot_name}]", bot_name) _tmp.append(problem) return list(set(_tmp)) @classmethod - async def check(cls, group_id: int, problem: str, is_tome: bool = False) -> Optional["WordBank"]: + async def check(cls, group_id: int, problem: str) -> Optional["WordBank"]: """ 检测词条并随机返回 :param group_id: 群号 :param problem: 问题 - :param is_tome:是否at真寻 """ - if is_tome: + if problem: + FUZZY = Config.get_config("word_bank", "WORD_BANK_FUZZY") + KEY = Config.get_config("word_bank", "WORD_BANK_KEY") q = await cls.query.where( - (cls.group_id == group_id) + (cls.group_id == group_id) & (cls.problem == problem) ).gino.all() - q = [x for x in q if "[_to_me" in x.problem] - if q: - for x in q: - r = re.search(r"\[_to_me\|(.*?)](.*)", x.problem) - if r and r.group(2) == problem: - return x - return None + if KEY and FUZZY: + q_fuzzy = await cls.query.where( + (cls.group_id == group_id) & (cls.search_type == 2) & ( + cls.problem.contains(f'{problem}'))).gino.all() + q_key = await cls.query.where((cls.group_id == group_id) & (cls.search_type == 1)).gino.all() + q_key = [x for x in q_key if str(x.problem) in (problem)] + q += q_fuzzy + q_key + elif FUZZY: + q_fuzzy = await cls.query.where( + (cls.group_id == group_id) & (cls.search_type == 2) & ( + cls.problem.contains(f'{problem}'))).gino.all() + q += q_fuzzy + elif KEY: + q_key = await cls.query.where((cls.group_id == group_id) & (cls.search_type == 1)).gino.all() + q_key = [x for x in q_key if str(x.problem) in (problem)] + q += q_key else: - if problem: - FUZZY = Config.get_config("word_bank", "WORD_BANK_FUZZY") - KEY = Config.get_config("word_bank", "WORD_BANK_KEY") - q = await cls.query.where( - (cls.group_id == group_id) & (cls.problem == problem) - ).gino.all() - if KEY and FUZZY: - q_fuzzy = await cls.query.where( - (cls.group_id == group_id) & (cls.search_type == 2) & ( - cls.problem.contains(f'{problem}'))).gino.all() - q_key = await cls.query.where((cls.group_id == group_id) & (cls.search_type == 1)).gino.all() - q_key = [x for x in q_key if str(x.problem) in (problem)] - q += q_fuzzy + q_key - elif FUZZY: - q_fuzzy = await cls.query.where( - (cls.group_id == group_id) & (cls.search_type == 2) & ( - cls.problem.contains(f'{problem}'))).gino.all() - q += q_fuzzy - elif KEY: - q_key = await cls.query.where((cls.group_id == group_id) & (cls.search_type == 1)).gino.all() - q_key = [x for x in q_key if str(x.problem) in (problem)] - q += q_key - else: - return None + return None return random.choice(q) if q else None diff --git a/plugins/word_bank/word_hanlde.py b/plugins/word_bank/word_hanlde.py index 17d4e7b3..a0464586 100644 --- a/plugins/word_bank/word_hanlde.py +++ b/plugins/word_bank/word_hanlde.py @@ -6,14 +6,13 @@ from utils.http_utils import AsyncHttpx from ._data_source import WordBankBuilder from utils.message_builder import image from utils.image_utils import text2image -from .message_handle import get_one_answer, get_one_problem, get_one_image_problem +from .message_handle import get_one_answer, get_one_problem, get_one_image_problem, replace_cq from .model import WordBank from nonebot.adapters.onebot.v11 import ( Bot, GroupMessageEvent, Message ) -from nonebot.typing import T_State from nonebot import on_command import random import os @@ -69,7 +68,7 @@ show_word = on_command("显示词条", aliases={"查看词条"}, priority=5, blo @add_word.handle() -async def _(bot: Bot, event: GroupMessageEvent, state: T_State, arg: Message = CommandArg()): +async def _(event: GroupMessageEvent, arg: Message = CommandArg()): msg = str(arg) r = re.search(r"问(.+)\s?答([\s\S]*)", msg) if not r: @@ -81,12 +80,7 @@ async def _(bot: Bot, event: GroupMessageEvent, state: T_State, arg: Message = C if not answer: await add_word.finish("未检测到词条回答...") idx = 0 - for n in bot.config.nickname: - if n and problem.startswith(n): - _problem = f"[_to_me|{n}]" + problem[len(n):] - break - else: - _problem = problem + _problem = problem search_type = 0 if re.search("^关键字|词(.*)", msg): search_type = 1 @@ -102,7 +96,7 @@ async def _(bot: Bot, event: GroupMessageEvent, state: T_State, arg: Message = C @delete_word.handle() async def _(event: GroupMessageEvent, arg: Message = CommandArg()): - msg = str(arg).strip() + msg = str(arg) if not msg: await delete_word.finish("此命令之后需要跟随指定词条,通过“显示词条“查看") index = None @@ -124,7 +118,10 @@ async def _(event: GroupMessageEvent, arg: Message = CommandArg()): if answer := await WordBank.delete_problem_answer( event.user_id, event.group_id, _problem, index ): - await delete_word.send("删除词条成功:" + problem + f"\n回答:\n{answer}") + + await delete_word.send(Message( + "删除词条成功:\n问" + await replace_cq(event.group_id, problem, False) + f"\n回答:\n" + await replace_cq( + event.group_id, answer, False) + "\n")) logger.info( f"(USER {event.user_id}, GROUP " f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})" @@ -137,7 +134,7 @@ async def _(event: GroupMessageEvent, arg: Message = CommandArg()): @update_word.handle() -async def _(bot: Bot, event: GroupMessageEvent, arg: Message = CommandArg()): +async def _(event: GroupMessageEvent, arg: Message = CommandArg()): msg = str(arg) if not msg: await update_word.finish("此命令之后需要跟随指定词条,通过“显示词条“查看") @@ -157,12 +154,7 @@ async def _(bot: Bot, event: GroupMessageEvent, arg: Message = CommandArg()): else: await update_word.finish("此命令之后需要跟随修改内容") idx = 0 - for n in bot.config.nickname: - if n and problem.startswith(n): - _problem = f"[_to_me|{n}]" + problem[len(n):] - break - else: - _problem = problem + _problem = problem _builder = await get__builder(event, _problem, new_answer, idx) try: @@ -185,8 +177,8 @@ async def _(bot: Bot, event: GroupMessageEvent, arg: Message = CommandArg()): if not msg: _problem_list = await WordBank.get_group_all_problem(event.group_id) if not _problem_list: - await show_word.finish("该群未收录任何词条..") - _problem_list = [f"\t{i}. {x}" for i, x in enumerate(_problem_list)] + await show_word.finish("该群未收录任replace_cq何词条..") + _problem_list = [f"\t{i}. {await replace_cq(event.group_id, x)}" for i, x in enumerate(_problem_list)] long_problem_list = len(_problem_list) max_line = Config.get_config("word_bank", "WORD_BANK_MIX") if long_problem_list > max_line: @@ -245,10 +237,11 @@ async def _(bot: Bot, event: GroupMessageEvent, arg: Message = CommandArg()): else: # 解析图片和@ - _answer_img_nu_list = [await get_one_answer(event, format, answer, 0) for answer, format in _answer_list] + _answer_img_nu_list = [await get_one_answer(event, format, answer, False) for answer, format in + _answer_list] word_nu = len(_answer_img_nu_list) img_nu = 0 - answer = "词条" + msg + "回答:" + answer = "词条" + msg + "\n回答:" for i, x, in enumerate(_answer_img_nu_list): r = re.findall(rf"\[CQ:image,file=", str(x)) if r: @@ -265,7 +258,7 @@ async def _(bot: Bot, event: GroupMessageEvent, arg: Message = CommandArg()): # await show_word.send(f"词条 {msg} 回答:\n" + "\n".join(_answer_list)) -async def get__builder(event, _problem, answer, idx): +async def get__builder(event, _problem: str, answer: str, idx: int): (data_dir / f"{event.group_id}").mkdir(exist_ok=True, parents=True) (data_dir / f"{event.group_id}" / "problem").mkdir(exist_ok=True, parents=True) _builder = WordBankBuilder(event.user_id, event.group_id, _problem)