From 31e4cadecb8eb4ecd998c06c1602a4a121e97289 Mon Sep 17 00:00:00 2001 From: HibiKier <775757368@qq.com> Date: Sun, 4 Dec 2022 16:46:33 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- basic_plugins/help/data_source.py | 4 +- basic_plugins/help/utils.py | 27 +++- basic_plugins/init_plugin_config/__init__.py | 2 + .../init_none_plugin_count_manager.py | 2 +- .../init_plugin_config/init_plugin_info.py | 121 ++++++++++++++++++ .../init_plugin_config/init_plugins_config.py | 119 ++++++----------- .../init_plugin_config/init_plugins_data.py | 74 ++++------- .../init_plugin_config/init_plugins_limit.py | 62 +++------ .../init_plugins_resources.py | 18 +-- .../init_plugins_settings.py | 85 ++---------- configs/utils/__init__.py | 4 + utils/manager/__init__.py | 24 ++-- utils/manager/models.py | 66 ++++++++-- utils/manager/plugin_data_manager.py | 35 +++++ utils/manager/plugins2block_manager.py | 39 ++++-- utils/manager/plugins2cd_manager.py | 38 ++++-- utils/manager/plugins2count_manager.py | 27 +++- utils/manager/plugins2settings_manager.py | 61 ++++++--- 18 files changed, 483 insertions(+), 325 deletions(-) create mode 100644 basic_plugins/init_plugin_config/init_plugin_info.py create mode 100644 utils/manager/plugin_data_manager.py diff --git a/basic_plugins/help/data_source.py b/basic_plugins/help/data_source.py index 1d4c4fb0..d3a14077 100755 --- a/basic_plugins/help/data_source.py +++ b/basic_plugins/help/data_source.py @@ -77,10 +77,8 @@ async def _create_help_img( plugins2settings_manager.get_plugin_data(matcher.plugin_name).plugin_type ) else: - try: + if hasattr(_module, "__plugin_type__"): plugin_type = _module.__getattribute__("__plugin_type__") - except AttributeError: - pass if len(plugin_type) > 1: try: text_type = int(plugin_type[1]) diff --git a/basic_plugins/help/utils.py b/basic_plugins/help/utils.py index 989756f4..24e659d4 100644 --- a/basic_plugins/help/utils.py +++ b/basic_plugins/help/utils.py @@ -1,12 +1,28 @@ -from typing import List, Tuple +from typing import List, Tuple, Dict, Optional, Union from configs.path_config import IMAGE_PATH from utils.image_utils import BuildImage +from pydantic import BaseModel import os import random - background_path = IMAGE_PATH / "background" / "help" / "simple_help" +# class PluginType(BaseModel): +# +# +# class Plugin(BaseModel): +# name: str +# plugin_type: PluginType # 插件内部类型,根据name [Hidden] [Admin] [SUPERUSER] +# usage: Optional[str] +# des: Optional[str] +# cmd: Optional[List[str]] +# task: Optional[Dict[str, str]] +# type: Optional[Tuple[str, int]] # 菜单类型 +# version: Optional[Union[str, int]] +# author: Optional[str] + + + async def build_help_image(image_group: List[List[BuildImage]], h: int): bk = None @@ -103,3 +119,10 @@ def group_image(image_list: List[BuildImage]) -> Tuple[List[List[BuildImage]], i max_w -= max([x.w for x in image_group[-1]]) image_group.pop(-1) return image_group, max(max_h + 250, max_w + 70) + + +# class HelpImageBuild: +# +# def __init__(self): +# self._data: Dict[str, List[]] = {} + diff --git a/basic_plugins/init_plugin_config/__init__.py b/basic_plugins/init_plugin_config/__init__.py index c9909bcf..4e5a5b15 100755 --- a/basic_plugins/init_plugin_config/__init__.py +++ b/basic_plugins/init_plugin_config/__init__.py @@ -5,6 +5,7 @@ from .init_plugins_data import init_plugins_data, plugins_manager from .init_none_plugin_count_manager import init_none_plugin_count_manager from .init_plugins_resources import init_plugins_resources from .init_plugins_settings import init_plugins_settings +from .init_plugin_info import init_plugin_info from .init_plugins_limit import ( init_plugins_block_limit, init_plugins_count_limit, @@ -34,6 +35,7 @@ async def _(): config_file = DATA_PATH / "configs" / "plugins2config.yaml" _flag = not config_file.exists() init() + init_plugin_info() init_plugins_settings() init_plugins_cd_limit() init_plugins_block_limit() diff --git a/basic_plugins/init_plugin_config/init_none_plugin_count_manager.py b/basic_plugins/init_plugin_config/init_none_plugin_count_manager.py index ac40d928..29d6b3a9 100755 --- a/basic_plugins/init_plugin_config/init_none_plugin_count_manager.py +++ b/basic_plugins/init_plugin_config/init_none_plugin_count_manager.py @@ -48,7 +48,7 @@ def init_none_plugin_count_manager(): logger.info( f"{module}:{plugin_name} 插件疑似已删除," f"加载{none_plugin_count_manager._max_count}次失败后将清除对应插件数据," - f"当前次数:{none_plugin_count_manager._data[module]}" + f"当前次数:{none_plugin_count_manager.get(module)}" ) else: none_plugin_count_manager.reset(module) diff --git a/basic_plugins/init_plugin_config/init_plugin_info.py b/basic_plugins/init_plugin_config/init_plugin_info.py new file mode 100644 index 00000000..c0f35cd6 --- /dev/null +++ b/basic_plugins/init_plugin_config/init_plugin_info.py @@ -0,0 +1,121 @@ +from types import ModuleType +from typing import Any + +from services import logger +from utils.manager import ( + plugin_data_manager, + plugins2settings_manager, + plugins2cd_manager, + plugins2block_manager, + plugins2count_manager, +) +from configs.config import Config +from utils.manager.models import ( + PluginType, + PluginSetting, + PluginCd, + PluginData, + PluginBlock, + PluginCount, +) +from utils.utils import get_matchers + + +def get_attr(module: ModuleType, name: str, default: Any = None) -> Any: + """ + 说明: + 获取属性 + 参数: + :param module: module + :param name: name + :param default: default + """ + if hasattr(module, name): + return getattr(module, name) + return default + + +def init_plugin_info(): + + for matcher in [x for x in get_matchers(True) if x.plugin]: + try: + plugin = matcher.plugin + metadata = plugin.metadata + if hasattr(plugin, "module"): + module = plugin.module + plugin_model = matcher.plugin_name + plugin_name = ( + metadata.name + if metadata and metadata.name + else get_attr(module, "__zx_plugin_name__", matcher.plugin_name) + ) + if "[Admin]" in plugin_name: + plugin_type = PluginType.ADMIN + plugin_name = plugin_name.replace("[Admin]", "") + elif "[Hidden]" in plugin_name: + plugin_type = PluginType.HIDDEN + plugin_name = plugin_name.replace("[Hidden]", "") + elif "[Superuser]" in plugin_name: + plugin_type = PluginType.SUPERUSER + plugin_name = plugin_name.replace("[Superuser]", "") + else: + plugin_type = PluginType.NORMAL + plugin_usage = ( + metadata.usage + if metadata and metadata.usage + else get_attr(module, "__plugin_usage__") + ) + plugin_des = ( + metadata.description + if metadata and metadata.description + else get_attr(module, "__plugin_des__") + ) + menu_type = get_attr(module, "__plugin_type__") or ("normal",) + plugin_setting = get_attr(module, "__plugin_settings__") + if plugin_setting: + plugin_setting = PluginSetting(**plugin_setting) + plugin_setting.plugin_type = menu_type + plugin_task = get_attr(module, "__plugin_task__") + plugin_version = get_attr(module, "__plugin_version__") + plugin_author = get_attr(module, "__plugin_author__") + plugin_cd = get_attr(module, "__plugin_cd_limit__") + if plugin_cd: + plugin_cd = PluginCd(**plugin_cd) + plugin_block = get_attr(module, "__plugin_block_limit__") + if plugin_block: + plugin_block = PluginBlock(**plugin_block) + plugin_count = get_attr(module, "__plugin_count_limit__") + if plugin_count: + plugin_count = PluginCount(**plugin_count) + plugin_resources = get_attr(module, "__plugin_resources__") + plugin_configs = get_attr(module, "__plugin_configs__") + if settings := plugins2settings_manager.get(plugin_model): + plugin_setting = settings + if plugin_cd_limit := plugins2cd_manager.get(plugin_model): + plugin_cd = plugin_cd_limit + if plugin_block_limit := plugins2block_manager.get(plugin_model): + plugin_block = plugin_block_limit + if plugin_count_limit := plugins2count_manager.get(plugin_model): + plugin_count = plugin_count_limit + if plugin_cfg := Config.get(plugin_model): + plugin_configs = plugin_cfg + plugin_data = PluginData( + model=plugin_model, + name=plugin_name.strip(), + plugin_type=plugin_type, + usage=plugin_usage, + des=plugin_des, + task=plugin_task, + menu_type=menu_type, + version=plugin_version, + author=plugin_author, + plugin_setting=plugin_setting, + plugin_cd=plugin_cd, + plugin_block=plugin_block, + plugin_count=plugin_count, + plugin_resources=plugin_resources, + plugin_configs=plugin_configs, + ) + plugin_data_manager.add_plugin_info(plugin_data) + except Exception as e: + logger.error(f"构造插件数据失败 {matcher.plugin_name} - {type(e)}:{e}") diff --git a/basic_plugins/init_plugin_config/init_plugins_config.py b/basic_plugins/init_plugin_config/init_plugins_config.py index 0a5c8bb7..eafe39c0 100755 --- a/basic_plugins/init_plugin_config/init_plugins_config.py +++ b/basic_plugins/init_plugin_config/init_plugins_config.py @@ -1,13 +1,10 @@ -import asyncio -from datetime import datetime, timedelta from pathlib import Path from ruamel.yaml import round_trip_load, round_trip_dump, YAML -from utils.manager import admin_manager, plugins_manager +from utils.manager import admin_manager, plugins_manager, plugin_data_manager from configs.config import Config from services.log import logger from utils.text_utils import prompt2cn from utils.utils import get_matchers -from utils.utils import scheduler from configs.path_config import DATA_PATH from ruamel import yaml @@ -20,67 +17,50 @@ def init_plugins_config(): 初始化插件数据配置 """ plugins2config_file = DATA_PATH / "configs" / "plugins2config.yaml" - plugins2config_file.parent.mkdir(parents=True, exist_ok=True) - _data = {} - if plugins2config_file.exists(): - _data = _yaml.load(open(plugins2config_file, "r", encoding="utf8")) + _data = Config.get_data() # 优先使用 metadata 数据 - for matcher in [matcher for matcher in get_matchers(True) if matcher.plugin and matcher.plugin.module]: - _plugin = matcher.plugin - metadata = _plugin.metadata - _module = _plugin.module - plugin_version = None - if metadata: - plugin_version = metadata.extra.get("version") - if not plugin_version and hasattr(_module, "__plugin_version__"): - plugin_version = _module.__getattribute__("__plugin_version__") - if metadata and metadata.config: - plugin_configs = {} - for key, value in metadata.config.__fields__.items(): - plugin_configs[key.upper()] = { - "value": value.default, - "default_value": value.default - } - else: - try: - plugin_configs = _module.__getattribute__("__plugin_configs__") - except AttributeError: - continue - # 插件配置版本更新或为Version为None或不在存储配置内,当使用metadata时,必定更新 - if isinstance(plugin_version, str) or ( - plugin_version is None - or ( - _data.get(matcher.plugin_name) - and _data[matcher.plugin_name].keys() != plugin_configs.keys() - ) - or plugin_version > plugins_manager.get(matcher.plugin_name).version - or matcher.plugin_name not in _data.keys() - ): - for key in plugin_configs: - if isinstance(plugin_configs[key], dict): + for matcher in get_matchers(True): + if plugin_data := plugin_data_manager.get(matcher.plugin_name): + # 插件配置版本更新或为Version为None或不在存储配置内,当使用metadata时,必定更新 + if plugin_data.plugin_configs and ( + isinstance(plugin_data.version, str) + or ( + plugin_data.version is None + or ( + _data.get(matcher.plugin_name) + and _data[matcher.plugin_name].keys() + != plugin_data.plugin_configs.keys() + ) + or plugin_data.version + > plugins_manager.get(matcher.plugin_name).version + or matcher.plugin_name not in _data.keys() + ) + ): + plugin_configs = plugin_data.plugin_configs + for key in plugin_configs: + if isinstance(plugin_data.plugin_configs[key], dict): + Config.add_plugin_config( + matcher.plugin_name, + key, + plugin_configs[key].get("value"), + help_=plugin_configs[key].get("help"), + default_value=plugin_configs[key].get("default_value"), + _override=True, + ) + else: + Config.add_plugin_config( + matcher.plugin_name, key, plugin_configs[key] + ) + elif plugin_configs := _data.get(matcher.plugin_name): + for key in plugin_configs: Config.add_plugin_config( matcher.plugin_name, key, - plugin_configs[key].get("value"), - help_=plugin_configs[key].get("help"), - default_value=plugin_configs[key].get("default_value"), + plugin_configs[key]["value"], + help_=plugin_configs[key]["help"], + default_value=plugin_configs[key]["default_value"], _override=True, ) - else: - Config.add_plugin_config( - matcher.plugin_name, key, plugin_configs[key] - ) - else: - plugin_configs = _data[matcher.plugin_name] - for key in plugin_configs: - Config.add_plugin_config( - matcher.plugin_name, - key, - plugin_configs[key]["value"], - help_=plugin_configs[key]["help"], - default_value=plugin_configs[key]["default_value"], - _override=True, - ) if not Config.is_empty(): Config.save() _data = round_trip_load(open(plugins2config_file, encoding="utf8")) @@ -92,26 +72,13 @@ def init_plugins_config(): _data[plugin].yaml_set_start_comment(plugin_name, indent=2) # 初始化未设置的管理员权限等级 for k, v in Config.get_admin_level_data(): - # try: admin_manager.set_admin_level(k, v) - # except KeyError as e: - # raise KeyError(f"{e} ****** 请检查是否有插件加载失败 ******") # 存完插件基本设置 with open(plugins2config_file, "w", encoding="utf8") as wf: round_trip_dump( _data, wf, indent=2, Dumper=yaml.RoundTripDumper, allow_unicode=True ) - user_config_file = Path() / "configs" / "config.yaml" - # if not user_config_file.exists(): _replace_config() - # else: - # logger.info('五分钟后将进行配置数据替换,请注意...') - # scheduler.add_job( - # _replace_config, - # "date", - # run_date=datetime.now() + timedelta(minutes=5), - # id=f"_replace_config" - # ) def _replace_config(): @@ -151,9 +118,7 @@ def _replace_config(): temp_file = Path() / "configs" / "temp_config.yaml" try: with open(temp_file, "w", encoding="utf8") as wf: - yaml.dump( - _tmp_data, wf, Dumper=yaml.RoundTripDumper, allow_unicode=True - ) + yaml.dump(_tmp_data, wf, Dumper=yaml.RoundTripDumper, allow_unicode=True) with open(temp_file, "r", encoding="utf8") as rf: _data = round_trip_load(rf) # 添加注释 @@ -187,9 +152,7 @@ def _replace_config(): rst += f'{k}: {Config[plugin][k]["help"]}' + "\n" _data[plugin].yaml_set_start_comment(rst[:-1], indent=2) with open(Path() / "configs" / "config.yaml", "w", encoding="utf8") as wf: - round_trip_dump( - _data, wf, Dumper=yaml.RoundTripDumper, allow_unicode=True - ) + round_trip_dump(_data, wf, Dumper=yaml.RoundTripDumper, allow_unicode=True) except Exception as e: logger.error(f"生成简易配置注释错误 {type(e)}:{e}") if temp_file.exists(): diff --git a/basic_plugins/init_plugin_config/init_plugins_data.py b/basic_plugins/init_plugin_config/init_plugins_data.py index 37614bc2..e6244365 100755 --- a/basic_plugins/init_plugin_config/init_plugins_data.py +++ b/basic_plugins/init_plugin_config/init_plugins_data.py @@ -1,9 +1,8 @@ from ruamel.yaml import YAML -from utils.manager import plugins_manager +from utils.manager import plugins_manager, plugin_data_manager from utils.utils import get_matchers from services.log import logger -from configs.path_config import DATA_PATH try: import ujson as json @@ -18,13 +17,10 @@ def init_plugins_data(): """ 初始化插件数据信息 """ - plugin2data_file = DATA_PATH / "manager" / "plugin_manager.json" - plugin2data_file.parent.mkdir(parents=True, exist_ok=True) for matcher in get_matchers(True): _plugin = matcher.plugin if not _plugin: continue - metadata = _plugin.metadata try: _module = _plugin.module except AttributeError: @@ -35,49 +31,27 @@ def init_plugins_data(): else: plugins_manager[matcher.plugin_name].error = True else: - try: - plugin_version = None - if metadata: - plugin_version = metadata.extra.get("version") - if not plugin_version and hasattr(_module, "__plugin_version__"): - plugin_version = _module.__getattribute__("__plugin_version__") - if metadata: - plugin_name = metadata.name - else: - try: - plugin_name = _module.__getattribute__("__zx_plugin_name__") - except AttributeError: - plugin_name = matcher.plugin_name - plugin_author = None - if metadata: - plugin_author = metadata.extra.get('author') - if not plugin_author and hasattr(_module, "__plugin_author__"): - plugin_author = _module.__getattribute__("__plugin_author__") - if matcher.plugin_name in plugins_manager.keys(): - plugins_manager[matcher.plugin_name].error = False - if matcher.plugin_name not in plugins_manager.keys(): - plugins_manager.add_plugin_data( - matcher.plugin_name, - plugin_name=plugin_name, - author=plugin_author, - version=plugin_version, - ) - # metadata不检测version - elif isinstance(plugin_version, str) or plugins_manager[matcher.plugin_name].version is None or ( - plugin_version is not None - and plugin_version > float(plugins_manager[matcher.plugin_name].version) - ): - plugins_manager[matcher.plugin_name].plugin_name = plugin_name - plugins_manager[matcher.plugin_name].author = plugin_author - plugins_manager[matcher.plugin_name].version = plugin_version - # if matcher.plugin_name in _data.keys(): - # plugins_manager[matcher.plugin_name].error = _data[matcher.plugin_name]["error"] - # plugins_manager.set_module_data( - # matcher.plugin_name, "error", _data[matcher.plugin_name]["error"] - # ) - # plugins_manager.set_module_data( - # matcher.plugin_name, "plugin_name", _data[matcher.plugin_name]["plugin_name"] - # ) - except Exception as e: - logger.error(f"插件数据 {matcher.plugin_name} 加载发生错误 {type(e)}:{e}") + if plugin_data := plugin_data_manager.get(matcher.plugin_name): + try: + plugin_version = plugin_data.version + plugin_name = plugin_data.name + plugin_author = plugin_data.author + if matcher.plugin_name in plugins_manager.keys(): + plugins_manager[matcher.plugin_name].error = False + if matcher.plugin_name not in plugins_manager.keys(): + plugins_manager.add_plugin_data( + matcher.plugin_name, + plugin_name=plugin_name, + author=plugin_author, + version=plugin_version, + ) + elif isinstance(plugin_version, str) or plugins_manager[matcher.plugin_name].version is None or ( + plugin_version is not None + and plugin_version > float(plugins_manager[matcher.plugin_name].version) + ): + plugins_manager[matcher.plugin_name].plugin_name = plugin_name + plugins_manager[matcher.plugin_name].author = plugin_author + plugins_manager[matcher.plugin_name].version = plugin_version + except Exception as e: + logger.error(f"插件数据 {matcher.plugin_name} 加载发生错误 {type(e)}:{e}") plugins_manager.save() diff --git a/basic_plugins/init_plugin_config/init_plugins_limit.py b/basic_plugins/init_plugin_config/init_plugins_limit.py index a99d28a6..2de6c010 100755 --- a/basic_plugins/init_plugin_config/init_plugins_limit.py +++ b/basic_plugins/init_plugin_config/init_plugins_limit.py @@ -2,10 +2,10 @@ from utils.manager import ( plugins2cd_manager, plugins2block_manager, plugins2count_manager, + plugin_data_manager, ) from utils.utils import get_matchers from configs.path_config import DATA_PATH -import nonebot def init_plugins_cd_limit(): @@ -14,22 +14,16 @@ def init_plugins_cd_limit(): """ plugins2cd_file = DATA_PATH / "configs" / "plugins2cd.yaml" plugins2cd_file.parent.mkdir(exist_ok=True, parents=True) - _data = {} for matcher in get_matchers(True): - if not plugins2cd_manager.get_plugin_cd_data(matcher.plugin_name): - _plugin = nonebot.plugin.get_plugin(matcher.plugin_name) - try: - _module = _plugin.module - plugin_cd_limit = _module.__getattribute__("__plugin_cd_limit__") + if not plugins2cd_manager.get_plugin_cd_data(matcher.plugin_name) and ( + plugin_data := plugin_data_manager.get(matcher.plugin_name) + ): + if plugin_data.plugin_cd: plugins2cd_manager.add_cd_limit( - matcher.plugin_name, **plugin_cd_limit + matcher.plugin_name, plugin_data.plugin_cd ) - except AttributeError: - pass if not plugins2cd_manager.keys(): - plugins2cd_manager.add_cd_limit( - "这是一个示例" - ) + plugins2cd_manager.add_cd_limit("这是一个示例") plugins2cd_manager.save() plugins2cd_manager.reload_cd_limit() @@ -38,23 +32,16 @@ def init_plugins_block_limit(): """ 加载阻塞限制 """ - plugins2block_file = DATA_PATH / "configs" / "plugins2block.yaml" - plugins2block_file.parent.mkdir(exist_ok=True, parents=True) for matcher in get_matchers(True): - if not plugins2block_manager.get_plugin_block_data(matcher.plugin_name): - _plugin = matcher.plugin - try: - _module = _plugin.module - plugin_block_limit = _module.__getattribute__("__plugin_block_limit__") + if not plugins2block_manager.get_plugin_block_data(matcher.plugin_name) and ( + plugin_data := plugin_data_manager.get(matcher.plugin_name) + ): + if plugin_data.plugin_block: plugins2block_manager.add_block_limit( - matcher.plugin_name, **plugin_block_limit + matcher.plugin_name, plugin_data.plugin_block ) - except AttributeError: - pass if not plugins2block_manager.keys(): - plugins2block_manager.add_block_limit( - "这是一个示例" - ) + plugins2block_manager.add_block_limit("这是一个示例") plugins2block_manager.save() plugins2block_manager.reload_block_limit() @@ -63,24 +50,15 @@ def init_plugins_count_limit(): """ 加载次数限制 """ - plugins2count_file = DATA_PATH / "configs" / "plugins2count.yaml" - plugins2count_file.parent.mkdir(exist_ok=True, parents=True) - _data = {} - _matchers = get_matchers() - for matcher in _matchers: - if not plugins2count_manager.get_plugin_count_data(matcher.plugin_name): - _plugin = nonebot.plugin.get_plugin(matcher.plugin_name) - try: - _module = _plugin.module - plugin_count_limit = _module.__getattribute__("__plugin_count_limit__") + for matcher in get_matchers(True): + if not plugins2count_manager.get_plugin_count_data(matcher.plugin_name) and ( + plugin_data := plugin_data_manager.get(matcher.plugin_name) + ): + if plugin_data.plugin_count: plugins2count_manager.add_count_limit( - matcher.plugin_name, **plugin_count_limit + matcher.plugin_name, plugin_data.plugin_count ) - except AttributeError: - pass if not plugins2count_manager.keys(): - plugins2count_manager.add_count_limit( - "这是一个示例" - ) + plugins2count_manager.add_count_limit("这是一个示例") plugins2count_manager.save() plugins2count_manager.reload_count_limit() diff --git a/basic_plugins/init_plugin_config/init_plugins_resources.py b/basic_plugins/init_plugin_config/init_plugins_resources.py index b69bb59b..936d5038 100755 --- a/basic_plugins/init_plugin_config/init_plugins_resources.py +++ b/basic_plugins/init_plugin_config/init_plugins_resources.py @@ -1,29 +1,21 @@ -from utils.manager import resources_manager +from utils.manager import resources_manager, plugin_data_manager from utils.utils import get_matchers from services.log import logger from pathlib import Path -import nonebot def init_plugins_resources(): """ 资源文件路径的移动 """ - _tmp = [] - for matcher in get_matchers(): - if matcher.plugin_name not in _tmp: - _tmp.append(matcher.plugin_name) - _plugin = nonebot.plugin.get_plugin(matcher.plugin_name) + for matcher in get_matchers(True): + if plugin_data := plugin_data_manager.get(matcher.plugin_name): try: - _module = _plugin.module + _module = matcher.plugin.module except AttributeError: logger.warning(f"插件 {matcher.plugin_name} 加载失败...,资源控制未加载...") else: - try: - resources = _module.__getattribute__("__plugin_resources__") - except AttributeError: - pass - else: + if resources := plugin_data.plugin_resources: path = Path(_module.__getattribute__("__file__")).parent for resource in resources.keys(): resources_manager.add_resource( diff --git a/basic_plugins/init_plugin_config/init_plugins_settings.py b/basic_plugins/init_plugin_config/init_plugins_settings.py index 14b0e46b..aecb185b 100755 --- a/basic_plugins/init_plugin_config/init_plugins_settings.py +++ b/basic_plugins/init_plugin_config/init_plugins_settings.py @@ -1,5 +1,6 @@ -from utils.manager import plugins2settings_manager, admin_manager +from utils.manager import plugins2settings_manager, admin_manager, plugin_data_manager from services.log import logger +from utils.manager.models import PluginType from utils.utils import get_matchers import nonebot @@ -8,93 +9,33 @@ def init_plugins_settings(): """ 初始化插件设置,从插件中获取 __zx_plugin_name__,__plugin_cmd__,__plugin_settings__ """ - _tmp_module = {} for x in plugins2settings_manager.keys(): try: _plugin = nonebot.plugin.get_plugin(x) _module = _plugin.module - metadata = _plugin.metadata - plugin_name = ( - metadata.name - if metadata - else _module.__getattribute__("__zx_plugin_name__") - ) - _tmp_module[x] = plugin_name + _module.__getattribute__("__zx_plugin_name__") except (KeyError, AttributeError) as e: logger.warning(f"配置文件 模块:{x} 获取 plugin_name 失败...{e}") - _tmp_module[x] = "" - for matcher in [x for x in get_matchers(True) if x.plugin]: + for matcher in get_matchers(True): try: if matcher.plugin_name not in plugins2settings_manager.keys(): _plugin = matcher.plugin - metadata = _plugin.metadata try: _module = _plugin.module except AttributeError: logger.warning(f"插件 {matcher.plugin_name} 加载失败...,插件控制未加载.") else: - try: - plugin_name = metadata.name if metadata else _module.__getattribute__("__zx_plugin_name__") - # 管理员命令 - if "[admin]" in plugin_name.lower(): - level = 5 - cmd = None - if hasattr(_module, "__plugin_settings__"): - admin_settings = _module.__getattribute__( - "__plugin_settings__" + if plugin_data := plugin_data_manager.get(matcher.plugin_name): + if plugin_settings := plugin_data.plugin_setting: + # 管理员命令 + if plugin_data.plugin_type == PluginType.ADMIN: + admin_manager.add_admin_plugin_settings( + matcher.plugin_name, plugin_settings.cmd, plugin_settings.level ) - level = admin_settings.get("admin_level", 5) - cmd = admin_settings.get("cmd") - admin_manager.add_admin_plugin_settings( - matcher.plugin_name, cmd, level - ) - if ( - "[hidden]" in plugin_name.lower() - or "[admin]" in plugin_name.lower() - or "[superuser]" in plugin_name.lower() - or matcher.plugin_name in plugins2settings_manager.keys() - ): - continue - except AttributeError: - logger.warning( - f"获取插件 {matcher.plugin_name} __zx_plugin_name__ 失败...,插件控制未加载." - ) - else: - _tmp_module[matcher.plugin_name] = plugin_name - if hasattr(_module, "__plugin_settings__"): - plugin_settings = _module.__getattribute__( - "__plugin_settings__" - ) - else: - plugin_settings = {"cmd": [matcher.plugin_name, plugin_name]} - if plugin_settings.get("cost_gold") is None: - plugin_settings["cost_gold"] = 0 - if ( - plugin_settings.get("cmd") is not None - and plugin_name not in plugin_settings["cmd"] - ): - plugin_settings["cmd"].append(plugin_name) - if plugins2settings_manager.get( - matcher.plugin_name - ) and plugins2settings_manager[matcher.plugin_name].get( - "plugin_type" - ): - plugin_type = tuple( - plugins2settings_manager.get_plugin_data( - matcher.plugin_name - ).plugin_type - ) - else: - if hasattr(_module, "__plugin_type__"): - plugin_type = _module.__getattribute__("__plugin_type__") else: - plugin_type = ("normal",) - if plugin_settings and matcher.plugin_name: - plugins2settings_manager.add_plugin_settings( - matcher.plugin_name, - plugin_type=plugin_type, - **plugin_settings, - ) + plugins2settings_manager.add_plugin_settings( + matcher.plugin_name, plugin_settings + ) except Exception as e: logger.error(f'{matcher.plugin_name} 初始化 plugin_settings 发生错误 {type(e)}:{e}') plugins2settings_manager.save() diff --git a/configs/utils/__init__.py b/configs/utils/__init__.py index a91c8de5..10de0138 100644 --- a/configs/utils/__init__.py +++ b/configs/utils/__init__.py @@ -1,3 +1,4 @@ +import copy from typing import Optional, Any, Union from pathlib import Path from ruamel.yaml import YAML @@ -216,6 +217,9 @@ class ConfigsManager: """ return self._admin_level_data + def get_data(self): + return copy.deepcopy(self._data) + def is_empty(self) -> bool: return not bool(self._data) diff --git a/utils/manager/__init__.py b/utils/manager/__init__.py index 30ec973c..a3dc5ab0 100755 --- a/utils/manager/__init__.py +++ b/utils/manager/__init__.py @@ -1,6 +1,7 @@ from typing import Optional from .group_manager import GroupManager from .data_class import StaticData +from .plugin_data_manager import PluginDataManager from .withdraw_message_manager import WithdrawMessageManager from .plugins2cd_manager import Plugins2cdManager from .plugins2block_manager import Plugins2blockManager @@ -18,50 +19,53 @@ from configs.path_config import DATA_PATH admin_manager = AdminManager() # 群功能开关 | 群被动技能 | 群权限 管理 -group_manager: Optional[GroupManager] = GroupManager( +group_manager: GroupManager = GroupManager( DATA_PATH / "manager" / "group_manager.json" ) # 撤回消息管理 -withdraw_message_manager: Optional[WithdrawMessageManager] = WithdrawMessageManager() +withdraw_message_manager: WithdrawMessageManager = WithdrawMessageManager() # 插件管理 -plugins_manager: Optional[PluginsManager] = PluginsManager( +plugins_manager: PluginsManager = PluginsManager( DATA_PATH / "manager" / "plugins_manager.json" ) # 插件基本设置管理 -plugins2settings_manager: Optional[Plugins2settingsManager] = Plugins2settingsManager( +plugins2settings_manager: Plugins2settingsManager = Plugins2settingsManager( DATA_PATH / "configs" / "plugins2settings.yaml" ) # 插件命令 cd 管理 -plugins2cd_manager: Optional[Plugins2cdManager] = Plugins2cdManager( +plugins2cd_manager: Plugins2cdManager = Plugins2cdManager( DATA_PATH / "configs" / "plugins2cd.yaml" ) # 插件命令 阻塞 管理 -plugins2block_manager: Optional[Plugins2blockManager] = Plugins2blockManager( +plugins2block_manager: Plugins2blockManager = Plugins2blockManager( DATA_PATH / "configs" / "plugins2block.yaml" ) # 插件命令 每次次数限制 管理 -plugins2count_manager: Optional[Plugins2countManager] = Plugins2countManager( +plugins2count_manager: Plugins2countManager = Plugins2countManager( DATA_PATH / "configs" / "plugins2count.yaml" ) # 资源管理 -resources_manager: Optional[ResourcesManager] = ResourcesManager( +resources_manager: ResourcesManager = ResourcesManager( DATA_PATH / "manager" / "resources_manager.json" ) # 插件加载容忍管理 -none_plugin_count_manager: Optional[NonePluginCountManager] = NonePluginCountManager( +none_plugin_count_manager: NonePluginCountManager = NonePluginCountManager( DATA_PATH / "manager" / "none_plugin_count_manager.json" ) # 好友请求/群聊邀请 管理 -requests_manager: Optional[RequestManager] = RequestManager( +requests_manager: RequestManager = RequestManager( DATA_PATH / "manager" / "requests_manager.json" ) +# 全局插件数据 +plugin_data_manager: PluginDataManager = PluginDataManager() + diff --git a/utils/manager/models.py b/utils/manager/models.py index f2bac6a0..16655e4a 100644 --- a/utils/manager/models.py +++ b/utils/manager/models.py @@ -1,6 +1,8 @@ -from typing import List, Optional, Dict, Literal, Tuple, Union +from pathlib import Path +from typing import List, Optional, Dict, Literal, Tuple, Union, Any from pydantic import BaseModel from configs.config import Config +from enum import Enum class AdminSetting(BaseModel): @@ -8,7 +10,7 @@ class AdminSetting(BaseModel): 管理员设置 """ - level: int + level: int = 5 cmd: Optional[List[str]] @@ -40,9 +42,9 @@ class PluginBlock(BaseModel): 插件阻断 """ - status: bool # 限制状态 - check_type: Literal["private", "group", "all"] # 检查类型 - limit_type: Literal["user", "group"] # 监听对象 + status: bool = True # 限制状态 + check_type: Literal["private", "group", "all"] = "all" # 检查类型 + limit_type: Literal["user", "group"] = "user" # 监听对象 rst: Optional[str] # 阻断时回复 @@ -51,10 +53,10 @@ class PluginCd(BaseModel): 插件阻断 """ - cd: int # cd - status: bool # 限制状态 - check_type: Literal["private", "group", "all"] # 检查类型 - limit_type: Literal["user", "group"] # 监听对象 + cd: int = 5 # cd + status: bool = True # 限制状态 + check_type: Literal["private", "group", "all"] = "all" # 检查类型 + limit_type: Literal["user", "group"] = "user" # 监听对象 rst: Optional[str] # 阻断时回复 @@ -64,8 +66,8 @@ class PluginCount(BaseModel): """ max_count: int # 次数 - status: bool # 限制状态 - limit_type: Literal["user", "group"] # 监听对象 + status: bool = True # 限制状态 + limit_type: Literal["user", "group"] = "user" # 监听对象 rst: Optional[str] # 阻断时回复 @@ -93,3 +95,45 @@ class Plugin(BaseModel): block_type: Optional[str] = None # 关闭类型 author: Optional[str] = None # 作者 version: Optional[Union[int, str]] = None # 版本 + + +class PluginType(Enum): + """ + 插件类型 + """ + + NORMAL = "normal" + ADMIN = "admin" + HIDDEN = "hidden" + SUPERUSER = "superuser" + + +class PluginData(BaseModel): + model: str + name: str + plugin_type: PluginType # 插件内部类型,根据name [Hidden] [Admin] [SUPERUSER] + usage: Optional[str] + des: Optional[str] + task: Optional[Dict[str, str]] + menu_type: Tuple[Union[str, int], ...] = ("normal",) # 菜单类型 + version: Optional[Union[str, int]] + author: Optional[str] + plugin_setting: Optional[PluginSetting] + plugin_cd: Optional[PluginCd] + plugin_block: Optional[PluginBlock] + plugin_count: Optional[PluginCount] + plugin_resources: Optional[Dict[str, Union[str, Path]]] + plugin_configs: Optional[Dict[str, Dict[str, Any]]] + + class Config: + arbitrary_types_allowed = True + + def __eq__(self, other: "PluginData"): + return ( + isinstance(other, PluginData) + and self.name == other.name + and self.menu_type == other.menu_type + ) + + def __hash__(self): + return hash(self.name + self.menu_type[0]) diff --git a/utils/manager/plugin_data_manager.py b/utils/manager/plugin_data_manager.py new file mode 100644 index 00000000..989f5ed8 --- /dev/null +++ b/utils/manager/plugin_data_manager.py @@ -0,0 +1,35 @@ +from typing import Dict, Any + +from .models import PluginData + + +class PluginDataManager: + """ + 插件所有信息管理 + """ + + def __init__(self): + self._data: Dict[str, PluginData] = {} + + def add_plugin_info(self, info: PluginData): + """ + 说明: + 添加插件信息 + 参数: + :param info: PluginInfo + """ + if info.model in self._data.keys() and self._data[info.model] == info: + raise ValueError(f"PluginInfoManager {info.model}:{info.name} 插件名称及类型已存在") + self._data[info.model] = info + + def get(self, item: str, default: Any = None) -> PluginData: + return self._data.get(item, default) + + def __getitem__(self, item) -> PluginData: + return self._data.get(item) + + def __str__(self) -> str: + return str(self._data) + + + diff --git a/utils/manager/plugins2block_manager.py b/utils/manager/plugins2block_manager.py index 99962393..7312580f 100644 --- a/utils/manager/plugins2block_manager.py +++ b/utils/manager/plugins2block_manager.py @@ -1,4 +1,4 @@ -from typing import Optional, Dict, Literal, Union +from typing import Optional, Dict, Literal, Union, overload from utils.manager.data_class import StaticData from services.log import logger from utils.utils import UserBlockLimiter @@ -19,11 +19,25 @@ class Plugins2blockManager(StaticData): self._block_limiter: Dict[str, UserBlockLimiter] = {} self.__load_file() + @overload + def add_block_limit(self, plugin: str, plugin_block: PluginBlock): + ... + + @overload def add_block_limit( self, plugin: str, - *, - status: Optional[bool] = True, + status: bool = True, + check_type: Literal["private", "group", "all"] = "all", + limit_type: Literal["user", "group"] = "user", + rst: Optional[str] = None, + ): + ... + + def add_block_limit( + self, + plugin: str, + status: Union[bool, PluginBlock] = True, check_type: Literal["private", "group", "all"] = "all", limit_type: Literal["user", "group"] = "user", rst: Optional[str] = None, @@ -38,15 +52,18 @@ class Plugins2blockManager(StaticData): :param limit_type: 限制类型 监听对象,以user_id或group_id作为键来限制,'user':用户id,'group':群id :param rst: 回复的话,为空则不回复 """ - if check_type not in ["all", "group", "private"]: - raise ValueError( - f"{plugin} 添加block限制错误,‘check_type‘ 必须为 'private'/'group'/'all'" + if isinstance(status, PluginBlock): + self._data[plugin] = status + else: + if check_type not in ["all", "group", "private"]: + raise ValueError( + f"{plugin} 添加block限制错误,‘check_type‘ 必须为 'private'/'group'/'all'" + ) + if limit_type not in ["user", "group"]: + raise ValueError(f"{plugin} 添加block限制错误,‘limit_type‘ 必须为 'user'/'group'") + self._data[plugin] = PluginBlock( + status=status, check_type=check_type, limit_type=limit_type, rst=rst ) - if limit_type not in ["user", "group"]: - raise ValueError(f"{plugin} 添加block限制错误,‘limit_type‘ 必须为 'user'/'group'") - self._data[plugin] = PluginBlock( - status=status, check_type=check_type, limit_type=limit_type, rst=rst - ) def get_plugin_block_data(self, plugin: str) -> Optional[PluginBlock]: """ diff --git a/utils/manager/plugins2cd_manager.py b/utils/manager/plugins2cd_manager.py index 159b3ab5..b819939b 100644 --- a/utils/manager/plugins2cd_manager.py +++ b/utils/manager/plugins2cd_manager.py @@ -1,4 +1,4 @@ -from typing import Optional, Dict, Literal, Union +from typing import Optional, Dict, Literal, Union, overload from utils.manager.data_class import StaticData from utils.utils import FreqLimiter from services.log import logger @@ -19,11 +19,26 @@ class Plugins2cdManager(StaticData): self._freq_limiter: Dict[str, FreqLimiter] = {} self.__load_file() + @overload + def add_cd_limit(self, plugin: str, plugin_cd: PluginCd): + ... + + @overload def add_cd_limit( self, plugin: str, - *, - cd: Optional[int] = 5, + cd: Union[int, PluginCd] = 5, + status: Optional[bool] = True, + check_type: Literal["private", "group", "all"] = "all", + limit_type: Literal["user", "group"] = "user", + rst: Optional[str] = None, + ): + ... + + def add_cd_limit( + self, + plugin: str, + cd: Union[int, PluginCd] = 5, status: Optional[bool] = True, check_type: Literal["private", "group", "all"] = "all", limit_type: Literal["user", "group"] = "user", @@ -40,13 +55,16 @@ class Plugins2cdManager(StaticData): :param limit_type: 限制类型 监听对象,以user_id或group_id作为键来限制,'user':用户id,'group':群id :param rst: 回复的话,为空则不回复 """ - if check_type not in ["all", "group", "private"]: - raise ValueError( - f"{plugin} 添加cd限制错误,‘check_type‘ 必须为 'private'/'group'/'all'" - ) - if limit_type not in ["user", "group"]: - raise ValueError(f"{plugin} 添加cd限制错误,‘limit_type‘ 必须为 'user'/'group'") - self._data[plugin] = PluginCd(cd=cd, status=status, check_type=check_type, limit_type=limit_type, rst=rst) + if isinstance(cd, PluginCd): + self._data[plugin] = cd + else: + if check_type not in ["all", "group", "private"]: + raise ValueError( + f"{plugin} 添加cd限制错误,‘check_type‘ 必须为 'private'/'group'/'all'" + ) + if limit_type not in ["user", "group"]: + raise ValueError(f"{plugin} 添加cd限制错误,‘limit_type‘ 必须为 'user'/'group'") + self._data[plugin] = PluginCd(cd=cd, status=status, check_type=check_type, limit_type=limit_type, rst=rst) def get_plugin_cd_data(self, plugin: str) -> Optional[PluginCd]: """ diff --git a/utils/manager/plugins2count_manager.py b/utils/manager/plugins2count_manager.py index b7fd89c0..9b593f1e 100644 --- a/utils/manager/plugins2count_manager.py +++ b/utils/manager/plugins2count_manager.py @@ -1,4 +1,4 @@ -from typing import Optional, Dict, Literal, Union +from typing import Optional, Dict, Literal, Union, overload from utils.manager.data_class import StaticData from utils.utils import DailyNumberLimiter from services.log import logger @@ -19,14 +19,28 @@ class Plugins2countManager(StaticData): self._daily_limiter: Dict[str, DailyNumberLimiter] = {} self.__load_file() + @overload + def add_count_limit(self, plugin: str, plugin_count: PluginCount): + ... + + @overload def add_count_limit( self, plugin: str, - *, max_count: int = 5, status: Optional[bool] = True, limit_type: Literal["user", "group"] = "user", rst: Optional[str] = None, + ): + ... + + def add_count_limit( + self, + plugin: str, + max_count: Union[int, PluginCount] = 5, + status: Optional[bool] = True, + limit_type: Literal["user", "group"] = "user", + rst: Optional[str] = None, ): """ 说明: @@ -38,9 +52,12 @@ class Plugins2countManager(StaticData): :param limit_type: 限制类型 监听对象,以user_id或group_id作为键来限制,'user':用户id,'group':群id :param rst: 回复的话,为空则不回复 """ - if limit_type not in ["user", "group"]: - raise ValueError(f"{plugin} 添加count限制错误,‘limit_type‘ 必须为 'user'/'group'") - self._data[plugin] = PluginCount(max_count=max_count, status=status, limit_type=limit_type, rst=rst) + if isinstance(max_count, PluginCount): + self._data[plugin] = max_count + else: + if limit_type not in ["user", "group"]: + raise ValueError(f"{plugin} 添加count限制错误,‘limit_type‘ 必须为 'user'/'group'") + self._data[plugin] = PluginCount(max_count=max_count, status=status, limit_type=limit_type, rst=rst) def get_plugin_count_data(self, plugin: str) -> Optional[PluginCount]: """ diff --git a/utils/manager/plugins2settings_manager.py b/utils/manager/plugins2settings_manager.py index 291e945e..28443eba 100644 --- a/utils/manager/plugins2settings_manager.py +++ b/utils/manager/plugins2settings_manager.py @@ -1,9 +1,8 @@ -from typing import List, Optional, Union, Tuple, Dict +from typing import List, Optional, Union, Tuple, Dict, overload from utils.manager.data_class import StaticData from pathlib import Path from ruamel import yaml -from .models import PluginSetting - +from .models import PluginSetting, PluginType _yaml = yaml.YAML(typ="safe") @@ -17,10 +16,27 @@ class Plugins2settingsManager(StaticData): super().__init__(file, False) self.__load_file() + @overload + def add_plugin_settings(self, plugin: str, plugin_settings: PluginSetting): + ... + + @overload def add_plugin_settings( self, plugin: str, - cmd: Optional[List[str]] = None, + cmd: List[str] = None, + default_status: bool = True, + level: int = 5, + limit_superuser: bool = False, + plugin_type: Tuple[Union[str, int]] = ("normal",), + cost_gold: int = 0, + ): + ... + + def add_plugin_settings( + self, + plugin: str, + cmd: Union[List[str], PluginSetting] = None, default_status: bool = True, level: int = 5, limit_superuser: bool = False, @@ -39,14 +55,17 @@ class Plugins2settingsManager(StaticData): :param plugin_type: 插件类型 :param cost_gold: 需要消费的金币 """ - self._data[plugin] = PluginSetting( - cmd=cmd, - level=level, - default_status=default_status, - limit_superuser=limit_superuser, - plugin_type=plugin_type, - cost_gold=cost_gold, - ) + if isinstance(cmd, PluginSetting): + self._data[plugin] = cmd + else: + self._data[plugin] = PluginSetting( + cmd=cmd, + level=level, + default_status=default_status, + limit_superuser=limit_superuser, + plugin_type=plugin_type, + cost_gold=cost_gold, + ) def get_plugin_data(self, module: str) -> Optional[PluginSetting]: """ @@ -95,8 +114,16 @@ class Plugins2settingsManager(StaticData): path = Path(path) if path: with open(path, "w", encoding="utf8") as f: + self_dict = self.dict() + for key in self_dict.keys(): + if self_dict[key].get("plugin_type") and isinstance( + self_dict[key].get("plugin_type"), PluginType + ): + self_dict[key]["plugin_type"] = self_dict[key][ + "plugin_type" + ].value yaml.dump( - {"PluginSettings": self.dict()}, + {"PluginSettings": self_dict}, f, indent=2, Dumper=yaml.RoundTripDumper, @@ -127,7 +154,7 @@ class Plugins2settingsManager(StaticData): self._data: Dict[str, PluginSetting] = {} if self.file.exists(): with open(self.file, "r", encoding="utf8") as f: - temp = _yaml.load(f) - if "PluginSettings" in temp.keys(): - for k, v in temp["PluginSettings"].items(): - self._data[k] = PluginSetting.parse_obj(v) + if temp := _yaml.load(f): + if "PluginSettings" in temp.keys(): + for k, v in temp["PluginSettings"].items(): + self._data[k] = PluginSetting.parse_obj(v)