diff --git a/.vscode/settings.json b/.vscode/settings.json index b23be7bc..1a22fedc 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -8,6 +8,7 @@ "arclet", "Arparma", "displayname", + "flmt", "getbbox", "httpx", "kaiheila", diff --git a/poetry.lock b/poetry.lock index 58022200..0a13029c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -96,13 +96,13 @@ reference = "ali" [[package]] name = "arclet-alconna" -version = "1.7.42" +version = "1.7.44" description = "A High-performance, Generality, Humane Command Line Arguments Parser Library." optional = false python-versions = ">=3.8" files = [ - {file = "arclet_alconna-1.7.42-py3-none-any.whl", hash = "sha256:fa78944121d4afa4e2c0247a98967ddb1e76cf63b94c8c3f4f393c52f6d23e75"}, - {file = "arclet_alconna-1.7.42.tar.gz", hash = "sha256:a5a1cca37d0c3d58607ee22485e636fa0b01d40eb43194e542b2c3d6a5d2e70b"}, + {file = "arclet_alconna-1.7.44-py3-none-any.whl", hash = "sha256:e5751a2aa854b7b2c01cac87986ad11b397986a725c9536d5f9ff81a84e85614"}, + {file = "arclet_alconna-1.7.44.tar.gz", hash = "sha256:9c8a70a3f75e8358fa9c71befd3687c8c9781a19b1d28cb53cbe08fbc36cf720"}, ] [package.dependencies] @@ -1356,20 +1356,20 @@ reference = "ali" [[package]] name = "nonebot-plugin-alconna" -version = "0.36.3" +version = "0.37.1" description = "Alconna Adapter for Nonebot" optional = false python-versions = ">=3.8" files = [ - {file = "nonebot_plugin_alconna-0.36.3-py3-none-any.whl", hash = "sha256:8f26f96c711d3adadc538ebf40d51ba2249c18fe1689bf36baed0e4d1e05246a"}, - {file = "nonebot_plugin_alconna-0.36.3.tar.gz", hash = "sha256:ed8e4f2fd845d0c3d8becdd68678c203ee76109b9104a3b1c18f63525e85c6d4"}, + {file = "nonebot_plugin_alconna-0.37.1-py3-none-any.whl", hash = "sha256:fcc46f04ac89bf43730afebd97fa46e5910bc404a9e24cab7950da58be36246d"}, + {file = "nonebot_plugin_alconna-0.37.1.tar.gz", hash = "sha256:5e9989ee7debd79d61c97aa41c88aac5fe452cc9c47f2d48b829d81d26dfe130"}, ] [package.dependencies] -arclet-alconna = ">=1.7.42,<2.0.0" -arclet-alconna-tools = ">=0.6.11,<0.7.0" -nepattern = ">=0.5.14,<0.6.0" -nonebot2 = ">=2.1.0" +arclet-alconna = ">=1.7.44" +arclet-alconna-tools = ">=0.6.11" +nepattern = ">=0.5.15" +nonebot2 = ">=2.2.0" [package.source] type = "legacy" @@ -2982,4 +2982,4 @@ reference = "ali" [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "2e5c4963196533949601dff69762b6f5586056a8775419c2ee1aef0df91b016a" +content-hash = "858e616442c77d1a328e37af331056a7b870611b22247fcebfe5dbe41a3fd4f0" diff --git a/pyproject.toml b/pyproject.toml index 41f7f799..33e40319 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,7 +12,6 @@ url = "https://mirrors.aliyun.com/pypi/simple/" [tool.poetry.dependencies] python = "^3.10" -nonebot-plugin-alconna = "^0.36.0" playwright = "^1.41.1" nonebot-adapter-onebot = "^2.3.1" nonebot-plugin-apscheduler = "^0.3.0" @@ -33,6 +32,7 @@ retrying = "^1.3.4" aiofiles = "^23.2.1" nonebot-plugin-htmlrender = "^0.3.0" nonebot-plugin-userinfo = "^0.1.3" +nonebot-plugin-alconna = "^0.37.1" [tool.poetry.dev-dependencies] diff --git a/zhenxun/builtin_plugins/admin/plugin_switch/__init__.py b/zhenxun/builtin_plugins/admin/plugin_switch/__init__.py index 42da314f..c0af3544 100644 --- a/zhenxun/builtin_plugins/admin/plugin_switch/__init__.py +++ b/zhenxun/builtin_plugins/admin/plugin_switch/__init__.py @@ -21,6 +21,7 @@ from zhenxun.utils.enum import BlockType, PluginType from zhenxun.utils.rules import admin_check, ensure_group from ._data_source import PluginManage, build_plugin, build_task +from .command import _group_status_matcher, _status_matcher base_config = Config.get("admin_bot_manage") @@ -53,60 +54,70 @@ __plugin_meta__ = PluginMetadata( ) -_status_matcher = on_alconna( - Alconna( - "switch", - Option("-t|--task", action=store_true, help_text="被动技能"), - Subcommand( - "open", - Args["name", str], - Option( - "-g|--group", - Args["group_id", str], - ), - ), - Subcommand( - "close", - Args["name", str], - Option( - "-t|--type", - Args["block_type", ["all", "a", "private", "p", "group", "g"]], - ), - Option( - "-g|--group", - Args["group_id", str], - ), - ), - ), - rule=admin_check("admin_bot_manage", "CHANGE_GROUP_SWITCH_LEVEL"), - priority=5, - block=True, -) +# _status_matcher = on_alconna( +# Alconna( +# "switch", +# Option("-t|--task", action=store_true, help_text="被动技能"), +# Subcommand( +# "open", +# Args["name", str], +# Option( +# "-g|--group", +# Args["group_id", str], +# ), +# ), +# Subcommand( +# "close", +# Args["name", str], +# Option( +# "-t|--type", +# Args["block_type", ["all", "a", "private", "p", "group", "g"]], +# ), +# Option( +# "-g|--group", +# Args["group_id", str], +# ), +# ), +# ), +# rule=admin_check("admin_bot_manage", "CHANGE_GROUP_SWITCH_LEVEL"), +# priority=5, +# block=True, +# ) -# TODO: shortcut +# # TODO: shortcut -_group_status_matcher = on_alconna( - Alconna("group-status", Args["status", ["sleep", "wake"]]), - rule=admin_check("admin_bot_manage", "CHANGE_GROUP_SWITCH_LEVEL") & ensure_group, - priority=5, - block=True, -) +# _group_status_matcher = on_alconna( +# Alconna("group-status", Args["status", ["sleep", "wake"]]), +# rule=admin_check("admin_bot_manage", "CHANGE_GROUP_SWITCH_LEVEL") & ensure_group, +# priority=5, +# block=True, +# ) @_status_matcher.assign("$main") async def _(bot: Bot, session: EventSession, arparma: Arparma): image = None - if arparma.find("task"): - image = await build_task(session.id3 or session.id2) - elif session.id1 in bot.config.superusers: + if session.id1 in bot.config.superusers: image = await build_plugin() if image: await Image(image.pic2bs4()).send(reply=True) - logger.info( - f"查看{'被动' if arparma.find('task') else '功能'}列表", - arparma.header_result, - session=session, - ) + logger.info( + f"查看功能列表", + arparma.header_result, + session=session, + ) + + +@_status_matcher.assign("task") +async def _(bot: Bot, session: EventSession, arparma: Arparma): + image = None + if image := await build_task(session.id3 or session.id2): + await Image(image.pic2bs4()).send(reply=True) + logger.info( + f"查看被动列表", + arparma.header_result, + session=session, + ) @_status_matcher.assign("open") @@ -122,13 +133,14 @@ async def _( await Text(result).send(reply=True) logger.info(f"开启功能 {name}", arparma.header_result, session=session) elif session.id1 in bot.config.superusers: - result = await PluginManage.superuser_block(name, None, group.result) + group_id = group.result if group.available else None + result = await PluginManage.superuser_block(name, None, group_id) await Text(result).send(reply=True) logger.info( f"超级用户开启功能 {name}", arparma.header_result, session=session, - target=group.result, + target=group_id, ) @@ -146,17 +158,39 @@ async def _( await Text(result).send(reply=True) logger.info(f"关闭功能 {name}", arparma.header_result, session=session) elif session.id1 in bot.config.superusers: + group_id = group.result if group.available else None _type = BlockType.ALL if block_type.available: if block_type.result in ["p", "private"]: - _type = BlockType.FRIEND + _type = BlockType.PRIVATE elif block_type.result in ["g", "group"]: _type = BlockType.GROUP - result = await PluginManage.superuser_block(name, _type, group.result) + result = await PluginManage.superuser_block(name, _type, group_id) await Text(result).send(reply=True) logger.info( f"超级用户关闭功能 {name}, 禁用类型: {_type}", arparma.header_result, session=session, - target=group.result, + target=group_id, ) + + +@_group_status_matcher.handle() +async def _( + bot: Bot, + session: EventSession, + arparma: Arparma, + status: str, +): + if gid := session.id3 or session.id2: + if status == "sleep": + await PluginManage.sleep(gid) + logger.info("进行休眠", arparma.header_result, session=session) + await Text("那我先睡觉了...").finish() + else: + if PluginManage.is_wake(gid): + await Text("我还醒着呢!").finish() + await PluginManage.wake(gid) + logger.info("醒来", arparma.header_result, session=session) + await Text("呜..醒来了...").finish() + return Text("群组id为空...").send() diff --git a/zhenxun/builtin_plugins/admin/plugin_switch/_data_source.py b/zhenxun/builtin_plugins/admin/plugin_switch/_data_source.py index bd7fd937..8137206f 100644 --- a/zhenxun/builtin_plugins/admin/plugin_switch/_data_source.py +++ b/zhenxun/builtin_plugins/admin/plugin_switch/_data_source.py @@ -119,7 +119,7 @@ async def build_task(group_id: str | None) -> BuildImage: task.name, "开启" if task.module not in group.block_task else "关闭", "开启" if task.status else "关闭", - task.run_time, + task.run_time or "-", ] ) else: @@ -129,7 +129,7 @@ async def build_task(group_id: str | None) -> BuildImage: task.module, task.name, "开启" if task.status else "关闭", - task.run_time, + task.run_time or "-", ] ) return await ImageTemplate.table_page( @@ -143,6 +143,20 @@ async def build_task(group_id: str | None) -> BuildImage: class PluginManage: + @classmethod + async def is_wake(cls, group_id: str) -> bool: + if c := await GroupConsole.get_or_none(group_id=group_id): + return c.status + return False + + @classmethod + async def sleep(cls, group_id: str): + await GroupConsole.filter(group_id=group_id).update(status=False) + + @classmethod + async def wake(cls, group_id: str): + await GroupConsole.filter(group_id=group_id).update(status=True) + @classmethod async def block(cls, module: str): await PluginInfo.filter(module=module).update(status=False) @@ -191,8 +205,13 @@ class PluginManage: 返回: str: 返回信息 """ + + if plugin_name.isdigit(): + plugin = await PluginInfo.get_or_none(id=int(plugin_name)) + else: + plugin = await PluginInfo.get_or_none(name=plugin_name) status_str = "开启" if status else "关闭" - if plugin := await PluginInfo.get_or_none(name=plugin_name): + if plugin: group, _ = await GroupConsole.get_or_create(group_id=group_id) if status: if plugin.module in group.block_plugin: @@ -200,12 +219,12 @@ class PluginManage: f"{plugin.module},", "" ) await group.save(update_fields=["block_plugin"]) - return f"已成功{status_str} {plugin_name} 功能!" + return f"已成功{status_str} {plugin.name} 功能!" else: if plugin.module not in group.block_plugin: group.block_plugin += f"{plugin.module}," await group.save(update_fields=["block_plugin"]) - return f"已成功{status_str} {plugin_name} 功能!" + return f"已成功{status_str} {plugin.name} 功能!" return f"该功能已经{status_str}了喔,不要重复{status_str}..." return "没有找到这个功能喔..." @@ -223,7 +242,11 @@ class PluginManage: 返回: str: 返回信息 """ - if plugin := await PluginInfo.get_or_none(name=plugin_name): + if plugin_name.isdigit(): + plugin = await PluginInfo.get_or_none(id=int(plugin_name)) + else: + plugin = await PluginInfo.get_or_none(name=plugin_name) + if plugin: if group_id: if group := await GroupConsole.get_or_none(group_id=group_id): if f"super:{plugin_name}," not in group.block_plugin: @@ -238,7 +261,7 @@ class PluginManage: plugin.status = not bool(block_type) await plugin.save(update_fields=["status", "block_type"]) if not block_type: - return f"已成功将 {plugin_name} 全局启用!" + return f"已成功将 {plugin.name} 全局启用!" else: - return f"已成功将 {plugin_name} 全局关闭!" + return f"已成功将 {plugin.name} 全局关闭!" return "没有找到这个功能喔..." diff --git a/zhenxun/builtin_plugins/admin/plugin_switch/command.py b/zhenxun/builtin_plugins/admin/plugin_switch/command.py new file mode 100644 index 00000000..ec0c8513 --- /dev/null +++ b/zhenxun/builtin_plugins/admin/plugin_switch/command.py @@ -0,0 +1,94 @@ +from nonebot.rule import to_me +from nonebot_plugin_alconna import ( + Alconna, + Args, + Option, + Subcommand, + on_alconna, + store_true, +) + +from zhenxun.utils.rules import admin_check, ensure_group + +_status_matcher = on_alconna( + Alconna( + "switch", + Option("-t|--task", action=store_true, help_text="被动技能"), + Subcommand( + "open", + Args["name", [str, int]], + Option( + "-g|--group", + Args["group_id", str], + ), + ), + Subcommand( + "close", + Args["name", [str, int]], + Option( + "-t|--type", + Args["block_type", ["all", "a", "private", "p", "group", "g"]], + ), + Option( + "-g|--group", + Args["group_id", str], + ), + ), + ), + rule=admin_check("admin_bot_manage", "CHANGE_GROUP_SWITCH_LEVEL"), + priority=5, + block=True, +) + +# TODO: shortcut + +_group_status_matcher = on_alconna( + Alconna("group-status", Args["status", ["sleep", "wake"]]), + rule=admin_check("admin_bot_manage", "CHANGE_GROUP_SWITCH_LEVEL") + & ensure_group + & to_me(), + priority=5, + block=True, +) + +_status_matcher.shortcut( + r"插件列表", + command="switch", + arguments=[], + prefix=True, +) + +_status_matcher.shortcut( + r"群被动状态", + command="switch", + arguments=["--task"], + prefix=True, +) + +_status_matcher.shortcut( + r"开启(?P.+)", + command="switch", + arguments=["open", "{name}"], + prefix=True, +) + +_status_matcher.shortcut( + r"关闭(?P.+)", + command="switch", + arguments=["close", "{name}"], + prefix=True, +) + +_group_status_matcher.shortcut( + r"醒来", + command="group-status", + arguments=["wake"], + prefix=True, +) + +_group_status_matcher.shortcut( + r"休息吧", + command="group-status", + arguments=["sleep"], + prefix=True, +) diff --git a/zhenxun/builtin_plugins/help/_utils.py b/zhenxun/builtin_plugins/help/_utils.py index b3f8d3a4..82aae886 100644 --- a/zhenxun/builtin_plugins/help/_utils.py +++ b/zhenxun/builtin_plugins/help/_utils.py @@ -84,7 +84,7 @@ class HelpImageBuild: sta = 2 if not group_id and plugin.block_type in [ BlockType.ALL, - BlockType.FRIEND, + BlockType.PRIVATE, ]: sta = 2 if group_id and ( diff --git a/zhenxun/builtin_plugins/hooks/_auth_checker.py b/zhenxun/builtin_plugins/hooks/_auth_checker.py new file mode 100644 index 00000000..03a33ebd --- /dev/null +++ b/zhenxun/builtin_plugins/hooks/_auth_checker.py @@ -0,0 +1,455 @@ +from typing import Dict +from unittest import result + +from nonebot.adapters import Bot, Event +from nonebot.exception import IgnoredException +from nonebot.matcher import Matcher +from nonebot_plugin_alconna import UniMsg +from nonebot_plugin_saa import Mention, MessageFactory, Text +from nonebot_plugin_session import EventSession +from pydantic import BaseModel + +from zhenxun.configs.config import Config +from zhenxun.models.group_console import GroupConsole +from zhenxun.models.level_user import LevelUser +from zhenxun.models.plugin_info import PluginInfo +from zhenxun.models.plugin_limit import PluginLimit +from zhenxun.models.user_console import UserConsole +from zhenxun.services.log import logger +from zhenxun.utils.enum import ( + BlockType, + GoldHandle, + LimitWatchType, + PluginLimitType, + PluginType, +) +from zhenxun.utils.utils import CountLimiter, FreqLimiter, UserBlockLimiter + + +class Limit(BaseModel): + + limit: PluginLimit + limiter: FreqLimiter | UserBlockLimiter | CountLimiter + + class Config: + arbitrary_types_allowed = True + + +class LimitManage: + + add_module = [] + + cd_limit: Dict[str, Limit] = {} + block_limit: Dict[str, Limit] = {} + count_limit: Dict[str, Limit] = {} + + @classmethod + def add_limit(cls, limit: PluginLimit): + """添加限制 + + 参数: + limit: PluginLimit + """ + if limit.module not in cls.add_module: + cls.add_module.append(limit.module) + if limit.limit_type == PluginLimitType.BLOCK: + cls.block_limit[limit.module] = Limit( + limit=limit, limiter=UserBlockLimiter() + ) + elif limit.limit_type == PluginLimitType.CD: + cls.cd_limit[limit.module] = Limit( + limit=limit, limiter=FreqLimiter(limit.cd) + ) + elif limit.limit_type == PluginLimitType.COUNT: + cls.count_limit[limit.module] = Limit( + limit=limit, limiter=CountLimiter(limit.max_count) + ) + + @classmethod + def unblock( + cls, module: str, user_id: str, group_id: str | None, channel_id: str | None + ): + """解除插件block + + 参数: + module: 模块名 + user_id: 用户id + group_id: 群组id + channel_id: 频道id + """ + if limit_model := cls.block_limit.get(module): + limit = limit_model.limit + limiter: UserBlockLimiter = limit_model.limiter # type: ignore + key_type = user_id + if group_id and limit.watch_type == LimitWatchType.GROUP: + key_type = channel_id or group_id + limiter.set_false(key_type) + + @classmethod + async def check( + cls, + module: str, + user_id: str, + group_id: str | None, + channel_id: str | None, + session: EventSession, + ): + """检测限制 + + 参数: + module: 模块名 + user_id: 用户id + group_id: 群组id + channel_id: 频道id + session: Session + + 异常: + IgnoredException: IgnoredException + """ + if limit_model := cls.cd_limit.get(module): + await cls.__check(limit_model, user_id, group_id, channel_id, session) + if limit_model := cls.block_limit.get(module): + await cls.__check(limit_model, user_id, group_id, channel_id, session) + if limit_model := cls.count_limit.get(module): + await cls.__check(limit_model, user_id, group_id, channel_id, session) + + @classmethod + async def __check( + cls, + limit_model: Limit, + user_id: str, + group_id: str | None, + channel_id: str | None, + session: EventSession, + ): + """检测限制 + + 参数: + limit_model: Limit + user_id: 用户id + group_id: 群组id + channel_id: 频道id + session: Session + + 异常: + IgnoredException: IgnoredException + """ + if limit_model: + limit = limit_model.limit + limiter = limit_model.limiter + is_limit = ( + LimitWatchType.ALL + or (group_id and limit.watch_type == LimitWatchType.GROUP) + or (not group_id and limit.watch_type == LimitWatchType.USER) + ) + key_type = user_id + if group_id and limit.watch_type == LimitWatchType.GROUP: + key_type = channel_id or group_id + if is_limit and limiter.check(key_type): + if limit.result: + await Text(limit.result).send() + logger.debug( + f"{limit.module}({limit.limit_type}) 正在限制中...", + "HOOK", + session=session, + ) + raise IgnoredException(f"{limit.module} 正在cd中...") + else: + if isinstance(limiter, FreqLimiter): + limiter.start_cd(key_type) + if isinstance(limiter, UserBlockLimiter): + limiter.set_true(key_type) + if isinstance(limiter, CountLimiter): + limiter.increase(key_type) + + +class IsSuperuserException(Exception): + pass + + +class AuthChecker: + """ + 权限检查 + """ + + def __init__(self): + check_notice_info_cd = Config.get_config("hook", "CHECK_NOTICE_INFO_CD") + if check_notice_info_cd is None or check_notice_info_cd < 0: + raise ValueError("模块: [hook], 配置项: [CHECK_NOTICE_INFO_CD] 为空或小于0") + self._flmt = FreqLimiter(check_notice_info_cd) + self._flmt_g = FreqLimiter(check_notice_info_cd) + self._flmt_s = FreqLimiter(check_notice_info_cd) + self._flmt_c = FreqLimiter(check_notice_info_cd) + + async def auth( + self, + matcher: Matcher, + bot: Bot, + session: EventSession, + message: UniMsg, + ): + """权限检查 + + 参数: + matcher: matcher + bot: bot + session: EventSession + message: UniMsg + """ + is_ignore = False + cost_gold = 0 + user_id = session.id1 + group_id = session.id3 + channel_id = session.id2 + if not group_id: + group_id = channel_id + channel_id = None + if user_id and matcher.plugin and (module := matcher.plugin.name): + user = await UserConsole.get_user(user_id, session.platform) + if plugin := await PluginInfo.get_or_none(module=module): + try: + cost_gold = await self.auth_cost(user, plugin, session) + if session.id1 in bot.config.superusers: + if plugin.plugin_type == PluginType.SUPERUSER: + raise IsSuperuserException() + if not plugin.limit_superuser: + cost_gold = 0 + raise IsSuperuserException() + await self.auth_group(plugin, session, message) + await self.auth_admin(plugin, session) + await self.auth_plugin(plugin, session) + await self.auth_limit(plugin, session) + except IsSuperuserException: + logger.debug( + f"超级用户或被ban跳过权限检测...", "HOOK", session=session + ) + except IgnoredException: + is_ignore = True + LimitManage.unblock( + matcher.plugin.name, user_id, group_id, channel_id + ) + if cost_gold and user_id: + """花费金币""" + await UserConsole.reduce_gold( + user_id, + cost_gold, + GoldHandle.PLUGIN, + matcher.plugin.name if matcher.plugin else "", + session.platform, + ) + logger.debug(f"调用功能花费金币: {cost_gold}", "HOOK", session=session) + if is_ignore: + raise IgnoredException("权限检测 ignore") + + async def auth_limit(self, plugin: PluginInfo, session: EventSession): + """插件限制 + + 参数: + plugin: PluginInfo + session: EventSession + """ + user_id = session.id1 + group_id = session.id3 + channel_id = session.id2 + if not group_id: + group_id = channel_id + channel_id = None + limit_list: list[PluginLimit] = await plugin.plugin_limit.all() # type: ignore + for limit in limit_list: + LimitManage.add_limit(limit) + if user_id: + await LimitManage.check( + plugin.module, user_id, group_id, channel_id, session + ) + + async def auth_plugin(self, plugin: PluginInfo, session: EventSession): + """插件状态 + + 参数: + plugin: PluginInfo + session: EventSession + """ + user_id = session.id1 + group_id = session.id3 + channel_id = session.id2 + if not group_id: + group_id = channel_id + channel_id = None + if user_id: + if group_id: + if await GroupConsole.is_block_plugin( + group_id, plugin.module, channel_id + ): + """群组插件状态""" + if self._flmt_s.check(group_id or user_id): + self._flmt_s.start_cd(group_id or user_id) + await Text("该群未开启此功能...").send(reply=True) + logger.debug( + f"{plugin.name}({plugin.module}) 未开启此功能...", + "HOOK", + session=session, + ) + raise IgnoredException("该群未开启此功能...") + if await GroupConsole.is_super_block_plugin( + group_id, plugin.module, channel_id + ): + """群组插件状态""" + if self._flmt_s.check(group_id or user_id): + self._flmt_s.start_cd(group_id or user_id) + await Text("超级管理员禁用了该群此功能...").send(reply=True) + logger.debug( + f"{plugin.name}({plugin.module}) 超级管理员禁用了该群此功能...", + "HOOK", + session=session, + ) + raise IgnoredException("超级管理员禁用了该群此功能...") + # 群聊禁用 + if not plugin.status and plugin.block_type == BlockType.GROUP: + try: + if self._flmt_c.check(group_id): + self._flmt_c.start_cd(group_id) + await Text("该功能在群聊中已被禁用...").send(reply=True) + except Exception: + pass + logger.debug( + f"{plugin.name}({plugin.module}) 该插件在群聊中已被禁用...", + "HOOK", + session=session, + ) + raise IgnoredException("该插件在群聊中已被禁用...") + else: + # 私聊禁用 + if not plugin.status and plugin.block_type == BlockType.PRIVATE: + try: + if self._flmt_c.check(user_id): + self._flmt_c.start_cd(user_id) + await Text("该功能在私聊中已被禁用...").send() + except Exception: + pass + logger.debug( + f"{plugin.name}({plugin.module}) 该插件在私聊中已被禁用...", + "HOOK", + session=session, + ) + raise IgnoredException("该插件在私聊中已被禁用...") + if not plugin.status and plugin.block_type == BlockType.ALL: + """全局状态""" + if group_id: + if await GroupConsole.is_super_group(group_id, channel_id): + raise IsSuperuserException() + if self._flmt_s.check(group_id or user_id): + self._flmt_s.start_cd(group_id or user_id) + await Text("全局未开启此功能...").send() + logger.debug( + f"{plugin.name}({plugin.module}) 全局未开启此功能...", + "HOOK", + session=session, + ) + raise IgnoredException("全局未开启此功能...") + + async def auth_admin(self, plugin: PluginInfo, session: EventSession): + """管理员命令 个人权限 + + 参数: + plugin: PluginInfo + session: EventSession + """ + user_id = session.id1 + group_id = session.id3 or session.id2 + if user_id and group_id and plugin.admin_level: + if group_id: + if await LevelUser.check_level(user_id, group_id, plugin.admin_level): + try: + if self._flmt.check(user_id): + self._flmt.start_cd(user_id) + await MessageFactory( + [ + Mention(user_id), + Text( + f"你的权限不足喔,该功能需要的权限等级: {plugin.admin_level}" + ), + ] + ).finish(reply=True) + except Exception: + pass + logger.debug( + f"{plugin.name}({plugin.module}) 管理员权限不足...", + "HOOK", + session=session, + ) + raise IgnoredException("管理员权限不足...") + else: + if not await LevelUser.check_level(user_id, "", plugin.admin_level): + try: + await Text( + f"你的权限不足喔,该功能需要的权限等级: {plugin.admin_level}" + ).finish() + except Exception: + pass + logger.debug( + f"{plugin.name}({plugin.module}) 管理员权限不足...", + "HOOK", + session=session, + ) + raise IgnoredException("权限不足") + + async def auth_group( + self, plugin: PluginInfo, session: EventSession, message: UniMsg + ): + """群黑名单检测 群总开关检测 + + 参数: + plugin: PluginInfo + session: EventSession + message: UniMsg + """ + if group_id := session.id3 or session.id2: + text = message.extract_plain_text() + group, _ = await GroupConsole.get_or_create(group_id=group_id) + if group.level < -1: + """群权限小于0""" + logger.debug( + f"{plugin.name}({plugin.module}) 群黑名单, 群权限-1...", + "HOOK", + session=session, + ) + raise IgnoredException("群黑名单") + if not group.status: + """群休眠""" + if text.strip() != "醒来": + logger.debug( + f"{plugin.name}({plugin.module}) 功能总开关关闭状态...", + "HOOK", + session=session, + ) + raise IgnoredException("功能总开关关闭状态") + + async def auth_cost( + self, user: UserConsole, plugin: PluginInfo, session: EventSession + ) -> int: + """检测是否满足金币条件 + + 参数: + user: UserConsole + plugin: PluginInfo + session: EventSession + + 返回: + int: 需要消耗的金币 + """ + if user.gold < plugin.cost_gold: + """插件消耗金币不足""" + try: + await Text(f"金币不足..该功能需要{plugin.cost_gold}金币..").send() + except Exception: + pass + logger.debug( + f"{plugin.name}({plugin.module}) 金币限制..该功能需要{plugin.cost_gold}金币..", + "HOOK", + session=session, + ) + raise IgnoredException(f"{plugin.name}({plugin.module}) 金币限制...") + return plugin.cost_gold + + +checker = AuthChecker() diff --git a/zhenxun/builtin_plugins/hooks/auth_hook.py b/zhenxun/builtin_plugins/hooks/auth_hook.py new file mode 100644 index 00000000..938f8222 --- /dev/null +++ b/zhenxun/builtin_plugins/hooks/auth_hook.py @@ -0,0 +1,35 @@ +from typing import Optional + +from nonebot.adapters.onebot.v11 import Bot, Event, MessageEvent +from nonebot.matcher import Matcher +from nonebot.message import run_postprocessor, run_preprocessor +from nonebot_plugin_alconna import UniMsg +from nonebot_plugin_session import EventSession + +from ._auth_checker import LimitManage, checker + + +# # 权限检测 +@run_preprocessor +async def _(matcher: Matcher, bot: Bot, session: EventSession, message: UniMsg): + await checker.auth(matcher, bot, session, message) + + +# 解除命令block阻塞 +@run_postprocessor +async def _( + matcher: Matcher, + exception: Optional[Exception], + bot: Bot, + event: Event, + session: EventSession, +): + user_id = session.id1 + group_id = session.id3 + channel_id = session.id2 + if not group_id: + group_id = channel_id + channel_id = None + if user_id and matcher.plugin: + module = matcher.plugin.name + LimitManage.unblock(module, user_id, group_id, channel_id) diff --git a/zhenxun/builtin_plugins/init/init_plugin.py b/zhenxun/builtin_plugins/init/init_plugin.py index e80fc8f6..1f8caf39 100644 --- a/zhenxun/builtin_plugins/init/init_plugin.py +++ b/zhenxun/builtin_plugins/init/init_plugin.py @@ -136,7 +136,7 @@ async def _(): create_list = [] update_list = [] for task in task_list: - if task.module not in module_list: + if task.module not in module_dict: create_list.append(task) else: task.id = module_dict[task.module] diff --git a/zhenxun/builtin_plugins/shop/_data_source.py b/zhenxun/builtin_plugins/shop/_data_source.py index 96bed3f0..6ecea8da 100644 --- a/zhenxun/builtin_plugins/shop/_data_source.py +++ b/zhenxun/builtin_plugins/shop/_data_source.py @@ -36,9 +36,7 @@ class ShopManage: goods = filter_goods[0] else: return "道具名称不存在..." - user, _ = await UserConsole.get_or_create( - user_id=user_id, defaults={"platform": platform} - ) + user = await UserConsole.get_user(user_id, platform) price = goods.goods_price * num * goods.goods_discount if user.gold < price: return "糟糕! 您的金币好像不太够哦..." @@ -77,9 +75,7 @@ class ShopManage: 返回: BuildImage | None: 道具背包图片 """ - user, _ = await UserConsole.get_or_create( - user_id=user_id, defaults={"platform": platform} - ) + user = await UserConsole.get_user(user_id, platform) if not user.props: return None result = await GoodsInfo.filter(uuid__in=user.props.keys()).all() @@ -113,9 +109,7 @@ class ShopManage: 返回: int: 金币数量 """ - user, _ = await UserConsole.get_or_create( - user_id=user_id, defaults={"platform": platform} - ) + user = await UserConsole.get_user(user_id, platform) return user.gold @classmethod diff --git a/zhenxun/builtin_plugins/sign_in/_data_source.py b/zhenxun/builtin_plugins/sign_in/_data_source.py index 07f3fcbe..6fe4ce76 100644 --- a/zhenxun/builtin_plugins/sign_in/_data_source.py +++ b/zhenxun/builtin_plugins/sign_in/_data_source.py @@ -94,13 +94,7 @@ class SignManage: if not session.id1: return None now = datetime.now(pytz.timezone("Asia/Shanghai")) - user_console, _ = await UserConsole.get_or_create( - user_id=session.id1, - defaults={ - "uid": await UserConsole.get_new_uid(), - "platform": session.platform, - }, - ) + user_console = await UserConsole.get_user(session.id1, session.platform) user, _ = await SignUser.get_or_create( user_id=session.id1, defaults={"user_console": user_console, "platform": session.platform}, @@ -112,7 +106,6 @@ class SignManage: or (new_log and now > new_log.create_time) or file_name in os.listdir(SIGN_TODAY_CARD_PATH) ): - user_console, _ = await UserConsole.get_or_create(user_id=session.id1) path = await get_card(user, nickname, -1, user_console.gold, "") else: path = await cls._handle_sign_in(user, nickname, session, is_view_card) diff --git a/zhenxun/builtin_plugins/sign_in/goods_register.py b/zhenxun/builtin_plugins/sign_in/goods_register.py index 3af6514f..0fc3b921 100644 --- a/zhenxun/builtin_plugins/sign_in/goods_register.py +++ b/zhenxun/builtin_plugins/sign_in/goods_register.py @@ -35,19 +35,14 @@ async def _(): **{"好感度双倍加持卡Ⅰ_prob": 0.1, "好感度双倍加持卡Ⅱ_prob": 0.2, "好感度双倍加持卡Ⅲ_prob": 0.3}, # type: ignore ) async def _(session: EventSession, user_id: int, group_id: int, prob: float): - user_console, _ = await UserConsole.get_or_create( - user_id=session.id1, - defaults={ - "uid": await UserConsole.get_new_uid(), - "platform": session.platform, - }, - ) - user, _ = await SignUser.get_or_create( - user_id=user_id, - defaults={"platform": session.platform, "user_console": user_console}, - ) - user.add_probability = Decimal(prob) - await user.save(update_fields=["add_probability"]) + if session.id1: + user_console = await UserConsole.get_user(session.id1, session.platform) + user, _ = await SignUser.get_or_create( + user_id=user_id, + defaults={"platform": session.platform, "user_console": user_console}, + ) + user.add_probability = Decimal(prob) + await user.save(update_fields=["add_probability"]) @shop_register( name="测试道具A", diff --git a/zhenxun/builtin_plugins/superuser/group_manage.py b/zhenxun/builtin_plugins/superuser/group_manage.py new file mode 100644 index 00000000..1e7c9f14 --- /dev/null +++ b/zhenxun/builtin_plugins/superuser/group_manage.py @@ -0,0 +1,154 @@ +from nonebot.adapters import Bot +from nonebot.adapters.onebot.v11 import Bot as v11Bot +from nonebot.params import Depends +from nonebot.permission import SUPERUSER +from nonebot.plugin import PluginMetadata +from nonebot.typing import T_State +from nonebot_plugin_alconna import ( + Alconna, + Args, + Arparma, + Match, + Option, + Subcommand, + on_alconna, + store_true, +) +from nonebot_plugin_saa import Text +from nonebot_plugin_session import EventSession + +from zhenxun.configs.config import NICKNAME +from zhenxun.configs.utils import PluginExtraData +from zhenxun.models.group_console import GroupConsole +from zhenxun.services.log import logger +from zhenxun.utils.enum import PluginType + +__plugin_meta__ = PluginMetadata( + name="管理群操作", + description="管理群操作", + usage=""" + 群权限 | 群白名单 | 退出群 操作 + 退群,添加/删除群白名单,添加/删除群认证,当在群组中这五个命令且没有指定群号时,默认指定当前群组 + 指令: + 退群 ?[group_id] + 修改群权限 [group_id] [等级] + 修改群权限 [等级]: 该命令仅在群组时生效,默认修改当前群组 + 添加群白名单 ?*[group_id] + 删除群白名单 ?*[group_id] + 添加群认证 ?*[group_id] + 删除群认证 ?*[group_id] + """.strip(), + extra=PluginExtraData( + author="HibiKier", + version="0.1", + plugin_type=PluginType.SUPERUSER, + ).dict(), +) + + +_matcher = on_alconna( + Alconna( + "group-manage", + Subcommand( + "modify-level", Args["level", int]["group_id?", int], help_text="修改群权限" + ), + Subcommand( + "super-handle", + Option("--del", action=store_true, help_text="删除"), + Args["group_id", int], + help_text="添加/删除群白名单", + ), + Subcommand( + "auth-handle", + Option("--del", action=store_true, help_text="删除"), + Args["group_id", int], + help_text="添加群白名单", + ), + Subcommand("del-group", Args["group_id", int], help_text="退出群组"), + ), + permission=SUPERUSER, + priority=1, + block=True, +) + + +def CheckGroupId(): + """ + 检测群组id + """ + + async def dependency( + session: EventSession, + group_id: Match[int], + state: T_State, + ): + gid = session.id3 or session.id2 + if group_id.available: + gid = group_id.result + if not gid: + await Text("群组id不能为空...").finish() + state["group_id"] = gid + + return Depends(dependency) + + +@_matcher.assign("modify-level", parameterless=[]) +async def _(session: EventSession, arparma: Arparma, state: T_State, level: int): + gid = state["group_id"] + group, _ = await GroupConsole.get_or_create(group_id=gid) + old_level = group.level + group.level = level + await group.save(update_fields=["level"]) + await Text("群权限修改成功!").send(reply=True) + logger.info( + f"修改群权限: {old_level} -> {level}", + arparma.header_result, + session=session, + target=gid, + ) + + +@_matcher.assign("super-handle") +async def _(session: EventSession, arparma: Arparma, state: T_State): + gid = state["group_id"] + group = await GroupConsole.get_or_none(group_id=gid) + if not group: + await Text("群组信息不存在, 请更新群组信息...").finish() + s = "删除" if arparma.find("del") else "添加" + group.is_super = not arparma.find("del") + await group.save(update_fields=["is_super"]) + await Text(f"{s}群白名单成功!").send(reply=True) + logger.info(f"{s}群白名单", arparma.header_result, session=session, target=gid) + + +@_matcher.assign("auth-handle") +async def _(session: EventSession, arparma: Arparma, state: T_State): + gid = state["group_id"] + await GroupConsole.update_or_create( + group_id=gid, defaults={"group_flag": 0 if arparma.find("del") else 1} + ) + s = "删除" if arparma.find("del") else "添加" + await Text(f"{s}群认证成功!").send(reply=True) + logger.info(f"{s}群白名单", arparma.header_result, session=session, target=gid) + + +@_matcher.assign("del-group") +async def _(bot: Bot, session: EventSession, arparma: Arparma, group_id: int): + if isinstance(bot, v11Bot): + group_list = [g["group_id"] for g in await bot.get_group_list()] + if group_id not in group_list: + logger.debug("群组不存在", "退群", session=session, target=group_id) + await Text(f"{NICKNAME}未在该群组中...").finish() + try: + await bot.set_group_leave(group_id=group_id) + logger.info( + f"{NICKNAME}退出群组成功", "退群", session=session, target=group_id + ) + await Text(f"退出群组 {group_id} 成功!").send() + await GroupConsole.filter(group_id=group_id).delete() + except Exception as e: + logger.error(f"退出群组失败", "退群", session=session, target=group_id, e=e) + await Text(f"退出群组 {group_id} 失败...").send() + else: + # TODO: 其他平台的退群操作 + await Text(f"暂未支持退群操作...").send() diff --git a/zhenxun/models/group_console.py b/zhenxun/models/group_console.py index a0ff26fe..0e8858de 100644 --- a/zhenxun/models/group_console.py +++ b/zhenxun/models/group_console.py @@ -17,6 +17,14 @@ class GroupConsole(Model): """最大人数""" member_count = fields.IntField(default=0, description="当前人数") """当前人数""" + status = fields.BooleanField(default=True, description="群状态") + """群状态""" + level = fields.IntField(default=5, description="群权限") + """群权限""" + is_super = fields.BooleanField( + default=False, description="超级用户指定,可以使用全局关闭的功能" + ) + """超级用户指定群,可以使用全局关闭的功能""" group_flag = fields.IntField(default=0, description="群认证标记") """群认证标记""" block_plugin = fields.TextField(default="", description="禁用插件") @@ -31,6 +39,61 @@ class GroupConsole(Model): table_description = "群组信息表" unique_together = ("group_id", "channel_id") + @classmethod + async def is_super_group(cls, group_id: str, channel_id: str | None = None) -> bool: + """是否超级用户指定群 + + 参数: + group_id: 群组id + channel_id: 频道id. + + 返回: + bool: 是否超级用户指定群 + """ + if group := await cls.get_or_none(group_id=group_id): + return group.is_super + return False + + @classmethod + async def is_super_block_plugin( + cls, group_id: str, module: str, channel_id: str | None = None + ) -> bool: + """查看群组是否超级用户禁用功能 + + 参数: + group_id: 群组id + module: 模块名称 + channel_id: 频道id + + 返回: + bool: 是否禁用被动 + """ + return await cls.exists( + group_id=group_id, + channel_id=channel_id, + block_plugin__contains=f"super:{module},", + ) + + @classmethod + async def is_block_plugin( + cls, group_id: str, module: str, channel_id: str | None = None + ) -> bool: + """查看群组是否禁用功能 + + 参数: + group_id: 群组id + module: 模块名称 + channel_id: 频道id + + 返回: + bool: 是否禁用被动 + """ + return await cls.exists( + group_id=group_id, + channel_id=channel_id, + block_plugin__contains=f"{module},", + ) + @classmethod async def is_block_task( cls, group_id: str, task: str, channel_id: str | None = None diff --git a/zhenxun/models/plugin_limit.py b/zhenxun/models/plugin_limit.py index 9bf4259f..96538b64 100644 --- a/zhenxun/models/plugin_limit.py +++ b/zhenxun/models/plugin_limit.py @@ -26,7 +26,7 @@ class PluginLimit(Model): limit_type = fields.CharEnumField(PluginLimitType, description="限制类型") """限制类型""" watch_type = fields.CharEnumField(LimitWatchType, description="监听类型") - """限制类型""" + """监听类型""" status = fields.BooleanField(default=True, description="限制的开关状态") """限制的开关状态""" check_type = fields.CharEnumField( diff --git a/zhenxun/models/user_console.py b/zhenxun/models/user_console.py index 59b643f4..32695923 100644 --- a/zhenxun/models/user_console.py +++ b/zhenxun/models/user_console.py @@ -4,6 +4,7 @@ from tortoise import fields from zhenxun.services.db_context import Model from zhenxun.utils.enum import GoldHandle +from zhenxun.utils.exception import InsufficientGold from .user_gold_log import UserGoldLog @@ -32,7 +33,29 @@ class UserConsole(Model): table_description = "用户数据表" @classmethod - async def get_new_uid(cls): + async def get_user(cls, user_id: str, platform: str | None = None) -> "UserConsole": + """获取用户 + + 参数: + user_id: 用户id + platform: 平台. + + 返回: + UserConsole: UserConsole + """ + user, _ = await UserConsole.get_or_create( + user_id=user_id, + defaults={"platform": platform, "uid": await cls.get_new_uid()}, + ) + return user + + @classmethod + async def get_new_uid(cls) -> int: + """获取最新uid + + 返回: + int: 最新uid + """ if user := await cls.annotate().order_by("uid").first(): return user.uid + 1 return 1 @@ -58,6 +81,38 @@ class UserConsole(Model): user_id=user_id, gold=gold, handle=GoldHandle.GET, source=source ) + @classmethod + async def reduce_gold( + cls, + user_id: str, + gold: int, + handle: GoldHandle, + plugin_module: str, + platform: str | None = None, + ): + """消耗金币 + + 参数: + user_id: 用户id + gold: 金币 + handle: 金币处理 + plugin_name: 插件模块 + platform: 平台. + + 异常: + InsufficientGold: 金币不足 + """ + user, _ = await cls.get_or_create( + user_id=user_id, defaults={"platform": platform, "uid": cls.get_new_uid()} + ) + if user.gold < gold: + raise InsufficientGold() + user.gold -= gold + await user.save(update_fields=["gold"]) + await UserGoldLog.create( + user_id=user_id, gold=gold, handle=handle, source=plugin_module + ) + @classmethod async def add_props( cls, user_id: str, goods_uuid: str, num: int = 1, platform: str | None = None diff --git a/zhenxun/utils/_build_image.py b/zhenxun/utils/_build_image.py index bcb83dc7..757bc5bc 100644 --- a/zhenxun/utils/_build_image.py +++ b/zhenxun/utils/_build_image.py @@ -453,14 +453,24 @@ class BuildImage: return self def pic2bs4(self) -> str: - """ - BuildImage 转 base64 + """BuildImage 转 base64 + + 返回: + str: base64 """ buf = BytesIO() self.markImg.save(buf, format="PNG") base64_str = base64.b64encode(buf.getvalue()).decode() return "base64://" + base64_str + def pic2io(self) -> BytesIO: + """图片转 BytesIO + + 返回: + BytesIO: BytesIO + """ + return BytesIO(self.tobytes()) + def convert(self, type_: ModeType) -> Self: """ 修改图片类型 diff --git a/zhenxun/utils/enum.py b/zhenxun/utils/enum.py index 32270316..437c4f0d 100644 --- a/zhenxun/utils/enum.py +++ b/zhenxun/utils/enum.py @@ -10,6 +10,8 @@ class GoldHandle(StrEnum): """购买""" GET = "GET" """获取""" + PLUGIN = "PLUGIN" + """插件花费""" class PropHandle(StrEnum): @@ -40,7 +42,7 @@ class BlockType(StrEnum): 禁用状态 """ - FRIEND = "PRIVATE" + PRIVATE = "PRIVATE" GROUP = "GROUP" ALL = "ALL" @@ -72,6 +74,7 @@ class LimitWatchType(StrEnum): USER = "USER" GROUP = "GROUP" + ALL = "ALL" class RequestType(StrEnum): diff --git a/zhenxun/utils/exception.py b/zhenxun/utils/exception.py index 0998e776..4b1cbe17 100644 --- a/zhenxun/utils/exception.py +++ b/zhenxun/utils/exception.py @@ -1,14 +1,38 @@ class NotFoundError(Exception): + """ + 未发现 + """ + pass class GroupInfoNotFound(Exception): + """ + 群组未找到 + """ + pass class EmptyError(Exception): + """ + 空错误 + """ + pass class UserAndGroupIsNone(Exception): + """ + 用户和群组为空 + """ + + pass + + +class InsufficientGold(Exception): + """ + 金币不足 + """ + pass diff --git a/zhenxun/utils/rules.py b/zhenxun/utils/rules.py index f48c1106..0508f21f 100644 --- a/zhenxun/utils/rules.py +++ b/zhenxun/utils/rules.py @@ -27,7 +27,9 @@ def admin_check(a: int | str, key: str | None = None) -> Rule: if type(a) == str and key: level = Config.get_config(a, key) if level is not None: - return bool(LevelUser.check_level(session.id1, session.id2, int(level))) + return bool( + await LevelUser.check_level(session.id1, session.id2, int(level)) + ) return False return Rule(_rule) diff --git a/zhenxun/utils/utils.py b/zhenxun/utils/utils.py index 86e8c4e4..5fe35211 100644 --- a/zhenxun/utils/utils.py +++ b/zhenxun/utils/utils.py @@ -1,10 +1,12 @@ import os import time from collections import defaultdict +from datetime import datetime from pathlib import Path from typing import Any import httpx +import pytz from zhenxun.services.log import logger @@ -78,21 +80,31 @@ class ResourceDirManager: class CountLimiter: """ - 次数检测工具,检测调用次数是否超过设定值 + 每日调用命令次数限制 """ - def __init__(self, max_count: int): + tz = pytz.timezone("Asia/Shanghai") + + def __init__(self, max_num): + self.today = -1 self.count = defaultdict(int) - self.max_count = max_count + self.max = max_num - def add(self, key: Any): - self.count[key] += 1 + def check(self, key) -> bool: + day = datetime.now(self.tz).day + if day != self.today: + self.today = day + self.count.clear() + return bool(self.count[key] < self.max) - def check(self, key: Any) -> bool: - if self.count[key] >= self.max_count: - self.count[key] = 0 - return True - return False + def get_num(self, key): + return self.count[key] + + def increase(self, key, num=1): + self.count[key] += num + + def reset(self, key): + self.count[key] = 0 class UserBlockLimiter: