From f1354c626497626971bd2003c175126fcebc4a92 Mon Sep 17 00:00:00 2001 From: HibiKier <45528451+HibiKier@users.noreply.github.com> Date: Sat, 14 Sep 2024 14:23:19 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20=E4=BF=AE=E5=A4=8D=E7=BE=A4?= =?UTF-8?q?=E6=9D=83=E9=99=90=E4=B8=8E=E6=8F=92=E4=BB=B6=E7=AD=89=E7=BA=A7?= =?UTF-8?q?=E5=8C=B9=E9=85=8D=20(#1627)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../builtin_plugins/hooks/_auth_checker.py | 195 +++++++++--------- 1 file changed, 97 insertions(+), 98 deletions(-) diff --git a/zhenxun/builtin_plugins/hooks/_auth_checker.py b/zhenxun/builtin_plugins/hooks/_auth_checker.py index efffa236..df18e6a3 100644 --- a/zhenxun/builtin_plugins/hooks/_auth_checker.py +++ b/zhenxun/builtin_plugins/hooks/_auth_checker.py @@ -1,35 +1,34 @@ -from nonebot.adapters import Bot, Event -from nonebot.adapters.onebot.v11 import PokeNotifyEvent -from nonebot.exception import IgnoredException -from nonebot.matcher import Matcher -from nonebot_plugin_alconna import At, UniMsg -from nonebot_plugin_session import EventSession from pydantic import BaseModel +from nonebot.matcher import Matcher +from nonebot.adapters import Bot, Event +from nonebot_plugin_alconna import At, UniMsg +from nonebot.exception import IgnoredException from tortoise.exceptions import IntegrityError +from nonebot_plugin_session import EventSession +from nonebot.adapters.onebot.v11 import PokeNotifyEvent +from zhenxun.services.log import logger from zhenxun.configs.config import Config -from zhenxun.models.group_console import GroupConsole +from zhenxun.utils.message import MessageUtils from zhenxun.models.level_user import LevelUser from zhenxun.models.plugin_info import PluginInfo from zhenxun.models.plugin_limit import PluginLimit from zhenxun.models.user_console import UserConsole -from zhenxun.services.log import logger +from zhenxun.utils.exception import InsufficientGold +from zhenxun.models.group_console import GroupConsole +from zhenxun.utils.utils import FreqLimiter, CountLimiter, UserBlockLimiter from zhenxun.utils.enum import ( BlockType, GoldHandle, + PluginType, LimitWatchType, PluginLimitType, - PluginType, ) -from zhenxun.utils.exception import InsufficientGold -from zhenxun.utils.message import MessageUtils -from zhenxun.utils.utils import CountLimiter, FreqLimiter, UserBlockLimiter base_config = Config.get("hook") class Limit(BaseModel): - limit: PluginLimit limiter: FreqLimiter | UserBlockLimiter | CountLimiter @@ -38,12 +37,11 @@ class Limit(BaseModel): class LimitManage: + add_module = [] # noqa: RUF012 - add_module = [] - - cd_limit: dict[str, Limit] = {} - block_limit: dict[str, Limit] = {} - count_limit: dict[str, Limit] = {} + cd_limit: dict[str, Limit] = {} # noqa: RUF012 + block_limit: dict[str, Limit] = {} # noqa: RUF012 + count_limit: dict[str, Limit] = {} # noqa: RUF012 @classmethod def add_limit(cls, limit: PluginLimit): @@ -136,33 +134,34 @@ class LimitManage: 异常: IgnoredException: IgnoredException """ - if limit_model: - limit = limit_model.limit - limiter = limit_model.limiter - is_limit = ( - LimitWatchType.ALL - or (group_id and limit.watch_type == LimitWatchType.GROUP) - or (not group_id and limit.watch_type == LimitWatchType.USER) + if not limit_model: + return + limit = limit_model.limit + limiter = limit_model.limiter + is_limit = ( + LimitWatchType.ALL + or (group_id and limit.watch_type == LimitWatchType.GROUP) + or (not group_id and limit.watch_type == LimitWatchType.USER) + ) + key_type = user_id + if group_id and limit.watch_type == LimitWatchType.GROUP: + key_type = channel_id or group_id + if is_limit and not limiter.check(key_type): + if limit.result: + await MessageUtils.build_message(limit.result).send() + logger.debug( + f"{limit.module}({limit.limit_type}) 正在限制中...", + "HOOK", + session=session, ) - key_type = user_id - if group_id and limit.watch_type == LimitWatchType.GROUP: - key_type = channel_id or group_id - if is_limit and not limiter.check(key_type): - if limit.result: - await MessageUtils.build_message(limit.result).send() - logger.debug( - f"{limit.module}({limit.limit_type}) 正在限制中...", - "HOOK", - session=session, - ) - raise IgnoredException(f"{limit.module} 正在限制中...") - else: - if isinstance(limiter, FreqLimiter): - limiter.start_cd(key_type) - if isinstance(limiter, UserBlockLimiter): - limiter.set_true(key_type) - if isinstance(limiter, CountLimiter): - limiter.increase(key_type) + raise IgnoredException(f"{limit.module} 正在限制中...") + else: + if isinstance(limiter, FreqLimiter): + limiter.start_cd(key_type) + if isinstance(limiter, UserBlockLimiter): + limiter.set_true(key_type) + if isinstance(limiter, CountLimiter): + limiter.increase(key_type) class IsSuperuserException(Exception): @@ -196,11 +195,7 @@ class AuthChecker: return False if plugin.plugin_type == PluginType.DEPENDANT: return False - if not self._flmt_s.check(sid): - return False - if plugin.module == "ai": - return False - return True + return plugin.module != "ai" if self._flmt_s.check(sid) else False async def auth( self, @@ -255,7 +250,7 @@ class AuthChecker: await self.auth_limit(plugin, session) except IsSuperuserException: logger.debug( - f"超级用户或被ban跳过权限检测...", "HOOK", session=session + "超级用户或被ban跳过权限检测...", "HOOK", session=session ) except IgnoredException: is_ignore = True @@ -313,14 +308,13 @@ class AuthChecker: plugin: PluginInfo session: EventSession """ - user_id = session.id1 group_id = session.id3 channel_id = session.id2 - is_poke = isinstance(event, PokeNotifyEvent) if not group_id: group_id = channel_id channel_id = None - if user_id: + if user_id := session.id1: + is_poke = isinstance(event, PokeNotifyEvent) if group_id: sid = group_id or user_id if await GroupConsole.is_super_block_plugin( @@ -393,9 +387,8 @@ class AuthChecker: raise IgnoredException("该插件在私聊中已被禁用...") if not plugin.status and plugin.block_type == BlockType.ALL: """全局状态""" - if group_id: - if await GroupConsole.is_super_group(group_id): - raise IsSuperuserException() + if group_id and await GroupConsole.is_super_group(group_id): + raise IsSuperuserException() logger.debug( f"{plugin.name}({plugin.module}) 全局未开启此功能...", "HOOK", @@ -414,9 +407,8 @@ class AuthChecker: session: EventSession """ user_id = session.id1 - group_id = session.id3 or session.id2 if user_id and plugin.admin_level: - if group_id: + if group_id := session.id3 or session.id2: if not await LevelUser.check_level( user_id, group_id, plugin.admin_level ): @@ -426,7 +418,8 @@ class AuthChecker: await MessageUtils.build_message( [ At(flag="user", target=user_id), - f"你的权限不足喔,该功能需要的权限等级: {plugin.admin_level}", + f"你的权限不足喔," + f"该功能需要的权限等级: {plugin.admin_level}", ] ).send(reply_to=True) except Exception as e: @@ -439,22 +432,21 @@ class AuthChecker: session=session, ) raise IgnoredException("管理员权限不足...") - else: - if not await LevelUser.check_level(user_id, None, plugin.admin_level): - try: - await MessageUtils.build_message( - f"你的权限不足喔,该功能需要的权限等级: {plugin.admin_level}" - ).send() - except Exception as e: - logger.error( - "auth_admin 发送消息失败", "HOOK", session=session, e=e - ) - logger.debug( - f"{plugin.name}({plugin.module}) 管理员权限不足...", - "HOOK", - session=session, + elif not await LevelUser.check_level(user_id, None, plugin.admin_level): + try: + await MessageUtils.build_message( + f"你的权限不足喔,该功能需要的权限等级: {plugin.admin_level}" + ).send() + except Exception as e: + logger.error( + "auth_admin 发送消息失败", "HOOK", session=session, e=e ) - raise IgnoredException("权限不足") + logger.debug( + f"{plugin.name}({plugin.module}) 管理员权限不足...", + "HOOK", + session=session, + ) + raise IgnoredException("权限不足") async def auth_group( self, plugin: PluginInfo, session: EventSession, message: UniMsg @@ -466,29 +458,35 @@ class AuthChecker: session: EventSession message: UniMsg """ - if group_id := session.id3 or session.id2: - text = message.extract_plain_text() - group = await GroupConsole.get_group(group_id) - if not group: - """群不存在""" - raise IgnoredException("群不存在") - if group.level < 0: - """群权限小于0""" - logger.debug( - f"群黑名单, 群权限-1...", - "HOOK", - session=session, - ) - raise IgnoredException("群黑名单") - if not group.status: - """群休眠""" - if text.strip() != "醒来": - logger.debug( - f"群休眠状态...", - "HOOK", - session=session, - ) - raise IgnoredException("群休眠状态") + if not (group_id := session.id3 or session.id2): + return + text = message.extract_plain_text() + group = await GroupConsole.get_group(group_id) + if not group: + """群不存在""" + raise IgnoredException("群不存在") + if group.level < 0: + """群权限小于0""" + logger.debug( + "群黑名单, 群权限-1...", + "HOOK", + session=session, + ) + raise IgnoredException("群黑名单") + if not group.status: + """群休眠""" + if text.strip() != "醒来": + logger.debug("群休眠状态...", "HOOK", session=session) + raise IgnoredException("群休眠状态") + if plugin.level > group.level: + """插件等级大于群等级""" + logger.debug( + f"{plugin.name}({plugin.module}) 群等级限制.." + f"该功能需要的群等级: {plugin.level}..", + "HOOK", + session=session, + ) + raise IgnoredException(f"{plugin.name}({plugin.module}) 群等级限制...") async def auth_cost( self, user: UserConsole, plugin: PluginInfo, session: EventSession @@ -512,7 +510,8 @@ class AuthChecker: except Exception as e: logger.error("auth_cost 发送消息失败", "HOOK", session=session, e=e) logger.debug( - f"{plugin.name}({plugin.module}) 金币限制..该功能需要{plugin.cost_gold}金币..", + f"{plugin.name}({plugin.module}) 金币限制.." + f"该功能需要{plugin.cost_gold}金币..", "HOOK", session=session, )