From a04475abc1087d63fd453b444535ae5b1b70fb13 Mon Sep 17 00:00:00 2001 From: HibiKier <45528451+HibiKier@users.noreply.github.com> Date: Sat, 21 Sep 2024 23:07:00 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20=E9=87=8D=E6=9E=84qq=E7=BE=A4?= =?UTF-8?q?=E4=BA=8B=E4=BB=B6=E5=A4=84=E7=90=86=20(#1643)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../builtin_plugins/platform/qq/exception.py | 11 + .../platform/qq/group_handle/__init__.py | 125 ++++++++ .../platform/qq/group_handle/data_source.py | 294 ++++++++++++++++++ zhenxun/utils/message.py | 25 ++ 4 files changed, 455 insertions(+) create mode 100644 zhenxun/builtin_plugins/platform/qq/exception.py create mode 100644 zhenxun/builtin_plugins/platform/qq/group_handle/__init__.py create mode 100644 zhenxun/builtin_plugins/platform/qq/group_handle/data_source.py diff --git a/zhenxun/builtin_plugins/platform/qq/exception.py b/zhenxun/builtin_plugins/platform/qq/exception.py new file mode 100644 index 00000000..87e14676 --- /dev/null +++ b/zhenxun/builtin_plugins/platform/qq/exception.py @@ -0,0 +1,11 @@ +class ForceAddGroupError(Exception): + """ + 强制拉群 + """ + + def __init__(self, info: str): + super().__init__(self) + self._info = info + + def get_info(self) -> str: + return self._info diff --git a/zhenxun/builtin_plugins/platform/qq/group_handle/__init__.py b/zhenxun/builtin_plugins/platform/qq/group_handle/__init__.py new file mode 100644 index 00000000..8e2ac040 --- /dev/null +++ b/zhenxun/builtin_plugins/platform/qq/group_handle/__init__.py @@ -0,0 +1,125 @@ +from nonebot.adapters import Bot +from nonebot import on_notice, on_request +from nonebot.plugin import PluginMetadata +from nonebot.adapters.onebot.v11 import ( + GroupDecreaseNoticeEvent, + GroupIncreaseNoticeEvent, +) +from nonebot.adapters.onebot.v12 import ( + GroupMemberDecreaseEvent, + GroupMemberIncreaseEvent, +) + +from zhenxun.utils.enum import PluginType +from zhenxun.utils.platform import PlatformUtils +from zhenxun.utils.common_utils import CommonUtils +from zhenxun.configs.config import Config, BotConfig +from zhenxun.models.group_console import GroupConsole +from zhenxun.configs.utils import Task, RegisterConfig, PluginExtraData + +from .data_source import GroupManager +from ..exception import ForceAddGroupError + +__plugin_meta__ = PluginMetadata( + name="QQ群事件处理", + description="群事件处理", + usage="", + extra=PluginExtraData( + author="HibiKier", + version="0.1", + plugin_type=PluginType.HIDDEN, + configs=[ + RegisterConfig( + module="invite_manager", + key="message", + value=f"请不要未经同意就拉{BotConfig.self_nickname}入群!告辞!", + help="强制拉群后进群回复的内容", + ), + RegisterConfig( + module="invite_manager", + key="flag", + value=True, + help="强制拉群后进群退出并回复内容", + default_value=True, + type=bool, + ), + RegisterConfig( + module="invite_manager", + key="welcome_msg_cd", + value=5, + help="群欢迎消息cd", + default_value=5, + type=int, + ), + RegisterConfig( + module="_task", + key="DEFAULT_GROUP_WELCOME", + value=True, + help="被动 进群欢迎 进群默认开关状态", + default_value=True, + type=bool, + ), + RegisterConfig( + module="_task", + key="DEFAULT_REFUND_GROUP_REMIND", + value=True, + help="被动 退群提醒 进群默认开关状态", + default_value=True, + type=bool, + ), + ], + tasks=[ + Task(module="group_welcome", name="进群欢迎"), + Task(module="refund_group_remind", name="退群提醒"), + ], + ).dict(), +) + + +base_config = Config.get("invite_manager") + + +limit_cd = base_config.get("welcome_msg_cd") + + +group_increase_handle = on_notice(priority=1, block=False) +"""群员增加处理""" +group_decrease_handle = on_notice(priority=1, block=False) +"""群员减少处理""" +add_group = on_request(priority=1, block=False) +"""加群同意请求""" + + +@group_increase_handle.handle() +async def _(bot: Bot, event: GroupIncreaseNoticeEvent | GroupMemberIncreaseEvent): + user_id = str(event.user_id) + group_id = str(event.group_id) + if user_id == bot.self_id: + """新成员为bot本身""" + group, _ = await GroupConsole.get_or_create( + group_id=group_id, channel_id__isnull=True + ) + if group.group_flag == 0: + try: + await GroupManager.add_bot(bot, str(event.operator_id), group_id, group) + except ForceAddGroupError as e: + await PlatformUtils.send_superuser(bot, e.get_info()) + else: + await GroupManager.add_user(bot, user_id, group_id) + + +@group_decrease_handle.handle() +async def _(bot: Bot, event: GroupDecreaseNoticeEvent | GroupMemberDecreaseEvent): + user_id = str(event.user_id) + group_id = str(event.group_id) + if event.sub_type == "kick_me": + """踢出Bot""" + await GroupManager.kick_bot(bot, user_id, group_id) + elif event.sub_type in ["leave", "kick"]: + result = await GroupManager.run_user( + bot, user_id, group_id, str(event.operator_id), event.sub_type + ) + if result and not await CommonUtils.task_is_block( + "refund_group_remind", group_id + ): + await group_decrease_handle.send(result) diff --git a/zhenxun/builtin_plugins/platform/qq/group_handle/data_source.py b/zhenxun/builtin_plugins/platform/qq/group_handle/data_source.py new file mode 100644 index 00000000..87b34c82 --- /dev/null +++ b/zhenxun/builtin_plugins/platform/qq/group_handle/data_source.py @@ -0,0 +1,294 @@ +import os +import re +import random +from pathlib import Path +from datetime import datetime + +import ujson as json +from nonebot.adapters import Bot +from nonebot_plugin_alconna import At + +from zhenxun.services.log import logger +from zhenxun.configs.config import Config +from zhenxun.utils.utils import FreqLimiter +from zhenxun.utils.message import MessageUtils +from zhenxun.models.fg_request import FgRequest +from zhenxun.models.level_user import LevelUser +from zhenxun.utils.enum import RequestHandleType +from zhenxun.utils.platform import PlatformUtils +from zhenxun.models.plugin_info import PluginInfo +from zhenxun.utils.common_utils import CommonUtils +from zhenxun.models.group_console import GroupConsole +from zhenxun.models.group_member_info import GroupInfoUser +from zhenxun.configs.path_config import DATA_PATH, IMAGE_PATH + +from ..exception import ForceAddGroupError + +base_config = Config.get("invite_manager") + +limit_cd = base_config.get("welcome_msg_cd") + +WELCOME_PATH = DATA_PATH / "welcome_message" / "qq" + +DEFAULT_IMAGE_PATH = IMAGE_PATH / "qxz" + + +class GroupManager: + _flmt = FreqLimiter(limit_cd) + + @classmethod + async def __handle_add_group( + cls, bot: Bot, group_id: str, group: GroupConsole | None + ): + """允许群组并设置群认证,默认群功能开关 + + 参数: + bot: Bot + group_id: 群组id + group: GroupConsole + """ + if group: + await GroupConsole.filter( + group_id=group_id, channel_id__isnull=True + ).update(group_flag=1) + else: + block_plugin = "" + if plugin_list := await PluginInfo.filter(default_status=False).all(): + for plugin in plugin_list: + block_plugin += f"{plugin.module}," + group_info = await bot.get_group_info(group_id=group_id) + await GroupConsole.create( + group_id=group_info["group_id"], + group_name=group_info["group_name"], + max_member_count=group_info["max_member_count"], + member_count=group_info["member_count"], + group_flag=1, + block_plugin=block_plugin, + platform="qq", + ) + + @classmethod + async def __refresh_level(cls, bot: Bot, group_id: str): + """刷新权限 + + 参数: + bot: Bot + group_id: 群组id + """ + admin_default_auth = Config.get_config("admin_bot_manage", "ADMIN_DEFAULT_AUTH") + member_list = await bot.get_group_member_list(group_id=group_id) + member_id_list = [str(user_info["user_id"]) for user_info in member_list] + flag2u = await LevelUser.filter( + user_id__in=member_id_list, group_id=group_id, group_flag=1 + ).values_list("user_id", flat=True) + # 即刻刷新权限 + for user_info in member_list: + user_id = user_info["user_id"] + role = user_info["role"] + if user_id in bot.config.superusers: + await LevelUser.set_level(user_id, user_info["group_id"], 9) + logger.debug( + "添加超级用户权限: 9", + "入群检测", + session=user_id, + group_id=user_info["group_id"], + ) + elif ( + admin_default_auth is not None + and role in ["owner", "admin"] + and user_id not in flag2u + ): + await LevelUser.set_level( + user_id, + user_info["group_id"], + admin_default_auth, + ) + logger.debug( + f"添加默认群管理员权限: {admin_default_auth}", + "入群检测", + session=user_id, + group_id=user_info["group_id"], + ) + + @classmethod + async def add_bot( + cls, bot: Bot, operator_id: str, group_id: str, group: GroupConsole | None + ): + """拉入bot + + 参数: + bot: Bot + operator_id: 操作者id + group_id: 群组id + group: GroupConsole + """ + if base_config.get("flag") and operator_id not in bot.config.superusers: + """退出群组""" + try: + if result_msg := base_config.get("message"): + await bot.send_group_msg(group_id=int(group_id), message=result_msg) + await bot.set_group_leave(group_id=int(group_id)) + logger.info( + "强制拉群或未有群信息,退出群聊成功", "入群检测", group_id=group_id + ) + await FgRequest.filter( + group_id=group_id, handle_type__isnull=True + ).update(handle_type=RequestHandleType.IGNORE) + except Exception as e: + logger.error( + "强制拉群或未有群信息,退出群聊失败", + "入群检测", + group_id=group_id, + e=e, + ) + raise ForceAddGroupError("强制拉群或未有群信息,退出群聊失败...") from e + await GroupConsole.filter(group_id=group_id).delete() + raise ForceAddGroupError(f"触发强制入群保护,已成功退出群聊 {group_id}...") + else: + await cls.__handle_add_group(bot, group_id, group) + """刷新群管理员权限""" + await cls.__refresh_level(bot, group_id) + + @classmethod + def __build_welcome_message(cls, user_id: str, path: Path) -> list[At | Path | str]: + """构造群欢迎消息 + + 参数: + user_id: 用户id + path: 群欢迎消息存储路径 + + 返回: + list[At | Path | str]: 消息列表 + """ + file = path / "text.json" + data = json.load(file.open(encoding="utf-8")) + message = data["message"] + msg_split = re.split(r"\[image:\d+\]", message) + msg_list = [] + if data["at"]: + msg_list.append(At(flag="user", target=user_id)) + for i, text in enumerate(msg_split): + msg_list.append(text) + img_file = path / f"{i}.png" + if img_file.exists(): + msg_list.append(img_file) + return msg_list + + @classmethod + async def __send_welcome_message(cls, user_id: str, group_id: str): + """发送群欢迎消息 + + 参数: + user_id: 用户id + group_id: 群组id + """ + cls._flmt.start_cd(group_id) + path = WELCOME_PATH / f"{group_id}" + file = path / "text.json" + if file.exists(): + msg_list = cls.__build_welcome_message(user_id, path) + logger.info("发送群欢迎消息...", "入群检测", group_id=group_id) + if msg_list: + await MessageUtils.build_message(msg_list).send() # type: ignore + else: + image = DEFAULT_IMAGE_PATH / random.choice( + os.listdir(DEFAULT_IMAGE_PATH) + ) + await MessageUtils.build_message( + [ + "新人快跑啊!!本群现状↓(快使用自定义!)", + image, + ] + ).send() + + @classmethod + async def add_user(cls, bot: Bot, user_id: str, group_id: str): + """拉入用户 + + 参数: + bot: Bot + user_id: 用户id + group_id: 群组id + """ + join_time = datetime.now() + user_info = await bot.get_group_member_info(group_id=group_id, user_id=user_id) + await GroupInfoUser.update_or_create( + user_id=str(user_info["user_id"]), + group_id=str(user_info["group_id"]), + defaults={"user_name": user_info["nickname"], "user_join_time": join_time}, + ) + logger.info(f"用户{user_info['user_id']} 所属{user_info['group_id']} 更新成功") + if not await CommonUtils.task_is_block( + "group_welcome", group_id + ) and cls._flmt.check(group_id): + await cls.__send_welcome_message(user_id, group_id) + + @classmethod + async def kick_bot(cls, bot: Bot, group_id: str, operator_id: str): + """踢出bot + + 参数: + bot: Bot + group_id: 群组id + operator_id: 操作员id + """ + if user := await GroupInfoUser.get_or_none( + user_id=operator_id, group_id=group_id + ): + operator_name = user.user_name + else: + operator_name = "None" + group = await GroupConsole.get_group(group_id) + group_name = group.group_name if group else "" + if group: + await group.delete() + await PlatformUtils.send_superuser( + bot, + f"****呜..一份踢出报告****\n" + f"我被 {operator_name}({operator_id})\n" + f"踢出了 {group_name}({group_id})\n" + f"日期:{str(datetime.now()).split('.')[0]}", + ) + + @classmethod + async def run_user( + cls, + bot: Bot, + user_id: str, + group_id: str, + operator_id: str, + sub_type: str, + ) -> str | None: + """踢出用户或用户离开 + + 参数: + bot: Bot + user_id: 用户id + group_id: 群组id + operator_id: 操作员id + sub_type: 类型 + + 返回: + str | None: 返回消息 + """ + if user := await GroupInfoUser.get_or_none(user_id=user_id, group_id=group_id): + user_name = user.user_name + else: + user_name = f"{user_id}" + if user: + await user.delete() + logger.info( + f"名称: {user_name} 退出群聊", + "group_decrease_handle", + session=user_id, + group_id=group_id, + ) + if sub_type == "kick": + operator = await bot.get_group_member_info( + user_id=int(operator_id), group_id=int(group_id) + ) + operator_name = operator["card"] or operator["nickname"] + return f"{user_name} 被 {operator_name} 送走了." + elif sub_type == "leave": + return f"{user_name}离开了我们..." + return None diff --git a/zhenxun/utils/message.py b/zhenxun/utils/message.py index f2fc9153..34296777 100644 --- a/zhenxun/utils/message.py +++ b/zhenxun/utils/message.py @@ -147,3 +147,28 @@ class MessageUtils: s = str(r_list) forward_data.append(s) return cls.custom_forward_msg(forward_data, uni) + + @classmethod + def template2alc(cls, msg_list: list[MessageSegment]) -> list: + """模板转alc + + 参数: + msg_list: 消息列表 + + 返回: + list: alc模板 + """ + forward_data = [] + for msg in msg_list: + if isinstance(msg, str): + forward_data.append(Text(msg)) + elif msg.type == "at": + if msg.data["qq"] == "0": + forward_data.append(AtAll()) + else: + forward_data.append(At(flag="user", target=msg.data["qq"])) + elif msg.type == "image": + forward_data.append(Image(url=msg.data["file"] or msg.data["url"])) + elif msg.type == "text" and msg.data["text"]: + forward_data.append(Text(msg.data["text"])) + return forward_data