mirror of
https://github.com/zhenxun-org/zhenxun_bot.git
synced 2025-12-15 14:22:55 +08:00
commit
0e1e5a5ebe
@ -24,7 +24,7 @@ __plugin_settings__ = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
quotations = on_regex("^[语录|二次元]$", priority=5, block=True)
|
quotations = on_regex("^(语录|二次元)$", priority=5, block=True)
|
||||||
|
|
||||||
url = "https://international.v1.hitokoto.cn/?c=a"
|
url = "https://international.v1.hitokoto.cn/?c=a"
|
||||||
|
|
||||||
|
|||||||
@ -234,16 +234,27 @@ class WordBank(db.Model):
|
|||||||
query = query.where(cls.word_scope == word_type)
|
query = query.where(cls.word_scope == word_type)
|
||||||
sql_text += f" and word_scope = {word_scope}"
|
sql_text += f" and word_scope = {word_scope}"
|
||||||
# 完全匹配
|
# 完全匹配
|
||||||
if await query.where(cls.problem == problem).gino.first():
|
if await query.where(
|
||||||
|
((cls.word_type == 0) | (cls.word_type == 3)) & (cls.problem == problem)
|
||||||
|
).gino.first():
|
||||||
return query.where(cls.problem == problem)
|
return query.where(cls.problem == problem)
|
||||||
# 模糊匹配
|
# 模糊匹配
|
||||||
if await query.where(cls.problem.contains(problem)).gino.first():
|
if await query.where(
|
||||||
|
(cls.word_type == 1) & (cls.problem.contains(problem))
|
||||||
|
).gino.first():
|
||||||
return query.where(cls.problem.contains(problem))
|
return query.where(cls.problem.contains(problem))
|
||||||
# 正则匹配
|
# 正则匹配
|
||||||
if await db.first(
|
if await db.first(
|
||||||
db.text(sql_text + f" and word_type = 2 and word_scope != 999 and '{problem}' ~ problem;")
|
db.text(
|
||||||
|
sql_text
|
||||||
|
+ f" and word_type = 2 and word_scope != 999 and :problem ~ problem;"
|
||||||
|
),
|
||||||
|
problem=problem,
|
||||||
):
|
):
|
||||||
return sql_text + f" and word_type = 2 and word_scope != 999 and '{problem}' ~ problem;"
|
return (
|
||||||
|
sql_text
|
||||||
|
+ f" and word_type = 2 and word_scope != 999 and '{problem}' ~ problem;"
|
||||||
|
)
|
||||||
# if await db.first(
|
# if await db.first(
|
||||||
# db.text(sql_text + f" and word_type = 1 and word_scope != 999 and '{problem}' ~ problem;")
|
# db.text(sql_text + f" and word_type = 1 and word_scope != 999 and '{problem}' ~ problem;")
|
||||||
# ):
|
# ):
|
||||||
|
|||||||
@ -1,31 +1,38 @@
|
|||||||
import random
|
import imagehash
|
||||||
|
from PIL import Image
|
||||||
|
from io import BytesIO
|
||||||
|
from httpx import TimeoutException
|
||||||
|
|
||||||
|
from nonebot.typing import T_State
|
||||||
from nonebot.adapters.onebot.v11 import MessageEvent
|
from nonebot.adapters.onebot.v11 import MessageEvent
|
||||||
|
|
||||||
from configs.path_config import TEMP_PATH
|
|
||||||
from utils.image_utils import get_img_hash
|
|
||||||
from utils.utils import get_message_text, get_message_img, get_message_at
|
from utils.utils import get_message_text, get_message_img, get_message_at
|
||||||
from ._model import WordBank
|
from ._model import WordBank
|
||||||
from utils.http_utils import AsyncHttpx
|
from utils.http_utils import AsyncHttpx
|
||||||
|
|
||||||
|
|
||||||
async def check(event: MessageEvent) -> bool:
|
async def check(event: MessageEvent, state: T_State) -> bool:
|
||||||
text = get_message_text(event.message)
|
text = get_message_text(event.message)
|
||||||
img = get_message_img(event.message)
|
img = get_message_img(event.message)
|
||||||
at = get_message_at(event.message)
|
at = get_message_at(event.message)
|
||||||
rand = random.randint(1, 100)
|
|
||||||
problem = text
|
problem = text
|
||||||
if not text and len(img) == 1:
|
if not text and len(img) == 1:
|
||||||
if await AsyncHttpx.download_file(img[0], TEMP_PATH / f"{event.user_id}_{rand}_word_bank_check.jpg"):
|
try:
|
||||||
problem = str(get_img_hash(TEMP_PATH / f"{event.user_id}_{rand}_word_bank_check.jpg"))
|
r = await AsyncHttpx.get(img[0])
|
||||||
|
problem = str(imagehash.average_hash(Image.open(BytesIO(r.content))))
|
||||||
|
except TimeoutException:
|
||||||
|
pass
|
||||||
if at:
|
if at:
|
||||||
temp = ''
|
temp = ''
|
||||||
for seg in event.message:
|
for seg in event.message:
|
||||||
if seg.type == 'at':
|
if seg.type == 'at':
|
||||||
temp += f"[at:{seg.data['qq']}]"
|
temp += f"[at:{seg.data['qq']}]"
|
||||||
else:
|
elif isinstance(seg, str):
|
||||||
temp += seg
|
temp += seg
|
||||||
|
elif seg.type == 'text':
|
||||||
|
temp += seg.data["text"]
|
||||||
problem = temp
|
problem = temp
|
||||||
if problem:
|
if problem and (await WordBank.check(event, problem) is not None):
|
||||||
return await WordBank.check(event, problem) is not None
|
state["problem"] = problem
|
||||||
|
return True
|
||||||
return False
|
return False
|
||||||
|
|||||||
@ -1,14 +1,10 @@
|
|||||||
import random
|
|
||||||
|
|
||||||
from services import logger
|
from services import logger
|
||||||
from utils.image_utils import get_img_hash
|
|
||||||
from ._rule import check
|
from ._rule import check
|
||||||
from ._model import WordBank
|
from ._model import WordBank
|
||||||
from configs.path_config import DATA_PATH, TEMP_PATH
|
from configs.path_config import DATA_PATH
|
||||||
from nonebot.adapters.onebot.v11 import GroupMessageEvent, MessageEvent
|
from nonebot.adapters.onebot.v11 import GroupMessageEvent, MessageEvent
|
||||||
from utils.utils import get_message_img, get_message_text, get_message_at
|
|
||||||
from nonebot import on_message
|
from nonebot import on_message
|
||||||
from utils.http_utils import AsyncHttpx
|
from nonebot.typing import T_State
|
||||||
|
|
||||||
__zx_plugin_name__ = "词库问答回复操作 [Hidden]"
|
__zx_plugin_name__ = "词库问答回复操作 [Hidden]"
|
||||||
|
|
||||||
@ -19,26 +15,8 @@ message_handle = on_message(priority=6, block=True, rule=check)
|
|||||||
|
|
||||||
|
|
||||||
@message_handle.handle()
|
@message_handle.handle()
|
||||||
async def _(event: MessageEvent):
|
async def _(event: MessageEvent, state: T_State):
|
||||||
text = get_message_text(event.message)
|
if problem := state.get("problem"):
|
||||||
img = get_message_img(event.message)
|
|
||||||
at = get_message_at(event.message)
|
|
||||||
problem = None
|
|
||||||
if not text and img and len(img) == 1:
|
|
||||||
rand = random.randint(1, 10000)
|
|
||||||
if await AsyncHttpx.download_file(img[0], TEMP_PATH / f"{event.user_id}_{rand}_word_bank.jpg"):
|
|
||||||
problem = str(get_img_hash(TEMP_PATH / f"{event.user_id}_{rand}_word_bank.jpg"))
|
|
||||||
elif at:
|
|
||||||
temp = ''
|
|
||||||
for seg in event.message:
|
|
||||||
if seg.type == 'at':
|
|
||||||
temp += f"[at:{seg.data['qq']}]"
|
|
||||||
else:
|
|
||||||
temp += seg
|
|
||||||
problem = temp
|
|
||||||
elif text:
|
|
||||||
problem = text
|
|
||||||
if problem:
|
|
||||||
if msg := await WordBank.get_answer(event, problem):
|
if msg := await WordBank.get_answer(event, problem):
|
||||||
await message_handle.send(msg)
|
await message_handle.send(msg)
|
||||||
logger.info(
|
logger.info(
|
||||||
@ -46,6 +24,3 @@ async def _(event: MessageEvent):
|
|||||||
f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
|
f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
|
||||||
f" 触发词条 {problem}"
|
f" 触发词条 {problem}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -1,3 +1,4 @@
|
|||||||
|
import re
|
||||||
from typing import Tuple, Any, Optional
|
from typing import Tuple, Any, Optional
|
||||||
|
|
||||||
from nonebot.internal.params import Arg, ArgStr
|
from nonebot.internal.params import Arg, ArgStr
|
||||||
@ -5,11 +6,12 @@ from nonebot.typing import T_State
|
|||||||
|
|
||||||
from utils.utils import get_message_at, is_number, get_message_img
|
from utils.utils import get_message_at, is_number, get_message_img
|
||||||
from nonebot.params import CommandArg, RegexGroup, Command
|
from nonebot.params import CommandArg, RegexGroup, Command
|
||||||
|
from nonebot.exception import FinishedException
|
||||||
from services.log import logger
|
from services.log import logger
|
||||||
from configs.path_config import DATA_PATH
|
from configs.path_config import DATA_PATH
|
||||||
from utils.message_builder import custom_forward_msg
|
from utils.message_builder import custom_forward_msg
|
||||||
from ._model import WordBank
|
from ._model import WordBank
|
||||||
from nonebot.adapters.onebot.v11 import Bot, GroupMessageEvent, Message, MessageEvent, PrivateMessageEvent
|
from nonebot.adapters.onebot.v11 import Bot, GroupMessageEvent, Message, MessageEvent, PrivateMessageEvent, unescape
|
||||||
from nonebot import on_command, on_regex
|
from nonebot import on_command, on_regex
|
||||||
from configs.config import Config
|
from configs.config import Config
|
||||||
from ._data_source import delete_word, update_word, show_word
|
from ._data_source import delete_word, update_word, show_word
|
||||||
@ -23,7 +25,7 @@ usage:
|
|||||||
更推荐使用id方式删除
|
更推荐使用id方式删除
|
||||||
问题回答支持的CQ:at, face, image
|
问题回答支持的CQ:at, face, image
|
||||||
查看词条命令:群聊时为 群词条+全局词条,私聊时为 私聊词条+全局词条
|
查看词条命令:群聊时为 群词条+全局词条,私聊时为 私聊词条+全局词条
|
||||||
添加词条正则:添加词条(模糊|正则|图片)?问\s*?(\S*)\s*?答\s?(\S*)
|
添加词条正则:添加词条(模糊|正则|图片)?问\s*?(\S*\s?\S*)\s*?答\s?(\S*)
|
||||||
指令:
|
指令:
|
||||||
添加词条 ?[模糊|正则|图片]问...答...:添加问答词条,可重复添加相同问题的不同回答
|
添加词条 ?[模糊|正则|图片]问...答...:添加问答词条,可重复添加相同问题的不同回答
|
||||||
删除词条 [问题/下标] ?[下标]:删除指定词条指定或全部回答
|
删除词条 [问题/下标] ?[下标]:删除指定词条指定或全部回答
|
||||||
@ -47,7 +49,7 @@ __plugin_superuser_usage__ = r"""
|
|||||||
usage:
|
usage:
|
||||||
在私聊中超级用户额外设置
|
在私聊中超级用户额外设置
|
||||||
指令:
|
指令:
|
||||||
(全局|私聊)?添加词条\s*?(模糊|正则|图片)?问\s*?(\S*)\s*?答\s?(\S*):添加问答词条,可重复添加相同问题的不同回答
|
(全局|私聊)?添加词条\s*?(模糊|正则|图片)?问\s*?(\S*\s?\S*)\s*?答\s?(\S*):添加问答词条,可重复添加相同问题的不同回答
|
||||||
全局添加词条
|
全局添加词条
|
||||||
私聊添加词条
|
私聊添加词条
|
||||||
(私聊情况下)删除词条: 删除私聊词条
|
(私聊情况下)删除词条: 删除私聊词条
|
||||||
@ -74,7 +76,7 @@ data_dir = DATA_PATH / "word_bank"
|
|||||||
data_dir.mkdir(parents=True, exist_ok=True)
|
data_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
add_word = on_regex(
|
add_word = on_regex(
|
||||||
r"^(全局|私聊)?添加词条\s*?(模糊|正则|图片)?问\s*?(\S*)\s*?答\s?(\S*)", priority=5, block=True
|
r"^(全局|私聊)?添加词条\s*?(模糊|正则|图片)?问\s*?(\S*\s?\S*)\s*?答\s?(\S*)", priority=5, block=True
|
||||||
)
|
)
|
||||||
|
|
||||||
delete_word_matcher = on_command("删除词条", aliases={'删除全局词条'}, priority=5, block=True)
|
delete_word_matcher = on_command("删除词条", aliases={'删除全局词条'}, priority=5, block=True)
|
||||||
@ -113,16 +115,21 @@ async def _(
|
|||||||
if seg.type == 'text' and '答' in str(seg):
|
if seg.type == 'text' and '答' in str(seg):
|
||||||
_problem = event.message[:index]
|
_problem = event.message[:index]
|
||||||
answer = event.message[index:]
|
answer = event.message[index:]
|
||||||
answer[0] = str(answer[0])[str(answer).index('答')+1:]
|
answer[0] = str(answer[0])[str(answer[0]).index('答')+1:]
|
||||||
_problem[0] = str(_problem[0])[str(_problem).index('问')+1:]
|
_problem[0] = str(_problem[0])[str(_problem[0]).index('问')+1:]
|
||||||
|
if _problem[-1].type != 'at' or seg.data['text'][:seg.data['text'].index('答')].lstrip():
|
||||||
|
_problem.append(seg.data['text'][:seg.data['text'].index('答')])
|
||||||
temp = ''
|
temp = ''
|
||||||
for g in _problem:
|
for g in _problem:
|
||||||
if isinstance(g, str) or g.type == 'text':
|
if isinstance(g, str):
|
||||||
temp += g
|
temp += g
|
||||||
|
elif g.type == 'text':
|
||||||
|
temp += g.data['text']
|
||||||
elif g.type == 'at':
|
elif g.type == 'at':
|
||||||
temp += f"[at:{g.data['qq']}]"
|
temp += f"[at:{g.data['qq']}]"
|
||||||
problem = temp
|
problem = temp
|
||||||
break
|
break
|
||||||
|
problem = unescape(problem)
|
||||||
index = len((word_scope or "") + "添加词条" + (word_type or "") + problem) + 1
|
index = len((word_scope or "") + "添加词条" + (word_type or "") + problem) + 1
|
||||||
event.message[0] = event.message[0].data["text"][index + 1 :].strip()
|
event.message[0] = event.message[0].data["text"][index + 1 :].strip()
|
||||||
state["word_scope"] = word_scope
|
state["word_scope"] = word_scope
|
||||||
@ -141,6 +148,11 @@ async def _(
|
|||||||
problem_image: Message = Arg("problem_image"),
|
problem_image: Message = Arg("problem_image"),
|
||||||
):
|
):
|
||||||
try:
|
try:
|
||||||
|
if word_type == "正则":
|
||||||
|
try:
|
||||||
|
re.compile(problem)
|
||||||
|
except re.error:
|
||||||
|
await add_word.finish(f"添加词条失败,正则表达式 {problem} 非法!")
|
||||||
await WordBank.add_problem_answer(
|
await WordBank.add_problem_answer(
|
||||||
event.user_id,
|
event.user_id,
|
||||||
event.group_id if isinstance(event, GroupMessageEvent) and (not word_scope or word_scope == '1') else 0,
|
event.group_id if isinstance(event, GroupMessageEvent) and (not word_scope or word_scope == '1') else 0,
|
||||||
@ -150,6 +162,8 @@ async def _(
|
|||||||
answer,
|
answer,
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
if isinstance(e, FinishedException):
|
||||||
|
await add_word.finish()
|
||||||
logger.error(
|
logger.error(
|
||||||
f"(USER {event.user_id}, GROUP "
|
f"(USER {event.user_id}, GROUP "
|
||||||
f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
|
f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
|
||||||
@ -233,7 +247,7 @@ async def _(bot: Bot, event: GroupMessageEvent, arg: Message = CommandArg()):
|
|||||||
not is_number(id_)
|
not is_number(id_)
|
||||||
or int(id_) < 0
|
or int(id_) < 0
|
||||||
or int(id_)
|
or int(id_)
|
||||||
> len(await WordBank.get_group_all_problem(event.group_id))
|
>= len(await WordBank.get_group_all_problem(event.group_id))
|
||||||
):
|
):
|
||||||
await show_word_matcher.finish("id必须为数字且在范围内")
|
await show_word_matcher.finish("id必须为数字且在范围内")
|
||||||
id_ = int(id_)
|
id_ = int(id_)
|
||||||
@ -243,7 +257,7 @@ async def _(bot: Bot, event: GroupMessageEvent, arg: Message = CommandArg()):
|
|||||||
not is_number(gid)
|
not is_number(gid)
|
||||||
or int(gid) < 0
|
or int(gid) < 0
|
||||||
or int(gid)
|
or int(gid)
|
||||||
> len(await WordBank.get_problem_by_scope(0))
|
>= len(await WordBank.get_problem_by_scope(0))
|
||||||
):
|
):
|
||||||
await show_word_matcher.finish("gid必须为数字且在范围内")
|
await show_word_matcher.finish("gid必须为数字且在范围内")
|
||||||
gid = int(gid)
|
gid = int(gid)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user