优化代码

This commit is contained in:
HibiKier 2022-12-04 16:46:33 +08:00
parent 994484f3b0
commit 31e4cadecb
18 changed files with 483 additions and 325 deletions

View File

@ -77,10 +77,8 @@ async def _create_help_img(
plugins2settings_manager.get_plugin_data(matcher.plugin_name).plugin_type
)
else:
try:
if hasattr(_module, "__plugin_type__"):
plugin_type = _module.__getattribute__("__plugin_type__")
except AttributeError:
pass
if len(plugin_type) > 1:
try:
text_type = int(plugin_type[1])

View File

@ -1,12 +1,28 @@
from typing import List, Tuple
from typing import List, Tuple, Dict, Optional, Union
from configs.path_config import IMAGE_PATH
from utils.image_utils import BuildImage
from pydantic import BaseModel
import os
import random
background_path = IMAGE_PATH / "background" / "help" / "simple_help"
# class PluginType(BaseModel):
#
#
# class Plugin(BaseModel):
# name: str
# plugin_type: PluginType # 插件内部类型根据name [Hidden] [Admin] [SUPERUSER]
# usage: Optional[str]
# des: Optional[str]
# cmd: Optional[List[str]]
# task: Optional[Dict[str, str]]
# type: Optional[Tuple[str, int]] # 菜单类型
# version: Optional[Union[str, int]]
# author: Optional[str]
async def build_help_image(image_group: List[List[BuildImage]], h: int):
bk = None
@ -103,3 +119,10 @@ def group_image(image_list: List[BuildImage]) -> Tuple[List[List[BuildImage]], i
max_w -= max([x.w for x in image_group[-1]])
image_group.pop(-1)
return image_group, max(max_h + 250, max_w + 70)
# class HelpImageBuild:
#
# def __init__(self):
# self._data: Dict[str, List[]] = {}

View File

@ -5,6 +5,7 @@ from .init_plugins_data import init_plugins_data, plugins_manager
from .init_none_plugin_count_manager import init_none_plugin_count_manager
from .init_plugins_resources import init_plugins_resources
from .init_plugins_settings import init_plugins_settings
from .init_plugin_info import init_plugin_info
from .init_plugins_limit import (
init_plugins_block_limit,
init_plugins_count_limit,
@ -34,6 +35,7 @@ async def _():
config_file = DATA_PATH / "configs" / "plugins2config.yaml"
_flag = not config_file.exists()
init()
init_plugin_info()
init_plugins_settings()
init_plugins_cd_limit()
init_plugins_block_limit()

View File

@ -48,7 +48,7 @@ def init_none_plugin_count_manager():
logger.info(
f"{module}:{plugin_name} 插件疑似已删除,"
f"加载{none_plugin_count_manager._max_count}次失败后将清除对应插件数据,"
f"当前次数:{none_plugin_count_manager._data[module]}"
f"当前次数:{none_plugin_count_manager.get(module)}"
)
else:
none_plugin_count_manager.reset(module)

View File

@ -0,0 +1,121 @@
from types import ModuleType
from typing import Any
from services import logger
from utils.manager import (
plugin_data_manager,
plugins2settings_manager,
plugins2cd_manager,
plugins2block_manager,
plugins2count_manager,
)
from configs.config import Config
from utils.manager.models import (
PluginType,
PluginSetting,
PluginCd,
PluginData,
PluginBlock,
PluginCount,
)
from utils.utils import get_matchers
def get_attr(module: ModuleType, name: str, default: Any = None) -> Any:
"""
说明:
获取属性
参数:
:param module: module
:param name: name
:param default: default
"""
if hasattr(module, name):
return getattr(module, name)
return default
def init_plugin_info():
for matcher in [x for x in get_matchers(True) if x.plugin]:
try:
plugin = matcher.plugin
metadata = plugin.metadata
if hasattr(plugin, "module"):
module = plugin.module
plugin_model = matcher.plugin_name
plugin_name = (
metadata.name
if metadata and metadata.name
else get_attr(module, "__zx_plugin_name__", matcher.plugin_name)
)
if "[Admin]" in plugin_name:
plugin_type = PluginType.ADMIN
plugin_name = plugin_name.replace("[Admin]", "")
elif "[Hidden]" in plugin_name:
plugin_type = PluginType.HIDDEN
plugin_name = plugin_name.replace("[Hidden]", "")
elif "[Superuser]" in plugin_name:
plugin_type = PluginType.SUPERUSER
plugin_name = plugin_name.replace("[Superuser]", "")
else:
plugin_type = PluginType.NORMAL
plugin_usage = (
metadata.usage
if metadata and metadata.usage
else get_attr(module, "__plugin_usage__")
)
plugin_des = (
metadata.description
if metadata and metadata.description
else get_attr(module, "__plugin_des__")
)
menu_type = get_attr(module, "__plugin_type__") or ("normal",)
plugin_setting = get_attr(module, "__plugin_settings__")
if plugin_setting:
plugin_setting = PluginSetting(**plugin_setting)
plugin_setting.plugin_type = menu_type
plugin_task = get_attr(module, "__plugin_task__")
plugin_version = get_attr(module, "__plugin_version__")
plugin_author = get_attr(module, "__plugin_author__")
plugin_cd = get_attr(module, "__plugin_cd_limit__")
if plugin_cd:
plugin_cd = PluginCd(**plugin_cd)
plugin_block = get_attr(module, "__plugin_block_limit__")
if plugin_block:
plugin_block = PluginBlock(**plugin_block)
plugin_count = get_attr(module, "__plugin_count_limit__")
if plugin_count:
plugin_count = PluginCount(**plugin_count)
plugin_resources = get_attr(module, "__plugin_resources__")
plugin_configs = get_attr(module, "__plugin_configs__")
if settings := plugins2settings_manager.get(plugin_model):
plugin_setting = settings
if plugin_cd_limit := plugins2cd_manager.get(plugin_model):
plugin_cd = plugin_cd_limit
if plugin_block_limit := plugins2block_manager.get(plugin_model):
plugin_block = plugin_block_limit
if plugin_count_limit := plugins2count_manager.get(plugin_model):
plugin_count = plugin_count_limit
if plugin_cfg := Config.get(plugin_model):
plugin_configs = plugin_cfg
plugin_data = PluginData(
model=plugin_model,
name=plugin_name.strip(),
plugin_type=plugin_type,
usage=plugin_usage,
des=plugin_des,
task=plugin_task,
menu_type=menu_type,
version=plugin_version,
author=plugin_author,
plugin_setting=plugin_setting,
plugin_cd=plugin_cd,
plugin_block=plugin_block,
plugin_count=plugin_count,
plugin_resources=plugin_resources,
plugin_configs=plugin_configs,
)
plugin_data_manager.add_plugin_info(plugin_data)
except Exception as e:
logger.error(f"构造插件数据失败 {matcher.plugin_name} - {type(e)}{e}")

View File

@ -1,13 +1,10 @@
import asyncio
from datetime import datetime, timedelta
from pathlib import Path
from ruamel.yaml import round_trip_load, round_trip_dump, YAML
from utils.manager import admin_manager, plugins_manager
from utils.manager import admin_manager, plugins_manager, plugin_data_manager
from configs.config import Config
from services.log import logger
from utils.text_utils import prompt2cn
from utils.utils import get_matchers
from utils.utils import scheduler
from configs.path_config import DATA_PATH
from ruamel import yaml
@ -20,44 +17,28 @@ def init_plugins_config():
初始化插件数据配置
"""
plugins2config_file = DATA_PATH / "configs" / "plugins2config.yaml"
plugins2config_file.parent.mkdir(parents=True, exist_ok=True)
_data = {}
if plugins2config_file.exists():
_data = _yaml.load(open(plugins2config_file, "r", encoding="utf8"))
_data = Config.get_data()
# 优先使用 metadata 数据
for matcher in [matcher for matcher in get_matchers(True) if matcher.plugin and matcher.plugin.module]:
_plugin = matcher.plugin
metadata = _plugin.metadata
_module = _plugin.module
plugin_version = None
if metadata:
plugin_version = metadata.extra.get("version")
if not plugin_version and hasattr(_module, "__plugin_version__"):
plugin_version = _module.__getattribute__("__plugin_version__")
if metadata and metadata.config:
plugin_configs = {}
for key, value in metadata.config.__fields__.items():
plugin_configs[key.upper()] = {
"value": value.default,
"default_value": value.default
}
else:
try:
plugin_configs = _module.__getattribute__("__plugin_configs__")
except AttributeError:
continue
for matcher in get_matchers(True):
if plugin_data := plugin_data_manager.get(matcher.plugin_name):
# 插件配置版本更新或为Version为None或不在存储配置内当使用metadata时必定更新
if isinstance(plugin_version, str) or (
plugin_version is None
if plugin_data.plugin_configs and (
isinstance(plugin_data.version, str)
or (
plugin_data.version is None
or (
_data.get(matcher.plugin_name)
and _data[matcher.plugin_name].keys() != plugin_configs.keys()
and _data[matcher.plugin_name].keys()
!= plugin_data.plugin_configs.keys()
)
or plugin_version > plugins_manager.get(matcher.plugin_name).version
or plugin_data.version
> plugins_manager.get(matcher.plugin_name).version
or matcher.plugin_name not in _data.keys()
)
):
plugin_configs = plugin_data.plugin_configs
for key in plugin_configs:
if isinstance(plugin_configs[key], dict):
if isinstance(plugin_data.plugin_configs[key], dict):
Config.add_plugin_config(
matcher.plugin_name,
key,
@ -70,8 +51,7 @@ def init_plugins_config():
Config.add_plugin_config(
matcher.plugin_name, key, plugin_configs[key]
)
else:
plugin_configs = _data[matcher.plugin_name]
elif plugin_configs := _data.get(matcher.plugin_name):
for key in plugin_configs:
Config.add_plugin_config(
matcher.plugin_name,
@ -92,26 +72,13 @@ def init_plugins_config():
_data[plugin].yaml_set_start_comment(plugin_name, indent=2)
# 初始化未设置的管理员权限等级
for k, v in Config.get_admin_level_data():
# try:
admin_manager.set_admin_level(k, v)
# except KeyError as e:
# raise KeyError(f"{e} ****** 请检查是否有插件加载失败 ******")
# 存完插件基本设置
with open(plugins2config_file, "w", encoding="utf8") as wf:
round_trip_dump(
_data, wf, indent=2, Dumper=yaml.RoundTripDumper, allow_unicode=True
)
user_config_file = Path() / "configs" / "config.yaml"
# if not user_config_file.exists():
_replace_config()
# else:
# logger.info('五分钟后将进行配置数据替换,请注意...')
# scheduler.add_job(
# _replace_config,
# "date",
# run_date=datetime.now() + timedelta(minutes=5),
# id=f"_replace_config"
# )
def _replace_config():
@ -151,9 +118,7 @@ def _replace_config():
temp_file = Path() / "configs" / "temp_config.yaml"
try:
with open(temp_file, "w", encoding="utf8") as wf:
yaml.dump(
_tmp_data, wf, Dumper=yaml.RoundTripDumper, allow_unicode=True
)
yaml.dump(_tmp_data, wf, Dumper=yaml.RoundTripDumper, allow_unicode=True)
with open(temp_file, "r", encoding="utf8") as rf:
_data = round_trip_load(rf)
# 添加注释
@ -187,9 +152,7 @@ def _replace_config():
rst += f'{k}: {Config[plugin][k]["help"]}' + "\n"
_data[plugin].yaml_set_start_comment(rst[:-1], indent=2)
with open(Path() / "configs" / "config.yaml", "w", encoding="utf8") as wf:
round_trip_dump(
_data, wf, Dumper=yaml.RoundTripDumper, allow_unicode=True
)
round_trip_dump(_data, wf, Dumper=yaml.RoundTripDumper, allow_unicode=True)
except Exception as e:
logger.error(f"生成简易配置注释错误 {type(e)}{e}")
if temp_file.exists():

View File

@ -1,9 +1,8 @@
from ruamel.yaml import YAML
from utils.manager import plugins_manager
from utils.manager import plugins_manager, plugin_data_manager
from utils.utils import get_matchers
from services.log import logger
from configs.path_config import DATA_PATH
try:
import ujson as json
@ -18,13 +17,10 @@ def init_plugins_data():
"""
初始化插件数据信息
"""
plugin2data_file = DATA_PATH / "manager" / "plugin_manager.json"
plugin2data_file.parent.mkdir(parents=True, exist_ok=True)
for matcher in get_matchers(True):
_plugin = matcher.plugin
if not _plugin:
continue
metadata = _plugin.metadata
try:
_module = _plugin.module
except AttributeError:
@ -35,24 +31,11 @@ def init_plugins_data():
else:
plugins_manager[matcher.plugin_name].error = True
else:
if plugin_data := plugin_data_manager.get(matcher.plugin_name):
try:
plugin_version = None
if metadata:
plugin_version = metadata.extra.get("version")
if not plugin_version and hasattr(_module, "__plugin_version__"):
plugin_version = _module.__getattribute__("__plugin_version__")
if metadata:
plugin_name = metadata.name
else:
try:
plugin_name = _module.__getattribute__("__zx_plugin_name__")
except AttributeError:
plugin_name = matcher.plugin_name
plugin_author = None
if metadata:
plugin_author = metadata.extra.get('author')
if not plugin_author and hasattr(_module, "__plugin_author__"):
plugin_author = _module.__getattribute__("__plugin_author__")
plugin_version = plugin_data.version
plugin_name = plugin_data.name
plugin_author = plugin_data.author
if matcher.plugin_name in plugins_manager.keys():
plugins_manager[matcher.plugin_name].error = False
if matcher.plugin_name not in plugins_manager.keys():
@ -62,7 +45,6 @@ def init_plugins_data():
author=plugin_author,
version=plugin_version,
)
# metadata不检测version
elif isinstance(plugin_version, str) or plugins_manager[matcher.plugin_name].version is None or (
plugin_version is not None
and plugin_version > float(plugins_manager[matcher.plugin_name].version)
@ -70,14 +52,6 @@ def init_plugins_data():
plugins_manager[matcher.plugin_name].plugin_name = plugin_name
plugins_manager[matcher.plugin_name].author = plugin_author
plugins_manager[matcher.plugin_name].version = plugin_version
# if matcher.plugin_name in _data.keys():
# plugins_manager[matcher.plugin_name].error = _data[matcher.plugin_name]["error"]
# plugins_manager.set_module_data(
# matcher.plugin_name, "error", _data[matcher.plugin_name]["error"]
# )
# plugins_manager.set_module_data(
# matcher.plugin_name, "plugin_name", _data[matcher.plugin_name]["plugin_name"]
# )
except Exception as e:
logger.error(f"插件数据 {matcher.plugin_name} 加载发生错误 {type(e)}{e}")
plugins_manager.save()

View File

@ -2,10 +2,10 @@ from utils.manager import (
plugins2cd_manager,
plugins2block_manager,
plugins2count_manager,
plugin_data_manager,
)
from utils.utils import get_matchers
from configs.path_config import DATA_PATH
import nonebot
def init_plugins_cd_limit():
@ -14,22 +14,16 @@ def init_plugins_cd_limit():
"""
plugins2cd_file = DATA_PATH / "configs" / "plugins2cd.yaml"
plugins2cd_file.parent.mkdir(exist_ok=True, parents=True)
_data = {}
for matcher in get_matchers(True):
if not plugins2cd_manager.get_plugin_cd_data(matcher.plugin_name):
_plugin = nonebot.plugin.get_plugin(matcher.plugin_name)
try:
_module = _plugin.module
plugin_cd_limit = _module.__getattribute__("__plugin_cd_limit__")
if not plugins2cd_manager.get_plugin_cd_data(matcher.plugin_name) and (
plugin_data := plugin_data_manager.get(matcher.plugin_name)
):
if plugin_data.plugin_cd:
plugins2cd_manager.add_cd_limit(
matcher.plugin_name, **plugin_cd_limit
matcher.plugin_name, plugin_data.plugin_cd
)
except AttributeError:
pass
if not plugins2cd_manager.keys():
plugins2cd_manager.add_cd_limit(
"这是一个示例"
)
plugins2cd_manager.add_cd_limit("这是一个示例")
plugins2cd_manager.save()
plugins2cd_manager.reload_cd_limit()
@ -38,23 +32,16 @@ def init_plugins_block_limit():
"""
加载阻塞限制
"""
plugins2block_file = DATA_PATH / "configs" / "plugins2block.yaml"
plugins2block_file.parent.mkdir(exist_ok=True, parents=True)
for matcher in get_matchers(True):
if not plugins2block_manager.get_plugin_block_data(matcher.plugin_name):
_plugin = matcher.plugin
try:
_module = _plugin.module
plugin_block_limit = _module.__getattribute__("__plugin_block_limit__")
if not plugins2block_manager.get_plugin_block_data(matcher.plugin_name) and (
plugin_data := plugin_data_manager.get(matcher.plugin_name)
):
if plugin_data.plugin_block:
plugins2block_manager.add_block_limit(
matcher.plugin_name, **plugin_block_limit
matcher.plugin_name, plugin_data.plugin_block
)
except AttributeError:
pass
if not plugins2block_manager.keys():
plugins2block_manager.add_block_limit(
"这是一个示例"
)
plugins2block_manager.add_block_limit("这是一个示例")
plugins2block_manager.save()
plugins2block_manager.reload_block_limit()
@ -63,24 +50,15 @@ def init_plugins_count_limit():
"""
加载次数限制
"""
plugins2count_file = DATA_PATH / "configs" / "plugins2count.yaml"
plugins2count_file.parent.mkdir(exist_ok=True, parents=True)
_data = {}
_matchers = get_matchers()
for matcher in _matchers:
if not plugins2count_manager.get_plugin_count_data(matcher.plugin_name):
_plugin = nonebot.plugin.get_plugin(matcher.plugin_name)
try:
_module = _plugin.module
plugin_count_limit = _module.__getattribute__("__plugin_count_limit__")
for matcher in get_matchers(True):
if not plugins2count_manager.get_plugin_count_data(matcher.plugin_name) and (
plugin_data := plugin_data_manager.get(matcher.plugin_name)
):
if plugin_data.plugin_count:
plugins2count_manager.add_count_limit(
matcher.plugin_name, **plugin_count_limit
matcher.plugin_name, plugin_data.plugin_count
)
except AttributeError:
pass
if not plugins2count_manager.keys():
plugins2count_manager.add_count_limit(
"这是一个示例"
)
plugins2count_manager.add_count_limit("这是一个示例")
plugins2count_manager.save()
plugins2count_manager.reload_count_limit()

View File

@ -1,29 +1,21 @@
from utils.manager import resources_manager
from utils.manager import resources_manager, plugin_data_manager
from utils.utils import get_matchers
from services.log import logger
from pathlib import Path
import nonebot
def init_plugins_resources():
"""
资源文件路径的移动
"""
_tmp = []
for matcher in get_matchers():
if matcher.plugin_name not in _tmp:
_tmp.append(matcher.plugin_name)
_plugin = nonebot.plugin.get_plugin(matcher.plugin_name)
for matcher in get_matchers(True):
if plugin_data := plugin_data_manager.get(matcher.plugin_name):
try:
_module = _plugin.module
_module = matcher.plugin.module
except AttributeError:
logger.warning(f"插件 {matcher.plugin_name} 加载失败...,资源控制未加载...")
else:
try:
resources = _module.__getattribute__("__plugin_resources__")
except AttributeError:
pass
else:
if resources := plugin_data.plugin_resources:
path = Path(_module.__getattribute__("__file__")).parent
for resource in resources.keys():
resources_manager.add_resource(

View File

@ -1,5 +1,6 @@
from utils.manager import plugins2settings_manager, admin_manager
from utils.manager import plugins2settings_manager, admin_manager, plugin_data_manager
from services.log import logger
from utils.manager.models import PluginType
from utils.utils import get_matchers
import nonebot
@ -8,92 +9,32 @@ def init_plugins_settings():
"""
初始化插件设置从插件中获取 __zx_plugin_name____plugin_cmd____plugin_settings__
"""
_tmp_module = {}
for x in plugins2settings_manager.keys():
try:
_plugin = nonebot.plugin.get_plugin(x)
_module = _plugin.module
metadata = _plugin.metadata
plugin_name = (
metadata.name
if metadata
else _module.__getattribute__("__zx_plugin_name__")
)
_tmp_module[x] = plugin_name
_module.__getattribute__("__zx_plugin_name__")
except (KeyError, AttributeError) as e:
logger.warning(f"配置文件 模块:{x} 获取 plugin_name 失败...{e}")
_tmp_module[x] = ""
for matcher in [x for x in get_matchers(True) if x.plugin]:
for matcher in get_matchers(True):
try:
if matcher.plugin_name not in plugins2settings_manager.keys():
_plugin = matcher.plugin
metadata = _plugin.metadata
try:
_module = _plugin.module
except AttributeError:
logger.warning(f"插件 {matcher.plugin_name} 加载失败...,插件控制未加载.")
else:
try:
plugin_name = metadata.name if metadata else _module.__getattribute__("__zx_plugin_name__")
if plugin_data := plugin_data_manager.get(matcher.plugin_name):
if plugin_settings := plugin_data.plugin_setting:
# 管理员命令
if "[admin]" in plugin_name.lower():
level = 5
cmd = None
if hasattr(_module, "__plugin_settings__"):
admin_settings = _module.__getattribute__(
"__plugin_settings__"
)
level = admin_settings.get("admin_level", 5)
cmd = admin_settings.get("cmd")
if plugin_data.plugin_type == PluginType.ADMIN:
admin_manager.add_admin_plugin_settings(
matcher.plugin_name, cmd, level
)
if (
"[hidden]" in plugin_name.lower()
or "[admin]" in plugin_name.lower()
or "[superuser]" in plugin_name.lower()
or matcher.plugin_name in plugins2settings_manager.keys()
):
continue
except AttributeError:
logger.warning(
f"获取插件 {matcher.plugin_name} __zx_plugin_name__ 失败...,插件控制未加载."
matcher.plugin_name, plugin_settings.cmd, plugin_settings.level
)
else:
_tmp_module[matcher.plugin_name] = plugin_name
if hasattr(_module, "__plugin_settings__"):
plugin_settings = _module.__getattribute__(
"__plugin_settings__"
)
else:
plugin_settings = {"cmd": [matcher.plugin_name, plugin_name]}
if plugin_settings.get("cost_gold") is None:
plugin_settings["cost_gold"] = 0
if (
plugin_settings.get("cmd") is not None
and plugin_name not in plugin_settings["cmd"]
):
plugin_settings["cmd"].append(plugin_name)
if plugins2settings_manager.get(
matcher.plugin_name
) and plugins2settings_manager[matcher.plugin_name].get(
"plugin_type"
):
plugin_type = tuple(
plugins2settings_manager.get_plugin_data(
matcher.plugin_name
).plugin_type
)
else:
if hasattr(_module, "__plugin_type__"):
plugin_type = _module.__getattribute__("__plugin_type__")
else:
plugin_type = ("normal",)
if plugin_settings and matcher.plugin_name:
plugins2settings_manager.add_plugin_settings(
matcher.plugin_name,
plugin_type=plugin_type,
**plugin_settings,
matcher.plugin_name, plugin_settings
)
except Exception as e:
logger.error(f'{matcher.plugin_name} 初始化 plugin_settings 发生错误 {type(e)}{e}')

View File

@ -1,3 +1,4 @@
import copy
from typing import Optional, Any, Union
from pathlib import Path
from ruamel.yaml import YAML
@ -216,6 +217,9 @@ class ConfigsManager:
"""
return self._admin_level_data
def get_data(self):
return copy.deepcopy(self._data)
def is_empty(self) -> bool:
return not bool(self._data)

View File

@ -1,6 +1,7 @@
from typing import Optional
from .group_manager import GroupManager
from .data_class import StaticData
from .plugin_data_manager import PluginDataManager
from .withdraw_message_manager import WithdrawMessageManager
from .plugins2cd_manager import Plugins2cdManager
from .plugins2block_manager import Plugins2blockManager
@ -18,50 +19,53 @@ from configs.path_config import DATA_PATH
admin_manager = AdminManager()
# 群功能开关 | 群被动技能 | 群权限 管理
group_manager: Optional[GroupManager] = GroupManager(
group_manager: GroupManager = GroupManager(
DATA_PATH / "manager" / "group_manager.json"
)
# 撤回消息管理
withdraw_message_manager: Optional[WithdrawMessageManager] = WithdrawMessageManager()
withdraw_message_manager: WithdrawMessageManager = WithdrawMessageManager()
# 插件管理
plugins_manager: Optional[PluginsManager] = PluginsManager(
plugins_manager: PluginsManager = PluginsManager(
DATA_PATH / "manager" / "plugins_manager.json"
)
# 插件基本设置管理
plugins2settings_manager: Optional[Plugins2settingsManager] = Plugins2settingsManager(
plugins2settings_manager: Plugins2settingsManager = Plugins2settingsManager(
DATA_PATH / "configs" / "plugins2settings.yaml"
)
# 插件命令 cd 管理
plugins2cd_manager: Optional[Plugins2cdManager] = Plugins2cdManager(
plugins2cd_manager: Plugins2cdManager = Plugins2cdManager(
DATA_PATH / "configs" / "plugins2cd.yaml"
)
# 插件命令 阻塞 管理
plugins2block_manager: Optional[Plugins2blockManager] = Plugins2blockManager(
plugins2block_manager: Plugins2blockManager = Plugins2blockManager(
DATA_PATH / "configs" / "plugins2block.yaml"
)
# 插件命令 每次次数限制 管理
plugins2count_manager: Optional[Plugins2countManager] = Plugins2countManager(
plugins2count_manager: Plugins2countManager = Plugins2countManager(
DATA_PATH / "configs" / "plugins2count.yaml"
)
# 资源管理
resources_manager: Optional[ResourcesManager] = ResourcesManager(
resources_manager: ResourcesManager = ResourcesManager(
DATA_PATH / "manager" / "resources_manager.json"
)
# 插件加载容忍管理
none_plugin_count_manager: Optional[NonePluginCountManager] = NonePluginCountManager(
none_plugin_count_manager: NonePluginCountManager = NonePluginCountManager(
DATA_PATH / "manager" / "none_plugin_count_manager.json"
)
# 好友请求/群聊邀请 管理
requests_manager: Optional[RequestManager] = RequestManager(
requests_manager: RequestManager = RequestManager(
DATA_PATH / "manager" / "requests_manager.json"
)
# 全局插件数据
plugin_data_manager: PluginDataManager = PluginDataManager()

View File

@ -1,6 +1,8 @@
from typing import List, Optional, Dict, Literal, Tuple, Union
from pathlib import Path
from typing import List, Optional, Dict, Literal, Tuple, Union, Any
from pydantic import BaseModel
from configs.config import Config
from enum import Enum
class AdminSetting(BaseModel):
@ -8,7 +10,7 @@ class AdminSetting(BaseModel):
管理员设置
"""
level: int
level: int = 5
cmd: Optional[List[str]]
@ -40,9 +42,9 @@ class PluginBlock(BaseModel):
插件阻断
"""
status: bool # 限制状态
check_type: Literal["private", "group", "all"] # 检查类型
limit_type: Literal["user", "group"] # 监听对象
status: bool = True # 限制状态
check_type: Literal["private", "group", "all"] = "all" # 检查类型
limit_type: Literal["user", "group"] = "user" # 监听对象
rst: Optional[str] # 阻断时回复
@ -51,10 +53,10 @@ class PluginCd(BaseModel):
插件阻断
"""
cd: int # cd
status: bool # 限制状态
check_type: Literal["private", "group", "all"] # 检查类型
limit_type: Literal["user", "group"] # 监听对象
cd: int = 5 # cd
status: bool = True # 限制状态
check_type: Literal["private", "group", "all"] = "all" # 检查类型
limit_type: Literal["user", "group"] = "user" # 监听对象
rst: Optional[str] # 阻断时回复
@ -64,8 +66,8 @@ class PluginCount(BaseModel):
"""
max_count: int # 次数
status: bool # 限制状态
limit_type: Literal["user", "group"] # 监听对象
status: bool = True # 限制状态
limit_type: Literal["user", "group"] = "user" # 监听对象
rst: Optional[str] # 阻断时回复
@ -93,3 +95,45 @@ class Plugin(BaseModel):
block_type: Optional[str] = None # 关闭类型
author: Optional[str] = None # 作者
version: Optional[Union[int, str]] = None # 版本
class PluginType(Enum):
"""
插件类型
"""
NORMAL = "normal"
ADMIN = "admin"
HIDDEN = "hidden"
SUPERUSER = "superuser"
class PluginData(BaseModel):
model: str
name: str
plugin_type: PluginType # 插件内部类型根据name [Hidden] [Admin] [SUPERUSER]
usage: Optional[str]
des: Optional[str]
task: Optional[Dict[str, str]]
menu_type: Tuple[Union[str, int], ...] = ("normal",) # 菜单类型
version: Optional[Union[str, int]]
author: Optional[str]
plugin_setting: Optional[PluginSetting]
plugin_cd: Optional[PluginCd]
plugin_block: Optional[PluginBlock]
plugin_count: Optional[PluginCount]
plugin_resources: Optional[Dict[str, Union[str, Path]]]
plugin_configs: Optional[Dict[str, Dict[str, Any]]]
class Config:
arbitrary_types_allowed = True
def __eq__(self, other: "PluginData"):
return (
isinstance(other, PluginData)
and self.name == other.name
and self.menu_type == other.menu_type
)
def __hash__(self):
return hash(self.name + self.menu_type[0])

View File

@ -0,0 +1,35 @@
from typing import Dict, Any
from .models import PluginData
class PluginDataManager:
"""
插件所有信息管理
"""
def __init__(self):
self._data: Dict[str, PluginData] = {}
def add_plugin_info(self, info: PluginData):
"""
说明:
添加插件信息
参数:
:param info: PluginInfo
"""
if info.model in self._data.keys() and self._data[info.model] == info:
raise ValueError(f"PluginInfoManager {info.model}:{info.name} 插件名称及类型已存在")
self._data[info.model] = info
def get(self, item: str, default: Any = None) -> PluginData:
return self._data.get(item, default)
def __getitem__(self, item) -> PluginData:
return self._data.get(item)
def __str__(self) -> str:
return str(self._data)

View File

@ -1,4 +1,4 @@
from typing import Optional, Dict, Literal, Union
from typing import Optional, Dict, Literal, Union, overload
from utils.manager.data_class import StaticData
from services.log import logger
from utils.utils import UserBlockLimiter
@ -19,11 +19,25 @@ class Plugins2blockManager(StaticData):
self._block_limiter: Dict[str, UserBlockLimiter] = {}
self.__load_file()
@overload
def add_block_limit(self, plugin: str, plugin_block: PluginBlock):
...
@overload
def add_block_limit(
self,
plugin: str,
*,
status: Optional[bool] = True,
status: bool = True,
check_type: Literal["private", "group", "all"] = "all",
limit_type: Literal["user", "group"] = "user",
rst: Optional[str] = None,
):
...
def add_block_limit(
self,
plugin: str,
status: Union[bool, PluginBlock] = True,
check_type: Literal["private", "group", "all"] = "all",
limit_type: Literal["user", "group"] = "user",
rst: Optional[str] = None,
@ -38,6 +52,9 @@ class Plugins2blockManager(StaticData):
:param limit_type: 限制类型 监听对象以user_id或group_id作为键来限制'user'用户id'group'群id
:param rst: 回复的话为空则不回复
"""
if isinstance(status, PluginBlock):
self._data[plugin] = status
else:
if check_type not in ["all", "group", "private"]:
raise ValueError(
f"{plugin} 添加block限制错误check_type 必须为 'private'/'group'/'all'"

View File

@ -1,4 +1,4 @@
from typing import Optional, Dict, Literal, Union
from typing import Optional, Dict, Literal, Union, overload
from utils.manager.data_class import StaticData
from utils.utils import FreqLimiter
from services.log import logger
@ -19,11 +19,26 @@ class Plugins2cdManager(StaticData):
self._freq_limiter: Dict[str, FreqLimiter] = {}
self.__load_file()
@overload
def add_cd_limit(self, plugin: str, plugin_cd: PluginCd):
...
@overload
def add_cd_limit(
self,
plugin: str,
*,
cd: Optional[int] = 5,
cd: Union[int, PluginCd] = 5,
status: Optional[bool] = True,
check_type: Literal["private", "group", "all"] = "all",
limit_type: Literal["user", "group"] = "user",
rst: Optional[str] = None,
):
...
def add_cd_limit(
self,
plugin: str,
cd: Union[int, PluginCd] = 5,
status: Optional[bool] = True,
check_type: Literal["private", "group", "all"] = "all",
limit_type: Literal["user", "group"] = "user",
@ -40,6 +55,9 @@ class Plugins2cdManager(StaticData):
:param limit_type: 限制类型 监听对象以user_id或group_id作为键来限制'user'用户id'group'群id
:param rst: 回复的话为空则不回复
"""
if isinstance(cd, PluginCd):
self._data[plugin] = cd
else:
if check_type not in ["all", "group", "private"]:
raise ValueError(
f"{plugin} 添加cd限制错误check_type 必须为 'private'/'group'/'all'"

View File

@ -1,4 +1,4 @@
from typing import Optional, Dict, Literal, Union
from typing import Optional, Dict, Literal, Union, overload
from utils.manager.data_class import StaticData
from utils.utils import DailyNumberLimiter
from services.log import logger
@ -19,14 +19,28 @@ class Plugins2countManager(StaticData):
self._daily_limiter: Dict[str, DailyNumberLimiter] = {}
self.__load_file()
@overload
def add_count_limit(self, plugin: str, plugin_count: PluginCount):
...
@overload
def add_count_limit(
self,
plugin: str,
*,
max_count: int = 5,
status: Optional[bool] = True,
limit_type: Literal["user", "group"] = "user",
rst: Optional[str] = None,
):
...
def add_count_limit(
self,
plugin: str,
max_count: Union[int, PluginCount] = 5,
status: Optional[bool] = True,
limit_type: Literal["user", "group"] = "user",
rst: Optional[str] = None,
):
"""
说明:
@ -38,6 +52,9 @@ class Plugins2countManager(StaticData):
:param limit_type: 限制类型 监听对象以user_id或group_id作为键来限制'user'用户id'group'群id
:param rst: 回复的话为空则不回复
"""
if isinstance(max_count, PluginCount):
self._data[plugin] = max_count
else:
if limit_type not in ["user", "group"]:
raise ValueError(f"{plugin} 添加count限制错误limit_type 必须为 'user'/'group'")
self._data[plugin] = PluginCount(max_count=max_count, status=status, limit_type=limit_type, rst=rst)

View File

@ -1,9 +1,8 @@
from typing import List, Optional, Union, Tuple, Dict
from typing import List, Optional, Union, Tuple, Dict, overload
from utils.manager.data_class import StaticData
from pathlib import Path
from ruamel import yaml
from .models import PluginSetting
from .models import PluginSetting, PluginType
_yaml = yaml.YAML(typ="safe")
@ -17,10 +16,27 @@ class Plugins2settingsManager(StaticData):
super().__init__(file, False)
self.__load_file()
@overload
def add_plugin_settings(self, plugin: str, plugin_settings: PluginSetting):
...
@overload
def add_plugin_settings(
self,
plugin: str,
cmd: Optional[List[str]] = None,
cmd: List[str] = None,
default_status: bool = True,
level: int = 5,
limit_superuser: bool = False,
plugin_type: Tuple[Union[str, int]] = ("normal",),
cost_gold: int = 0,
):
...
def add_plugin_settings(
self,
plugin: str,
cmd: Union[List[str], PluginSetting] = None,
default_status: bool = True,
level: int = 5,
limit_superuser: bool = False,
@ -39,6 +55,9 @@ class Plugins2settingsManager(StaticData):
:param plugin_type: 插件类型
:param cost_gold: 需要消费的金币
"""
if isinstance(cmd, PluginSetting):
self._data[plugin] = cmd
else:
self._data[plugin] = PluginSetting(
cmd=cmd,
level=level,
@ -95,8 +114,16 @@ class Plugins2settingsManager(StaticData):
path = Path(path)
if path:
with open(path, "w", encoding="utf8") as f:
self_dict = self.dict()
for key in self_dict.keys():
if self_dict[key].get("plugin_type") and isinstance(
self_dict[key].get("plugin_type"), PluginType
):
self_dict[key]["plugin_type"] = self_dict[key][
"plugin_type"
].value
yaml.dump(
{"PluginSettings": self.dict()},
{"PluginSettings": self_dict},
f,
indent=2,
Dumper=yaml.RoundTripDumper,
@ -127,7 +154,7 @@ class Plugins2settingsManager(StaticData):
self._data: Dict[str, PluginSetting] = {}
if self.file.exists():
with open(self.file, "r", encoding="utf8") as f:
temp = _yaml.load(f)
if temp := _yaml.load(f):
if "PluginSettings" in temp.keys():
for k, v in temp["PluginSettings"].items():
self._data[k] = PluginSetting.parse_obj(v)