🐛 修复群权限与插件等级匹配 (#1627)

This commit is contained in:
HibiKier 2024-09-14 14:23:19 +08:00 committed by GitHub
parent 029a731fb9
commit f1354c6264
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,35 +1,34 @@
from nonebot.adapters import Bot, Event
from nonebot.adapters.onebot.v11 import PokeNotifyEvent
from nonebot.exception import IgnoredException
from nonebot.matcher import Matcher
from nonebot_plugin_alconna import At, UniMsg
from nonebot_plugin_session import EventSession
from pydantic import BaseModel from pydantic import BaseModel
from nonebot.matcher import Matcher
from nonebot.adapters import Bot, Event
from nonebot_plugin_alconna import At, UniMsg
from nonebot.exception import IgnoredException
from tortoise.exceptions import IntegrityError from tortoise.exceptions import IntegrityError
from nonebot_plugin_session import EventSession
from nonebot.adapters.onebot.v11 import PokeNotifyEvent
from zhenxun.services.log import logger
from zhenxun.configs.config import Config from zhenxun.configs.config import Config
from zhenxun.models.group_console import GroupConsole from zhenxun.utils.message import MessageUtils
from zhenxun.models.level_user import LevelUser from zhenxun.models.level_user import LevelUser
from zhenxun.models.plugin_info import PluginInfo from zhenxun.models.plugin_info import PluginInfo
from zhenxun.models.plugin_limit import PluginLimit from zhenxun.models.plugin_limit import PluginLimit
from zhenxun.models.user_console import UserConsole from zhenxun.models.user_console import UserConsole
from zhenxun.services.log import logger from zhenxun.utils.exception import InsufficientGold
from zhenxun.models.group_console import GroupConsole
from zhenxun.utils.utils import FreqLimiter, CountLimiter, UserBlockLimiter
from zhenxun.utils.enum import ( from zhenxun.utils.enum import (
BlockType, BlockType,
GoldHandle, GoldHandle,
PluginType,
LimitWatchType, LimitWatchType,
PluginLimitType, PluginLimitType,
PluginType,
) )
from zhenxun.utils.exception import InsufficientGold
from zhenxun.utils.message import MessageUtils
from zhenxun.utils.utils import CountLimiter, FreqLimiter, UserBlockLimiter
base_config = Config.get("hook") base_config = Config.get("hook")
class Limit(BaseModel): class Limit(BaseModel):
limit: PluginLimit limit: PluginLimit
limiter: FreqLimiter | UserBlockLimiter | CountLimiter limiter: FreqLimiter | UserBlockLimiter | CountLimiter
@ -38,12 +37,11 @@ class Limit(BaseModel):
class LimitManage: class LimitManage:
add_module = [] # noqa: RUF012
add_module = [] cd_limit: dict[str, Limit] = {} # noqa: RUF012
block_limit: dict[str, Limit] = {} # noqa: RUF012
cd_limit: dict[str, Limit] = {} count_limit: dict[str, Limit] = {} # noqa: RUF012
block_limit: dict[str, Limit] = {}
count_limit: dict[str, Limit] = {}
@classmethod @classmethod
def add_limit(cls, limit: PluginLimit): def add_limit(cls, limit: PluginLimit):
@ -136,33 +134,34 @@ class LimitManage:
异常: 异常:
IgnoredException: IgnoredException IgnoredException: IgnoredException
""" """
if limit_model: if not limit_model:
limit = limit_model.limit return
limiter = limit_model.limiter limit = limit_model.limit
is_limit = ( limiter = limit_model.limiter
LimitWatchType.ALL is_limit = (
or (group_id and limit.watch_type == LimitWatchType.GROUP) LimitWatchType.ALL
or (not group_id and limit.watch_type == LimitWatchType.USER) or (group_id and limit.watch_type == LimitWatchType.GROUP)
or (not group_id and limit.watch_type == LimitWatchType.USER)
)
key_type = user_id
if group_id and limit.watch_type == LimitWatchType.GROUP:
key_type = channel_id or group_id
if is_limit and not limiter.check(key_type):
if limit.result:
await MessageUtils.build_message(limit.result).send()
logger.debug(
f"{limit.module}({limit.limit_type}) 正在限制中...",
"HOOK",
session=session,
) )
key_type = user_id raise IgnoredException(f"{limit.module} 正在限制中...")
if group_id and limit.watch_type == LimitWatchType.GROUP: else:
key_type = channel_id or group_id if isinstance(limiter, FreqLimiter):
if is_limit and not limiter.check(key_type): limiter.start_cd(key_type)
if limit.result: if isinstance(limiter, UserBlockLimiter):
await MessageUtils.build_message(limit.result).send() limiter.set_true(key_type)
logger.debug( if isinstance(limiter, CountLimiter):
f"{limit.module}({limit.limit_type}) 正在限制中...", limiter.increase(key_type)
"HOOK",
session=session,
)
raise IgnoredException(f"{limit.module} 正在限制中...")
else:
if isinstance(limiter, FreqLimiter):
limiter.start_cd(key_type)
if isinstance(limiter, UserBlockLimiter):
limiter.set_true(key_type)
if isinstance(limiter, CountLimiter):
limiter.increase(key_type)
class IsSuperuserException(Exception): class IsSuperuserException(Exception):
@ -196,11 +195,7 @@ class AuthChecker:
return False return False
if plugin.plugin_type == PluginType.DEPENDANT: if plugin.plugin_type == PluginType.DEPENDANT:
return False return False
if not self._flmt_s.check(sid): return plugin.module != "ai" if self._flmt_s.check(sid) else False
return False
if plugin.module == "ai":
return False
return True
async def auth( async def auth(
self, self,
@ -255,7 +250,7 @@ class AuthChecker:
await self.auth_limit(plugin, session) await self.auth_limit(plugin, session)
except IsSuperuserException: except IsSuperuserException:
logger.debug( logger.debug(
f"超级用户或被ban跳过权限检测...", "HOOK", session=session "超级用户或被ban跳过权限检测...", "HOOK", session=session
) )
except IgnoredException: except IgnoredException:
is_ignore = True is_ignore = True
@ -313,14 +308,13 @@ class AuthChecker:
plugin: PluginInfo plugin: PluginInfo
session: EventSession session: EventSession
""" """
user_id = session.id1
group_id = session.id3 group_id = session.id3
channel_id = session.id2 channel_id = session.id2
is_poke = isinstance(event, PokeNotifyEvent)
if not group_id: if not group_id:
group_id = channel_id group_id = channel_id
channel_id = None channel_id = None
if user_id: if user_id := session.id1:
is_poke = isinstance(event, PokeNotifyEvent)
if group_id: if group_id:
sid = group_id or user_id sid = group_id or user_id
if await GroupConsole.is_super_block_plugin( if await GroupConsole.is_super_block_plugin(
@ -393,9 +387,8 @@ class AuthChecker:
raise IgnoredException("该插件在私聊中已被禁用...") raise IgnoredException("该插件在私聊中已被禁用...")
if not plugin.status and plugin.block_type == BlockType.ALL: if not plugin.status and plugin.block_type == BlockType.ALL:
"""全局状态""" """全局状态"""
if group_id: if group_id and await GroupConsole.is_super_group(group_id):
if await GroupConsole.is_super_group(group_id): raise IsSuperuserException()
raise IsSuperuserException()
logger.debug( logger.debug(
f"{plugin.name}({plugin.module}) 全局未开启此功能...", f"{plugin.name}({plugin.module}) 全局未开启此功能...",
"HOOK", "HOOK",
@ -414,9 +407,8 @@ class AuthChecker:
session: EventSession session: EventSession
""" """
user_id = session.id1 user_id = session.id1
group_id = session.id3 or session.id2
if user_id and plugin.admin_level: if user_id and plugin.admin_level:
if group_id: if group_id := session.id3 or session.id2:
if not await LevelUser.check_level( if not await LevelUser.check_level(
user_id, group_id, plugin.admin_level user_id, group_id, plugin.admin_level
): ):
@ -426,7 +418,8 @@ class AuthChecker:
await MessageUtils.build_message( await MessageUtils.build_message(
[ [
At(flag="user", target=user_id), At(flag="user", target=user_id),
f"你的权限不足喔,该功能需要的权限等级: {plugin.admin_level}", f"你的权限不足喔,"
f"该功能需要的权限等级: {plugin.admin_level}",
] ]
).send(reply_to=True) ).send(reply_to=True)
except Exception as e: except Exception as e:
@ -439,22 +432,21 @@ class AuthChecker:
session=session, session=session,
) )
raise IgnoredException("管理员权限不足...") raise IgnoredException("管理员权限不足...")
else: elif not await LevelUser.check_level(user_id, None, plugin.admin_level):
if not await LevelUser.check_level(user_id, None, plugin.admin_level): try:
try: await MessageUtils.build_message(
await MessageUtils.build_message( f"你的权限不足喔,该功能需要的权限等级: {plugin.admin_level}"
f"你的权限不足喔,该功能需要的权限等级: {plugin.admin_level}" ).send()
).send() except Exception as e:
except Exception as e: logger.error(
logger.error( "auth_admin 发送消息失败", "HOOK", session=session, e=e
"auth_admin 发送消息失败", "HOOK", session=session, e=e
)
logger.debug(
f"{plugin.name}({plugin.module}) 管理员权限不足...",
"HOOK",
session=session,
) )
raise IgnoredException("权限不足") logger.debug(
f"{plugin.name}({plugin.module}) 管理员权限不足...",
"HOOK",
session=session,
)
raise IgnoredException("权限不足")
async def auth_group( async def auth_group(
self, plugin: PluginInfo, session: EventSession, message: UniMsg self, plugin: PluginInfo, session: EventSession, message: UniMsg
@ -466,29 +458,35 @@ class AuthChecker:
session: EventSession session: EventSession
message: UniMsg message: UniMsg
""" """
if group_id := session.id3 or session.id2: if not (group_id := session.id3 or session.id2):
text = message.extract_plain_text() return
group = await GroupConsole.get_group(group_id) text = message.extract_plain_text()
if not group: group = await GroupConsole.get_group(group_id)
"""群不存在""" if not group:
raise IgnoredException("群不存在") """群不存在"""
if group.level < 0: raise IgnoredException("群不存在")
"""群权限小于0""" if group.level < 0:
logger.debug( """群权限小于0"""
f"群黑名单, 群权限-1...", logger.debug(
"HOOK", "群黑名单, 群权限-1...",
session=session, "HOOK",
) session=session,
raise IgnoredException("群黑名单") )
if not group.status: raise IgnoredException("群黑名单")
"""群休眠""" if not group.status:
if text.strip() != "醒来": """群休眠"""
logger.debug( if text.strip() != "醒来":
f"群休眠状态...", logger.debug("群休眠状态...", "HOOK", session=session)
"HOOK", raise IgnoredException("群休眠状态")
session=session, if plugin.level > group.level:
) """插件等级大于群等级"""
raise IgnoredException("群休眠状态") logger.debug(
f"{plugin.name}({plugin.module}) 群等级限制.."
f"该功能需要的群等级: {plugin.level}..",
"HOOK",
session=session,
)
raise IgnoredException(f"{plugin.name}({plugin.module}) 群等级限制...")
async def auth_cost( async def auth_cost(
self, user: UserConsole, plugin: PluginInfo, session: EventSession self, user: UserConsole, plugin: PluginInfo, session: EventSession
@ -512,7 +510,8 @@ class AuthChecker:
except Exception as e: except Exception as e:
logger.error("auth_cost 发送消息失败", "HOOK", session=session, e=e) logger.error("auth_cost 发送消息失败", "HOOK", session=session, e=e)
logger.debug( logger.debug(
f"{plugin.name}({plugin.module}) 金币限制..该功能需要{plugin.cost_gold}金币..", f"{plugin.name}({plugin.module}) 金币限制.."
f"该功能需要{plugin.cost_gold}金币..",
"HOOK", "HOOK",
session=session, session=session,
) )