适配metadata

This commit is contained in:
HibiKier 2022-07-31 17:30:29 +08:00
parent e97a043031
commit c089388323
11 changed files with 136 additions and 87 deletions

View File

@ -247,7 +247,17 @@ PS: **ARM平台** 请使用全量版 同时 **如果你的机器 RAM < 1G 可能
## 更新
### 2022/7/24 \[v0.1.6.2]
### 2022/7/31
* 对nonebot.beta4中PluginMeta进行解析
* 查看订阅以图片形式发送
### 2022/7/30
* 替换了cos和bt的url [@pull/951](https://github.com/HibiKier/zhenxun_bot/pull/951)
* 发言记录统计添加日消息统计 [@pull/953](https://github.com/HibiKier/zhenxun_bot/pull/953)
### 2022/7/24
* 订阅up动态提供直链

View File

@ -44,7 +44,7 @@ async def _(bot: Bot, event: MessageEvent, state: T_State):
@simple_help.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State, arg: Message = CommandArg()):
async def _(bot: Bot, event: MessageEvent, arg: Message = CommandArg()):
msg = arg.extract_plain_text().strip()
is_super = False
if msg:

View File

@ -44,7 +44,7 @@ def _create_help_img(
:param help_image: 图片路径
:param simple_help_image: 简易帮助图片路径
"""
_matchers = get_matchers()
_matchers = get_matchers(True)
width = 0
matchers_data = {}
_des_tmp = {}
@ -55,15 +55,16 @@ def _create_help_img(
# 插件分类
for matcher in _matchers:
plugin_name = None
_plugin = nonebot.plugin.get_plugin(matcher.plugin_name)
_plugin = matcher.plugin
metadata = _plugin.metadata
if not _plugin:
logger.warning(f"获取 功能:{matcher.plugin_name} 失败...")
continue
_module = _plugin.module
try:
plugin_name = _module.__getattribute__("__zx_plugin_name__")
plugin_name = metadata.name if metadata else _module.__getattribute__("__zx_plugin_name__")
try:
plugin_des = _module.__getattribute__("__plugin_des__")
plugin_des = metadata.description if metadata else _module.__getattribute__("__plugin_des__")
except AttributeError:
plugin_des = "_"
if (
@ -343,13 +344,14 @@ def get_plugin_help(msg: str, is_super: bool = False) -> Optional[str]:
if module:
try:
plugin = nonebot.plugin.get_plugin(module)
metadata = plugin.metadata
if plugin:
if is_super:
result = plugin.module.__getattribute__(
"__plugin_superuser_usage__"
)
else:
result = plugin.module.__getattribute__("__plugin_usage__")
result = metadata.usage if metadata else plugin.module.__getattribute__("__plugin_usage__")
if result:
width = 0
for x in result.split("\n"):

View File

@ -2,15 +2,12 @@ from nonebot.matcher import Matcher
from nonebot.message import run_preprocessor, IgnoredException
from nonebot.typing import T_State
from ._utils import status_message_manager
from utils.image_utils import text2image
from typing import Dict, Any
from nonebot.adapters.onebot.v11 import (
Bot,
MessageEvent,
PrivateMessageEvent,
GroupMessageEvent,
)
import re
# 为什么AI会自己和自己聊天
@ -41,11 +38,6 @@ async def _(matcher: Matcher, bot: Bot, event: MessageEvent, state: T_State):
status_message_manager.delete(event.user_id)
raise IgnoredException("有命令就别说话了")
# @Bot.on_calling_api
# async def handle_api_call(bot: Bot, api: str, data: Dict[str, Any]):
# if api in ["send_msg", "send_group_msg", "send_private_msg"]:
# msg = str(data["message"])
# if (r := re.search("\[\[To_Img\|?(.*?)]]", msg)) or (r := re.search("&#91;&#91;To_Img\|?(.*?)&#91;&#91;")):

View File

@ -6,7 +6,7 @@ import re
@Bot.on_calling_api
async def handle_api_call(bot: Bot, api: str, data: Dict[str, Any]):
async def _(bot: Bot, api: str, data: Dict[str, Any]):
r = None
if (
(

View File

@ -21,21 +21,35 @@ def init_plugins_config(data_path):
_data = {}
if plugins2config_file.exists():
_data = _yaml.load(open(plugins2config_file, "r", encoding="utf8"))
_matchers = get_matchers()
_matchers = get_matchers(True)
# 优先使用 metadata 数据
for matcher in _matchers:
_plugin = nonebot.plugin.get_plugin(matcher.plugin_name)
_plugin = matcher.plugin
metadata = _plugin.metadata
try:
_module = _plugin.module
except AttributeError:
continue
try:
plugin_version = _module.__getattribute__("__plugin_version__")
except AttributeError:
plugin_version = None
try:
plugin_configs = _module.__getattribute__("__plugin_configs__")
except AttributeError:
continue
plugin_version = None
if metadata:
plugin_version = metadata.extra.get("version")
if not plugin_version:
try:
plugin_version = _module.__getattribute__("__plugin_version__")
except AttributeError:
pass
if metadata and metadata.config:
plugin_configs = {}
for key, value in metadata.config.__fields__.items():
plugin_configs[key.upper()] = {
"value": value.default,
"default_value": value.default
}
else:
try:
plugin_configs = _module.__getattribute__("__plugin_configs__")
except AttributeError:
continue
# 插件配置版本更新或为Version为None或不在存储配置内
if (
plugin_version is None

View File

@ -22,9 +22,10 @@ def init_plugins_data(data_path):
_data = {}
if plugin2data_file.exists():
_data = json.load(open(plugin2data_file, "r", encoding="utf8"))
_matchers = get_matchers()
_matchers = get_matchers(True)
for matcher in _matchers:
_plugin = nonebot.plugin.get_plugin(matcher.plugin_name)
_plugin = matcher.plugin
metadata = _plugin.metadata
try:
_module = _plugin.module
except AttributeError:
@ -40,18 +41,28 @@ def init_plugins_data(data_path):
matcher.plugin_name, "version", plugin_data.get("version")
)
else:
try:
plugin_version = _module.__getattribute__("__plugin_version__")
except AttributeError:
plugin_version = None
try:
plugin_name = _module.__getattribute__("__zx_plugin_name__")
except AttributeError:
plugin_name = matcher.plugin_name
plugin_version = None
if metadata:
plugin_version = metadata.extra.get("version")
if not plugin_version:
try:
plugin_version = _module.__getattribute__("__plugin_version__")
except AttributeError:
pass
if metadata:
plugin_name = metadata.name
else:
try:
plugin_name = _module.__getattribute__("__zx_plugin_name__")
except AttributeError:
plugin_name = matcher.plugin_name
plugin_author = None
if metadata:
plugin_author = metadata.extra.get('author')
try:
plugin_author = _module.__getattribute__("__plugin_author__")
except AttributeError:
plugin_author = None
pass
if matcher.plugin_name in plugins_manager.keys():
plugins_manager.set_module_data(matcher.plugin_name, "error", False)
if matcher.plugin_name not in plugins_manager.keys():

View File

@ -16,28 +16,33 @@ def init_plugins_settings(data_path: str):
"""
plugins2settings_file = data_path / "configs" / "plugins2settings.yaml"
plugins2settings_file.parent.mkdir(exist_ok=True, parents=True)
_matchers = get_matchers()
_matchers = get_matchers(True)
_tmp_module = {}
_tmp = []
for x in plugins2settings_manager.keys():
try:
_plugin = nonebot.plugin.get_plugin(x)
_module = _plugin.module
plugin_name = _module.__getattribute__("__zx_plugin_name__")
metadata = _plugin.metadata
plugin_name = metadata.name if metadata else _module.__getattribute__("__zx_plugin_name__")
_tmp_module[x] = plugin_name
except (KeyError, AttributeError) as e:
logger.warning(f"配置文件 模块:{x} 获取 plugin_name 失败...{e}")
_tmp_module[x] = ""
for matcher in _matchers:
if matcher.plugin_name not in plugins2settings_manager.keys():
_plugin = nonebot.plugin.get_plugin(matcher.plugin_name)
_plugin = matcher.plugin
metadata = _plugin.metadata
try:
_module = _plugin.module
except AttributeError:
logger.warning(f"插件 {matcher.plugin_name} 加载失败...,插件控制未加载.")
else:
try:
plugin_name = _module.__getattribute__("__zx_plugin_name__")
if metadata:
plugin_name = metadata.name
else:
plugin_name = _module.__getattribute__("__zx_plugin_name__")
if "[admin]" in plugin_name.lower():
try:
admin_settings = _module.__getattribute__(
@ -66,43 +71,45 @@ def init_plugins_settings(data_path: str):
f"获取插件 {matcher.plugin_name} __zx_plugin_name__ 失败...,插件控制未加载."
)
else:
_tmp_module[matcher.plugin_name] = plugin_name
try:
_tmp_module[matcher.plugin_name] = plugin_name
plugin_settings = _module.__getattribute__(
"__plugin_settings__"
)
if plugin_settings.get('cost_gold') is None:
plugin_settings['cost_gold'] = 0
if (
plugin_settings.get("cmd") is not None
and plugin_name not in plugin_settings["cmd"]
):
plugin_settings["cmd"].append(plugin_name)
if plugins2settings_manager.get(
matcher.plugin_name
) and plugins2settings_manager[matcher.plugin_name].get(
"plugin_type"
):
plugin_type = tuple(
plugins2settings_manager.get_plugin_data(
matcher.plugin_name
)["plugin_type"]
)
else:
try:
plugin_type = _module.__getattribute__(
"__plugin_type__"
)
except AttributeError:
plugin_type = ("normal",)
if plugin_settings and matcher.plugin_name:
plugins2settings_manager.add_plugin_settings(
matcher.plugin_name,
plugin_type=plugin_type,
**plugin_settings,
)
except AttributeError:
pass
plugin_settings = {
"cmd": [matcher.plugin_name, plugin_name]
}
if not plugin_settings.get('cost_gold'):
plugin_settings['cost_gold'] = 0
if (
plugin_settings.get("cmd") is not None
and plugin_name not in plugin_settings["cmd"]
):
plugin_settings["cmd"].append(plugin_name)
if plugins2settings_manager.get(
matcher.plugin_name
) and plugins2settings_manager[matcher.plugin_name].get(
"plugin_type"
):
plugin_type = tuple(
plugins2settings_manager.get_plugin_data(
matcher.plugin_name
)["plugin_type"]
)
else:
try:
plugin_type = _module.__getattribute__(
"__plugin_type__"
)
except AttributeError:
plugin_type = ("normal",)
if plugin_settings and matcher.plugin_name:
plugins2settings_manager.add_plugin_settings(
matcher.plugin_name,
plugin_type=plugin_type,
**plugin_settings,
)
_tmp.append(matcher.plugin_name)
_tmp_data = {"PluginSettings": plugins2settings_manager.get_data()}
with open(plugins2settings_file, "w", encoding="utf8") as wf:

View File

@ -1,8 +1,9 @@
from nonebot import on_command
from nonebot import on_command, on_regex
from nonebot.typing import T_State
from nonebot.adapters.onebot.v11 import Bot, MessageEvent, GroupMessageEvent, Message
from utils.message_builder import at
from utils.image_utils import text2image
from utils.message_builder import image
from .data_source import (
add_live_sub,
delete_sub,
@ -16,10 +17,10 @@ from .data_source import (
from models.level_user import LevelUser
from configs.config import Config
from utils.utils import is_number, scheduler, get_bot
from typing import Optional
from typing import Optional, Tuple, Any
from services.log import logger
from nonebot import Driver
from nonebot.params import CommandArg, ArgStr
from nonebot.params import CommandArg, ArgStr, RegexGroup
import nonebot
__zx_plugin_name__ = "B站订阅"
@ -66,8 +67,8 @@ __plugin_configs__ = {
}
add_sub = on_command("添加订阅", priority=5, block=True)
del_sub = on_command("删除订阅", priority=5, block=True)
show_sub_info = on_command("查看订阅", priority=5, block=True)
del_sub = on_regex(r"^删除订阅(\d+)$", priority=5, block=True)
show_sub_info = on_regex("^查看订阅$", priority=5, block=True)
driver: Driver = nonebot.get_driver()
@ -156,10 +157,8 @@ async def _(
@del_sub.handle()
async def _(event: MessageEvent, arg: Message = CommandArg()):
msg = arg.extract_plain_text().strip()
if not is_number(msg):
await del_sub.finish("Id必须为数字", at_sender=True)
async def _(event: MessageEvent, reg_group: Tuple[Any, ...] = RegexGroup()):
msg = reg_group[0]
id_ = (
f"{event.user_id}:{event.group_id}"
if isinstance(event, GroupMessageEvent)
@ -207,7 +206,15 @@ async def _(event: MessageEvent):
live_rst = (
"该群目前没有任何订阅..." if isinstance(event, GroupMessageEvent) else "您目前没有任何订阅..."
)
await show_sub_info.send(live_rst + up_rst + season_rst)
await show_sub_info.send(
image(
b64=(
await text2image(
live_rst + up_rst + season_rst, padding=10, color="#f9f6f2"
)
).pic2bs4()
)
)
# 推送

View File

@ -37,7 +37,6 @@ class Plugins2settingsManager(StaticData):
limit_superuser: Optional[bool] = False,
plugin_type: Tuple[Union[str, int]] = ("normal",),
cost_gold: int = 0,
**kwargs
):
"""
添加一个插件设置

View File

@ -181,13 +181,20 @@ def get_bot() -> Optional[Bot]:
return None
def get_matchers() -> List[Type[Matcher]]:
def get_matchers(distinct: bool = False) -> List[Type[Matcher]]:
"""
获取所有插件
说明:
获取所有matcher
参数:
distinct: 去重
"""
_matchers = []
temp = []
for i in matchers.keys():
for matcher in matchers[i]:
if distinct and matcher.plugin_name in temp:
continue
temp.append(matcher.plugin_name)
_matchers.append(matcher)
return _matchers