修复词条At时bug与模糊查询时无法替换占位符问题

This commit is contained in:
HibiKier 2023-03-25 21:16:48 +08:00
parent f72988a71a
commit b2a2776fe3
5 changed files with 68 additions and 44 deletions

View File

@ -335,6 +335,7 @@ PS: **ARM平台** 请使用全量版 同时 **如果你的机器 RAM < 1G 可能
* 删除BUFF_SKIN表约束新增`skin_id`字段
* 开箱新增更新指定刀具皮肤命令(某些箱子金色无法通过api获取)
* 修复词条At时bug与模糊查询时无法替换占位符问题
### 2023/3/20

View File

@ -220,16 +220,16 @@ async def _(event: MessageEvent, arg: Message = CommandArg(), cmd: str = OneComm
if msg in ["ALL", "ALL1"]:
if msg == "ALL":
case_list = list(CASE2ID.keys())
result = "武器箱"
type_ = "武器箱"
else:
case_list = list(KNIFE2ID.keys())
result = "罕见皮肤"
await update_case.send(f"即将更新所有{result}, 请稍等")
type_ = "罕见皮肤"
await update_case.send(f"即将更新所有{type_}, 请稍等")
for i, case_name in enumerate(case_list):
try:
await update_skin_data(case_name)
rand = random.randint(300, 500)
result = f"更新全部{result}完成"
result = f"更新全部{type_}完成"
if i < len(case_list):
next_case = case_list[i + 1]
result = f"将在 {rand} 秒后更新下一{result}: {next_case}"
@ -237,9 +237,9 @@ async def _(event: MessageEvent, arg: Message = CommandArg(), cmd: str = OneComm
logger.info(f"成功更新{result}: {case_name}, {result}", "更新武器箱")
await asyncio.sleep(rand)
except Exception as e:
logger.error(f"更新{result}: {case_name}", e=e)
await update_case.send(f"更新{result}: {case_name} 发生错误: {type(e)}: {e}")
await update_case.send(f"更新全部{result}完成")
logger.error(f"更新{type_}: {case_name}", e=e)
await update_case.send(f"更新{type_}: {case_name} 发生错误: {type(e)}: {e}")
await update_case.send(f"更新全部{type_}完成")
else:
await update_case.send(f"开始{cmd}: {msg}, 请稍等")
try:

View File

@ -138,7 +138,7 @@ class WordBank(Model):
word_scope=word_scope,
word_type=word_type,
status=True,
problem=problem,
problem=str(problem).strip(),
answer=answer,
image_path=image_path,
placeholder=",".join(_list),
@ -220,17 +220,17 @@ class WordBank(Model):
answer=answer,
)
if query and query.placeholder:
type_list = re.findall(rf"\[(.*?):placeholder_.*?]", answer)
temp_answer = re.sub(rf"\[(.*?):placeholder_.*?]", "{}", answer)
type_list = re.findall(rf"\[(.*?):placeholder_.*?]", str(answer))
temp_answer = re.sub(rf"\[(.*?):placeholder_.*?]", "{}", str(answer))
seg_list = []
for t, p in zip(type_list, query.placeholder.split(",")):
if t == "image":
seg_list.append(image(path / p))
elif t == "face":
seg_list.append(face(p))
seg_list.append(face(int(p)))
elif t == "at":
seg_list.append(at(p))
return MessageTemplate(temp_answer, Message).format(*seg_list)
return MessageTemplate(temp_answer, Message).format(*seg_list) # type: ignore
return answer
@classmethod
@ -302,7 +302,7 @@ class WordBank(Model):
answer = random.choice(data_list)
return (
await cls._format2answer(
problem, answer.answer, answer.user_qq, answer.group_id
answer.problem, answer.answer, answer.user_qq, answer.group_id
)
if answer.placeholder
else answer.answer

View File

@ -1,4 +1,5 @@
from io import BytesIO
from typing import List
import imagehash
from nonebot.adapters.onebot.v11 import Bot, MessageEvent
@ -6,24 +7,28 @@ from nonebot.typing import T_State
from PIL import Image
from services.log import logger
from utils.depends import ImageList
from utils.http_utils import AsyncHttpx
from utils.utils import get_message_at, get_message_img, get_message_text
from ._model import WordBank
async def check(bot: Bot, event: MessageEvent, state: T_State) -> bool:
async def check(
bot: Bot,
event: MessageEvent,
state: T_State,
) -> bool:
text = get_message_text(event.message)
img = get_message_img(event.message)
at = get_message_at(event.message)
img_list = get_message_img(event.message)
problem = text
if not text and len(img) == 1:
if not text and len(img_list) == 1:
try:
r = await AsyncHttpx.get(img[0])
r = await AsyncHttpx.get(img_list[0])
problem = str(imagehash.average_hash(Image.open(BytesIO(r.content))))
except Exception as e:
logger.warning(f"word_bank rule 获取图片失败 {type(e)}{e}")
if at:
logger.warning(f"获取图片失败", "词条检测", e=e)
if get_message_at(event.message):
temp = ""
for seg in event.message:
if seg.type == "at":

View File

@ -1,5 +1,5 @@
import re
from typing import Any, Optional, Tuple
from typing import Any, List, Optional, Tuple
from nonebot import on_command, on_regex
from nonebot.adapters.onebot.v11 import (
@ -18,6 +18,7 @@ from nonebot.typing import T_State
from configs.config import Config
from configs.path_config import DATA_PATH
from services.log import logger
from utils.depends import AtList, ImageList
from utils.message_builder import custom_forward_msg
from utils.utils import get_message_at, get_message_img, is_number
@ -100,6 +101,8 @@ async def _(
event: MessageEvent,
state: T_State,
reg_group: Tuple[Any, ...] = RegexGroup(),
img_list: List[str] = ImageList(),
at_list: List[int] = AtList(),
):
if (
isinstance(event, PrivateMessageEvent)
@ -117,35 +120,50 @@ async def _(
await add_word.finish("权限不足,无法添加该范围词条")
if (not problem or not problem.strip()) and word_type != "图片":
await add_word.finish("词条问题不能为空!")
if (not answer or not answer.strip()) and not len(get_message_img(event.message)):
if (not answer or not answer.strip()) and not len(img_list) and not len(at_list):
await add_word.finish("词条回答不能为空!")
if word_type != "图片":
state["problem_image"] = "YES"
answer = event.message
# 对at问题对额外处理
if get_message_at(event.message):
if at_list:
is_first = True
cur_p = None
answer = ""
problem = ""
for index, seg in enumerate(event.message):
if seg.type == "text" and "" in str(seg):
_problem = event.message[:index]
answer = event.message[index:]
answer[0] = str(answer[0])[str(answer[0]).index("") + 1 :]
_problem[0] = str(_problem[0])[str(_problem[0]).index("") + 1 :]
if (
_problem[-1].type != "at"
or seg.data["text"][: seg.data["text"].index("")].lstrip()
):
_problem.append(seg.data["text"][: seg.data["text"].index("")])
temp = ""
for g in _problem:
if isinstance(g, str):
temp += g
elif g.type == "text":
temp += g.data["text"]
elif g.type == "at":
temp += f"[at:{g.data['qq']}]"
problem = temp
break
problem = unescape(problem)
if seg.type == "text" and "添加词条问" in str(seg) and is_first:
is_first = False
seg_ = str(seg).split("添加词条问")[-1]
cur_p = "problem"
# 纯文本
if "" in seg_:
answer_index = seg_.index("")
problem = unescape(seg_[:answer_index]).strip()
answer = unescape(seg_[answer_index + 1 :]).strip()
cur_p = "answer"
else:
problem = unescape(seg_)
continue
if cur_p == "problem":
if seg.type == "text" and "" in str(seg):
seg_ = str(seg)
answer_index = seg_.index("")
problem += seg_[:answer_index]
answer += seg_[answer_index + 1 :]
cur_p = "answer"
else:
if seg.type == "at":
problem += f"[at:{seg.data['qq']}]"
else:
problem += (
unescape(str(seg)).strip() if seg.type == "text" else seg
)
else:
if seg.type == "text":
answer += unescape(str(seg))
else:
answer += seg
event.message[0] = event.message[0].data["text"].split("", maxsplit=1)[-1].strip()
state["word_scope"] = word_scope
state["word_type"] = word_type