Merge pull request #1 from yajiwa/zhenxun_yajiwa

Update Modify the entry
This commit is contained in:
yajiwa 2022-04-01 23:56:57 +08:00 committed by GitHub
commit 07c16e5238
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 205 additions and 99 deletions

View File

@ -5,7 +5,6 @@ from typing import Union
class WordBankBuilder:
def __init__(self, user_id: int, group_id: int, problem: str):
self._data = {
"user_id": user_id,
"group_id": group_id,
@ -37,16 +36,13 @@ class WordBankBuilder:
placeholder = self._data.get("placeholder")
await WordBank.add_problem_answer(user_id, group_id, problem, answer, placeholder)
async def update(self, index):
user_id = self._data["user_id"]
group_id = self._data["group_id"]
problem = self._data["problem"]
answer = self._data["answer"]
placeholder = self._data.get("placeholder")
return await WordBank.update_problem_answer(user_id, group_id, problem, answer, index, placeholder)
def __str__(self):
return str(self._data)

View File

@ -21,12 +21,12 @@ class WordBank(db.Model):
@classmethod
async def add_problem_answer(
cls,
user_id: int,
group_id: Optional[int],
problem: str,
answer: str,
format_: Optional[List[Tuple[int, Union[int, str]]]],
cls,
user_id: int,
group_id: Optional[int],
problem: str,
answer: str,
format_: Optional[List[Tuple[int, Union[int, str]]]],
) -> bool:
"""
添加或新增一个问答
@ -47,7 +47,7 @@ class WordBank(db.Model):
@classmethod
async def delete_problem_answer(
cls, user_id: int, group_id: Optional[int], problem: str, index: Optional[int]
cls, user_id: int, group_id: Optional[int], problem: str, index: Optional[int]
) -> str:
"""
删除某问题一个或全部回答
@ -60,9 +60,35 @@ class WordBank(db.Model):
user_id, group_id, problem, "delete", index=index
)
@classmethod
async def update_problem_answer(
cls,
user_id: int,
group_id: Optional[int],
problem: str,
answer: str,
index: Optional[int],
format_: Optional[List[Tuple[int, Union[int, str]]]],
) -> str:
"""
修改某问题一个或全部回答
:param user_id: 用户id
:param group_id: 群号
:param problem: 问题
:param index: 回答下标
"""
_str = None
if format_:
_str = ""
for x, y in format_:
_str += f"{x}<_s>{y}<format>"
return await cls._problem_answer_handle(
user_id, group_id, problem, "update", answer=answer,index=index, format_=_str
)
@classmethod
async def get_problem_answer(
cls, user_id: int, group_id: Optional[int], problem: str
cls, user_id: int, group_id: Optional[int], problem: str
) -> List[str]:
"""
获取问题的所有回答
@ -135,15 +161,15 @@ class WordBank(db.Model):
@classmethod
async def _problem_answer_handle(
cls,
user_id: int,
group_id: Optional[int],
problem: str,
type_: str,
*,
answer: Optional[str] = None,
index: Optional[int] = None,
format_: Optional[str] = None,
cls,
user_id: int,
group_id: Optional[int],
problem: str,
type_: str,
*,
answer: Optional[str] = None,
index: Optional[int] = None,
format_: Optional[str] = None,
) -> Union[List[Union[str, Tuple[str, str]]], bool, str]:
"""
添加或新增一个问答
@ -174,8 +200,8 @@ class WordBank(db.Model):
problem=problem,
answer=answer,
format=format_,
create_time=datetime.now().date(),
update_time=datetime.now().date(),
create_time=datetime.now().replace(microsecond=0),
update_time=datetime.now().replace(microsecond=0),
)
return True
elif type_ == "delete":
@ -204,9 +230,39 @@ class WordBank(db.Model):
& (cls.group_id == group_id)
).gino.status()
return answer
elif type_ == "update":
new_format =format_
new_answer = answer
q = await q.with_for_update().gino.all()
if q:
path = DATA_PATH / "word_bank" / f"{group_id}"
if index is not None:
_q = [x.problem for x in q]
_q.sort()
prob = _q[index]
index = [x.problem for x in q].index(prob)
q = [q[index]]
else:
q = [q[0]]
for x in q:
format_ = x.format
if format_:
for sp in format_.split("<format>")[:-1]:
_, image_name = sp.split("<_s>")
if image_name.endswith("jpg"):
_path = path / image_name
if _path.exists():
_path.unlink()
await cls.update.values(answer=new_answer,
format=new_format,
update_time=datetime.now().replace(microsecond=0), ).where(
(cls.problem == problem)
& (cls.answer == x.answer)
& (cls.group_id == group_id)
).gino.status()
return True
elif type_ == "get":
q = await q.gino.all()
if q:
return [(x.answer, x.format.split("<format>")[:-1]) for x in q]
return False

