From 412654d165ab4f4342687b26bededd915039103c Mon Sep 17 00:00:00 2001 From: webjoin111 <455457521@qq.com> Date: Tue, 11 Nov 2025 21:44:07 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat(tag):=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E6=A0=87=E7=AD=BE=E5=85=8B=E9=9A=86=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 `tag clone <源标签名> <新标签名>` 命令,用于复制现有标签。 - 【优化】在 `tag create`, `tag edit --add`, `tag edit --set` 命令中,自动去重传入的群组ID,避免重复关联。 --- .../builtin_plugins/superuser/tag_manage.py | 59 ++++++++++++- zhenxun/services/tags/manager.py | 86 +++++++++++++++++-- 2 files changed, 135 insertions(+), 10 deletions(-) diff --git a/zhenxun/builtin_plugins/superuser/tag_manage.py b/zhenxun/builtin_plugins/superuser/tag_manage.py index f934531b..5a0c056b 100644 --- a/zhenxun/builtin_plugins/superuser/tag_manage.py +++ b/zhenxun/builtin_plugins/superuser/tag_manage.py @@ -176,6 +176,16 @@ tag_cmd = on_alconna( help_text="删除标签", ), Subcommand("clear", help_text="清空所有标签"), + Subcommand( + "clone", + Args["source_name", str]["new_name", str], + Option("--add", Args["add_groups", MultiVar(str)]), + Option("--remove", Args["remove_groups", MultiVar(str)]), + Option("--as-dynamic", action=store_true), + Option("--desc", Args["description", str]), + Option("--mode", Args["mode", ["black", "white"]]), + help_text="克隆标签", + ), ), permission=SUPERUSER, priority=5, @@ -269,17 +279,24 @@ async def handle_create( ).finish() try: + gids_to_create = None + unique_gids_count = 0 + if group_ids.available: + unique_gids = list(dict.fromkeys(group_ids.result)) + gids_to_create = unique_gids + unique_gids_count = len(unique_gids) + tag = await tag_manager.create_tag( name=name.result, is_blacklist=blacklist.result, description=description.result if description.available else None, - group_ids=group_ids.result if group_ids.available else None, + group_ids=gids_to_create, tag_type=ttype, dynamic_rule=rule.result if rule.available else None, ) msg = f"标签 '{tag.name}' 创建成功!" if group_ids.available: - msg += f"\n已同时关联 {len(group_ids.result)} 个群组。" + msg += f"\n已同时关联 {unique_gids_count} 个群组。" await MessageUtils.build_message(msg).finish() except IntegrityError: await MessageUtils.build_message( @@ -411,3 +428,41 @@ async def handle_clear(): await MessageUtils.build_message(f"操作完成,已清空 {count} 个标签。").finish() else: await MessageUtils.build_message("操作已取消。").finish() + + +@tag_cmd.assign("clone") +async def handle_clone( + bot: Bot, + source_name: Match[str], + new_name: Match[str], + add_groups: Query[list[str] | None] = AlconnaQuery("clone.add.add_groups", None), + remove_groups: Query[list[str] | None] = AlconnaQuery( + "clone.remove.remove_groups", None + ), + as_dynamic: Query[bool] = AlconnaQuery("clone.as-dynamic.value", False), + description: Query[str | None] = AlconnaQuery("clone.desc.description", None), + mode: Query[str | None] = AlconnaQuery("clone.mode.mode", None), +): + try: + new_tag = await tag_manager.clone_tag( + source_name=source_name.result, + new_name=new_name.result, + bot=bot, + add_groups=add_groups.result, + remove_groups=remove_groups.result, + as_dynamic=as_dynamic.result, + description=description.result, + mode=mode.result, + ) + + tag_type_str = "动态" if new_tag.tag_type == "DYNAMIC" else "静态" + group_count = 0 + if new_tag.tag_type == "STATIC": + group_count = await new_tag.groups.all().count() + + msg = f"✅ 成功克隆标签!\n- 新标签: {new_tag.name}\n- 类型: {tag_type_str}" + if new_tag.tag_type == "STATIC": + msg += f" (含 {group_count} 个群组)" + await MessageUtils.build_message(msg).finish() + except (ValueError, IntegrityError) as e: + await MessageUtils.build_message(f"克隆失败: {e}").finish() diff --git a/zhenxun/services/tags/manager.py b/zhenxun/services/tags/manager.py index 8c914baa..e26d6b7e 100644 --- a/zhenxun/services/tags/manager.py +++ b/zhenxun/services/tags/manager.py @@ -156,8 +156,9 @@ class TagManager: dynamic_rule=dynamic_rule, ) if group_ids: + unique_group_ids = list(dict.fromkeys(group_ids)) await GroupTagLink.bulk_create( - [GroupTagLink(tag=tag, group_id=gid) for gid in group_ids] + [GroupTagLink(tag=tag, group_id=gid) for gid in unique_group_ids] ) return tag @@ -186,11 +187,12 @@ class TagManager: if tag.tag_type == "DYNAMIC": raise ValueError("不能向动态标签手动添加群组。") + unique_group_ids = list(dict.fromkeys(group_ids)) await GroupTagLink.bulk_create( - [GroupTagLink(tag=tag, group_id=gid) for gid in group_ids], + [GroupTagLink(tag=tag, group_id=gid) for gid in unique_group_ids], ignore_conflicts=True, ) - return len(group_ids) + return len(unique_group_ids) @invalidate_on_change async def remove_groups_from_tag(self, name: str, group_ids: list[str]) -> int: @@ -205,6 +207,72 @@ class TagManager: ).delete() return deleted_count + @invalidate_on_change + async def clone_tag( + self, + source_name: str, + new_name: str, + bot: Bot, + add_groups: list[str] | None = None, + remove_groups: list[str] | None = None, + as_dynamic: bool = False, + description: str | None = None, + mode: str | None = None, + ) -> GroupTag: + """ + 克隆一个标签,支持动态转静态、修改群组等。 + """ + source_tag = await GroupTag.get_or_none(name=source_name) + if not source_tag: + raise ValueError(f"源标签 '{source_name}' 不存在。") + + if await GroupTag.exists(name=new_name): + raise IntegrityError(f"目标标签 '{new_name}' 已存在。") + + tag_type = "STATIC" + group_ids_to_set: list[str] | None = None + dynamic_rule: str | dict | None = None + + if source_tag.tag_type == "STATIC": + if as_dynamic: + raise ValueError("不能将静态标签克隆为动态标签。") + group_ids_to_set = await GroupTagLink.filter(tag=source_tag).values_list( # type: ignore + "group_id", flat=True + ) + else: + if as_dynamic: + tag_type = "DYNAMIC" + dynamic_rule = source_tag.dynamic_rule + if add_groups or remove_groups: + raise ValueError( + "克隆为动态标签时,不支持 --add 或 --remove 操作。" + ) + else: + group_ids_to_set = await self.resolve_tag_to_group_ids( + source_name, bot=bot + ) + + if group_ids_to_set is not None: + final_group_set = set(group_ids_to_set) + if add_groups: + final_group_set.update(add_groups) + if remove_groups: + final_group_set.difference_update(remove_groups) + group_ids_to_set = list(final_group_set) + + is_blacklist = ( + (mode == "black") if mode is not None else source_tag.is_blacklist + ) + + return await self.create_tag( + name=new_name, + is_blacklist=is_blacklist, + description=description, + group_ids=group_ids_to_set, + tag_type=tag_type, + dynamic_rule=dynamic_rule, + ) + async def list_tags_with_counts(self) -> list[dict]: """列出所有标签及其关联的群组数量。""" tags = await GroupTag.all().prefetch_related("groups") @@ -514,11 +582,13 @@ class TagManager: raise ValueError("不能为动态标签设置静态群组列表。") async with in_transaction(): await GroupTagLink.filter(tag=tag).delete() - await GroupTagLink.bulk_create( - [GroupTagLink(tag=tag, group_id=gid) for gid in group_ids], - ignore_conflicts=True, - ) - return len(group_ids) + unique_group_ids = list(dict.fromkeys(group_ids)) + if unique_group_ids: + await GroupTagLink.bulk_create( + [GroupTagLink(tag=tag, group_id=gid) for gid in unique_group_ids], + ignore_conflicts=True, + ) + return len(unique_group_ids) @invalidate_on_change async def clear_all_tags(self) -> int: