mirror of
https://github.com/zhenxun-org/zhenxun_bot.git
synced 2025-12-15 14:22:55 +08:00
Update Modify the entry
This commit is contained in:
parent
14d1512b1e
commit
4f9445a7e9
@ -5,7 +5,6 @@ from typing import Union
|
|||||||
class WordBankBuilder:
|
class WordBankBuilder:
|
||||||
|
|
||||||
def __init__(self, user_id: int, group_id: int, problem: str):
|
def __init__(self, user_id: int, group_id: int, problem: str):
|
||||||
|
|
||||||
self._data = {
|
self._data = {
|
||||||
"user_id": user_id,
|
"user_id": user_id,
|
||||||
"group_id": group_id,
|
"group_id": group_id,
|
||||||
@ -37,16 +36,13 @@ class WordBankBuilder:
|
|||||||
placeholder = self._data.get("placeholder")
|
placeholder = self._data.get("placeholder")
|
||||||
await WordBank.add_problem_answer(user_id, group_id, problem, answer, placeholder)
|
await WordBank.add_problem_answer(user_id, group_id, problem, answer, placeholder)
|
||||||
|
|
||||||
|
async def update(self, index):
|
||||||
|
user_id = self._data["user_id"]
|
||||||
|
group_id = self._data["group_id"]
|
||||||
|
problem = self._data["problem"]
|
||||||
|
answer = self._data["answer"]
|
||||||
|
placeholder = self._data.get("placeholder")
|
||||||
|
return await WordBank.update_problem_answer(user_id, group_id, problem, answer, index, placeholder)
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return str(self._data)
|
return str(self._data)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -21,12 +21,12 @@ class WordBank(db.Model):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def add_problem_answer(
|
async def add_problem_answer(
|
||||||
cls,
|
cls,
|
||||||
user_id: int,
|
user_id: int,
|
||||||
group_id: Optional[int],
|
group_id: Optional[int],
|
||||||
problem: str,
|
problem: str,
|
||||||
answer: str,
|
answer: str,
|
||||||
format_: Optional[List[Tuple[int, Union[int, str]]]],
|
format_: Optional[List[Tuple[int, Union[int, str]]]],
|
||||||
) -> bool:
|
) -> bool:
|
||||||
"""
|
"""
|
||||||
添加或新增一个问答
|
添加或新增一个问答
|
||||||
@ -47,7 +47,7 @@ class WordBank(db.Model):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def delete_problem_answer(
|
async def delete_problem_answer(
|
||||||
cls, user_id: int, group_id: Optional[int], problem: str, index: Optional[int]
|
cls, user_id: int, group_id: Optional[int], problem: str, index: Optional[int]
|
||||||
) -> str:
|
) -> str:
|
||||||
"""
|
"""
|
||||||
删除某问题一个或全部回答
|
删除某问题一个或全部回答
|
||||||
@ -60,9 +60,35 @@ class WordBank(db.Model):
|
|||||||
user_id, group_id, problem, "delete", index=index
|
user_id, group_id, problem, "delete", index=index
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def update_problem_answer(
|
||||||
|
cls,
|
||||||
|
user_id: int,
|
||||||
|
group_id: Optional[int],
|
||||||
|
problem: str,
|
||||||
|
answer: str,
|
||||||
|
index: Optional[int],
|
||||||
|
format_: Optional[List[Tuple[int, Union[int, str]]]],
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
修改某问题一个或全部回答
|
||||||
|
:param user_id: 用户id
|
||||||
|
:param group_id: 群号
|
||||||
|
:param problem: 问题
|
||||||
|
:param index: 回答下标
|
||||||
|
"""
|
||||||
|
_str = None
|
||||||
|
if format_:
|
||||||
|
_str = ""
|
||||||
|
for x, y in format_:
|
||||||
|
_str += f"{x}<_s>{y}<format>"
|
||||||
|
return await cls._problem_answer_handle(
|
||||||
|
user_id, group_id, problem, "update", answer=answer,index=index, format_=_str
|
||||||
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def get_problem_answer(
|
async def get_problem_answer(
|
||||||
cls, user_id: int, group_id: Optional[int], problem: str
|
cls, user_id: int, group_id: Optional[int], problem: str
|
||||||
) -> List[str]:
|
) -> List[str]:
|
||||||
"""
|
"""
|
||||||
获取问题的所有回答
|
获取问题的所有回答
|
||||||
@ -135,15 +161,15 @@ class WordBank(db.Model):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def _problem_answer_handle(
|
async def _problem_answer_handle(
|
||||||
cls,
|
cls,
|
||||||
user_id: int,
|
user_id: int,
|
||||||
group_id: Optional[int],
|
group_id: Optional[int],
|
||||||
problem: str,
|
problem: str,
|
||||||
type_: str,
|
type_: str,
|
||||||
*,
|
*,
|
||||||
answer: Optional[str] = None,
|
answer: Optional[str] = None,
|
||||||
index: Optional[int] = None,
|
index: Optional[int] = None,
|
||||||
format_: Optional[str] = None,
|
format_: Optional[str] = None,
|
||||||
) -> Union[List[Union[str, Tuple[str, str]]], bool, str]:
|
) -> Union[List[Union[str, Tuple[str, str]]], bool, str]:
|
||||||
"""
|
"""
|
||||||
添加或新增一个问答
|
添加或新增一个问答
|
||||||
@ -174,8 +200,8 @@ class WordBank(db.Model):
|
|||||||
problem=problem,
|
problem=problem,
|
||||||
answer=answer,
|
answer=answer,
|
||||||
format=format_,
|
format=format_,
|
||||||
create_time=datetime.now().date(),
|
create_time=datetime.now().replace(microsecond=0),
|
||||||
update_time=datetime.now().date(),
|
update_time=datetime.now().replace(microsecond=0),
|
||||||
)
|
)
|
||||||
return True
|
return True
|
||||||
elif type_ == "delete":
|
elif type_ == "delete":
|
||||||
@ -204,9 +230,39 @@ class WordBank(db.Model):
|
|||||||
& (cls.group_id == group_id)
|
& (cls.group_id == group_id)
|
||||||
).gino.status()
|
).gino.status()
|
||||||
return answer
|
return answer
|
||||||
|
elif type_ == "update":
|
||||||
|
new_format =format_
|
||||||
|
new_answer = answer
|
||||||
|
q = await q.with_for_update().gino.all()
|
||||||
|
if q:
|
||||||
|
path = DATA_PATH / "word_bank" / f"{group_id}"
|
||||||
|
if index is not None:
|
||||||
|
_q = [x.problem for x in q]
|
||||||
|
_q.sort()
|
||||||
|
prob = _q[index]
|
||||||
|
index = [x.problem for x in q].index(prob)
|
||||||
|
q = [q[index]]
|
||||||
|
else:
|
||||||
|
q = [q[0]]
|
||||||
|
for x in q:
|
||||||
|
format_ = x.format
|
||||||
|
if format_:
|
||||||
|
for sp in format_.split("<format>")[:-1]:
|
||||||
|
_, image_name = sp.split("<_s>")
|
||||||
|
if image_name.endswith("jpg"):
|
||||||
|
_path = path / image_name
|
||||||
|
if _path.exists():
|
||||||
|
_path.unlink()
|
||||||
|
await cls.update.values(answer=new_answer,
|
||||||
|
format=new_format,
|
||||||
|
update_time=datetime.now().replace(microsecond=0), ).where(
|
||||||
|
(cls.problem == problem)
|
||||||
|
& (cls.answer == x.answer)
|
||||||
|
& (cls.group_id == group_id)
|
||||||
|
).gino.status()
|
||||||
|
return True
|
||||||
elif type_ == "get":
|
elif type_ == "get":
|
||||||
q = await q.gino.all()
|
q = await q.gino.all()
|
||||||
if q:
|
if q:
|
||||||
return [(x.answer, x.format.split("<format>")[:-1]) for x in q]
|
return [(x.answer, x.format.split("<format>")[:-1]) for x in q]
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|||||||
@ -52,11 +52,12 @@ __plugin_settings__ = {
|
|||||||
data_dir = DATA_PATH / "word_bank"
|
data_dir = DATA_PATH / "word_bank"
|
||||||
data_dir.mkdir(parents=True, exist_ok=True)
|
data_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
|
||||||
add_word = on_command("添加词条", priority=5, block=True)
|
add_word = on_command("添加词条", priority=5, block=True)
|
||||||
|
|
||||||
delete_word = on_command("删除词条", priority=5, block=True)
|
delete_word = on_command("删除词条", priority=5, block=True)
|
||||||
|
|
||||||
|
update_word = on_command("修改词条", priority=5, block=True)
|
||||||
|
|
||||||
show_word = on_command("显示词条", aliases={"查看词条"}, priority=5, block=True)
|
show_word = on_command("显示词条", aliases={"查看词条"}, priority=5, block=True)
|
||||||
|
|
||||||
|
|
||||||
@ -75,10 +76,121 @@ async def _(bot: Bot, event: GroupMessageEvent, state: T_State, arg: Message = C
|
|||||||
idx = 0
|
idx = 0
|
||||||
for n in bot.config.nickname:
|
for n in bot.config.nickname:
|
||||||
if n and problem.startswith(n):
|
if n and problem.startswith(n):
|
||||||
_problem = f"[_to_me|{n}]" + problem[len(n) :]
|
_problem = f"[_to_me|{n}]" + problem[len(n):]
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
_problem = problem
|
_problem = problem
|
||||||
|
_builder = await get__builder(event, _problem, answer, idx)
|
||||||
|
await _builder.save()
|
||||||
|
logger.info(f"已保存词条 问:{problem} 答:{msg}")
|
||||||
|
await add_word.send(f"已保存词条:{problem}")
|
||||||
|
|
||||||
|
|
||||||
|
@delete_word.handle()
|
||||||
|
async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
|
||||||
|
msg = arg.extract_plain_text().strip()
|
||||||
|
if not msg:
|
||||||
|
await delete_word.finish("此命令之后需要跟随指定词条,通过“显示词条“查看")
|
||||||
|
index = None
|
||||||
|
_sp_msg = msg.split()
|
||||||
|
if len(_sp_msg) > 1:
|
||||||
|
if is_number(_sp_msg[-1]):
|
||||||
|
index = int(_sp_msg[-1])
|
||||||
|
msg = " ".join(_sp_msg[:-1])
|
||||||
|
problem = msg
|
||||||
|
if problem.startswith("id:"):
|
||||||
|
x = problem.split(":")[-1]
|
||||||
|
if not is_number(x) or int(x) < 0:
|
||||||
|
await delete_word.finish("id必须为数字且符合规范!")
|
||||||
|
p = await WordBank.get_group_all_problem(event.group_id)
|
||||||
|
if p:
|
||||||
|
problem = p[int(x)]
|
||||||
|
try:
|
||||||
|
if answer := await WordBank.delete_problem_answer(
|
||||||
|
event.user_id, event.group_id, problem, index
|
||||||
|
):
|
||||||
|
await delete_word.send(f"删除词条成功:{problem}\n回答:\n{answer}")
|
||||||
|
logger.info(
|
||||||
|
f"(USER {event.user_id}, GROUP "
|
||||||
|
f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
|
||||||
|
f" 删除词条: {problem}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
await delete_word.send(f"删除词条:{problem} 失败,可能该词条不存在")
|
||||||
|
except IndexError:
|
||||||
|
await delete_word.send("指定下标错误...请通过查看词条来确定..")
|
||||||
|
|
||||||
|
|
||||||
|
@update_word.handle()
|
||||||
|
async def _(bot: Bot,event: GroupMessageEvent, arg: Message = CommandArg()):
|
||||||
|
msg = str(arg)
|
||||||
|
if not msg:
|
||||||
|
await update_word.finish("此命令之后需要跟随指定词条,通过“显示词条“查看")
|
||||||
|
index = None
|
||||||
|
new_answer = None
|
||||||
|
problem = None
|
||||||
|
_sp_msg = msg.split()
|
||||||
|
len_msg = len(_sp_msg)
|
||||||
|
if 1 < len_msg:
|
||||||
|
problem = "".join(_sp_msg[0])
|
||||||
|
if len_msg == 3:
|
||||||
|
if is_number(_sp_msg[1]):
|
||||||
|
index = int(_sp_msg[1])
|
||||||
|
new_answer = "".join(_sp_msg[2:])
|
||||||
|
else:
|
||||||
|
new_answer = "".join(_sp_msg[1:])
|
||||||
|
else:
|
||||||
|
await update_word.finish("此命令之后需要跟随修改内容")
|
||||||
|
idx = 0
|
||||||
|
for n in bot.config.nickname:
|
||||||
|
if n and problem.startswith(n):
|
||||||
|
_problem = f"[_to_me|{n}]" + problem[len(n):]
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
_problem = problem
|
||||||
|
_builder = await get__builder(event, _problem, new_answer, idx)
|
||||||
|
|
||||||
|
try:
|
||||||
|
if await _builder.update(index):
|
||||||
|
await update_word.send(f"修改词条成功:{problem}")
|
||||||
|
logger.info(
|
||||||
|
f"(USER {event.user_id}, GROUP "
|
||||||
|
f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
|
||||||
|
f" 修改词条: {problem}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
await update_word.send(f"修改词条:{problem} 失败,可能该词条不存在")
|
||||||
|
except IndexError:
|
||||||
|
await update_word.send("指定下标错误...请通过查看词条来确定..")
|
||||||
|
|
||||||
|
|
||||||
|
@show_word.handle()
|
||||||
|
async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
|
||||||
|
msg = arg.extract_plain_text().strip()
|
||||||
|
if not msg:
|
||||||
|
_problem_list = await WordBank.get_group_all_problem(event.group_id)
|
||||||
|
if not _problem_list:
|
||||||
|
await show_word.finish("该群未收录任何词条..")
|
||||||
|
_problem_list = [f"\t{i}. {x}" for i, x in enumerate(_problem_list)]
|
||||||
|
await show_word.send(
|
||||||
|
image(
|
||||||
|
b64=(await text2image(
|
||||||
|
"该群已收录的词条:\n\n" + "\n".join(_problem_list),
|
||||||
|
padding=10,
|
||||||
|
color="#f9f6f2",
|
||||||
|
)).pic2bs4()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
_answer_list = await WordBank.get_group_all_answer(event.group_id, msg)
|
||||||
|
if not _answer_list:
|
||||||
|
await show_word.send("未收录该词条...")
|
||||||
|
else:
|
||||||
|
_answer_list = [f"{i}. {x}" for i, x in enumerate(_answer_list)]
|
||||||
|
await show_word.send(f"词条 {msg} 回答:\n" + "\n".join(_answer_list))
|
||||||
|
|
||||||
|
|
||||||
|
async def get__builder(event, _problem, answer, idx):
|
||||||
(data_dir / f"{event.group_id}").mkdir(exist_ok=True, parents=True)
|
(data_dir / f"{event.group_id}").mkdir(exist_ok=True, parents=True)
|
||||||
_builder = WordBankBuilder(event.user_id, event.group_id, _problem)
|
_builder = WordBankBuilder(event.user_id, event.group_id, _problem)
|
||||||
for at_ in get_message_at(event.json()):
|
for at_ in get_message_at(event.json()):
|
||||||
@ -108,67 +220,4 @@ async def _(bot: Bot, event: GroupMessageEvent, state: T_State, arg: Message = C
|
|||||||
_builder.set_placeholder(idx, f"__placeholder_{rand}_{idx}.jpg")
|
_builder.set_placeholder(idx, f"__placeholder_{rand}_{idx}.jpg")
|
||||||
idx += 1
|
idx += 1
|
||||||
_builder.set_answer(answer)
|
_builder.set_answer(answer)
|
||||||
await _builder.save()
|
return _builder
|
||||||
logger.info(f"已保存词条 问:{problem} 答:{msg}")
|
|
||||||
await add_word.send(f"已保存词条:{problem}")
|
|
||||||
|
|
||||||
|
|
||||||
@delete_word.handle()
|
|
||||||
async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
|
|
||||||
msg = arg.extract_plain_text().strip()
|
|
||||||
if not msg:
|
|
||||||
await delete_word.finish("此命令之后需要跟随指定词条,通过“显示词条“查看")
|
|
||||||
index = None
|
|
||||||
_sp_msg = msg.split()
|
|
||||||
if len(_sp_msg) > 1:
|
|
||||||
if is_number(_sp_msg[-1]):
|
|
||||||
index = int(_sp_msg[-1])
|
|
||||||
msg = " ".join(_sp_msg[:-1])
|
|
||||||
problem = msg
|
|
||||||
if problem.startswith("id:"):
|
|
||||||
x = problem.split(":")[-1]
|
|
||||||
if not is_number(x) or int(x) < 0:
|
|
||||||
await delete_word.finish("id必须为数字且符合规范!")
|
|
||||||
p = await WordBank.get_group_all_problem(event.group_id)
|
|
||||||
if p:
|
|
||||||
problem = p[int(x)]
|
|
||||||
try:
|
|
||||||
if answer := await WordBank.delete_problem_answer(
|
|
||||||
event.user_id, event.group_id, problem, index
|
|
||||||
):
|
|
||||||
await delete_word.send(f"删除词条成功:{problem}\n回答:\n{answer}")
|
|
||||||
logger.info(
|
|
||||||
f"(USER {event.user_id}, GROUP "
|
|
||||||
f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
|
|
||||||
f" 删除词条: {problem}"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
await delete_word.send(f"删除词条:{problem} 失败,可能该词条不存在")
|
|
||||||
except IndexError:
|
|
||||||
await delete_word.send("指定下标错误...请通过查看词条来确定..")
|
|
||||||
|
|
||||||
|
|
||||||
@show_word.handle()
|
|
||||||
async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
|
|
||||||
msg = arg.extract_plain_text().strip()
|
|
||||||
if not msg:
|
|
||||||
_problem_list = await WordBank.get_group_all_problem(event.group_id)
|
|
||||||
if not _problem_list:
|
|
||||||
await show_word.finish("该群未收录任何词条..")
|
|
||||||
_problem_list = [f"\t{i}. {x}" for i, x in enumerate(_problem_list)]
|
|
||||||
await show_word.send(
|
|
||||||
image(
|
|
||||||
b64=(await text2image(
|
|
||||||
"该群已收录的词条:\n\n" + "\n".join(_problem_list),
|
|
||||||
padding=10,
|
|
||||||
color="#f9f6f2",
|
|
||||||
)).pic2bs4()
|
|
||||||
)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
_answer_list = await WordBank.get_group_all_answer(event.group_id, msg)
|
|
||||||
if not _answer_list:
|
|
||||||
await show_word.send("未收录该词条...")
|
|
||||||
else:
|
|
||||||
_answer_list = [f"{i}. {x}" for i, x in enumerate(_answer_list)]
|
|
||||||
await show_word.send(f"词条 {msg} 回答:\n" + "\n".join(_answer_list))
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user