View File

@ -27,11 +27,15 @@ usage
指令
添加词条问......添加问答词条可重复添加相同问题的不同回答
删除词条 [问题/下标] ?[下标]删除指定词条指定或全部回答
修改词条 [问题/下标] ?[下标/新回答] [新回答]修改指定词条指定回答默认修改为第一条
查看词条 ?[问题/下标]查看全部词条或对应词条回答
示例添加词条问谁是萝莉答是我
示例删除词条 谁是萝莉
示例删除词条 谁是萝莉 0
示例删除词条 id:0
示例修改词条 谁是萝莉 是你
示例修改词条 谁是萝莉 0 是你
示例修改词条 id:0 是你
示例查看词条
示例查看词条 谁是萝莉
示例查看词条 id:0
@ -40,23 +44,25 @@ __plugin_des__ = "自定义词条内容随机回复"
__plugin_cmd__ = [
"添加词条问...答..",
"删除词条 [问题/下标] ?[下标]",
"修改词条 [问题/下标] ?[下标/新回答] [新回答]",
"查看词条 ?[问题/下标]",
]
__plugin_version__ = 0.1
__plugin_author__ = "HibiKier"
__plugin_settings__ = {
"admin_level": Config.get_config("word_bank", "WORD_BANK_LEVEL"),
"cmd": ["词库问答", "添加词条", "删除词条", "查看词条"],
"cmd": ["词库问答", "添加词条", "删除词条", "修改词条", "查看词条"],
}
data_dir = DATA_PATH / "word_bank"
data_dir.mkdir(parents=True, exist_ok=True)
add_word = on_command("添加词条", priority=5, block=True)
delete_word = on_command("删除词条", priority=5, block=True)
update_word = on_command("修改词条", priority=5, block=True)
show_word = on_command("显示词条", aliases={"查看词条"}, priority=5, block=True)
@ -75,10 +81,121 @@ async def _(bot: Bot, event: GroupMessageEvent, state: T_State, arg: Message = C
idx = 0
for n in bot.config.nickname:
if n and problem.startswith(n):
_problem = f"[_to_me|{n}]" + problem[len(n) :]
_problem = f"[_to_me|{n}]" + problem[len(n):]
break
else:
_problem = problem
_builder = await get__builder(event, _problem, answer, idx)
await _builder.save()
logger.info(f"已保存词条 问:{problem} 答:{msg}")
await add_word.send(f"已保存词条:{problem}")
@delete_word.handle()
async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
msg = arg.extract_plain_text().strip()
if not msg:
await delete_word.finish("此命令之后需要跟随指定词条,通过“显示词条“查看")
index = None
_sp_msg = msg.split()
if len(_sp_msg) > 1:
if is_number(_sp_msg[-1]):
index = int(_sp_msg[-1])
msg = " ".join(_sp_msg[:-1])
problem = msg
if problem.startswith("id:"):
x = problem.split(":")[-1]
if not is_number(x) or int(x) < 0:
await delete_word.finish("id必须为数字且符合规范")
p = await WordBank.get_group_all_problem(event.group_id)
if p:
problem = p[int(x)]
try:
if answer := await WordBank.delete_problem_answer(
event.user_id, event.group_id, problem, index
):
await delete_word.send(f"删除词条成功:{problem}\n回答:\n{answer}")
logger.info(
f"(USER {event.user_id}, GROUP "
f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
f" 删除词条: {problem}"
)
else:
await delete_word.send(f"删除词条:{problem} 失败,可能该词条不存在")
except IndexError:
await delete_word.send("指定下标错误...请通过查看词条来确定..")
@update_word.handle()
async def _(bot: Bot, event: GroupMessageEvent, arg: Message = CommandArg()):
msg = str(arg)
if not msg:
await update_word.finish("此命令之后需要跟随指定词条,通过“显示词条“查看")
index = None
new_answer = None
problem = None
_sp_msg = msg.split()
len_msg = len(_sp_msg)
if 1 < len_msg:
problem = "".join(_sp_msg[0])
if len_msg == 3:
if is_number(_sp_msg[1]):
index = int(_sp_msg[1])
new_answer = "".join(_sp_msg[2:])
else:
new_answer = "".join(_sp_msg[1:])
else:
await update_word.finish("此命令之后需要跟随修改内容")
idx = 0
for n in bot.config.nickname:
if n and problem.startswith(n):
_problem = f"[_to_me|{n}]" + problem[len(n):]
break
else:
_problem = problem
_builder = await get__builder(event, _problem, new_answer, idx)
try:
if await _builder.update(index):
await update_word.send(f"修改词条成功:{problem}")
logger.info(
f"(USER {event.user_id}, GROUP "
f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
f" 修改词条: {problem}"
)
else:
await update_word.send(f"修改词条:{problem} 失败,可能该词条不存在")
except IndexError:
await update_word.send("指定下标错误...请通过查看词条来确定..")
@show_word.handle()
async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
msg = arg.extract_plain_text().strip()
if not msg:
_problem_list = await WordBank.get_group_all_problem(event.group_id)
if not _problem_list:
await show_word.finish("该群未收录任何词条..")
_problem_list = [f"\t{i}. {x}" for i, x in enumerate(_problem_list)]
await show_word.send(
image(
b64=(await text2image(
"该群已收录的词条:\n\n" + "\n".join(_problem_list),
padding=10,
color="#f9f6f2",
)).pic2bs4()
)
)
else:
_answer_list = await WordBank.get_group_all_answer(event.group_id, msg)
if not _answer_list:
await show_word.send("未收录该词条...")
else:
_answer_list = [f"{i}. {x}" for i, x in enumerate(_answer_list)]
await show_word.send(f"词条 {msg} 回答:\n" + "\n".join(_answer_list))
async def get__builder(event, _problem, answer, idx):
(data_dir / f"{event.group_id}").mkdir(exist_ok=True, parents=True)
_builder = WordBankBuilder(event.user_id, event.group_id, _problem)
for at_ in get_message_at(event.json()):
@ -108,67 +225,4 @@ async def _(bot: Bot, event: GroupMessageEvent, state: T_State, arg: Message = C
_builder.set_placeholder(idx, f"__placeholder_{rand}_{idx}.jpg")
idx += 1
_builder.set_answer(answer)
await _builder.save()
logger.info(f"已保存词条 问:{problem} 答:{msg}")
await add_word.send(f"已保存词条:{problem}")
@delete_word.handle()
async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
msg = arg.extract_plain_text().strip()
if not msg:
await delete_word.finish("此命令之后需要跟随指定词条,通过“显示词条“查看")
index = None
_sp_msg = msg.split()
if len(_sp_msg) > 1:
if is_number(_sp_msg[-1]):
index = int(_sp_msg[-1])
msg = " ".join(_sp_msg[:-1])
problem = msg
if problem.startswith("id:"):
x = problem.split(":")[-1]
if not is_number(x) or int(x) < 0:
await delete_word.finish("id必须为数字且符合规范")
p = await WordBank.get_group_all_problem(event.group_id)
if p:
problem = p[int(x)]
try:
if answer := await WordBank.delete_problem_answer(
event.user_id, event.group_id, problem, index
):
await delete_word.send(f"删除词条成功:{problem}\n回答:\n{answer}")
logger.info(
f"(USER {event.user_id}, GROUP "
f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
f" 删除词条: {problem}"
)
else:
await delete_word.send(f"删除词条:{problem} 失败,可能该词条不存在")
except IndexError:
await delete_word.send("指定下标错误...请通过查看词条来确定..")
@show_word.handle()
async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
msg = arg.extract_plain_text().strip()
if not msg:
_problem_list = await WordBank.get_group_all_problem(event.group_id)
if not _problem_list:
await show_word.finish("该群未收录任何词条..")
_problem_list = [f"\t{i}. {x}" for i, x in enumerate(_problem_list)]
await show_word.send(
image(
b64=(await text2image(
"该群已收录的词条:\n\n" + "\n".join(_problem_list),
padding=10,
color="#f9f6f2",
)).pic2bs4()
)
)
else:
_answer_list = await WordBank.get_group_all_answer(event.group_id, msg)
if not _answer_list:
await show_word.send("未收录该词条...")
else:
_answer_list = [f"{i}. {x}" for i, x in enumerate(_answer_list)]
await show_word.send(f"词条 {msg} 回答:\n" + "\n".join(_answer_list))
return _builder