feat(tag): 添加标签克隆功能

- 新增 `tag clone <源标签名> <新标签名>` 命令,用于复制现有标签。
- 【优化】在 `tag create`, `tag edit --add`, `tag edit --set` 命令中,自动去重传入的群组ID,避免重复关联。
This commit is contained in:
webjoin111 2025-11-11 21:44:07 +08:00
parent 2581e335af
commit 412654d165
2 changed files with 135 additions and 10 deletions

View File

@ -176,6 +176,16 @@ tag_cmd = on_alconna(
help_text="删除标签",
),
Subcommand("clear", help_text="清空所有标签"),
Subcommand(
"clone",
Args["source_name", str]["new_name", str],
Option("--add", Args["add_groups", MultiVar(str)]),
Option("--remove", Args["remove_groups", MultiVar(str)]),
Option("--as-dynamic", action=store_true),
Option("--desc", Args["description", str]),
Option("--mode", Args["mode", ["black", "white"]]),
help_text="克隆标签",
),
),
permission=SUPERUSER,
priority=5,
@ -269,17 +279,24 @@ async def handle_create(
).finish()
try:
gids_to_create = None
unique_gids_count = 0
if group_ids.available:
unique_gids = list(dict.fromkeys(group_ids.result))
gids_to_create = unique_gids
unique_gids_count = len(unique_gids)
tag = await tag_manager.create_tag(
name=name.result,
is_blacklist=blacklist.result,
description=description.result if description.available else None,
group_ids=group_ids.result if group_ids.available else None,
group_ids=gids_to_create,
tag_type=ttype,
dynamic_rule=rule.result if rule.available else None,
)
msg = f"标签 '{tag.name}' 创建成功!"
if group_ids.available:
msg += f"\n已同时关联 {len(group_ids.result)} 个群组。"
msg += f"\n已同时关联 {unique_gids_count} 个群组。"
await MessageUtils.build_message(msg).finish()
except IntegrityError:
await MessageUtils.build_message(
@ -411,3 +428,41 @@ async def handle_clear():
await MessageUtils.build_message(f"操作完成,已清空 {count} 个标签。").finish()
else:
await MessageUtils.build_message("操作已取消。").finish()
@tag_cmd.assign("clone")
async def handle_clone(
bot: Bot,
source_name: Match[str],
new_name: Match[str],
add_groups: Query[list[str] | None] = AlconnaQuery("clone.add.add_groups", None),
remove_groups: Query[list[str] | None] = AlconnaQuery(
"clone.remove.remove_groups", None
),
as_dynamic: Query[bool] = AlconnaQuery("clone.as-dynamic.value", False),
description: Query[str | None] = AlconnaQuery("clone.desc.description", None),
mode: Query[str | None] = AlconnaQuery("clone.mode.mode", None),
):
try:
new_tag = await tag_manager.clone_tag(
source_name=source_name.result,
new_name=new_name.result,
bot=bot,
add_groups=add_groups.result,
remove_groups=remove_groups.result,
as_dynamic=as_dynamic.result,
description=description.result,
mode=mode.result,
)
tag_type_str = "动态" if new_tag.tag_type == "DYNAMIC" else "静态"
group_count = 0
if new_tag.tag_type == "STATIC":
group_count = await new_tag.groups.all().count()
msg = f"✅ 成功克隆标签!\n- 新标签: {new_tag.name}\n- 类型: {tag_type_str}"
if new_tag.tag_type == "STATIC":
msg += f" (含 {group_count} 个群组)"
await MessageUtils.build_message(msg).finish()
except (ValueError, IntegrityError) as e:
await MessageUtils.build_message(f"克隆失败: {e}").finish()

View File

@ -156,8 +156,9 @@ class TagManager:
dynamic_rule=dynamic_rule,
)
if group_ids:
unique_group_ids = list(dict.fromkeys(group_ids))
await GroupTagLink.bulk_create(
[GroupTagLink(tag=tag, group_id=gid) for gid in group_ids]
[GroupTagLink(tag=tag, group_id=gid) for gid in unique_group_ids]
)
return tag
@ -186,11 +187,12 @@ class TagManager:
if tag.tag_type == "DYNAMIC":
raise ValueError("不能向动态标签手动添加群组。")
unique_group_ids = list(dict.fromkeys(group_ids))
await GroupTagLink.bulk_create(
[GroupTagLink(tag=tag, group_id=gid) for gid in group_ids],
[GroupTagLink(tag=tag, group_id=gid) for gid in unique_group_ids],
ignore_conflicts=True,
)
return len(group_ids)
return len(unique_group_ids)
@invalidate_on_change
async def remove_groups_from_tag(self, name: str, group_ids: list[str]) -> int:
@ -205,6 +207,72 @@ class TagManager:
).delete()
return deleted_count
@invalidate_on_change
async def clone_tag(
self,
source_name: str,
new_name: str,
bot: Bot,
add_groups: list[str] | None = None,
remove_groups: list[str] | None = None,
as_dynamic: bool = False,
description: str | None = None,
mode: str | None = None,
) -> GroupTag:
"""
克隆一个标签支持动态转静态修改群组等
"""
source_tag = await GroupTag.get_or_none(name=source_name)
if not source_tag:
raise ValueError(f"源标签 '{source_name}' 不存在。")
if await GroupTag.exists(name=new_name):
raise IntegrityError(f"目标标签 '{new_name}' 已存在。")
tag_type = "STATIC"
group_ids_to_set: list[str] | None = None
dynamic_rule: str | dict | None = None
if source_tag.tag_type == "STATIC":
if as_dynamic:
raise ValueError("不能将静态标签克隆为动态标签。")
group_ids_to_set = await GroupTagLink.filter(tag=source_tag).values_list( # type: ignore
"group_id", flat=True
)
else:
if as_dynamic:
tag_type = "DYNAMIC"
dynamic_rule = source_tag.dynamic_rule
if add_groups or remove_groups:
raise ValueError(
"克隆为动态标签时,不支持 --add 或 --remove 操作。"
)
else:
group_ids_to_set = await self.resolve_tag_to_group_ids(
source_name, bot=bot
)
if group_ids_to_set is not None:
final_group_set = set(group_ids_to_set)
if add_groups:
final_group_set.update(add_groups)
if remove_groups:
final_group_set.difference_update(remove_groups)
group_ids_to_set = list(final_group_set)
is_blacklist = (
(mode == "black") if mode is not None else source_tag.is_blacklist
)
return await self.create_tag(
name=new_name,
is_blacklist=is_blacklist,
description=description,
group_ids=group_ids_to_set,
tag_type=tag_type,
dynamic_rule=dynamic_rule,
)
async def list_tags_with_counts(self) -> list[dict]:
"""列出所有标签及其关联的群组数量。"""
tags = await GroupTag.all().prefetch_related("groups")
@ -514,11 +582,13 @@ class TagManager:
raise ValueError("不能为动态标签设置静态群组列表。")
async with in_transaction():
await GroupTagLink.filter(tag=tag).delete()
await GroupTagLink.bulk_create(
[GroupTagLink(tag=tag, group_id=gid) for gid in group_ids],
ignore_conflicts=True,
)
return len(group_ids)
unique_group_ids = list(dict.fromkeys(group_ids))
if unique_group_ids:
await GroupTagLink.bulk_create(
[GroupTagLink(tag=tag, group_id=gid) for gid in unique_group_ids],
ignore_conflicts=True,
)
return len(unique_group_ids)
@invalidate_on_change
async def clear_all_tags(self) -> int: