mirror of
https://github.com/zhenxun-org/zhenxun_bot.git
synced 2025-12-15 14:22:55 +08:00
commit
1a4e1e9515
@ -1,12 +1,10 @@
|
||||
import re
|
||||
|
||||
from nonebot.adapters.onebot.v11 import Bot, GroupMessageEvent, Event
|
||||
from nonebot.adapters.onebot.v11 import GroupMessageEvent, Event
|
||||
from utils.utils import get_message_img_file
|
||||
from nonebot.typing import T_State
|
||||
from .model import WordBank
|
||||
|
||||
|
||||
async def check(bot: Bot, event: Event, state: T_State) -> bool:
|
||||
async def check(event: Event) -> bool:
|
||||
if isinstance(event, GroupMessageEvent):
|
||||
msg = event.raw_message
|
||||
list_img = get_message_img_file(event.json())
|
||||
@ -14,7 +12,7 @@ async def check(bot: Bot, event: Event, state: T_State) -> bool:
|
||||
for img_file in list_img:
|
||||
strinfo = re.compile(f"{img_file},.*?]")
|
||||
msg = strinfo.sub(f'{img_file}]', msg)
|
||||
return bool(
|
||||
await WordBank.check(event.group_id, msg, event.is_tome())
|
||||
)
|
||||
strinfo_face = re.compile(f",type=sticker]")
|
||||
msg = strinfo_face.sub(f']', msg)
|
||||
return bool(await WordBank.check(event.group_id, msg,))
|
||||
return False
|
||||
|
||||
@ -1,9 +1,10 @@
|
||||
from utils.message_builder import image, at
|
||||
from utils.message_builder import image, at, face
|
||||
from typing import Tuple
|
||||
from ._rule import check
|
||||
from .model import WordBank
|
||||
from configs.path_config import DATA_PATH
|
||||
from nonebot.adapters.onebot.v11 import GroupMessageEvent
|
||||
from utils.utils import get_message_at, get_message_img
|
||||
from utils.utils import get_message_at, get_message_img, change_img_md5
|
||||
from nonebot import on_message
|
||||
from models.group_member_info import GroupInfoUser
|
||||
from utils.utils import get_message_img_file, is_number
|
||||
@ -25,14 +26,14 @@ async def _(event: GroupMessageEvent):
|
||||
for img_file in list_img:
|
||||
strinfo = re.compile(f"{img_file},.*?]")
|
||||
msg = strinfo.sub(f'{img_file}]', msg)
|
||||
q = await WordBank.check(
|
||||
event.group_id, msg, event.is_tome()
|
||||
)
|
||||
strinfo_face = re.compile(f",type=sticker]")
|
||||
msg = strinfo_face.sub(f']', msg)
|
||||
q = await WordBank.check(event.group_id, msg, )
|
||||
await message_handle.send(await get_one_answer(event, q.format, q.answer))
|
||||
|
||||
|
||||
# 处理单条回答
|
||||
async def get_one_answer(event, format, _answer, all=1):
|
||||
async def get_one_answer(event, format: str, _answer: str, all: bool = True) -> str:
|
||||
path = data_dir / f"{event.group_id}"
|
||||
placeholder_list = (
|
||||
[
|
||||
@ -50,12 +51,13 @@ async def get_one_answer(event, format, _answer, all=1):
|
||||
else:
|
||||
for idx, placeholder in placeholder_list:
|
||||
if placeholder.endswith("jpg"):
|
||||
change_img_md5(path / placeholder)
|
||||
answer += _a[: _a.find(f"[__placeholder_{idx}]")] + image(
|
||||
path / placeholder
|
||||
)
|
||||
else:
|
||||
if all == 1:
|
||||
answer += _a[: _a.find(f"[__placeholder_{idx}]")] + at(placeholder)
|
||||
if all:
|
||||
answer += _a[: _a.find(f"[__placeholder_{idx}]")] + at(int(placeholder))
|
||||
else:
|
||||
q = await GroupInfoUser.get_member_info(
|
||||
int(placeholder), event.group_id)
|
||||
@ -65,7 +67,7 @@ async def get_one_answer(event, format, _answer, all=1):
|
||||
|
||||
|
||||
# 处理单条问题
|
||||
async def get_one_problem(event, problem):
|
||||
async def get_one_problem(event, problem: str, ) -> Tuple[str, str]:
|
||||
strinfo = re.compile(f",subType=\d")
|
||||
problem = strinfo.sub('', problem)
|
||||
_problem = problem
|
||||
@ -93,7 +95,7 @@ async def get_one_problem(event, problem):
|
||||
|
||||
|
||||
# 显示单条数据库问题
|
||||
async def get_one_image_problem(event, problem):
|
||||
async def get_one_image_problem(event, problem: str) -> str:
|
||||
path = data_dir / f"{event.group_id}" / "problem"
|
||||
placeholder_list = []
|
||||
idx = 0
|
||||
@ -127,3 +129,19 @@ async def get_one_image_problem(event, problem):
|
||||
_p = _p[_p.find(f"[__placeholder_{idx}]") + len(f"[__placeholder_{idx}]"):]
|
||||
|
||||
return problem + _p
|
||||
|
||||
|
||||
# 替换cq码
|
||||
async def replace_cq(group_id, msg: str, is_face: bool = True) -> str:
|
||||
strinfo_img = re.compile(f"\[CQ:image.*?]")
|
||||
msg = strinfo_img.sub('[图片]', msg)
|
||||
at_list = re.findall(rf"\[CQ:at,qq=(.*?)]", msg)
|
||||
if at_list:
|
||||
for ats in at_list:
|
||||
q = await GroupInfoUser.get_member_info(
|
||||
int(ats), group_id)
|
||||
msg = msg.replace(f'[CQ:at,qq={ats}]', "@" + q.user_name)
|
||||
if is_face:
|
||||
strinfo_face = re.compile(f"\[CQ:face,id=.*?]")
|
||||
msg = strinfo_face.sub('[表情]', msg)
|
||||
return msg
|
||||
|
||||
@ -1,9 +1,7 @@
|
||||
from services.db_context import db
|
||||
from typing import Optional, List, Union, Tuple
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from configs.path_config import DATA_PATH
|
||||
import re
|
||||
import random
|
||||
from configs.config import Config
|
||||
|
||||
@ -125,34 +123,16 @@ class WordBank(db.Model):
|
||||
q.sort()
|
||||
_tmp = []
|
||||
for problem in q:
|
||||
if "[_to_me" in problem:
|
||||
r = re.search(r"\[_to_me\|(.*?)](.*)", problem)
|
||||
if r:
|
||||
bot_name = r.group(1)
|
||||
problem = problem.replace(f"[_to_me|{bot_name}]", bot_name)
|
||||
_tmp.append(problem)
|
||||
return list(set(_tmp))
|
||||
|
||||
@classmethod
|
||||
async def check(cls, group_id: int, problem: str, is_tome: bool = False) -> Optional["WordBank"]:
|
||||
async def check(cls, group_id: int, problem: str) -> Optional["WordBank"]:
|
||||
"""
|
||||
检测词条并随机返回
|
||||
:param group_id: 群号
|
||||
:param problem: 问题
|
||||
:param is_tome:是否at真寻
|
||||
"""
|
||||
if is_tome:
|
||||
q = await cls.query.where(
|
||||
(cls.group_id == group_id)
|
||||
).gino.all()
|
||||
q = [x for x in q if "[_to_me" in x.problem]
|
||||
if q:
|
||||
for x in q:
|
||||
r = re.search(r"\[_to_me\|(.*?)](.*)", x.problem)
|
||||
if r and r.group(2) == problem:
|
||||
return x
|
||||
return None
|
||||
else:
|
||||
if problem:
|
||||
FUZZY = Config.get_config("word_bank", "WORD_BANK_FUZZY")
|
||||
KEY = Config.get_config("word_bank", "WORD_BANK_KEY")
|
||||
|
||||
@ -6,14 +6,13 @@ from utils.http_utils import AsyncHttpx
|
||||
from ._data_source import WordBankBuilder
|
||||
from utils.message_builder import image
|
||||
from utils.image_utils import text2image
|
||||
from .message_handle import get_one_answer, get_one_problem, get_one_image_problem
|
||||
from .message_handle import get_one_answer, get_one_problem, get_one_image_problem, replace_cq
|
||||
from .model import WordBank
|
||||
from nonebot.adapters.onebot.v11 import (
|
||||
Bot,
|
||||
GroupMessageEvent,
|
||||
Message
|
||||
)
|
||||
from nonebot.typing import T_State
|
||||
from nonebot import on_command
|
||||
import random
|
||||
import os
|
||||
@ -69,7 +68,7 @@ show_word = on_command("显示词条", aliases={"查看词条"}, priority=5, blo
|
||||
|
||||
|
||||
@add_word.handle()
|
||||
async def _(bot: Bot, event: GroupMessageEvent, state: T_State, arg: Message = CommandArg()):
|
||||
async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
|
||||
msg = str(arg)
|
||||
r = re.search(r"问(.+)\s?答([\s\S]*)", msg)
|
||||
if not r:
|
||||
@ -81,11 +80,6 @@ async def _(bot: Bot, event: GroupMessageEvent, state: T_State, arg: Message = C
|
||||
if not answer:
|
||||
await add_word.finish("未检测到词条回答...")
|
||||
idx = 0
|
||||
for n in bot.config.nickname:
|
||||
if n and problem.startswith(n):
|
||||
_problem = f"[_to_me|{n}]" + problem[len(n):]
|
||||
break
|
||||
else:
|
||||
_problem = problem
|
||||
search_type = 0
|
||||
if re.search("^关键字|词(.*)", msg):
|
||||
@ -102,7 +96,7 @@ async def _(bot: Bot, event: GroupMessageEvent, state: T_State, arg: Message = C
|
||||
|
||||
@delete_word.handle()
|
||||
async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
|
||||
msg = str(arg).strip()
|
||||
msg = str(arg)
|
||||
if not msg:
|
||||
await delete_word.finish("此命令之后需要跟随指定词条,通过“显示词条“查看")
|
||||
index = None
|
||||
@ -124,7 +118,10 @@ async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
|
||||
if answer := await WordBank.delete_problem_answer(
|
||||
event.user_id, event.group_id, _problem, index
|
||||
):
|
||||
await delete_word.send("删除词条成功:" + problem + f"\n回答:\n{answer}")
|
||||
|
||||
await delete_word.send(Message(
|
||||
"删除词条成功:\n问" + await replace_cq(event.group_id, problem, False) + f"\n回答:\n" + await replace_cq(
|
||||
event.group_id, answer, False) + "\n"))
|
||||
logger.info(
|
||||
f"(USER {event.user_id}, GROUP "
|
||||
f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
|
||||
@ -137,7 +134,7 @@ async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
|
||||
|
||||
|
||||
@update_word.handle()
|
||||
async def _(bot: Bot, event: GroupMessageEvent, arg: Message = CommandArg()):
|
||||
async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
|
||||
msg = str(arg)
|
||||
if not msg:
|
||||
await update_word.finish("此命令之后需要跟随指定词条,通过“显示词条“查看")
|
||||
@ -157,11 +154,6 @@ async def _(bot: Bot, event: GroupMessageEvent, arg: Message = CommandArg()):
|
||||
else:
|
||||
await update_word.finish("此命令之后需要跟随修改内容")
|
||||
idx = 0
|
||||
for n in bot.config.nickname:
|
||||
if n and problem.startswith(n):
|
||||
_problem = f"[_to_me|{n}]" + problem[len(n):]
|
||||
break
|
||||
else:
|
||||
_problem = problem
|
||||
_builder = await get__builder(event, _problem, new_answer, idx)
|
||||
|
||||
@ -185,8 +177,8 @@ async def _(bot: Bot, event: GroupMessageEvent, arg: Message = CommandArg()):
|
||||
if not msg:
|
||||
_problem_list = await WordBank.get_group_all_problem(event.group_id)
|
||||
if not _problem_list:
|
||||
await show_word.finish("该群未收录任何词条..")
|
||||
_problem_list = [f"\t{i}. {x}" for i, x in enumerate(_problem_list)]
|
||||
await show_word.finish("该群未收录任replace_cq何词条..")
|
||||
_problem_list = [f"\t{i}. {await replace_cq(event.group_id, x)}" for i, x in enumerate(_problem_list)]
|
||||
long_problem_list = len(_problem_list)
|
||||
max_line = Config.get_config("word_bank", "WORD_BANK_MIX")
|
||||
if long_problem_list > max_line:
|
||||
@ -245,10 +237,11 @@ async def _(bot: Bot, event: GroupMessageEvent, arg: Message = CommandArg()):
|
||||
|
||||
else:
|
||||
# 解析图片和@
|
||||
_answer_img_nu_list = [await get_one_answer(event, format, answer, 0) for answer, format in _answer_list]
|
||||
_answer_img_nu_list = [await get_one_answer(event, format, answer, False) for answer, format in
|
||||
_answer_list]
|
||||
word_nu = len(_answer_img_nu_list)
|
||||
img_nu = 0
|
||||
answer = "词条" + msg + "回答:"
|
||||
answer = "词条" + msg + "\n回答:"
|
||||
for i, x, in enumerate(_answer_img_nu_list):
|
||||
r = re.findall(rf"\[CQ:image,file=", str(x))
|
||||
if r:
|
||||
@ -265,7 +258,7 @@ async def _(bot: Bot, event: GroupMessageEvent, arg: Message = CommandArg()):
|
||||
# await show_word.send(f"词条 {msg} 回答:\n" + "\n".join(_answer_list))
|
||||
|
||||
|
||||
async def get__builder(event, _problem, answer, idx):
|
||||
async def get__builder(event, _problem: str, answer: str, idx: int):
|
||||
(data_dir / f"{event.group_id}").mkdir(exist_ok=True, parents=True)
|
||||
(data_dir / f"{event.group_id}" / "problem").mkdir(exist_ok=True, parents=True)
|
||||
_builder = WordBankBuilder(event.user_id, event.group_id, _problem)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user