优化manager代码

This commit is contained in:
HibiKier 2022-11-21 20:43:41 +08:00
parent 49e38862af
commit cd66b7f5b8
41 changed files with 1974 additions and 1760 deletions

View File

@ -296,6 +296,10 @@ PS: **ARM平台** 请使用全量版 同时 **如果你的机器 RAM < 1G 可能
## 更新
### 2022/11/21
* 优化manager, hook代码
### 2022/11/19
* 修改优化帮助图片生成逻辑

View File

@ -23,7 +23,7 @@ except ModuleNotFoundError:
import json
async def group_current_status(group_id: int) -> str:
def group_current_status(group_id: int) -> str:
"""
获取当前所有通知的开关
:param group_id: 群号
@ -31,17 +31,17 @@ async def group_current_status(group_id: int) -> str:
rst = "[被动技能 状态]\n"
_data = group_manager.get_task_data()
for task in _data.keys():
rst += f'{_data[task]}: {"" if await group_manager.check_group_task_status(group_id, task) else "×"}\n'
rst += f'{_data[task]}: {"" if group_manager.check_group_task_status(group_id, task) else "×"}\n'
return rst.strip()
custom_welcome_msg_json = (
Path() / "data" / "custom_welcome_msg" / "custom_welcome_msg.json"
Path() / "data" / "custom_welcome_msg" / "custom_welcome_msg.json"
)
async def custom_group_welcome(
msg: str, imgs: List[str], user_id: int, group_id: int
msg: str, imgs: List[str], user_id: int, group_id: int
) -> str:
"""
替换群欢迎消息
@ -104,20 +104,21 @@ async def change_group_switch(cmd: str, group_id: int, is_super: bool = False):
if cmd == "全部被动":
for task in task_data:
if status == "开启":
if not await group_manager.check_group_task_status(group_id, task):
await group_manager.open_group_task(group_id, task)
if not group_manager.check_group_task_status(group_id, task):
group_manager.open_group_task(group_id, task)
else:
if await group_manager.check_group_task_status(group_id, task):
await group_manager.close_group_task(group_id, task)
if group_manager.check_group_task_status(group_id, task):
group_manager.close_group_task(group_id, task)
if group_help_file.exists():
group_help_file.unlink()
return f"{status} 全部被动技能!"
if cmd == "全部功能":
for f in plugins2settings_manager.get_data():
if status == "开启":
group_manager.unblock_plugin(f, group_id)
group_manager.unblock_plugin(f, group_id, False)
else:
group_manager.block_plugin(f, group_id)
group_manager.block_plugin(f, group_id, False)
group_manager.save()
if group_help_file.exists():
group_help_file.unlink()
return f"{status} 全部功能!"
@ -129,18 +130,18 @@ async def change_group_switch(cmd: str, group_id: int, is_super: bool = False):
module = f"{module}:super"
if status == "开启":
if type_ == "task":
if await group_manager.check_group_task_status(group_id, module):
if group_manager.check_group_task_status(group_id, module):
return f"被动 {task_data[module]} 正处于开启状态!不要重复开启."
await group_manager.open_group_task(group_id, module)
group_manager.open_group_task(group_id, module)
else:
if group_manager.get_plugin_status(module, group_id):
return f"功能 {cmd} 正处于开启状态!不要重复开启."
group_manager.unblock_plugin(module, group_id)
else:
if type_ == "task":
if not await group_manager.check_group_task_status(group_id, module):
if not group_manager.check_group_task_status(group_id, module):
return f"被动 {task_data[module]} 正处于关闭状态!不要重复关闭."
await group_manager.close_group_task(group_id, module)
group_manager.close_group_task(group_id, module)
else:
if not group_manager.get_plugin_status(module, group_id):
return f"功能 {cmd} 正处于关闭状态!不要重复关闭."
@ -197,17 +198,17 @@ def _get_plugin_status() -> MessageSegment:
flag = plugins_manager.get_plugin_block_type(module)
flag = flag.upper() + " CLOSE" if flag else "OPEN"
try:
plugin_name = plugins_manager.get(module)["plugin_name"]
plugin_name = plugins_manager.get(module).plugin_name
if (
"[Hidden]" in plugin_name
or "[Admin]" in plugin_name
or "[Superuser]" in plugin_name
"[Hidden]" in plugin_name
or "[Admin]" in plugin_name
or "[Superuser]" in plugin_name
):
continue
rst += f"{plugin_name}"
except KeyError:
rst += f"{module}"
if plugins_manager.get(module)["error"]:
if plugins_manager.get(module).error:
rst += "[ERROR]"
rst += "\n"
flag_str += f"{flag}\n"
@ -238,12 +239,12 @@ async def update_member_info(group_id: int, remind_superuser: bool = False) -> b
async with db.transaction():
# 更新权限
if (
user_info["role"]
in [
"owner",
"admin",
]
and not await LevelUser.is_group_flag(user_info["user_id"], group_id)
user_info["role"]
in [
"owner",
"admin",
]
and not await LevelUser.is_group_flag(user_info["user_id"], group_id)
):
await LevelUser.set_level(
user_info["user_id"],
@ -272,10 +273,10 @@ async def update_member_info(group_id: int, remind_superuser: bool = False) -> b
"%Y-%m-%d %H:%M:%S",
)
if await GroupInfoUser.add_member_info(
user_info["user_id"],
user_info["group_id"],
nickname,
join_time,
user_info["user_id"],
user_info["group_id"],
nickname,
join_time,
):
_exist_member_list.append(int(user_info["user_id"]))
logger.info(f"用户{user_info['user_id']} 所属{user_info['group_id']} 更新成功")
@ -317,7 +318,4 @@ def set_group_bot_status(group_id: int, status: bool) -> str:
return "呜..醒来了..."
else:
group_manager.shutdown_group_bot_status(group_id)
# for x in group_manager.get_task_data():
# group_manager.close_group_task(group_id, x)
return "那我先睡觉了..."

View File

@ -2,18 +2,23 @@ from nonebot.adapters.onebot.v11 import Event
from utils.manager import group_manager, plugins2settings_manager
from utils.utils import get_message_text
from services.log import logger
import time
cmd = []
v = time.time()
def switch_rule(event: Event) -> bool:
"""
检测文本是否是关闭功能命令
:param event: pass
说明:
检测文本是否是关闭功能命令
参数:
:param event: pass
"""
global cmd
try:
if not cmd:
if not cmd or time.time() - v > 60 * 60:
cmd = ["关闭全部被动", "开启全部被动", "开启全部功能", "关闭全部功能"]
_data = group_manager.get_task_data()
for key in _data:
@ -22,10 +27,10 @@ def switch_rule(event: Event) -> bool:
cmd.append(f"开启 {_data[key]}")
cmd.append(f"关闭 {_data[key]}")
_data = plugins2settings_manager.get_data()
for key in _data:
for key in _data.keys():
try:
if isinstance(_data[key]["cmd"], list):
for x in _data[key]["cmd"]:
if isinstance(_data[key].cmd, list):
for x in _data[key].cmd:
cmd.append(f"开启{x}")
cmd.append(f"关闭{x}")
cmd.append(f"开启 {x}")

View File

@ -110,7 +110,7 @@ async def _():
@group_task_status.handle()
async def _(event: GroupMessageEvent):
await group_task_status.send(await group_current_status(event.group_id))
await group_task_status.send(group_current_status(event.group_id))
@group_status.handle()

View File

@ -16,9 +16,9 @@ admin_help_image = IMAGE_PATH / 'admin_help_img.png'
@driver.on_bot_connect
async def init_task(bot: Bot = None):
async def init_task():
if not group_manager.get_task_data():
await group_manager.init_group_task()
group_manager.load_task()
logger.info(f'已成功加载 {len(group_manager.get_task_data())} 个被动技能.')

View File

@ -44,7 +44,7 @@ async def _(bot: Bot, event: MessageEvent, arg: Message = CommandArg()):
gl = [
g["group_id"]
for g in gl
if await group_manager.check_group_task_status(g["group_id"], "broadcast")
if group_manager.check_group_task_status(g["group_id"], "broadcast")
]
g_cnt = len(gl)
cnt = 0

View File

@ -40,16 +40,14 @@ async def _create_help_img(
:param help_image: 图片路径
:param simple_help_image: 简易帮助图片路径
"""
_matchers = get_matchers(True)
width = 0
matchers_data = {}
_des_tmp = {}
_plugin_name_tmp = []
_tmp = []
tmp_img = BuildImage(0, 0, plain_text="1", font_size=24)
font_height = tmp_img.h
# 插件分类
for matcher in _matchers:
for matcher in get_matchers(True):
plugin_name = None
_plugin = matcher.plugin
if not _plugin:
@ -67,7 +65,6 @@ async def _create_help_img(
"[hidden]" in plugin_name.lower()
or "[admin]" in plugin_name.lower()
or "[superuser]" in plugin_name.lower()
or plugin_name in _plugin_name_tmp
or plugin_name == "帮助"
):
continue
@ -75,11 +72,9 @@ async def _create_help_img(
text_type = 0
if plugins2settings_manager.get(
matcher.plugin_name
) and plugins2settings_manager[matcher.plugin_name].get("plugin_type"):
) and plugins2settings_manager.get(matcher.plugin_name).plugin_type:
plugin_type = tuple(
plugins2settings_manager.get_plugin_data(matcher.plugin_name)[
"plugin_type"
]
plugins2settings_manager.get_plugin_data(matcher.plugin_name).plugin_type
)
else:
try:
@ -128,10 +123,7 @@ async def _create_help_img(
if plugin_des not in _des_tmp:
_des_tmp[plugin_des] = plugin_name
except AttributeError as e:
if plugin_name not in _plugin_name_tmp:
logger.warning(f"获取功能 {matcher.plugin_name}: {plugin_name} 设置失败...e{e}")
if plugin_name not in _plugin_name_tmp:
_plugin_name_tmp.append(plugin_name)
logger.warning(f"获取功能 {matcher.plugin_name}: {plugin_name} 设置失败...e{e}")
help_img_list = []
simple_help_img_list = []
types = list(matchers_data.keys())
@ -284,45 +276,13 @@ async def _create_help_img(
B.save(simple_help_image)
def get_max_width_or_paste(
simple_help_img_list: List[BuildImage], B: BuildImage = None, is_paste: bool = False
) -> Tuple[int, BuildImage]:
"""
获取最大宽度或直接贴图
:param simple_help_img_list: 简单帮助图片列表
:param B: 背景图
:param is_paste: 是否直接贴图
"""
current_width = 50
current_height = 180
max_width = simple_help_img_list[0].w
for i in range(len(simple_help_img_list)):
try:
if is_paste and B:
B.paste(simple_help_img_list[i], (current_width, current_height), True)
current_height += simple_help_img_list[i].h + 40
if current_height + simple_help_img_list[i + 1].h > B.h - 10:
current_height = 180
current_width += max_width + 30
max_width = 0
elif simple_help_img_list[i].w > max_width:
max_width = simple_help_img_list[i].w
except IndexError:
pass
if current_width > simple_help_img_list[0].w + 50:
current_width += simple_help_img_list[-1].w
return current_width, B
def get_plugin_help(msg: str, is_super: bool = False) -> Optional[str]:
"""
获取功能的帮助信息
:param msg: 功能cmd
:param is_super: 是否为超级用户
"""
module = plugins2settings_manager.get_plugin_module(msg)
if not module:
module = admin_manager.get_plugin_module(msg)
module = plugins2settings_manager.get_plugin_module(msg) or admin_manager.get_plugin_module(msg)
if module:
try:
plugin = nonebot.plugin.get_plugin(module)

View File

@ -1,5 +1,37 @@
from nonebot.adapters.onebot.v11 import GroupMessageEvent, PrivateMessageEvent
from utils.manager import plugins2block_manager, StaticData
from typing import Optional
import nonebot
from nonebot.adapters.onebot.v11 import (
GroupMessageEvent,
PrivateMessageEvent,
Bot,
Event,
MessageEvent,
Message,
PokeNotifyEvent,
)
from nonebot.exception import ActionFailed, IgnoredException
from nonebot.internal.matcher import Matcher
from models.bag_user import BagUser
from models.ban_user import BanUser
from models.friend_user import FriendUser
from models.group_member_info import GroupInfoUser
from models.level_user import LevelUser
from models.user_shop_gold_log import UserShopGoldLog
from utils.manager import (
plugins2block_manager,
StaticData,
plugins2settings_manager,
group_manager,
admin_manager,
plugins_manager,
plugins2cd_manager,
plugins2count_manager,
)
from utils.message_builder import at
from utils.utils import FreqLimiter
from configs.config import Config
import time
ignore_rst_module = ["ai", "poke", "dialogue"]
@ -8,7 +40,6 @@ other_limit_plugins = ["poke"]
class StatusMessageManager(StaticData):
def __init__(self):
super().__init__(None)
@ -39,8 +70,8 @@ def set_block_limit_false(event, module):
"""
if plugins2block_manager.check_plugin_block_status(module):
plugin_block_data = plugins2block_manager.get_plugin_block_data(module)
check_type = plugin_block_data["check_type"]
limit_type = plugin_block_data["limit_type"]
check_type = plugin_block_data.check_type
limit_type = plugin_block_data.limit_type
if not (
(isinstance(event, GroupMessageEvent) and check_type == "private")
or (isinstance(event, PrivateMessageEvent) and check_type == "group")
@ -50,3 +81,436 @@ def set_block_limit_false(event, module):
block_type_ = event.group_id
plugins2block_manager.set_false(block_type_, module)
async def send_msg(msg: str, bot: Bot, event: MessageEvent):
"""
说明:
发送信息
参数:
:param msg: pass
:param bot: pass
:param event: pass
"""
if "[uname]" in msg:
uname = event.sender.card or event.sender.nickname
msg = msg.replace("[uname]", uname)
if "[nickname]" in msg:
if isinstance(event, GroupMessageEvent):
nickname = await GroupInfoUser.get_group_member_nickname(
event.user_id, event.group_id
)
else:
nickname = await FriendUser.get_friend_nickname(event.user_id)
msg = msg.replace("[nickname]", nickname)
if "[at]" in msg and isinstance(event, GroupMessageEvent):
msg = msg.replace("[at]", str(at(event.user_id)))
try:
if isinstance(event, GroupMessageEvent):
status_message_manager.add(event.group_id)
await bot.send_group_msg(group_id=event.group_id, message=Message(msg))
else:
status_message_manager.add(event.user_id)
await bot.send_private_msg(user_id=event.user_id, message=Message(msg))
except ActionFailed:
pass
class ReturnException(Exception):
pass
class AuthChecker:
"""
权限检查
"""
checker: Optional["AuthChecker"] = None
def __init__(self):
self._flmt = FreqLimiter(Config.get_config("hook", "CHECK_NOTICE_INFO_CD"))
self._flmt_g = FreqLimiter(Config.get_config("hook", "CHECK_NOTICE_INFO_CD"))
self._flmt_s = FreqLimiter(Config.get_config("hook", "CHECK_NOTICE_INFO_CD"))
self._flmt_c = FreqLimiter(Config.get_config("hook", "CHECK_NOTICE_INFO_CD"))
def __new__(cls, *args, **kwargs):
if not cls.checker:
cls.checker = super().__new__(cls)
return cls.checker
async def auth(self, matcher: Matcher, bot: Bot, event: Event):
"""
说明:
权限检查
参数:
:param matcher: matcher
:param bot: bot
:param event: event
"""
try:
plugin_name = matcher.plugin_name
cost_gold = await self.auth_cost(plugin_name, bot, event)
await self.auth_basic(plugin_name, bot, event)
self.auth_group(plugin_name, bot, event)
await self.auth_admin(plugin_name, matcher, bot, event)
await self.auth_plugin(plugin_name, matcher, bot, event)
await self.auth_limit(plugin_name, bot, event)
if cost_gold and str(event.user_id) not in bot.config.superusers:
await BagUser.spend_gold(event.user_id, event.group_id, cost_gold)
except ReturnException:
return
async def auth_limit(self, plugin_name: str, bot: Bot, event: Event):
"""
说明:
插件限制
参数:
:param plugin_name: 模块名
:param bot: bot
:param event: event
"""
if plugins2cd_manager.check_plugin_cd_status(plugin_name):
plugin_cd_data = plugins2cd_manager.get_plugin_cd_data(plugin_name)
check_type = plugin_cd_data.check_type
limit_type = plugin_cd_data.limit_type
msg = plugin_cd_data.rst
if (
(isinstance(event, PrivateMessageEvent) and check_type == "private")
or (isinstance(event, GroupMessageEvent) and check_type == "group")
or plugins2cd_manager.get_plugin_data(plugin_name).check_type == "all"
):
cd_type_ = event.user_id
if limit_type == "group" and isinstance(event, GroupMessageEvent):
cd_type_ = event.group_id
if not plugins2cd_manager.check(plugin_name, cd_type_):
if msg:
await send_msg(msg, bot, event)
raise IgnoredException(f"{plugin_name} 正在cd中...")
else:
plugins2cd_manager.start_cd(plugin_name, cd_type_)
# Block
if plugins2block_manager.check_plugin_block_status(plugin_name):
plugin_block_data = plugins2block_manager.get_plugin_block_data(plugin_name)
check_type = plugin_block_data.check_type
limit_type = plugin_block_data.limit_type
msg = plugin_block_data.rst
if (
(isinstance(event, PrivateMessageEvent) and check_type == "private")
or (isinstance(event, GroupMessageEvent) and check_type == "group")
or check_type == "all"
):
block_type_ = event.user_id
if limit_type == "group" and isinstance(event, GroupMessageEvent):
block_type_ = event.group_id
if plugins2block_manager.check(block_type_, plugin_name):
if msg:
await send_msg(msg, bot, event)
raise IgnoredException(f"{event.user_id}正在调用{plugin_name}....")
else:
plugins2block_manager.set_true(block_type_, plugin_name)
# Count
if (
plugins2count_manager.check_plugin_count_status(plugin_name)
and event.user_id not in bot.config.superusers
):
plugin_count_data = plugins2count_manager.get_plugin_count_data(plugin_name)
limit_type = plugin_count_data.limit_type
msg = plugin_count_data.rst
count_type_ = event.user_id
if limit_type == "group" and isinstance(event, GroupMessageEvent):
count_type_ = event.group_id
if not plugins2count_manager.check(plugin_name, count_type_):
if msg:
await send_msg(msg, bot, event)
raise IgnoredException(f"{plugin_name} count次数限制...")
else:
plugins2count_manager.increase(plugin_name, count_type_)
async def auth_plugin(self, plugin_name: str, matcher: Matcher, bot: Bot, event: Event):
"""
说明:
插件状态
参数:
:param plugin_name: 模块名
:param matcher: matcher
:param bot: bot
:param event: event
"""
if plugin_name in plugins2settings_manager.keys() and matcher.priority not in [
1,
999,
]:
# 戳一戳单独判断
if (
isinstance(event, GroupMessageEvent)
or isinstance(event, PokeNotifyEvent)
or matcher.plugin_name in other_limit_plugins
):
if status_message_manager.get(event.group_id) is None:
status_message_manager.delete(event.group_id)
if plugins2settings_manager[
plugin_name
].level > group_manager.get_group_level(event.group_id):
try:
if (
self._flmt_g.check(event.user_id)
and plugin_name not in ignore_rst_module
):
self._flmt_g.start_cd(event.user_id)
await bot.send_group_msg(
group_id=event.group_id, message="群权限不足..."
)
except ActionFailed:
pass
if event.is_tome():
status_message_manager.add(event.group_id)
set_block_limit_false(event, plugin_name)
raise IgnoredException("群权限不足")
# 插件状态
if not group_manager.get_plugin_status(plugin_name, event.group_id):
try:
if plugin_name not in ignore_rst_module and self._flmt_s.check(
event.group_id
):
self._flmt_s.start_cd(event.group_id)
await bot.send_group_msg(
group_id=event.group_id, message="该群未开启此功能.."
)
except ActionFailed:
pass
if event.is_tome():
status_message_manager.add(event.group_id)
set_block_limit_false(event, plugin_name)
raise IgnoredException("未开启此功能...")
# 管理员禁用
if not group_manager.get_plugin_status(
f"{plugin_name}:super", event.group_id
):
try:
if (
self._flmt_s.check(event.group_id)
and plugin_name not in ignore_rst_module
):
self._flmt_s.start_cd(event.group_id)
await bot.send_group_msg(
group_id=event.group_id, message="管理员禁用了此群该功能..."
)
except ActionFailed:
pass
if event.is_tome():
status_message_manager.add(event.group_id)
set_block_limit_false(event, plugin_name)
raise IgnoredException("管理员禁用了此群该功能...")
# 群聊禁用
if not plugins_manager.get_plugin_status(
plugin_name, block_type="group"
):
try:
if (
self._flmt_c.check(event.group_id)
and plugin_name not in ignore_rst_module
):
self._flmt_c.start_cd(event.group_id)
await bot.send_group_msg(
group_id=event.group_id, message="该功能在群聊中已被禁用..."
)
except ActionFailed:
pass
if event.is_tome():
status_message_manager.add(event.group_id)
set_block_limit_false(event, plugin_name)
raise IgnoredException("该插件在群聊中已被禁用...")
else:
# 私聊禁用
if not plugins_manager.get_plugin_status(
plugin_name, block_type="private"
):
try:
if self._flmt_c.check(event.user_id):
self._flmt_c.start_cd(event.user_id)
await bot.send_private_msg(
user_id=event.user_id, message="该功能在私聊中已被禁用..."
)
except ActionFailed:
pass
if event.is_tome():
status_message_manager.add(event.user_id)
set_block_limit_false(event, plugin_name)
raise IgnoredException("该插件在私聊中已被禁用...")
# 维护
if not plugins_manager.get_plugin_status(plugin_name, block_type="all"):
if isinstance(
event, GroupMessageEvent
) and group_manager.check_group_is_white(event.group_id):
raise ReturnException()
try:
if isinstance(event, GroupMessageEvent):
if (
self._flmt_c.check(event.group_id)
and plugin_name not in ignore_rst_module
):
self._flmt_c.start_cd(event.group_id)
await bot.send_group_msg(
group_id=event.group_id, message="此功能正在维护..."
)
else:
await bot.send_private_msg(
user_id=event.user_id, message="此功能正在维护..."
)
except ActionFailed:
pass
if event.is_tome():
id_ = (
event.group_id
if isinstance(event, GroupMessageEvent)
else event.user_id
)
status_message_manager.add(id_)
set_block_limit_false(event, plugin_name)
raise IgnoredException("此功能正在维护...")
async def auth_admin(self, plugin_name: str, matcher: Matcher, bot: Bot, event: Event):
"""
说明:
管理员命令 个人权限
参数:
:param plugin_name: 模块名
:param matcher: matcher
:param bot: bot
:param event: event
"""
if plugin_name in admin_manager.keys() and matcher.priority not in [1, 999]:
if isinstance(event, GroupMessageEvent):
# 个人权限
if (
not await LevelUser.check_level(
event.user_id,
event.group_id,
admin_manager.get_plugin_level(plugin_name),
)
and admin_manager.get_plugin_level(plugin_name) > 0
):
try:
if self._flmt.check(event.user_id):
self._flmt.start_cd(event.user_id)
await bot.send_group_msg(
group_id=event.group_id,
message=f"{at(event.user_id)}你的权限不足喔,该功能需要的权限等级:"
f"{admin_manager.get_plugin_level(plugin_name)}",
)
except ActionFailed:
pass
set_block_limit_false(event, plugin_name)
if event.is_tome():
status_message_manager.add(event.group_id)
raise IgnoredException("权限不足")
else:
if not await LevelUser.check_level(
event.user_id, 0, admin_manager.get_plugin_level(plugin_name)
):
try:
await bot.send_private_msg(
user_id=event.user_id,
message=f"你的权限不足喔,该功能需要的权限等级:{admin_manager.get_plugin_level(plugin_name)}",
)
except ActionFailed:
pass
set_block_limit_false(event, plugin_name)
if event.is_tome():
status_message_manager.add(event.user_id)
raise IgnoredException("权限不足")
def auth_group(self, plugin_name: str, bot: Bot, event: Event):
"""
说明:
群黑名单检测 群总开关检测
参数:
:param plugin_name: 模块名
:param bot: bot
:param event: event
"""
if isinstance(event, GroupMessageEvent) or plugin_name in other_limit_plugins:
try:
if (
group_manager.get_group_level(event.group_id) < 0
and str(event.user_id) not in bot.config.superusers
):
raise IgnoredException("群黑名单")
if not group_manager.check_group_bot_status(event.group_id):
try:
if str(event.get_message()) != "醒来":
raise IgnoredException("功能总开关关闭状态")
except ValueError:
raise IgnoredException("功能总开关关闭状态")
except AttributeError:
pass
async def auth_basic(self, plugin_name: str, bot: Bot, event: Event):
"""
说明:
检测是否满足超级用户权限是否被ban等
参数:
:param plugin_name: 模块名
:param bot: bot
:param event: event
"""
ban = await BanUser.is_ban(event.user_id)
try:
if (
(
not isinstance(event, MessageEvent)
and plugin_name not in other_limit_plugins
)
or await BanUser.is_ban(event.user_id)
and str(event.user_id) not in bot.config.superusers
) or (
str(event.user_id) in bot.config.superusers
and plugins2settings_manager.get_plugin_data(plugin_name)
and not plugins2settings_manager.get_plugin_data(
plugin_name
).limit_superuser
):
raise ReturnException()
_plugin = nonebot.plugin.get_plugin(plugin_name)
_module = _plugin.module
_plugin_name = _module.__getattribute__("__zx_plugin_name__")
if (
"[superuser]" in _plugin_name.lower()
and str(event.user_id) in bot.config.superusers
):
raise ReturnException()
except AttributeError:
pass
async def auth_cost(self, plugin_name: str, bot: Bot, event: Event) -> int:
"""
说明:
检测是否满足金币条件
参数:
:param plugin_name: 模块名
:param bot: bot
:param event: event
"""
cost_gold = 0
if isinstance(event, GroupMessageEvent) and (
psm := plugins2settings_manager.get_plugin_data(plugin_name)
):
if psm.cost_gold > 0:
if (
await BagUser.get_gold(event.user_id, event.group_id)
< psm.cost_gold
):
await send_msg(f"金币不足..该功能需要{psm.cost_gold}金币..", bot, event)
raise IgnoredException(f"{plugin_name} 金币限制...")
# 当插件不阻塞超级用户时,超级用户提前扣除金币
if (
str(event.user_id) in bot.config.superusers
and not psm.limit_superuser
):
await BagUser.spend_gold(
event.user_id, event.group_id, psm.cost_gold
)
await UserShopGoldLog.add_shop_log(
event.user_id, event.group_id, 2, plugin_name, psm.cost_gold, 1
)
cost_gold = psm.cost_gold
return cost_gold

View File

@ -1,395 +1,36 @@
from nonebot.matcher import Matcher
from nonebot.message import run_preprocessor, run_postprocessor, IgnoredException
from models.friend_user import FriendUser
from models.group_member_info import GroupInfoUser
from models.bag_user import BagUser
from models.user_shop_gold_log import UserShopGoldLog
from utils.manager import (
plugins2settings_manager,
admin_manager,
group_manager,
plugins_manager,
plugins2cd_manager,
plugins2block_manager,
plugins2count_manager,
)
from ._utils import (
set_block_limit_false,
status_message_manager,
ignore_rst_module,
other_limit_plugins,
)
from nonebot.typing import T_State
from typing import Optional
from nonebot.adapters.onebot.v11 import (
Bot,
ActionFailed,
MessageEvent,
GroupMessageEvent,
PokeNotifyEvent,
PrivateMessageEvent,
Message,
Event,
)
from configs.config import Config
from models.ban_user import BanUser
from utils.utils import FreqLimiter
from utils.message_builder import at
from models.level_user import LevelUser
import nonebot
from nonebot.matcher import Matcher
from nonebot.message import run_preprocessor, run_postprocessor
from nonebot.typing import T_State
_flmt = FreqLimiter(Config.get_config("hook", "CHECK_NOTICE_INFO_CD"))
_flmt_g = FreqLimiter(Config.get_config("hook", "CHECK_NOTICE_INFO_CD"))
_flmt_s = FreqLimiter(Config.get_config("hook", "CHECK_NOTICE_INFO_CD"))
_flmt_c = FreqLimiter(Config.get_config("hook", "CHECK_NOTICE_INFO_CD"))
from ._utils import (
set_block_limit_false,
AuthChecker,
)
# 权限检测
# # 权限检测
@run_preprocessor
async def _(matcher: Matcher, bot: Bot, event: Event, state: T_State):
module = matcher.plugin_name
plugins2info_dict = plugins2settings_manager.get_data()
# 功能的金币检测 #######################################
# 功能的金币检测 #######################################
# 功能的金币检测 #######################################
cost_gold = 0
if isinstance(
event, GroupMessageEvent
) and plugins2settings_manager.get_plugin_data(module).get("cost_gold"):
cost_gold = plugins2settings_manager.get_plugin_data(module).get("cost_gold")
if await BagUser.get_gold(event.user_id, event.group_id) < cost_gold:
await send_msg(f"金币不足..该功能需要{cost_gold}金币..", bot, event)
raise IgnoredException(f"{module} 金币限制...")
await UserShopGoldLog.add_shop_log(event.user_id, event.group_id, 2, matcher.plugin_name, cost_gold, 1)
# 当插件不阻塞超级用户时,超级用户提前扣除金币
if (
str(event.user_id) in bot.config.superusers
and not plugins2info_dict[module]["limit_superuser"]
):
await BagUser.spend_gold(event.user_id, event.group_id, cost_gold)
try:
if (
(not isinstance(event, MessageEvent) and module not in other_limit_plugins)
or await BanUser.is_ban(event.user_id)
and str(event.user_id) not in bot.config.superusers
) or (
str(event.user_id) in bot.config.superusers
and plugins2info_dict.get(module)
and not plugins2info_dict[module]["limit_superuser"]
):
return
except AttributeError:
pass
# 超级用户命令
try:
_plugin = nonebot.plugin.get_plugin(module)
_module = _plugin.module
plugin_name = _module.__getattribute__("__zx_plugin_name__")
if (
"[superuser]" in plugin_name.lower()
and str(event.user_id) in bot.config.superusers
):
return
except AttributeError:
pass
# 群黑名单检测 群总开关检测
if (
isinstance(event, GroupMessageEvent)
or matcher.plugin_name in other_limit_plugins
):
try:
if (
group_manager.get_group_level(event.group_id) < 0
and str(event.user_id) not in bot.config.superusers
):
raise IgnoredException("群黑名单")
if not group_manager.check_group_bot_status(event.group_id):
try:
if str(event.get_message()) != "醒来":
raise IgnoredException("功能总开关关闭状态")
except ValueError:
raise IgnoredException("功能总开关关闭状态")
except AttributeError:
pass
if module in admin_manager.keys() and matcher.priority not in [1, 999]:
if isinstance(event, GroupMessageEvent):
# 个人权限
if (
not await LevelUser.check_level(
event.user_id,
event.group_id,
admin_manager.get_plugin_level(module),
)
and admin_manager.get_plugin_level(module) > 0
):
try:
if _flmt.check(event.user_id):
_flmt.start_cd(event.user_id)
await bot.send_group_msg(
group_id=event.group_id,
message=f"{at(event.user_id)}你的权限不足喔,该功能需要的权限等级:"
f"{admin_manager.get_plugin_level(module)}",
)
except ActionFailed:
pass
set_block_limit_false(event, module)
if event.is_tome():
status_message_manager.add(event.group_id)
raise IgnoredException("权限不足")
else:
if not await LevelUser.check_level(
event.user_id, 0, admin_manager.get_plugin_level(module)
):
try:
await bot.send_private_msg(
user_id=event.user_id,
message=f"你的权限不足喔,该功能需要的权限等级:{admin_manager.get_plugin_level(module)}",
)
except ActionFailed:
pass
set_block_limit_false(event, module)
if event.is_tome():
status_message_manager.add(event.user_id)
raise IgnoredException("权限不足")
if module in plugins2info_dict.keys() and matcher.priority not in [1, 999]:
# 戳一戳单独判断
if (
isinstance(event, GroupMessageEvent)
or isinstance(event, PokeNotifyEvent)
or matcher.plugin_name in other_limit_plugins
):
if status_message_manager.get(event.group_id) is None:
status_message_manager.delete(event.group_id)
if plugins2info_dict[module]["level"] > group_manager.get_group_level(
event.group_id
):
try:
if _flmt_g.check(event.user_id) and module not in ignore_rst_module:
_flmt_g.start_cd(event.user_id)
await bot.send_group_msg(
group_id=event.group_id, message="群权限不足..."
)
except ActionFailed:
pass
if event.is_tome():
status_message_manager.add(event.group_id)
set_block_limit_false(event, module)
raise IgnoredException("群权限不足")
# 插件状态
if not group_manager.get_plugin_status(module, event.group_id):
try:
if module not in ignore_rst_module and _flmt_s.check(
event.group_id
):
_flmt_s.start_cd(event.group_id)
await bot.send_group_msg(
group_id=event.group_id, message="该群未开启此功能.."
)
except ActionFailed:
pass
if event.is_tome():
status_message_manager.add(event.group_id)
set_block_limit_false(event, module)
raise IgnoredException("未开启此功能...")
# 管理员禁用
if not group_manager.get_plugin_status(f"{module}:super", event.group_id):
try:
if (
_flmt_s.check(event.group_id)
and module not in ignore_rst_module
):
_flmt_s.start_cd(event.group_id)
await bot.send_group_msg(
group_id=event.group_id, message="管理员禁用了此群该功能..."
)
except ActionFailed:
pass
if event.is_tome():
status_message_manager.add(event.group_id)
set_block_limit_false(event, module)
raise IgnoredException("管理员禁用了此群该功能...")
# 群聊禁用
if not plugins_manager.get_plugin_status(module, block_type="group"):
try:
if (
_flmt_c.check(event.group_id)
and module not in ignore_rst_module
):
_flmt_c.start_cd(event.group_id)
await bot.send_group_msg(
group_id=event.group_id, message="该功能在群聊中已被禁用..."
)
except ActionFailed:
pass
if event.is_tome():
status_message_manager.add(event.group_id)
set_block_limit_false(event, module)
raise IgnoredException("该插件在群聊中已被禁用...")
else:
# 私聊禁用
if not plugins_manager.get_plugin_status(module, block_type="private"):
try:
if _flmt_c.check(event.user_id):
_flmt_c.start_cd(event.user_id)
await bot.send_private_msg(
user_id=event.user_id, message="该功能在私聊中已被禁用..."
)
except ActionFailed:
pass
if event.is_tome():
status_message_manager.add(event.user_id)
set_block_limit_false(event, module)
raise IgnoredException("该插件在私聊中已被禁用...")
# 维护
if not plugins_manager.get_plugin_status(module, block_type="all"):
if isinstance(
event, GroupMessageEvent
) and group_manager.check_group_is_white(event.group_id):
return
try:
if isinstance(event, GroupMessageEvent):
if (
_flmt_c.check(event.group_id)
and module not in ignore_rst_module
):
_flmt_c.start_cd(event.group_id)
await bot.send_group_msg(
group_id=event.group_id, message="此功能正在维护..."
)
else:
await bot.send_private_msg(
user_id=event.user_id, message="此功能正在维护..."
)
except ActionFailed:
pass
if event.is_tome():
id_ = (
event.group_id
if isinstance(event, GroupMessageEvent)
else event.user_id
)
status_message_manager.add(id_)
set_block_limit_false(event, module)
raise IgnoredException("此功能正在维护...")
# 以下为限制检测 #######################################################
# 以下为限制检测 #######################################################
# 以下为限制检测 #######################################################
# 以下为限制检测 #######################################################
# 以下为限制检测 #######################################################
# 以下为限制检测 #######################################################
# Cd
if plugins2cd_manager.check_plugin_cd_status(module):
plugin_cd_data = plugins2cd_manager.get_plugin_cd_data(module)
check_type = plugin_cd_data["check_type"]
limit_type = plugin_cd_data["limit_type"]
rst = plugin_cd_data["rst"]
if (
(isinstance(event, PrivateMessageEvent) and check_type == "private")
or (isinstance(event, GroupMessageEvent) and check_type == "group")
or plugins2cd_manager.get_plugin_data(module).get("check_type") == "all"
):
cd_type_ = event.user_id
if limit_type == "group" and isinstance(event, GroupMessageEvent):
cd_type_ = event.group_id
if not plugins2cd_manager.check(module, cd_type_):
if rst:
rst = await init_rst(rst, event)
await send_msg(rst, bot, event)
raise IgnoredException(f"{module} 正在cd中...")
else:
plugins2cd_manager.start_cd(module, cd_type_)
# Block
if plugins2block_manager.check_plugin_block_status(module):
plugin_block_data = plugins2block_manager.get_plugin_block_data(module)
check_type = plugin_block_data["check_type"]
limit_type = plugin_block_data["limit_type"]
rst = plugin_block_data["rst"]
if (
(isinstance(event, PrivateMessageEvent) and check_type == "private")
or (isinstance(event, GroupMessageEvent) and check_type == "group")
or check_type == "all"
):
block_type_ = event.user_id
if limit_type == "group" and isinstance(event, GroupMessageEvent):
block_type_ = event.group_id
if plugins2block_manager.check(block_type_, module):
if rst:
rst = await init_rst(rst, event)
await send_msg(rst, bot, event)
raise IgnoredException(f"{event.user_id}正在调用{module}....")
else:
plugins2block_manager.set_true(block_type_, module)
# Count
if (
plugins2count_manager.check_plugin_count_status(module)
and event.user_id not in bot.config.superusers
):
plugin_count_data = plugins2count_manager.get_plugin_count_data(module)
limit_type = plugin_count_data["limit_type"]
rst = plugin_count_data["rst"]
count_type_ = event.user_id
if limit_type == "group" and isinstance(event, GroupMessageEvent):
count_type_ = event.group_id
if not plugins2count_manager.check(module, count_type_):
if rst:
rst = await init_rst(rst, event)
await send_msg(rst, bot, event)
raise IgnoredException(f"{module} count次数限制...")
else:
plugins2count_manager.increase(module, count_type_)
# 功能花费的金币 #######################################
# 功能花费的金币 #######################################
if cost_gold:
await BagUser.spend_gold(event.user_id, event.group_id, cost_gold)
async def send_msg(rst: str, bot: Bot, event: MessageEvent):
"""
发送信息
:param rst: pass
:param bot: pass
:param event: pass
"""
rst = await init_rst(rst, event)
try:
if isinstance(event, GroupMessageEvent):
status_message_manager.add(event.group_id)
await bot.send_group_msg(group_id=event.group_id, message=Message(rst))
else:
status_message_manager.add(event.user_id)
await bot.send_private_msg(user_id=event.user_id, message=Message(rst))
except ActionFailed:
pass
async def _(matcher: Matcher, bot: Bot, event: Event):
await AuthChecker().auth(matcher, bot, event)
# 解除命令block阻塞
@run_postprocessor
async def _(
matcher: Matcher,
exception: Optional[Exception],
bot: Bot,
event: Event,
state: T_State,
matcher: Matcher,
exception: Optional[Exception],
bot: Bot,
event: Event,
state: T_State,
):
if not isinstance(event, MessageEvent) and matcher.plugin_name != "poke":
return
module = matcher.plugin_name
set_block_limit_false(event, module)
async def init_rst(rst: str, event: MessageEvent):
if "[uname]" in rst:
uname = event.sender.card or event.sender.nickname
rst = rst.replace("[uname]", uname)
if "[nickname]" in rst:
if isinstance(event, GroupMessageEvent):
nickname = await GroupInfoUser.get_group_member_nickname(
event.user_id, event.group_id
)
else:
nickname = await FriendUser.get_friend_nickname(event.user_id)
rst = rst.replace("[nickname]", nickname)
if "[at]" in rst and isinstance(event, GroupMessageEvent):
rst = rst.replace("[at]", str(at(event.user_id)))
return rst

View File

@ -39,7 +39,7 @@ async def _(bot: Bot, api: str, data: Dict[str, Any]):
group_id = data["group_id"]
if group_manager.get_group_level(
group_id
) < 0 or not await group_manager.check_group_task_status(group_id, task):
) < 0 or not group_manager.check_group_task_status(group_id, task):
raise MockApiException(f"被动技能 {task} 处于关闭状态...")
else:
msg = str(data["message"]).strip()

View File

@ -1,3 +1,4 @@
from configs.path_config import DATA_PATH
from .init_group_manager import init_group_manager, group_manager
from .init_plugins_config import init_plugins_config
from .init_plugins_data import init_plugins_data, plugins_manager
@ -12,7 +13,6 @@ from .init_plugins_limit import (
from .init import init
from .check_plugin_status import check_plugin_status
from nonebot.adapters.onebot.v11 import Bot
from configs.path_config import DATA_PATH
from services.log import logger
from nonebot import Driver
import nonebot
@ -31,23 +31,21 @@ async def _():
"""
初始化数据
"""
_flag = False
config_file = DATA_PATH / "configs" / "plugins2config.yaml"
if not config_file.exists():
_flag = True
_flag = not config_file.exists()
init()
init_plugins_settings(DATA_PATH)
init_plugins_cd_limit(DATA_PATH)
init_plugins_block_limit(DATA_PATH)
init_plugins_count_limit(DATA_PATH)
init_plugins_data(DATA_PATH)
init_plugins_config(DATA_PATH)
init_plugins_settings()
init_plugins_cd_limit()
init_plugins_block_limit()
init_plugins_count_limit()
init_plugins_data()
init_plugins_config()
init_plugins_resources()
init_none_plugin_count_manager()
x = group_manager.get_super_old_data()
if x:
for key in x.keys():
plugins_manager.block_plugin(key, block_type=x[key])
# x = group_manager.get_super_old_data()
# if x:
# for key in x.keys():
# plugins_manager.block_plugin(key, block_type=x[key])
if _flag:
raise Exception("首次运行已在configs目录下生成配置文件config.yaml修改后重启即可...")
logger.info("初始化数据完成...")
@ -55,5 +53,5 @@ async def _():
@driver.on_bot_connect
async def _(bot: Bot):
await init_group_manager()
# await init_group_manager()
await check_plugin_status(bot)

View File

@ -6,13 +6,13 @@ async def check_plugin_status(bot: Bot):
"""
遍历查看插件加载情况
"""
rst = ""
msg = ""
for plugin in plugins_manager.keys():
data = plugins_manager.get(plugin)
if data.get("error") or data.get("error") is None:
rst += f'{plugin}:{data["plugin_name"]}\n'
if rst:
rst = "以下插件加载失败..\n" + rst
if not data.error:
msg += f'{plugin}:{data.plugin_name}\n'
if msg and bot.config.superusers:
msg = "以下插件加载失败..\n" + msg
await bot.send_private_msg(
user_id=int(list(bot.config.superusers)[0]), message=rst[:-1]
user_id=int(list(bot.config.superusers)[0]), message=msg.strip()
)

View File

@ -19,36 +19,44 @@ def init_none_plugin_count_manager():
"""
清除已删除插件数据
"""
modules = [x.plugin_name for x in get_matchers()]
modules = [x.plugin_name for x in get_matchers(True)]
plugins_manager_list = list(plugins_manager.keys())
for module in plugins_manager_list:
if module not in modules or none_plugin_count_manager.check(module):
try:
plugin_name = plugins_manager.get(module)["plugin_name"]
except (AttributeError, KeyError):
plugin_name = ""
if none_plugin_count_manager.check(module):
try:
if module not in modules or none_plugin_count_manager.check(module):
try:
plugins2settings_manager.delete(module)
plugins2settings_manager.save()
plugins2count_manager.delete(module)
plugins2count_manager.save()
plugins2cd_manager.delete(module)
plugins2cd_manager.save()
plugins2block_manager.delete(module)
plugins2block_manager.save()
plugins_manager.delete(module)
plugins_manager.save()
# resources_manager.remove_resource(module)
none_plugin_count_manager.delete(module)
logger.info(f"{module}:{plugin_name} 插件疑似已删除,清除对应插件数据...")
except Exception as e:
logger.exception(
f"{module}:{plugin_name} 插件疑似已删除,清除对应插件数据失败...{type(e)}{e}")
plugin_name = plugins_manager.get(module).plugin_name
except (AttributeError, KeyError):
plugin_name = ""
if none_plugin_count_manager.check(module):
try:
plugins2settings_manager.delete(module)
plugins2count_manager.delete(module)
plugins2cd_manager.delete(module)
plugins2block_manager.delete(module)
plugins_manager.delete(module)
plugins_manager.save()
# resources_manager.remove_resource(module)
none_plugin_count_manager.delete(module)
logger.info(f"{module}:{plugin_name} 插件疑似已删除,清除对应插件数据...")
except Exception as e:
logger.exception(
f"{module}:{plugin_name} 插件疑似已删除,清除对应插件数据失败...{type(e)}{e}"
)
else:
none_plugin_count_manager.add_count(module)
logger.info(
f"{module}:{plugin_name} 插件疑似已删除,"
f"加载{none_plugin_count_manager._max_count}次失败后将清除对应插件数据,"
f"当前次数:{none_plugin_count_manager._data[module]}"
)
else:
none_plugin_count_manager.add_count(module)
logger.info(
f"{module}:{plugin_name} 插件疑似已删除,加载{none_plugin_count_manager._max_count}次失败后将清除对应插件数据,当前次数:{none_plugin_count_manager._data[module]}")
else:
none_plugin_count_manager.reset(module)
none_plugin_count_manager.reset(module)
except Exception as e:
logger.error(f"清除插件数据错误 {type(e)}{e}")
plugins2settings_manager.save()
plugins2count_manager.save()
plugins2cd_manager.save()
plugins2block_manager.save()
plugins_manager.save()
none_plugin_count_manager.save()

View File

@ -8,40 +8,32 @@ from services.log import logger
from utils.text_utils import prompt2cn
from utils.utils import get_matchers
from utils.utils import scheduler
from configs.path_config import DATA_PATH
from ruamel import yaml
_yaml = YAML(typ="safe")
def init_plugins_config(data_path):
def init_plugins_config():
"""
初始化插件数据配置
"""
plugins2config_file = data_path / "configs" / "plugins2config.yaml"
plugins2config_file = DATA_PATH / "configs" / "plugins2config.yaml"
plugins2config_file.parent.mkdir(parents=True, exist_ok=True)
_data = {}
if plugins2config_file.exists():
_data = _yaml.load(open(plugins2config_file, "r", encoding="utf8"))
_matchers = get_matchers(True)
# 优先使用 metadata 数据
for matcher in _matchers:
for matcher in [matcher for matcher in get_matchers(True) if matcher.plugin and matcher.plugin.module]:
_plugin = matcher.plugin
if not _plugin:
continue
metadata = _plugin.metadata
try:
_module = _plugin.module
except AttributeError:
continue
_module = _plugin.module
plugin_version = None
if metadata:
plugin_version = metadata.extra.get("version")
if not plugin_version:
try:
plugin_version = _module.__getattribute__("__plugin_version__")
except AttributeError:
pass
if not plugin_version and hasattr(_module, "__plugin_version__"):
plugin_version = _module.__getattribute__("__plugin_version__")
if metadata and metadata.config:
plugin_configs = {}
for key, value in metadata.config.__fields__.items():
@ -61,7 +53,7 @@ def init_plugins_config(data_path):
_data.get(matcher.plugin_name)
and _data[matcher.plugin_name].keys() != plugin_configs.keys()
)
or plugin_version > plugins_manager.get(matcher.plugin_name)["version"]
or plugin_version > plugins_manager.get(matcher.plugin_name).version
or matcher.plugin_name not in _data.keys()
):
for key in plugin_configs:
@ -94,16 +86,16 @@ def init_plugins_config(data_path):
_data = round_trip_load(open(plugins2config_file, encoding="utf8"))
for plugin in _data.keys():
try:
plugin_name = plugins_manager.get(plugin)["plugin_name"]
plugin_name = plugins_manager.get(plugin).plugin_name
except (AttributeError, TypeError):
plugin_name = plugin
_data[plugin].yaml_set_start_comment(plugin_name, indent=2)
# 初始化未设置的管理员权限等级
for k, v in Config.get_admin_level_data():
try:
admin_manager.set_admin_level(k, v)
except KeyError as e:
raise KeyError(f"{e} ****** 请检查是否有插件加载失败 ******")
# try:
admin_manager.set_admin_level(k, v)
# except KeyError as e:
# raise KeyError(f"{e} ****** 请检查是否有插件加载失败 ******")
# 存完插件基本设置
with open(plugins2config_file, "w", encoding="utf8") as wf:
round_trip_dump(
@ -181,7 +173,7 @@ def _replace_config():
plugin_name = None
if not plugin_name:
try:
plugin_name = plugins_manager.get(plugin)["plugin_name"]
plugin_name = plugins_manager.get(plugin).plugin_name
except (AttributeError, TypeError):
plugin_name = plugin
plugin_name = (

View File

@ -3,6 +3,7 @@ from ruamel.yaml import YAML
from utils.manager import plugins_manager
from utils.utils import get_matchers
from services.log import logger
from configs.path_config import DATA_PATH
try:
import ujson as json
@ -13,17 +14,13 @@ except ModuleNotFoundError:
_yaml = YAML(typ="safe")
def init_plugins_data(data_path):
def init_plugins_data():
"""
初始化插件数据信息
"""
plugin2data_file = data_path / "manager" / "plugin_manager.json"
plugin2data_file = DATA_PATH / "manager" / "plugin_manager.json"
plugin2data_file.parent.mkdir(parents=True, exist_ok=True)
_data = {}
if plugin2data_file.exists():
_data = json.load(open(plugin2data_file, "r", encoding="utf8"))
_matchers = get_matchers(True)
for matcher in _matchers:
for matcher in get_matchers(True):
_plugin = matcher.plugin
if not _plugin:
continue
@ -31,27 +28,19 @@ def init_plugins_data(data_path):
try:
_module = _plugin.module
except AttributeError:
if matcher.plugin_name not in _data.keys():
if matcher.plugin_name not in plugins_manager.keys():
plugins_manager.add_plugin_data(
matcher.plugin_name, matcher.plugin_name, error=True
)
else:
plugins_manager.set_module_data(matcher.plugin_name, "error", True)
plugin_data = plugins_manager.get(matcher.plugin_name)
if plugin_data:
plugins_manager.set_module_data(
matcher.plugin_name, "version", plugin_data.get("version")
)
plugins_manager[matcher.plugin_name].error = True
else:
try:
plugin_version = None
if metadata:
plugin_version = metadata.extra.get("version")
if not plugin_version:
try:
plugin_version = _module.__getattribute__("__plugin_version__")
except AttributeError:
pass
if not plugin_version and hasattr(_module, "__plugin_version__"):
plugin_version = _module.__getattribute__("__plugin_version__")
if metadata:
plugin_name = metadata.name
else:
@ -62,12 +51,10 @@ def init_plugins_data(data_path):
plugin_author = None
if metadata:
plugin_author = metadata.extra.get('author')
try:
if not plugin_author and hasattr(_module, "__plugin_author__"):
plugin_author = _module.__getattribute__("__plugin_author__")
except AttributeError:
pass
if matcher.plugin_name in plugins_manager.keys():
plugins_manager.set_module_data(matcher.plugin_name, "error", False)
plugins_manager[matcher.plugin_name].error = False
if matcher.plugin_name not in plugins_manager.keys():
plugins_manager.add_plugin_data(
matcher.plugin_name,
@ -76,24 +63,21 @@ def init_plugins_data(data_path):
version=plugin_version,
)
# metadata不检测version
elif isinstance(plugin_version, str) or plugins_manager[matcher.plugin_name]["version"] is None or (
elif isinstance(plugin_version, str) or plugins_manager[matcher.plugin_name].version is None or (
plugin_version is not None
and plugin_version > float(plugins_manager[matcher.plugin_name]["version"])
and plugin_version > float(plugins_manager[matcher.plugin_name].version)
):
plugins_manager.set_module_data(
matcher.plugin_name, "plugin_name", plugin_name
)
plugins_manager.set_module_data(matcher.plugin_name, "author", plugin_author)
plugins_manager.set_module_data(
matcher.plugin_name, "version", plugin_version
)
if matcher.plugin_name in _data.keys():
plugins_manager.set_module_data(
matcher.plugin_name, "error", _data[matcher.plugin_name]["error"]
)
plugins_manager.set_module_data(
matcher.plugin_name, "plugin_name", _data[matcher.plugin_name]["plugin_name"]
)
plugins_manager[matcher.plugin_name].plugin_name = plugin_name
plugins_manager[matcher.plugin_name].author = plugin_author
plugins_manager[matcher.plugin_name].version = plugin_version
# if matcher.plugin_name in _data.keys():
# plugins_manager[matcher.plugin_name].error = _data[matcher.plugin_name]["error"]
# plugins_manager.set_module_data(
# matcher.plugin_name, "error", _data[matcher.plugin_name]["error"]
# )
# plugins_manager.set_module_data(
# matcher.plugin_name, "plugin_name", _data[matcher.plugin_name]["plugin_name"]
# )
except Exception as e:
logger.error(f"插件数据 {matcher.plugin_name} 加载发生错误 {type(e)}{e}")
plugins_manager.save()

View File

@ -1,27 +1,21 @@
from pathlib import Path
from ruamel.yaml import round_trip_load, round_trip_dump, YAML
from utils.manager import (
plugins2cd_manager,
plugins2block_manager,
plugins2count_manager,
)
from utils.utils import get_matchers
from ruamel import yaml
from configs.path_config import DATA_PATH
import nonebot
_yaml = YAML(typ="safe")
def init_plugins_cd_limit(data_path):
def init_plugins_cd_limit():
"""
加载 cd 限制
"""
plugins2cd_file = data_path / "configs" / "plugins2cd.yaml"
plugins2cd_file = DATA_PATH / "configs" / "plugins2cd.yaml"
plugins2cd_file.parent.mkdir(exist_ok=True, parents=True)
_data = {}
_matchers = get_matchers()
for matcher in _matchers:
for matcher in get_matchers(True):
if not plugins2cd_manager.get_plugin_cd_data(matcher.plugin_name):
_plugin = nonebot.plugin.get_plugin(matcher.plugin_name)
try:
@ -36,42 +30,19 @@ def init_plugins_cd_limit(data_path):
plugins2cd_manager.add_cd_limit(
"这是一个示例"
)
_tmp_data = {"PluginCdLimit": plugins2cd_manager.get_data()}
with open(plugins2cd_file, "w", encoding="utf8") as wf:
yaml.dump(_tmp_data, wf, Dumper=yaml.RoundTripDumper, allow_unicode=True)
_data = round_trip_load(open(plugins2cd_file, encoding="utf8"))
_data["PluginCdLimit"].yaml_set_start_comment(
"""# 需要cd的功能
# 自定义的功能需要cd也可以在此配置
# key模块名称
# cdcd 时长(秒)
# status此限制的开关状态
# check_type'private'/'group'/'all',限制私聊/群聊/全部
# limit_type监听对象以user_id或group_id作为键来限制'user'用户id'group'群id
# 示例:'user'用户N秒内触发1次'group'群N秒内触发1次
# rst回复的话可以添加[at][uname][nickname]来对应艾特,用户群名称,昵称系统昵称
# rst 为 "" 或 None 时则不回复
# rst示例"[uname]你冲的太快了,[nickname]先生,请稍后再冲[at]"
# rst回复"老色批你冲的太快了,欧尼酱先生,请稍后再冲@老色批"
# 用户昵称↑ 昵称系统的昵称↑ 艾特用户↑""",
indent=2,
)
with open(plugins2cd_file, "w", encoding="utf8") as wf:
round_trip_dump(_data, wf, Dumper=yaml.RoundTripDumper, allow_unicode=True)
plugins2cd_manager.save()
plugins2cd_manager.reload_cd_limit()
def init_plugins_block_limit(data_path):
def init_plugins_block_limit():
"""
加载阻塞限制
"""
plugins2block_file = data_path / "configs" / "plugins2block.yaml"
plugins2block_file = DATA_PATH / "configs" / "plugins2block.yaml"
plugins2block_file.parent.mkdir(exist_ok=True, parents=True)
_data = {}
_matchers = get_matchers()
for matcher in _matchers:
for matcher in get_matchers(True):
if not plugins2block_manager.get_plugin_block_data(matcher.plugin_name):
_plugin = nonebot.plugin.get_plugin(matcher.plugin_name)
_plugin = matcher.plugin
try:
_module = _plugin.module
plugin_block_limit = _module.__getattribute__("__plugin_block_limit__")
@ -84,36 +55,15 @@ def init_plugins_block_limit(data_path):
plugins2block_manager.add_block_limit(
"这是一个示例"
)
_tmp_data = {"PluginBlockLimit": plugins2block_manager.get_data()}
with open(plugins2block_file, "w", encoding="utf8") as wf:
yaml.dump(_tmp_data, wf, Dumper=yaml.RoundTripDumper, allow_unicode=True)
_data = round_trip_load(open(plugins2block_file, encoding="utf8"))
_data["PluginBlockLimit"].yaml_set_start_comment(
"""# 用户调用阻塞
# 即 当用户调用此功能还未结束时
# 用发送消息阻止用户重复调用此命令直到该命令结束
# key模块名称
# status此限制的开关状态
# check_type'private'/'group'/'all',限制私聊/群聊/全部
# limit_type监听对象以user_id或group_id作为键来限制'user'用户id'group'群id
# 示例:'user':阻塞用户,'group':阻塞群聊
# rst回复的话可以添加[at][uname][nickname]来对应艾特,用户群名称,昵称系统昵称
# rst 为 "" 或 None 时则不回复
# rst示例"[uname]你冲的太快了,[nickname]先生,请稍后再冲[at]"
# rst回复"老色批你冲的太快了,欧尼酱先生,请稍后再冲@老色批"
# 用户昵称↑ 昵称系统的昵称↑ 艾特用户↑""",
indent=2,
)
with open(plugins2block_file, "w", encoding="utf8") as wf:
round_trip_dump(_data, wf, Dumper=yaml.RoundTripDumper, allow_unicode=True)
plugins2block_manager.save()
plugins2block_manager.reload_block_limit()
def init_plugins_count_limit(data_path):
def init_plugins_count_limit():
"""
加载次数限制
"""
plugins2count_file = data_path / "configs" / "plugins2count.yaml"
plugins2count_file = DATA_PATH / "configs" / "plugins2count.yaml"
plugins2count_file.parent.mkdir(exist_ok=True, parents=True)
_data = {}
_matchers = get_matchers()
@ -132,26 +82,5 @@ def init_plugins_count_limit(data_path):
plugins2count_manager.add_count_limit(
"这是一个示例"
)
_tmp_data = {"PluginCountLimit": plugins2count_manager.get_data()}
with open(plugins2count_file, "w", encoding="utf8") as wf:
yaml.dump(_tmp_data, wf, Dumper=yaml.RoundTripDumper, allow_unicode=True)
_data = round_trip_load(open(plugins2count_file, encoding="utf8"))
_data["PluginCountLimit"].yaml_set_start_comment(
"""# 命令每日次数限制
# 即 用户/群聊 每日可调用命令的次数 [数据内存存储,重启将会重置]
# 每日调用直到 00:00 刷新
# key模块名称
# max_count: 每日调用上限
# status此限制的开关状态
# limit_type监听对象以user_id或group_id作为键来限制'user'用户id'group'群id
# 示例:'user':用户上限,'group':群聊上限
# rst回复的话可以添加[at][uname][nickname]来对应艾特,用户群名称,昵称系统昵称
# rst 为 "" 或 None 时则不回复
# rst示例"[uname]你冲的太快了,[nickname]先生,请稍后再冲[at]"
# rst回复"老色批你冲的太快了,欧尼酱先生,请稍后再冲@老色批"
# 用户昵称↑ 昵称系统的昵称↑ 艾特用户↑""",
indent=2,
)
with open(plugins2count_file, "w", encoding="utf8") as wf:
round_trip_dump(_data, wf, Dumper=yaml.RoundTripDumper, allow_unicode=True)
plugins2count_manager.save()
plugins2count_manager.reload_count_limit()

View File

@ -1,24 +1,14 @@
from pathlib import Path
from ruamel.yaml import round_trip_load, round_trip_dump, YAML
from utils.manager import plugins2settings_manager, admin_manager
from services.log import logger
from utils.utils import get_matchers
from ruamel import yaml
import nonebot
_yaml = YAML(typ="safe")
def init_plugins_settings(data_path: str):
def init_plugins_settings():
"""
初始化插件设置从插件中获取 __zx_plugin_name____plugin_cmd____plugin_settings__
"""
plugins2settings_file = data_path / "configs" / "plugins2settings.yaml"
plugins2settings_file.parent.mkdir(exist_ok=True, parents=True)
_matchers = get_matchers(True)
_tmp_module = {}
_tmp = []
for x in plugins2settings_manager.keys():
try:
_plugin = nonebot.plugin.get_plugin(x)
@ -33,12 +23,10 @@ def init_plugins_settings(data_path: str):
except (KeyError, AttributeError) as e:
logger.warning(f"配置文件 模块:{x} 获取 plugin_name 失败...{e}")
_tmp_module[x] = ""
for matcher in _matchers:
for matcher in [x for x in get_matchers(True) if x.plugin]:
try:
if matcher.plugin_name not in plugins2settings_manager.keys():
_plugin = matcher.plugin
if not _plugin:
continue
metadata = _plugin.metadata
try:
_module = _plugin.module
@ -46,22 +34,17 @@ def init_plugins_settings(data_path: str):
logger.warning(f"插件 {matcher.plugin_name} 加载失败...,插件控制未加载.")
else:
try:
if metadata:
plugin_name = metadata.name
else:
plugin_name = _module.__getattribute__("__zx_plugin_name__")
plugin_name = metadata.name if metadata else _module.__getattribute__("__zx_plugin_name__")
# 管理员命令
if "[admin]" in plugin_name.lower():
try:
level = 5
cmd = None
if hasattr(_module, "__plugin_settings__"):
admin_settings = _module.__getattribute__(
"__plugin_settings__"
)
level = admin_settings["admin_level"]
level = admin_settings.get("admin_level", 5)
cmd = admin_settings.get("cmd")
except (AttributeError, KeyError):
level = 5
cmd = None
if level is None:
level = 5
admin_manager.add_admin_plugin_settings(
matcher.plugin_name, cmd, level
)
@ -73,19 +56,18 @@ def init_plugins_settings(data_path: str):
):
continue
except AttributeError:
if matcher.plugin_name not in _tmp:
logger.warning(
f"获取插件 {matcher.plugin_name} __zx_plugin_name__ 失败...,插件控制未加载."
)
logger.warning(
f"获取插件 {matcher.plugin_name} __zx_plugin_name__ 失败...,插件控制未加载."
)
else:
_tmp_module[matcher.plugin_name] = plugin_name
try:
if hasattr(_module, "__plugin_settings__"):
plugin_settings = _module.__getattribute__(
"__plugin_settings__"
)
except AttributeError:
else:
plugin_settings = {"cmd": [matcher.plugin_name, plugin_name]}
if not plugin_settings.get("cost_gold"):
if plugin_settings.get("cost_gold") is None:
plugin_settings["cost_gold"] = 0
if (
plugin_settings.get("cmd") is not None
@ -100,12 +82,12 @@ def init_plugins_settings(data_path: str):
plugin_type = tuple(
plugins2settings_manager.get_plugin_data(
matcher.plugin_name
)["plugin_type"]
).plugin_type
)
else:
try:
if hasattr(_module, "__plugin_type__"):
plugin_type = _module.__getattribute__("__plugin_type__")
except AttributeError:
else:
plugin_type = ("normal",)
if plugin_settings and matcher.plugin_name:
plugins2settings_manager.add_plugin_settings(
@ -115,26 +97,5 @@ def init_plugins_settings(data_path: str):
)
except Exception as e:
logger.error(f'{matcher.plugin_name} 初始化 plugin_settings 发生错误 {type(e)}{e}')
_tmp.append(matcher.plugin_name)
_tmp_data = {"PluginSettings": plugins2settings_manager.get_data()}
with open(plugins2settings_file, "w", encoding="utf8") as wf:
yaml.dump(_tmp_data, wf, Dumper=yaml.RoundTripDumper, allow_unicode=True)
_data = round_trip_load(open(plugins2settings_file, encoding="utf8"))
_data["PluginSettings"].yaml_set_start_comment(
"""# 模块与对应命令和对应群权限
# 用于生成帮助图片 和 开关功能
# key模块名称
# level需要的群等级
# default_status加入群时功能的默认开关状态
# limit_superuser: 功能状态是否限制超级用户
# cmd: 关闭[cmd] 都会触发命令 关闭对应功能cmd列表第一个词为统计的功能名称
# plugin_type: 帮助类别 示例:('原神相关',) 或 ('原神相关', 1)1代表帮助命令列向排列否则为横向排列""",
indent=2,
)
for plugin in _data["PluginSettings"].keys():
_data["PluginSettings"][plugin].yaml_set_start_comment(
f"{plugin}{_tmp_module[plugin]}", indent=2
)
with open(plugins2settings_file, "w", encoding="utf8") as wf:
round_trip_dump(_data, wf, Dumper=yaml.RoundTripDumper, allow_unicode=True)
plugins2settings_manager.save()
logger.info(f"已成功加载 {len(plugins2settings_manager.get_data())} 个非限制插件.")

View File

@ -122,8 +122,8 @@ async def _(event: GroupMessageEvent):
tmp = ""
data = plugins2settings_manager.get_data()
for module in data:
if data[module]["level"] > level:
plugin_name = data[module]["cmd"][0]
if data[module].level > level:
plugin_name = data[module].cmd[0]
if plugin_name == "pixiv":
plugin_name = "搜图 p站排行"
tmp += f"{plugin_name}\n"

View File

@ -67,7 +67,7 @@ async def _():
gl = [g["group_id"] for g in gl]
msg_list, code = await get_epic_free(bot, "Group")
for g in gl:
if await group_manager.check_group_task_status(g, "epic_free_game"):
if group_manager.check_group_task_status(g, "epic_free_game"):
try:
if msg_list and code == 200:
await bot.send_group_forward_msg(group_id=g, messages=msg_list)

View File

@ -71,5 +71,5 @@ async def _():
if alc_img:
mes = "[[_task|genshin_alc]]" + alc_img + "\n ※ 黄历数据来源于 genshin.pub"
for gid in gl:
if await group_manager.check_group_task_status(gid, "genshin_alc"):
if group_manager.check_group_task_status(gid, "genshin_alc"):
await bot.send_group_msg(group_id=int(gid), message="" + mes)

View File

@ -172,7 +172,7 @@ async def generate_statistics_img(
data: dict, arg: str, name: str, plugin: str, day_index: int
):
try:
plugin = plugins2settings_manager.get_plugin_data(plugin)['cmd'][0]
plugin = plugins2settings_manager.get_plugin_data(plugin).cmd[0]
except (KeyError, IndexError):
pass
bar_graph = None

View File

@ -107,8 +107,8 @@ async def _(
group_id = "total"
user_id = str(event.user_id)
plugin_name = plugins2settings_manager.get_plugin_data(module)
if plugin_name and plugin_name.get('cmd'):
plugin_name = plugin_name.get('cmd')[0]
if plugin_name and plugin_name.cmd:
plugin_name = plugin_name.cmd[0]
check_exists_key(group_id, user_id, plugin_name)
for data in [_prefix_count_dict, _prefix_user_count_dict]:
data["total_statistics"]["total"][plugin_name] += 1

View File

@ -23,11 +23,11 @@ gConfig.add_plugin_config("web-ui", "password", None, name="web-ui", help_="前
async def _(matcher: Matcher, bot: Bot, event: MessageEvent, state: T_State):
flag = False
for module in plugins2settings_manager.keys():
if isinstance(plugins2settings_manager.get_plugin_data(module).get("cmd"), str):
if isinstance(plugins2settings_manager.get_plugin_data(module).cmd, str):
plugins2settings_manager.set_module_data(
module,
"cmd",
plugins2settings_manager.get_plugin_data(module).get("cmd").split(","),
plugins2settings_manager.get_plugin_data(module).cmd.split(","),
False
)
flag = True

View File

@ -61,8 +61,8 @@ def _(type_: Optional[str], user: User = Depends(token_to_user)) -> Result:
)
data["plugin_manager"] = PluginManager(**x)
if x := plugins2settings_manager.get(model):
if x.get("cmd") and isinstance(x.get("cmd"), list):
x["cmd"] = ",".join(x["cmd"])
if x.cmd and isinstance(x.cmd, list):
x.cmd = ",".join(x.cmd)
data["plugin_settings"] = PluginSettings(**x)
if x := plugins2cd_manager.get(model):
data["cd_limit"] = CdLimit(**x)

View File

@ -10,8 +10,6 @@
"poetry.lock",
"pyproject.toml"
],
"add_file": [
"resources/image/shop_icon"
],
"add_file": [],
"delete_file": []
}

View File

@ -14,6 +14,9 @@ from .requests_manager import RequestManager
from configs.path_config import DATA_PATH
# 管理员命令管理器
admin_manager = AdminManager()
# 群功能开关 | 群被动技能 | 群权限 管理
group_manager: Optional[GroupManager] = GroupManager(
DATA_PATH / "manager" / "group_manager.json"
@ -62,5 +65,3 @@ requests_manager: Optional[RequestManager] = RequestManager(
DATA_PATH / "manager" / "requests_manager.json"
)
# 管理员命令管理器
admin_manager = AdminManager()

64
utils/manager/admin_manager.py Executable file → Normal file
View File

@ -1,5 +1,6 @@
from .data_class import StaticData
from typing import List, Optional
from .models import AdminSetting
from utils.manager.data_class import StaticData
from typing import List, Optional, Dict
class AdminManager(StaticData):
@ -9,60 +10,71 @@ class AdminManager(StaticData):
def __init__(self):
super().__init__(None)
self._data: Dict[str, AdminSetting] = {}
def add_admin_plugin_settings(self, plugin: str, cmd: List[str], level: int):
"""
添加一个管理员命令
:param plugin: 模块
:param cmd: 别名
:param level: 等级
说明:
添加一个管理员命令
参数:
:param plugin: 模块
:param cmd: 别名
:param level: 等级
"""
self._data[plugin] = {
"level": level,
"cmd": cmd,
}
self._data[plugin] = AdminSetting(level=level, cmd=cmd)
def set_admin_level(self, plugin: str, level: int):
"""
设置管理员命令等级
:param plugin: 模块名
:param level: 权限等级
说明:
设置管理员命令等级
参数:
:param plugin: 模块名
:param level: 权限等级
"""
self._data[plugin]["level"] = level
if plugin in self._data.keys():
self._data[plugin].level = level
def remove_admin_plugin_settings(self, plugin: str):
"""
删除一个管理员命令
:param plugin: 模块名
说明:
删除一个管理员命令
参数:
:param plugin: 模块名
"""
if plugin in self._data.keys():
del self._data[plugin]
def check(self, plugin: str, level: int) -> bool:
"""
检查是否满足权限
:param plugin: 模块名
:param level: 权限等级
说明:
检查是否满足权限
参数:
:param plugin: 模块名
:param level: 权限等级
"""
if plugin in self._data.keys():
return level >= self._data[plugin]["level"]
return level >= self._data[plugin].level
return True
def get_plugin_level(self, plugin: str) -> int:
"""
获取插件等级
:param plugin: 模块名
说明:
获取插件等级
参数:
:param plugin: 模块名
"""
if plugin in self._data.keys():
return self._data[plugin]["level"]
return self._data[plugin].level
return 0
def get_plugin_module(self, cmd: str) -> Optional[str]:
"""
根据 cmd 获取功能 modules
:param cmd: 命令
说明:
根据 cmd 获取功能 modules
参数:
:param cmd: 命令
"""
for key in self._data.keys():
if self._data[key].get("cmd") and cmd in self._data[key]["cmd"]:
if self._data[key].cmd and cmd in self._data[key].cmd:
return key
return None

0
utils/manager/configs_manager.py Executable file → Normal file
View File

View File

@ -1,24 +1,29 @@
from typing import Union, Optional
from typing import Union, Optional, TypeVar, Generic, Dict, NoReturn, Any
from pathlib import Path
from ruamel.yaml import YAML
from ruamel import yaml
import ujson as json
import copy
from .models import *
_yaml = YAML(typ="safe")
class StaticData:
T = TypeVar("T", AdminSetting, BaseData, PluginBlock, PluginCd, PluginCount, PluginSetting, Plugin)
class StaticData(Generic[T]):
"""
静态数据共享类
"""
def __init__(self, file: Optional[Path]):
def __init__(self, file: Optional[Path], load_file: bool = True):
self._data: dict = {}
if file:
file.parent.mkdir(exist_ok=True, parents=True)
self.file = file
if file.exists():
if file.exists() and load_file:
with open(file, "r", encoding="utf8") as f:
if file.name.endswith("json"):
try:
@ -29,30 +34,39 @@ class StaticData:
elif file.name.endswith("yaml"):
self._data = _yaml.load(f)
def set(self, key, value):
def set(self, key, value) -> NoReturn:
self._data[key] = value
self.save()
def set_module_data(self, module, key, value, auto_save: bool = True):
def set_module_data(self, module, key, value, auto_save: bool = True) -> NoReturn:
if module in self._data.keys():
self._data[module][key] = value
if auto_save:
self.save()
def get(self, key):
def get(self, key) -> T:
return self._data.get(key)
def keys(self):
def keys(self) -> List[str]:
return self._data.keys()
def delete(self, key):
def delete(self, key) -> NoReturn:
if self._data.get(key) is not None:
del self._data[key]
def get_data(self) -> dict:
def get_data(self) -> Dict[str, T]:
return copy.deepcopy(self._data)
def save(self, path: Union[str, Path] = None):
def dict(self) -> Dict[str, Any]:
temp = {}
for k, v in self._data.items():
try:
temp[k] = v.dict()
except AttributeError:
temp[k] = copy.deepcopy(v)
return temp
def save(self, path: Union[str, Path] = None) -> NoReturn:
path = path or self.file
if isinstance(path, str):
path = Path(path)
@ -61,26 +75,26 @@ class StaticData:
if path.name.endswith("yaml"):
yaml.dump(self._data, f, indent=2, Dumper=yaml.RoundTripDumper, allow_unicode=True)
else:
json.dump(self._data, f, ensure_ascii=False, indent=4)
json.dump(self.dict(), f, ensure_ascii=False, indent=4)
def reload(self):
def reload(self) -> NoReturn:
if self.file.exists():
if self.file.name.endswith("json"):
self._data: dict = json.load(open(self.file, "r", encoding="utf8"))
elif self.file.name.endswith("yaml"):
self._data: dict = _yaml.load(open(self.file, "r", encoding="utf8"))
def is_exists(self):
def is_exists(self) -> bool:
return self.file.exists()
def is_empty(self):
def is_empty(self) -> bool:
return bool(len(self._data))
def __str__(self):
def __str__(self) -> str:
return str(self._data)
def __setitem__(self, key, value):
def __setitem__(self, key, value) -> NoReturn:
self._data[key] = value
def __getitem__(self, key):
def __getitem__(self, key) -> T:
return self._data[key]

730
utils/manager/group_manager.py Executable file → Normal file
View File

@ -1,378 +1,352 @@
from typing import Optional, List, Union, Dict
from pathlib import Path
from .data_class import StaticData
from utils.utils import get_matchers, get_bot
from configs.config import Config
import nonebot
Config.add_plugin_config(
"group_manager", "DEFAULT_GROUP_LEVEL", 5, help_="默认群权限", default_value=5
)
Config.add_plugin_config(
"group_manager", "DEFAULT_GROUP_BOT_STATUS", True, help_="默认进群总开关状态", default_value=True
)
class GroupManager(StaticData):
"""
群权限 | 功能 | 总开关 | 聊天时间 管理器
"""
def __init__(self, file: Path):
super().__init__(file)
if not self._data:
self._data = {
"super": {"white_group_list": []},
"group_manager": {},
}
self._task = {}
def block_plugin(self, module: str, group_id: int):
"""
说明:
锁定插件
参数:
:param module: 功能模块名
:param group_id: 群组None时为超级用户禁用
"""
self._set_plugin_status(module, "block", group_id)
def unblock_plugin(self, module: str, group_id: int):
"""
说明:
解锁插件
参数:
:param module: 功能模块名
:param group_id: 群组
"""
self._set_plugin_status(module, "unblock", group_id)
def turn_on_group_bot_status(self, group_id: int):
"""
说明:
开启群bot开关
参数:
:param group_id: 群号
"""
self._set_group_bot_status(group_id, True)
def shutdown_group_bot_status(self, group_id: int):
"""
说明:
关闭群bot开关
参数:
:param group_id: 群号
"""
self._set_group_bot_status(group_id, False)
def check_group_bot_status(self, group_id: int) -> bool:
"""
说明:
检查群聊bot总开关状态
参数:
:param group_id: 说明
"""
group_id = str(group_id)
if not self._data["group_manager"].get(group_id):
self._init_group(group_id)
if self._data["group_manager"][group_id].get("status") is None:
default_group_bot_status = Config.get_config("group_manager", "DEFAULT_GROUP_BOT_STATUS")
if default_group_bot_status:
default_group_bot_status = True
self._data["group_manager"][group_id]["status"] = default_group_bot_status
return self._data["group_manager"][group_id]["status"]
def set_group_level(self, group_id: int, level: int):
"""
说明:
设置群权限
参数:
:param group_id: 群组
:param level: 权限等级
"""
group_id = str(group_id)
if not self._data["group_manager"].get(group_id):
self._init_group(group_id)
self._data["group_manager"][group_id]["level"] = level
self.save()
def get_plugin_status(self, module: str, group_id: int) -> bool:
"""
说明:
获取插件状态
参数:
:param module: 功能模块名
:param group_id: 群组
"""
group_id = str(group_id) if group_id else group_id
if not self._data["group_manager"].get(group_id):
self._init_group(group_id)
return True
if module in self._data["group_manager"][group_id]["close_plugins"]:
return False
return True
def get_group_level(self, group_id: int) -> int:
"""
说明:
获取群等级
参数:
:param group_id: 群号
"""
group_id = str(group_id)
if not self._data["group_manager"].get(group_id):
self._init_group(group_id)
return self._data["group_manager"][group_id]["level"]
def check_group_is_white(self, group_id: int) -> bool:
"""
说明:
检测群聊是否在白名单
参数:
:param group_id: 群号
"""
return group_id in self._data["super"]["white_group_list"]
def add_group_white_list(self, group_id: int):
"""
说明:
将群聊加入白名单
参数:
:param group_id: 群号
"""
if group_id not in self._data["super"]["white_group_list"]:
self._data["super"]["white_group_list"].append(group_id)
def delete_group_white_list(self, group_id: int):
"""
说明:
将群聊从白名单中删除
参数:
:param group_id: 群号
"""
if group_id in self._data["super"]["white_group_list"]:
self._data["super"]["white_group_list"].remove(group_id)
def get_group_white_list(self) -> List[str]:
"""
说明:
获取所有群白名单
"""
return self._data["super"]["white_group_list"]
def delete_group(self, group_id: int):
"""
说明:
删除群配置
参数:
:param group_id: 群号
"""
if group_id in self._data["group_manager"]:
del self._data["group_manager"][str(group_id)]
if group_id in self._data["super"]["white_group_list"]:
self._data["super"]["white_group_list"].remove(group_id)
self.save()
async def open_group_task(self, group_id: int, task: str):
"""
说明:
开启群被动技能
参数:
:param group_id: 群号
:param task: 被动技能名称
"""
await self._set_group_task_status(group_id, task, True)
async def close_group_task(self, group_id: int, task: str):
"""
说明:
关闭群被动技能
参数:
:param group_id: 群号
:param task: 被动技能名称
"""
await self._set_group_task_status(group_id, task, False)
async def check_group_task_status(self, group_id: int, task: str) -> bool:
"""
说明:
查看群被动技能状态
参数:
:param group_id: 群号
:param task: 被动技能名称
"""
group_id = str(group_id)
if (
not self._data["group_manager"][group_id].get("group_task_status")
or self._data["group_manager"][group_id]["group_task_status"].get(task)
is None
):
await self.init_group_task(group_id)
return self._data["group_manager"][group_id]["group_task_status"][task]
def get_task_data(self) -> Dict[str, str]:
"""
说明:
获取所有被动任务
"""
return self._task
async def group_task_status(self, group_id: int) -> str:
"""
说明:
查看群被全部动技能状态
参数:
:param group_id: 群号
"""
x = "[群被动技能]:\n"
group_id = str(group_id)
if not self._data["group_manager"][group_id].get("group_task_status"):
await self.init_group_task(group_id)
for key in self._data["group_manager"][group_id]["group_task_status"].keys():
x += f'{self._task[key]}{"" if await self.check_group_task_status(int(group_id), key) else "×"}\n'
return x[:-1]
async def _set_group_task_status(self, group_id: int, task: str, status: bool):
"""
说明:
管理群被动技能状态
参数:
:param group_id: 群号
:param task: 被动技能
:param status: 状态
"""
group_id = str(group_id)
if not self._data["group_manager"].get(group_id):
self._init_group(group_id)
if (
not self._data["group_manager"][group_id].get("group_task_status")
or self._data["group_manager"][group_id]["group_task_status"].get(task)
is None
):
await self.init_group_task(group_id)
self._data["group_manager"][group_id]["group_task_status"][task] = status
self.save()
async def init_group_task(self, group_id: Optional[Union[int, str]] = None):
"""
说明:
初始化群聊 被动技能 状态
"""
if not self._task:
_m = []
for matcher in get_matchers():
if matcher.plugin_name not in _m:
_m.append(matcher.plugin_name)
_plugin = nonebot.plugin.get_plugin(matcher.plugin_name)
try:
_module = _plugin.module
plugin_task = _module.__getattribute__("__plugin_task__")
for key in plugin_task.keys():
if key in self._task.keys():
raise ValueError(f"plugin_task{key} 已存在!")
self._task[key] = plugin_task[key]
except AttributeError:
pass
bot = get_bot()
if bot or group_id:
if group_id:
_group_list = [group_id]
else:
_group_list = [x["group_id"] for x in await bot.get_group_list()]
for group_id in _group_list:
group_id = str(group_id)
if not self._data["group_manager"].get(group_id):
self._init_group(group_id)
if not self._data["group_manager"][group_id].get("group_task_status"):
self._data["group_manager"][group_id]["group_task_status"] = {}
for task in self._task:
if (
self._data["group_manager"][group_id]["group_task_status"].get(
task
)
is None
):
self._data["group_manager"][group_id]["group_task_status"][
task
] = Config.get_config('_task', f'DEFAULT_{task}', default=True)
for task in list(
self._data["group_manager"][group_id]["group_task_status"]
):
if task not in self._task:
del self._data["group_manager"][group_id]["group_task_status"][
task
]
self.save()
def _set_plugin_status(
self,
module: str,
status: str,
group_id: int,
):
"""
说明:
设置功能开关状态
参数:
:param module: 功能模块名
:param status: 功能状态
:param group_id: 群组
"""
group_id = str(group_id) if group_id else group_id
if not self._data["group_manager"].get(group_id):
self._init_group(group_id)
if status == "block":
if module not in self._data["group_manager"][group_id]["close_plugins"]:
self._data["group_manager"][group_id]["close_plugins"].append(module)
else:
if module in self._data["group_manager"][group_id]["close_plugins"]:
self._data["group_manager"][group_id]["close_plugins"].remove(module)
self.save()
def _init_group(self, group_id: str):
"""
说明:
初始化群数据
参数:
:param group_id: 群号
"""
default_group_level = Config.get_config("group_manager", "DEFAULT_GROUP_LEVEL")
if default_group_level is None:
default_group_level = 5
default_group_bot_status = Config.get_config("group_manager", "DEFAULT_GROUP_BOT_STATUS")
if default_group_bot_status:
default_group_bot_status = True
if not self._data["group_manager"].get(group_id):
self._data["group_manager"][group_id] = {
"level": default_group_level,
"status": default_group_bot_status,
"close_plugins": [],
"group_task_status": {},
}
def _set_group_bot_status(self, group_id: Union[int, str], status: bool):
"""
说明:
设置群聊bot总开关
参数:
:param group_id: 群号
:param status: 开关状态
"""
group_id = str(group_id)
if not self._data["group_manager"].get(group_id):
self._init_group(group_id)
self._data["group_manager"][group_id]["status"] = status
self.save()
def get_super_old_data(self) -> Optional[dict]:
"""
说明:
获取旧数据平时使用请不要调用
"""
if self._data["super"].get("close_plugins"):
_x = self._data["super"].get("close_plugins")
del self._data["super"]["close_plugins"]
return _x
return None
from typing import List, Union, Dict, Callable
from pathlib import Path
from .models import BaseData, BaseGroup
from utils.manager.data_class import StaticData
from utils.utils import get_matchers, is_number
from configs.config import Config
import nonebot
import ujson as json
Config.add_plugin_config(
"group_manager", "DEFAULT_GROUP_LEVEL", 5, help_="默认群权限", default_value=5
)
Config.add_plugin_config(
"group_manager", "DEFAULT_GROUP_BOT_STATUS", True, help_="默认进群总开关状态", default_value=True
)
def init_group(func: Callable):
"""
说明:
初始化群数据
参数:
:param func: func
"""
def wrapper(*args, **kwargs):
self = args[0]
group_id = list(filter(lambda x: is_number(x), args[1:]))[0]
if self and group_id and not self._data.group_manager.get(str(group_id)):
self._data.group_manager[str(group_id)] = BaseGroup()
return func(*args, **kwargs)
return wrapper
def init_task(func: Callable):
"""
说明:
初始化群被动
参数:
:param func: func
"""
def wrapper(*args, **kwargs):
self = args[0]
group_id = str(args[1])
task = args[2] if len(args) > 1 else None
if group_id and task and self._data.group_manager[group_id].group_task_status.get(task) is None:
for task in self._data.task:
if self._data.group_manager[group_id].group_task_status.get(task) is None:
self._data.group_manager[group_id].group_task_status[task] = Config.get_config('_task', f'DEFAULT_{task}', default=True)
for task in list(self._data.group_manager[group_id].group_task_status):
if task not in self._data.task:
del self._data.group_manager[group_id].group_task_status[task]
self.save()
return func(*args, **kwargs)
return wrapper
class GroupManager(StaticData):
"""
群权限 | 功能 | 总开关 | 聊天时间 管理器
"""
def __init__(self, file: Path):
super().__init__(file, False)
self._data = BaseData.parse_file(file) if file.exists() else BaseData()
def block_plugin(self, module: str, group_id: int, is_save: bool = True):
"""
说明:
锁定插件
参数:
:param module: 功能模块名
:param group_id: 群组None时为超级用户禁用
:param is_save: 是否保存文件
"""
self._set_plugin_status(module, "block", group_id, is_save)
def unblock_plugin(self, module: str, group_id: int, is_save: bool = True):
"""
说明:
解锁插件
参数:
:param module: 功能模块名
:param group_id: 群组
:param is_save: 是否保存文件
"""
self._set_plugin_status(module, "unblock", group_id, is_save)
def turn_on_group_bot_status(self, group_id: int):
"""
说明:
开启群bot开关
参数:
:param group_id: 群号
"""
self._set_group_bot_status(group_id, True)
def shutdown_group_bot_status(self, group_id: int):
"""
说明:
关闭群bot开关
参数:
:param group_id: 群号
"""
self._set_group_bot_status(group_id, False)
@init_group
def check_group_bot_status(self, group_id: int) -> bool:
"""
说明:
检查群聊bot总开关状态
参数:
:param group_id: 说明
"""
return self._data.group_manager[str(group_id)].status
@init_group
def set_group_level(self, group_id: int, level: int):
"""
说明:
设置群权限
参数:
:param group_id: 群组
:param level: 权限等级
"""
self._data.group_manager[str(group_id)].level = level
self.save()
@init_group
def get_plugin_status(self, module: str, group_id: int) -> bool:
"""
说明:
获取插件状态
参数:
:param module: 功能模块名
:param group_id: 群组
"""
return module not in self._data.group_manager[str(group_id)].close_plugins
@init_group
def get_group_level(self, group_id: int) -> int:
"""
说明:
获取群等级
参数:
:param group_id: 群号
"""
return self._data.group_manager[str(group_id)].level
def check_group_is_white(self, group_id: int) -> bool:
"""
说明:
检测群聊是否在白名单
参数:
:param group_id: 群号
"""
return group_id in self._data.white_group
def add_group_white_list(self, group_id: int):
"""
说明:
将群聊加入白名单
参数:
:param group_id: 群号
"""
if group_id not in self._data.white_group:
self._data.white_group.append(group_id)
def delete_group_white_list(self, group_id: int):
"""
说明:
将群聊从白名单中删除
参数:
:param group_id: 群号
"""
if group_id in self._data.white_group:
self._data.white_group.remove(group_id)
def get_group_white_list(self) -> List[int]:
"""
说明:
获取所有群白名单
"""
return self._data.white_group
def load_task(self):
"""
说明:
加载被动技能
"""
for matcher in get_matchers(True):
_plugin = nonebot.plugin.get_plugin(matcher.plugin_name)
try:
_module = _plugin.module
plugin_task = _module.__getattribute__("__plugin_task__")
for key in plugin_task.keys():
if key in self._data.task.keys():
raise ValueError(f"plugin_task{key} 已存在!")
self._data.task[key] = plugin_task[key]
except AttributeError:
pass
@init_group
def delete_group(self, group_id: int):
"""
说明:
删除群配置
参数:
:param group_id: 群号
"""
if group_id in self._data.white_group:
self._data.white_group.remove(group_id)
self.save()
def open_group_task(self, group_id: int, task: str):
"""
说明:
开启群被动技能
参数:
:param group_id: 群号
:param task: 被动技能名称
"""
self._set_group_group_task_status(group_id, task, True)
def close_group_task(self, group_id: int, task: str):
"""
说明:
关闭群被动技能
参数:
:param group_id: 群号
:param task: 被动技能名称
"""
self._set_group_group_task_status(group_id, task, False)
@init_task
def check_group_task_status(self, group_id: int, task: str) -> bool:
"""
说明:
查看群被动技能状态
参数:
:param group_id: 群号
:param task: 被动技能名称
"""
return self._data.group_manager[str(group_id)].group_task_status.get(task, False)
def get_task_data(self) -> Dict[str, str]:
"""
说明:
获取所有被动任务
"""
return self._data.task
@init_task
def group_group_task_status(self, group_id: int) -> str:
"""
说明:
查看群被全部动技能状态
参数:
:param group_id: 群号
"""
x = "[群被动技能]:\n"
group_id = str(group_id)
for key in self._data.group_manager[group_id].group_task_status.keys():
x += f'{self._data.task[key]}{"" if self.check_group_task_status(int(group_id), key) else "×"}\n'
return x[:-1]
@init_task
@init_group
def _set_group_group_task_status(self, group_id: int, task: str, status: bool):
"""
说明:
管理群被动技能状态
参数:
:param group_id: 群号
:param task: 被动技能
:param status: 状态
"""
self._data.group_manager[str(group_id)].group_task_status[task] = status
self.save()
@init_group
def _set_plugin_status(
self,
module: str,
status: str,
group_id: int,
is_save: bool
):
"""
说明:
设置功能开关状态
参数:
:param module: 功能模块名
:param status: 功能状态
:param group_id: 群组
:param is_save: 是否保存
"""
group_id = str(group_id) if group_id else group_id
if status == "block":
if module not in self._data.group_manager[group_id].close_plugins:
self._data.group_manager[group_id].close_plugins.append(module)
else:
if module in self._data.group_manager[group_id].close_plugins:
self._data.group_manager[group_id].close_plugins.remove(module)
if is_save:
self.save()
@init_group
def _set_group_bot_status(self, group_id: Union[int, str], status: bool):
"""
说明:
设置群聊bot总开关
参数:
:param group_id: 群号
:param status: 开关状态
"""
self._data.group_manager[str(group_id)].status = status
self.save()
def reload(self):
if self.file.exists():
t = self._data.task
self._data = BaseData.parse_file(self.file)
self._data.task = t
def save(self, path: Union[str, Path] = None):
"""
说明:
保存文件
参数:
:param path: 路径文件
"""
path = path or self.file
if isinstance(path, str):
path = Path(path)
if path:
dict_data = self._data.dict()
del dict_data["task"]
with open(path, "w", encoding="utf8") as f:
json.dump(dict_data, f, ensure_ascii=False, indent=4)
# def get_super_old_data(self) -> Optional[dict]:
# """
# 说明:
# 获取旧数据,平时使用请不要调用
# """
# if self._data["super"].get("close_plugins"):
# _x = self._data["super"].get("close_plugins")
# del self._data["super"]["close_plugins"]
# return _x
# return None

95
utils/manager/models.py Normal file
View File

@ -0,0 +1,95 @@
from typing import List, Optional, Dict, Literal, Tuple, Union
from pydantic import BaseModel
from configs.config import Config
class AdminSetting(BaseModel):
"""
管理员设置
"""
level: int
cmd: Optional[List[str]]
class BaseGroup(BaseModel):
"""
基础群聊信息
"""
level: int = Config.get_config("group_manager", "DEFAULT_GROUP_LEVEL") # 群等级
status: bool = Config.get_config(
"group_manager", "DEFAULT_GROUP_BOT_STATUS"
) # 总开关状态
close_plugins: List[str] = [] # 已关闭插件
group_task_status: Dict[str, bool] = {} # 被动状态
class BaseData(BaseModel):
"""
群基本信息
"""
white_group: List[int] = [] # 白名单
group_manager: Dict[str, BaseGroup] = {} # 群组管理
task: Dict[str, str] = {} # 被动任务 【英文:中文】
class PluginBlock(BaseModel):
"""
插件阻断
"""
status: bool # 限制状态
check_type: Literal["private", "group", "all"] # 检查类型
limit_type: Literal["user", "group"] # 监听对象
rst: Optional[str] # 阻断时回复
class PluginCd(BaseModel):
"""
插件阻断
"""
cd: int # cd
status: bool # 限制状态
check_type: Literal["private", "group", "all"] # 检查类型
limit_type: Literal["user", "group"] # 监听对象
rst: Optional[str] # 阻断时回复
class PluginCount(BaseModel):
"""
插件阻断
"""
max_count: int # 次数
status: bool # 限制状态
limit_type: Literal["user", "group"] # 监听对象
rst: Optional[str] # 阻断时回复
class PluginSetting(BaseModel):
"""
插件设置
"""
cmd: List[str] = [] # 命令 或 命令别名
default_status: bool = True # 默认开关状态
level: int = 5 # 功能权限等级
limit_superuser: bool = False # 功能状态是否限制超级用户
plugin_type: Tuple[Union[str, int], ...] = ("normal",) # 插件类型
cost_gold: int = 0 # 需要消费的金币
class Plugin(BaseModel):
"""
插件数据
"""
plugin_name: str # 模块名
status: Optional[bool] = True # 开关状态
error: Optional[bool] = False # 是否加载报错
block_type: Optional[str] = None # 关闭类型
author: Optional[str] = None # 作者
version: Optional[int] = None # 版本

94
utils/manager/none_plugin_count_manager.py Executable file → Normal file
View File

@ -1,47 +1,47 @@
from .data_class import StaticData
from typing import Optional
from pathlib import Path
class NonePluginCountManager(StaticData):
"""
插件加载容忍管理器当连续 max_count 次插件加载视为删除插件清楚数据
"""
def __init__(self, file: Optional[Path], max_count: int = 5):
"""
:param file: 存储路径
:param max_count: 容忍最大次数
"""
super().__init__(file)
if not self._data:
self._data = {}
self._max_count = max_count
def add_count(self, module: str, count: int = 1):
"""
添加次数
:param module: 模块
:param count: 次数无特殊情况均为 1
"""
if module not in self._data.keys():
self._data[module] = count
else:
self._data[module] += count
def reset(self, module: str):
"""
重置次数
:param module: 模块
"""
if module in self._data.keys():
self._data[module] = 0
def check(self, module: str):
"""
检查容忍次数是否到达最大值
:param module: 模块
"""
if module in self._data.keys():
return self._data[module] >= self._max_count
return False
from utils.manager.data_class import StaticData
from typing import Optional
from pathlib import Path
class NonePluginCountManager(StaticData):
"""
插件加载容忍管理器当连续 max_count 次插件加载视为删除插件清楚数据
"""
def __init__(self, file: Optional[Path], max_count: int = 5):
"""
:param file: 存储路径
:param max_count: 容忍最大次数
"""
super().__init__(file)
if not self._data:
self._data = {}
self._max_count = max_count
def add_count(self, module: str, count: int = 1):
"""
添加次数
:param module: 模块
:param count: 次数无特殊情况均为 1
"""
if module not in self._data.keys():
self._data[module] = count
else:
self._data[module] += count
def reset(self, module: str):
"""
重置次数
:param module: 模块
"""
if module in self._data.keys():
self._data[module] = 0
def check(self, module: str):
"""
检查容忍次数是否到达最大值
:param module: 模块
"""
if module in self._data.keys():
return self._data[module] >= self._max_count
return False

153
utils/manager/plugins2block_manager.py Executable file → Normal file
View File

@ -1,11 +1,12 @@
from typing import Optional, Dict
from .data_class import StaticData
from typing import Optional, Dict, Literal, Union
from utils.manager.data_class import StaticData
from services.log import logger
from utils.utils import UserBlockLimiter
from pathlib import Path
from ruamel.yaml import YAML
from ruamel import yaml
from .models import PluginBlock
yaml = YAML(typ="safe")
_yaml = yaml.YAML(typ="safe")
class Plugins2blockManager(StaticData):
@ -14,53 +15,45 @@ class Plugins2blockManager(StaticData):
"""
def __init__(self, file: Path):
self.file = file
super().__init__(None)
super().__init__(file, False)
self._block_limiter: Dict[str, UserBlockLimiter] = {}
if file.exists():
with open(file, "r", encoding="utf8") as f:
self._data = yaml.load(f)
if "PluginBlockLimit" in self._data.keys():
self._data = self._data["PluginBlockLimit"] or {}
self.__load_file()
def add_block_limit(
self,
plugin: str,
*,
status: Optional[bool] = True,
check_type: Optional[str] = "all",
limit_type: Optional[str] = "user",
check_type: Literal["private", "group", "all"] = "all",
limit_type: Literal["user", "group"] = "user",
rst: Optional[str] = None,
**kwargs # 用于接收额外实参
):
"""
添加插件调用 block 限制
:param plugin: 插件模块名称
:param status: 默认开关状态
:param check_type: 检查类型 'private'/'group'/'all'限制私聊/群聊/全部
:param limit_type: 限制类型 监听对象以user_id或group_id作为键来限制'user'用户id'group'群id
:param rst: 回复的话为空则不回复
说明:
添加插件调用 block 限制
参数:
:param plugin: 插件模块名称
:param status: 默认开关状态
:param check_type: 检查类型 'private'/'group'/'all'限制私聊/群聊/全部
:param limit_type: 限制类型 监听对象以user_id或group_id作为键来限制'user'用户id'group'群id
:param rst: 回复的话为空则不回复
"""
status = status or True
check_type = check_type or "all"
limit_type = limit_type or "user"
if check_type not in ["all", "group", "private"]:
raise ValueError(
f"{plugin} 添加block限制错误check_type 必须为 'private'/'group'/'all'"
)
if limit_type not in ["user", "group"]:
raise ValueError(f"{plugin} 添加block限制错误limit_type 必须为 'user'/'group'")
self._data[plugin] = {
"status": status,
"check_type": check_type,
"limit_type": limit_type,
"rst": rst,
}
self._data[plugin] = PluginBlock(
status=status, check_type=check_type, limit_type=limit_type, rst=rst
)
def get_plugin_block_data(self, plugin: str) -> Optional[dict]:
def get_plugin_block_data(self, plugin: str) -> Optional[PluginBlock]:
"""
获取插件block数据
:param plugin: 模块名
说明:
获取插件block数据
参数:
:param plugin: 模块名
"""
if self.check_plugin_block_status(plugin):
return self._data[plugin]
@ -68,16 +61,20 @@ class Plugins2blockManager(StaticData):
def check_plugin_block_status(self, plugin: str) -> bool:
"""
检测插件是否有 block
:param plugin: 模块名
说明:
检测插件是否有 block
参数:
:param plugin: 模块名
"""
return plugin in self._data.keys() and self._data[plugin]["status"]
return plugin in self._data.keys() and self._data[plugin].status
def check(self, id_: int, plugin: str) -> bool:
"""
检查 block
:param plugin: 模块名
:param id_: 限制 id
说明:
检查 block
参数:
:param id_: 限制 id
:param plugin: 模块名
"""
if self._block_limiter.get(plugin):
return self._block_limiter[plugin].check(id_)
@ -85,26 +82,30 @@ class Plugins2blockManager(StaticData):
def set_true(self, id_: int, plugin: str):
"""
对插件 block
:param plugin: 模块名
:param id_: 限制 id
说明:
对插件 block
参数:
:param id_: 限制 id
:param plugin: 模块名
"""
if self._block_limiter.get(plugin):
self._block_limiter[plugin].set_true(id_)
def set_false(self, id_: int, plugin: str):
"""
对插件 unblock
:param plugin: 模块名
:param id_: 限制 id
说明:
对插件 unblock
参数:
:param plugin: 模块名
:param id_: 限制 id
"""
if self._block_limiter.get(plugin):
self._block_limiter[plugin].set_false(id_)
def reload_block_limit(self):
"""
加载 block 限制器
:return:
说明:
加载 block 限制器
"""
for plugin in self._data:
if self.check_plugin_block_status(plugin):
@ -113,10 +114,62 @@ class Plugins2blockManager(StaticData):
def reload(self):
"""
重载本地数据
说明:
重载本地数据
"""
self.__load_file()
self.reload_block_limit()
def save(self, path: Union[str, Path] = None):
"""
说明:
保存文件
参数:
:param path: 文件路径
"""
path = path or self.file
if isinstance(path, str):
path = Path(path)
if path:
with open(path, "w", encoding="utf8") as f:
yaml.dump(
{"PluginBlockLimit": self.dict()},
f,
indent=2,
Dumper=yaml.RoundTripDumper,
allow_unicode=True,
)
_data = yaml.round_trip_load(open(path, encoding="utf8"))
_data["PluginBlockLimit"].yaml_set_start_comment(
"""# 用户调用阻塞
# 即 当用户调用此功能还未结束时
# 用发送消息阻止用户重复调用此命令直到该命令结束
# key模块名称
# status此限制的开关状态
# check_type'private'/'group'/'all',限制私聊/群聊/全部
# limit_type监听对象以user_id或group_id作为键来限制'user'用户id'group'群id
# 示例:'user':阻塞用户,'group':阻塞群聊
# rst回复的话可以添加[at][uname][nickname]来对应艾特,用户群名称,昵称系统昵称
# rst 为 "" 或 None 时则不回复
# rst示例"[uname]你冲的太快了,[nickname]先生,请稍后再冲[at]"
# rst回复"老色批你冲的太快了,欧尼酱先生,请稍后再冲@老色批"
# 用户昵称↑ 昵称系统的昵称↑ 艾特用户↑""",
indent=2,
)
with open(path, "w", encoding="utf8") as wf:
yaml.round_trip_dump(
_data, wf, Dumper=yaml.RoundTripDumper, allow_unicode=True
)
def __load_file(self):
"""
说明:
读取配置文件
"""
self._data: Dict[str, PluginBlock] = {}
if self.file.exists():
with open(self.file, "r", encoding="utf8") as f:
self._data: dict = yaml.load(f)
self._data = self._data["PluginBlockLimit"]
self.reload_block_limit()
temp = yaml.round_trip_load(f)
if "PluginBlockLimit" in temp.keys():
for k, v in temp["PluginBlockLimit"].items():
self._data[k] = PluginBlock.parse_obj(v)

167
utils/manager/plugins2cd_manager.py Executable file → Normal file
View File

@ -1,11 +1,12 @@
from typing import Optional, Dict
from .data_class import StaticData
from typing import Optional, Dict, Literal, Union
from utils.manager.data_class import StaticData
from utils.utils import FreqLimiter
from services.log import logger
from pathlib import Path
from ruamel.yaml import YAML
from ruamel import yaml
from .models import PluginCd
yaml = YAML(typ="safe")
_yaml = yaml.YAML(typ="safe")
class Plugins2cdManager(StaticData):
@ -14,16 +15,9 @@ class Plugins2cdManager(StaticData):
"""
def __init__(self, file: Path):
self.file = file
super().__init__(None)
super().__init__(file, False)
self._freq_limiter: Dict[str, FreqLimiter] = {}
if file.exists():
with open(file, "r", encoding="utf8") as f:
self._data = yaml.load(f)
if "PluginCdLimit" in self._data.keys():
self._data = (
self._data["PluginCdLimit"] if self._data["PluginCdLimit"] else {}
)
self.__load_file()
def add_cd_limit(
self,
@ -31,42 +25,35 @@ class Plugins2cdManager(StaticData):
*,
cd: Optional[int] = 5,
status: Optional[bool] = True,
check_type: Optional[str] = "all",
limit_type: Optional[str] = "user",
check_type: Literal["private", "group", "all"] = "all",
limit_type: Literal["user", "group"] = "user",
rst: Optional[str] = None,
**kwargs # 用于接收额外实参
):
"""
添加插件调用 cd 限制
:param plugin: 插件模块名称
:param cd: cd 时长
:param status: 默认开关状态
:param check_type: 检查类型 'private'/'group'/'all'限制私聊/群聊/全部
:param limit_type: 限制类型 监听对象以user_id或group_id作为键来限制'user'用户id'group'群id
:param rst: 回复的话为空则不回复
说明:
添加插件调用 cd 限制
参数:
:param plugin: 插件模块名称
:param cd: cd 时长
:param status: 默认开关状态
:param check_type: 检查类型 'private'/'group'/'all'限制私聊/群聊/全部
:param limit_type: 限制类型 监听对象以user_id或group_id作为键来限制'user'用户id'group'群id
:param rst: 回复的话为空则不回复
"""
cd = cd or 5
status = status or True
check_type = check_type or "all"
limit_type = limit_type or "user"
if check_type not in ["all", "group", "private"]:
raise ValueError(
f"{plugin} 添加cd限制错误check_type 必须为 'private'/'group'/'all'"
)
if limit_type not in ["user", "group"]:
raise ValueError(f"{plugin} 添加cd限制错误limit_type 必须为 'user'/'group'")
self._data[plugin] = {
"cd": cd,
"status": status,
"check_type": check_type,
"limit_type": limit_type,
"rst": rst,
}
self._data[plugin] = PluginCd(cd=cd, status=status, check_type=check_type, limit_type=limit_type, rst=rst)
def get_plugin_cd_data(self, plugin: str) -> Optional[dict]:
def get_plugin_cd_data(self, plugin: str) -> Optional[PluginCd]:
"""
获取插件cd数据
:param plugin: 模块名
说明:
获取插件cd数据
参数:
:param plugin: 模块名
"""
if self.check_plugin_cd_status(plugin):
return self._data[plugin]
@ -74,20 +61,24 @@ class Plugins2cdManager(StaticData):
def check_plugin_cd_status(self, plugin: str) -> bool:
"""
检测插件是否有 cd
:param plugin: 模块名
说明:
检测插件是否有 cd
参数:
:param plugin: 模块名
"""
return (
plugin in self._data.keys()
and self._data[plugin]["cd"] > 0
and self._data[plugin]["status"]
and self._data[plugin].cd > 0
and self._data[plugin].status
)
def check(self, plugin: str, id_: int) -> bool:
"""
检查 cd
:param plugin: 模块名
:param id_: 限制 id
说明:
检查 cd
参数:
:param plugin: 模块名
:param id_: 限制 id
"""
if self._freq_limiter.get(plugin):
return self._freq_limiter[plugin].check(id_)
@ -95,42 +86,96 @@ class Plugins2cdManager(StaticData):
def start_cd(self, plugin: str, id_: int, cd: int = 0):
"""
开始cd
:param plugin: 模块名
:param id_: cd 限制类型
:param cd: cd 时长
:return:
说明:
开始cd
参数:
:param plugin: 模块名
:param id_: cd 限制类型
:param cd: cd 时长
"""
if self._freq_limiter.get(plugin):
self._freq_limiter[plugin].start_cd(id_, cd)
def get_plugin_data(self, plugin: str) -> dict:
def get_plugin_data(self, plugin: str) -> Optional[PluginCd]:
"""
获取单个模块限制数据
:param plugin: 模块名
说明:
获取单个模块限制数据
参数:
:param plugin: 模块名
"""
if self._data.get(plugin) is not None:
if self._data.get(plugin):
return self._data.get(plugin)
return {}
def reload_cd_limit(self):
"""
加载 cd 限制器
:return:
说明:
加载 cd 限制器
"""
for plugin in self._data:
if self.check_plugin_cd_status(plugin):
self._freq_limiter[plugin] = FreqLimiter(
self.get_plugin_cd_data(plugin)["cd"]
self.get_plugin_cd_data(plugin).cd
)
logger.info(f"已成功加载 {len(self._freq_limiter)} 个Cd限制.")
def reload(self):
"""
重载本地数据
说明:
重载本地数据
"""
self.__load_file()
self.reload_cd_limit()
def save(self, path: Union[str, Path] = None):
"""
说明:
保存文件
参数:
:param path: 文件路径
"""
path = path or self.file
if isinstance(path, str):
path = Path(path)
if path:
with open(path, "w", encoding="utf8") as f:
yaml.dump(
{"PluginCdLimit": self.dict()},
f,
indent=2,
Dumper=yaml.RoundTripDumper,
allow_unicode=True,
)
_data = yaml.round_trip_load(open(path, encoding="utf8"))
_data["PluginCdLimit"].yaml_set_start_comment(
"""# 需要cd的功能
# 自定义的功能需要cd也可以在此配置
# key模块名称
# cdcd 时长(秒)
# status此限制的开关状态
# check_type'private'/'group'/'all',限制私聊/群聊/全部
# limit_type监听对象以user_id或group_id作为键来限制'user'用户id'group'群id
# 示例:'user'用户N秒内触发1次'group'群N秒内触发1次
# rst回复的话可以添加[at][uname][nickname]来对应艾特,用户群名称,昵称系统昵称
# rst 为 "" 或 None 时则不回复
# rst示例"[uname]你冲的太快了,[nickname]先生,请稍后再冲[at]"
# rst回复"老色批你冲的太快了,欧尼酱先生,请稍后再冲@老色批"
# 用户昵称↑ 昵称系统的昵称↑ 艾特用户↑""",
indent=2,
)
with open(path, "w", encoding="utf8") as wf:
yaml.round_trip_dump(
_data, wf, Dumper=yaml.RoundTripDumper, allow_unicode=True
)
def __load_file(self):
"""
说明:
读取配置文件
"""
self._data: Dict[str, PluginCd] = {}
if self.file.exists():
with open(self.file, "r", encoding="utf8") as f:
self._data: dict = yaml.load(f)
self._data = self._data["PluginCdLimit"]
self.reload_cd_limit()
temp = _yaml.load(f)
if "PluginCdLimit" in temp.keys():
for k, v in temp["PluginCdLimit"].items():
self._data[k] = PluginCd.parse_obj(v)

156
utils/manager/plugins2count_manager.py Executable file → Normal file
View File

@ -1,11 +1,12 @@
from typing import Optional, Dict
from .data_class import StaticData
from typing import Optional, Dict, Literal, Union
from utils.manager.data_class import StaticData
from utils.utils import DailyNumberLimiter
from services.log import logger
from pathlib import Path
from ruamel.yaml import YAML
from ruamel import yaml
from .models import PluginCount
yaml = YAML(typ="safe")
_yaml = yaml.YAML(typ="safe")
class Plugins2countManager(StaticData):
@ -14,16 +15,9 @@ class Plugins2countManager(StaticData):
"""
def __init__(self, file: Path):
self.file = file
super().__init__(None)
super().__init__(file, False)
self._daily_limiter: Dict[str, DailyNumberLimiter] = {}
if file.exists():
with open(file, "r", encoding="utf8") as f:
self._data = yaml.load(f)
if "PluginCountLimit" in self._data.keys():
self._data = (
self._data["PluginCountLimit"] if self._data["PluginCountLimit"] else {}
)
self.__load_file()
def add_count_limit(
self,
@ -31,64 +25,64 @@ class Plugins2countManager(StaticData):
*,
max_count: int = 5,
status: Optional[bool] = True,
limit_type: Optional[str] = "user",
limit_type: Literal["user", "group"] = "user",
rst: Optional[str] = None,
**kwargs # 用于接收额外实参
):
"""
添加插件调用 次数 限制
:param plugin: 插件模块名称
:param max_count: 最大次数限制
:param status: 默认开关状态
:param limit_type: 限制类型 监听对象以user_id或group_id作为键来限制'user'用户id'group'群id
:param rst: 回复的话为空则不回复
说明:
添加插件调用 次数 限制
参数:
:param plugin: 插件模块名称
:param max_count: 最大次数限制
:param status: 默认开关状态
:param limit_type: 限制类型 监听对象以user_id或group_id作为键来限制'user'用户id'group'群id
:param rst: 回复的话为空则不回复
"""
max_count = max_count or 5
status = status or True
limit_type = limit_type or "user"
if limit_type not in ["user", "group"]:
raise ValueError(f"{plugin} 添加count限制错误limit_type 必须为 'user'/'group'")
self._data[plugin] = {
"max_count": max_count,
"status": status,
"limit_type": limit_type,
"rst": rst,
}
self._data[plugin] = PluginCount(max_count=max_count, status=status, limit_type=limit_type, rst=rst)
def get_plugin_count_data(self, plugin: str) -> Optional[dict]:
def get_plugin_count_data(self, plugin: str) -> Optional[PluginCount]:
"""
获取插件次数数据
:param plugin: 模块名
说明:
获取插件次数数据
参数:
:param plugin: 模块名
"""
if self.check_plugin_count_status(plugin):
return self._data[plugin]
return None
def get_plugin_data(self, plugin: str) -> Optional[dict]:
def get_plugin_data(self, plugin: str) -> Optional[PluginCount]:
"""
获取单个模块限制数据
:param plugin: 模块名
说明:
获取单个模块限制数据
参数:
:param plugin: 模块名
"""
if self._data.get(plugin) is not None:
return self._data.get(plugin)
return None
def check_plugin_count_status(self, plugin: str) -> bool:
"""
检测插件是否有 次数 限制
:param plugin: 模块名
说明:
检测插件是否有 次数 限制
参数:
:param plugin: 模块名
"""
return (
plugin in self._data.keys()
and self._data[plugin]["status"]
and self._data[plugin]["max_count"] > 0
and self._data[plugin].status
and self._data[plugin].max_count > 0
)
def check(self, plugin: str, id_: int) -> bool:
"""
检查 count
:param plugin: 模块名
:param id_: 限制 id
说明:
检查 count
参数:
:param plugin: 模块名
:param id_: 限制 id
"""
if self._daily_limiter.get(plugin):
return self._daily_limiter[plugin].check(id_)
@ -96,24 +90,25 @@ class Plugins2countManager(StaticData):
def increase(self, plugin: str, id_: int, num: int = 1):
"""
增加次数
:param plugin: 模块名
:param id_: cd 限制类型
:param num: 增加次数
:return:
说明:
增加次数
参数:
:param plugin: 模块名
:param id_: cd 限制类型
:param num: 增加次数
"""
if self._daily_limiter.get(plugin):
self._daily_limiter[plugin].increase(id_, num)
def reload_count_limit(self):
"""
加载 cd 限制器
:return:
说明:
加载 cd 限制器
"""
for plugin in self._data:
if self.check_plugin_count_status(plugin):
self._daily_limiter[plugin] = DailyNumberLimiter(
self.get_plugin_count_data(plugin)["max_count"]
self.get_plugin_count_data(plugin).max_count
)
logger.info(f"已成功加载 {len(self._daily_limiter)} 个Count限制.")
@ -121,8 +116,59 @@ class Plugins2countManager(StaticData):
"""
重载本地数据
"""
self.__load_file()
self.reload_count_limit()
def save(self, path: Union[str, Path] = None):
"""
说明:
保存文件
参数:
:param path: 文件路径
"""
path = path or self.file
if isinstance(path, str):
path = Path(path)
if path:
with open(path, "w", encoding="utf8") as f:
yaml.dump(
{"PluginCountLimit": self.dict()},
f,
indent=2,
Dumper=yaml.RoundTripDumper,
allow_unicode=True,
)
_data = yaml.round_trip_load(open(path, encoding="utf8"))
_data["PluginCountLimit"].yaml_set_start_comment(
"""# 命令每日次数限制
# 即 用户/群聊 每日可调用命令的次数 [数据内存存储,重启将会重置]
# 每日调用直到 00:00 刷新
# key模块名称
# max_count: 每日调用上限
# status此限制的开关状态
# limit_type监听对象以user_id或group_id作为键来限制'user'用户id'group'群id
# 示例:'user':用户上限,'group':群聊上限
# rst回复的话可以添加[at][uname][nickname]来对应艾特,用户群名称,昵称系统昵称
# rst 为 "" 或 None 时则不回复
# rst示例"[uname]你冲的太快了,[nickname]先生,请稍后再冲[at]"
# rst回复"老色批你冲的太快了,欧尼酱先生,请稍后再冲@老色批"
# 用户昵称↑ 昵称系统的昵称↑ 艾特用户↑""",
indent=2,
)
with open(path, "w", encoding="utf8") as wf:
yaml.round_trip_dump(
_data, wf, Dumper=yaml.RoundTripDumper, allow_unicode=True
)
def __load_file(self):
"""
说明:
读取配置文件
"""
self._data: Dict[str, PluginCount] = {}
if self.file.exists():
with open(self.file, "r", encoding="utf8") as f:
self._data: dict = yaml.load(f)
self._data = self._data["PluginCountLimit"]
self.reload_count_limit()
temp = _yaml.load(f)
if "PluginCountLimit" in temp.keys():
for k, v in temp["PluginCountLimit"].items():
self._data[k] = PluginCount.parse_obj(v)

143
utils/manager/plugins2settings_manager.py Executable file → Normal file
View File

@ -1,11 +1,11 @@
from typing import List, Optional, Union, Tuple
from .data_class import StaticData
from typing import List, Optional, Union, Tuple, Dict
from utils.manager.data_class import StaticData
from pathlib import Path
from ruamel.yaml import YAML
from ruamel import yaml
from .models import PluginSetting
_yaml = YAML(typ="safe")
_yaml = yaml.YAML(typ="safe")
class Plugins2settingsManager(StaticData):
@ -14,97 +14,120 @@ class Plugins2settingsManager(StaticData):
"""
def __init__(self, file: Path):
self.file = file
super().__init__(None)
if file.exists():
with open(file, "r", encoding="utf8") as f:
self._data = _yaml.load(f)
if self._data:
if "PluginSettings" in self._data.keys():
self._data = (
self._data["PluginSettings"] if self._data["PluginSettings"] else {}
)
for x in self._data.keys():
if self._data[x].get("cost_gold") is None:
self._data[x]["cost_gold"] = 0
super().__init__(file, False)
self.__load_file()
def add_plugin_settings(
self,
plugin: str,
cmd: Optional[List[str]] = None,
default_status: Optional[bool] = True,
level: Optional[int] = 5,
limit_superuser: Optional[bool] = False,
default_status: bool = True,
level: int = 5,
limit_superuser: bool = False,
plugin_type: Tuple[Union[str, int]] = ("normal",),
cost_gold: int = 0,
):
"""
添加一个插件设置
:param plugin: 插件模块名称
:param cmd: 命令 命令别名
:param default_status: 默认开关状态
:param level: 功能权限等级
:param limit_superuser: 功能状态是否限制超级用户
:param plugin_type: 插件类型
:param cost_gold: 需要消费的金币
说明:
添加一个插件设置
参数:
:param plugin: 插件模块名称
:param cmd: 命令 命令别名
:param default_status: 默认开关状态
:param level: 功能权限等级
:param limit_superuser: 功能状态是否限制超级用户
:param plugin_type: 插件类型
:param cost_gold: 需要消费的金币
"""
level = level or 5
cmd = cmd or []
cost_gold = cost_gold or 0
self._data[plugin] = {
"level": level if level is not None else 5,
"default_status": default_status if default_status is not None else True,
"limit_superuser": (
limit_superuser
if limit_superuser is not None
else False
),
"cmd": cmd,
"plugin_type": list(
plugin_type if plugin_type is not None else ("normal",)
),
"cost_gold": cost_gold,
}
self._data[plugin] = PluginSetting(
cmd=cmd,
level=level,
default_status=default_status,
limit_superuser=limit_superuser,
plugin_type=plugin_type,
cost_gold=cost_gold,
)
def get_plugin_data(self, module: str) -> dict:
def get_plugin_data(self, module: str) -> Optional[PluginSetting]:
"""
通过模块名获取数据
:param module: 模块名称
说明:
通过模块名获取数据
参数:
:param module: 模块名称
"""
if self._data.get(module) is not None:
return self._data.get(module)
return {}
return self._data.get(module)
def get_plugin_module(
self, cmd: str, is_all: bool = False
) -> Union[str, List[str]]:
"""
根据 cmd 获取功能 modules
:param cmd: 命令
:param is_all: 获取全部包含cmd的模块
说明:
根据 cmd 获取功能 modules
参数:
:param cmd: 命令
:param is_all: 获取全部包含cmd的模块
"""
keys = []
for key in self._data.keys():
if cmd in self._data[key]["cmd"]:
if cmd in self._data[key].cmd:
if is_all:
keys.append(key)
else:
return key
return keys
def reload(self):
"""
说明:
重载本地数据
"""
self.__load_file()
def save(self, path: Union[str, Path] = None):
"""
说明:
保存文件
参数:
:param path: 文件路径
"""
path = path or self.file
if isinstance(path, str):
path = Path(path)
if path:
with open(path, "w", encoding="utf8") as f:
yaml.dump({"PluginSettings": self._data}, f, indent=2, Dumper=yaml.RoundTripDumper, allow_unicode=True)
yaml.dump(
{"PluginSettings": self.dict()},
f,
indent=2,
Dumper=yaml.RoundTripDumper,
allow_unicode=True,
)
_data = yaml.round_trip_load(open(path, encoding="utf8"))
_data["PluginSettings"].yaml_set_start_comment(
"""# 模块与对应命令和对应群权限
# 用于生成帮助图片 和 开关功能
# key模块名称
# level需要的群等级
# default_status加入群时功能的默认开关状态
# limit_superuser: 功能状态是否限制超级用户
# cmd: 关闭[cmd] 都会触发命令 关闭对应功能cmd列表第一个词为统计的功能名称
# plugin_type: 帮助类别 示例:('原神相关',) 或 ('原神相关', 1)1代表帮助命令列向排列否则为横向排列""",
indent=2,
)
with open(path, "w", encoding="utf8") as wf:
yaml.round_trip_dump(
_data, wf, Dumper=yaml.RoundTripDumper, allow_unicode=True
)
def reload(self):
def __load_file(self):
"""
重载本地数据
说明:
读取配置文件
"""
self._data: Dict[str, PluginSetting] = {}
if self.file.exists():
with open(self.file, "r", encoding="utf8") as f:
self._data: dict = _yaml.load(f)
self._data = self._data["PluginSettings"]
temp = _yaml.load(f)
if "PluginSettings" in temp.keys():
for k, v in temp["PluginSettings"].items():
self._data[k] = PluginSetting.parse_obj(v)

123
utils/manager/plugins_manager.py Executable file → Normal file
View File

@ -1,7 +1,33 @@
from typing import Optional
from typing import Optional, Dict, Callable, Union
from pathlib import Path
from .data_class import StaticData
from . import group_manager
from utils.manager.data_class import StaticData
from utils.manager import group_manager
from .models import Plugin
def init_plugin(func: Callable):
"""
说明:
初始化群数据
参数:
:param func: func
"""
def wrapper(*args, **kwargs):
self = args[0]
module = args[1]
if module not in self._data.keys():
self._data[module] = Plugin(
plugin_name=module,
status=True,
error=False,
block_type=None,
author=None,
version=None,
)
return func(*args, **kwargs)
return wrapper
class PluginsManager(StaticData):
@ -10,9 +36,10 @@ class PluginsManager(StaticData):
"""
def __init__(self, file: Path):
self._data: Dict[str, Plugin]
super().__init__(file)
if not self._data:
self._data = {}
for k, v in self._data.items():
self._data[k] = Plugin.parse_obj(v)
def add_plugin_data(
self,
@ -26,23 +53,25 @@ class PluginsManager(StaticData):
version: Optional[int] = None,
):
"""
添加插件数据
:param module: 模块名称
:param plugin_name: 插件名称
:param status: 插件开关状态
:param error: 加载状态
:param block_type: 限制类型
:param author: 作者
:param version: 版本
说明:
添加插件数据
参数:
:param module: 模块名称
:param plugin_name: 插件名称
:param status: 插件开关状态
:param error: 加载状态
:param block_type: 限制类型
:param author: 作者
:param version: 版本
"""
self._data[module] = {
"plugin_name": plugin_name,
"status": status,
"error": error,
"block_type": block_type,
"author": author,
"version": version,
}
self._data[module] = Plugin(
plugin_name=plugin_name,
status=status,
error=error,
block_type=block_type,
author=author,
version=version,
)
def block_plugin(
self, module: str, group_id: Optional[int] = None, block_type: str = "all"
@ -67,9 +96,7 @@ class PluginsManager(StaticData):
"""
self._set_plugin_status(module, "unblock", group_id)
def get_plugin_status(
self, module: str, block_type: str = "all"
) -> bool:
def get_plugin_status(self, module: str, block_type: str = "all") -> bool:
"""
说明:
获取插件状态
@ -78,13 +105,12 @@ class PluginsManager(StaticData):
:param block_type: 限制类型
"""
if module in self._data.keys():
if self._data[module]["block_type"] == "all" and block_type == "all":
if self._data[module].block_type == "all" and block_type == "all":
return False
else:
return not self._data[module]["block_type"] == block_type
return not self._data[module].block_type == block_type
return True
def get_plugin_block_type(self, module: str) -> str:
def get_plugin_block_type(self, module: str) -> Optional[str]:
"""
说明:
获取功能限制类型
@ -92,18 +118,19 @@ class PluginsManager(StaticData):
:param module: 模块名称
"""
if module in self._data.keys():
return self._data[module]["block_type"]
return ""
return self._data[module].block_type
@init_plugin
def get_plugin_error_status(self, module: str) -> bool:
"""
插件是否成功加载
:param module: 模块名称
说明:
插件是否成功加载
参数:
:param module: 模块名称
"""
if module not in self._data.keys():
self.init_plugin(module)
return self._data[module]["error"]
return self._data[module].error
@init_plugin
def _set_plugin_status(
self,
module: str,
@ -120,7 +147,6 @@ class PluginsManager(StaticData):
:param group_id: 群组
:param block_type: 限制类型
"""
group_id = str(group_id) if group_id else group_id
if module:
if group_id:
if status == "block":
@ -128,28 +154,11 @@ class PluginsManager(StaticData):
else:
group_manager.unblock_plugin(f"{module}:super", int(group_id))
else:
if module not in self._data.keys():
self.init_plugin(module)
if status == "block":
self._data[module]["status"] = False
self._data[module]["block_type"] = block_type
self._data[module].status = False
self._data[module].block_type = block_type
else:
if module in self._data.keys():
self._data[module]["status"] = True
self._data[module]["block_type"] = None
self._data[module].status = True
self._data[module].block_type = None
self.save()
def init_plugin(self, module: str):
"""
初始化插件数据
:param module: 模块名称
"""
if module not in self._data.keys():
self._data[module] = {
"plugin_name": module,
"status": True,
"error": False,
"block_type": None,
"author": None,
"version": None,
}

542
utils/manager/requests_manager.py Executable file → Normal file
View File

@ -1,271 +1,271 @@
from utils.manager.data_class import StaticData
from nonebot.adapters.onebot.v11 import Bot, ActionFailed
from services.log import logger
from typing import Optional, Literal
from utils.image_utils import BuildImage
from utils.utils import get_user_avatar
from pathlib import Path
from io import BytesIO
class RequestManager(StaticData):
"""
好友请求/邀请请求 管理
"""
def __init__(self, file: Optional[Path]):
super().__init__(file)
if not self._data:
self._data = {"private": {}, "group": {}}
def add_request(
self,
id_: int,
type_: str,
flag: str,
*,
nickname: Optional[str] = None,
level: Optional[int] = None,
sex: Optional[str] = None,
age: Optional[str] = None,
from_: Optional[str] = "",
comment: Optional[str] = None,
invite_group: Optional[int] = None,
group_name: Optional[str] = None,
):
"""
添加一个请求
:param id_: id用户id或群id
:param type_: 类型private group
:param flag: event.flag
:param nickname: 用户昵称
:param level: 等级
:param sex: 性别
:param age: 年龄
:param from_: 请求来自
:param comment: 附加消息
:param invite_group: 邀请群聊
:param group_name: 群聊名称
"""
self._data[type_][str(len(self._data[type_].keys()))] = {
"id": id_,
"flag": flag,
"nickname": nickname,
"level": level,
"sex": sex,
"age": age,
"from": from_,
"comment": comment,
"invite_group": invite_group,
"group_name": group_name,
}
self.save()
def remove_request(self, type_: str, id_: int):
"""
删除一个请求数据
:param type_: 类型
:param id_: iduser_id group_id
"""
for x in self._data[type_].keys():
if self._data[type_][x].get("id") == id_:
del self._data[type_][x]
break
self.save()
def get_group_id(self, id_: int) -> Optional[int]:
"""
通过id获取群号
:param id_: id
"""
data = self._data["group"].get(str(id_))
if data:
return data["invite_group"]
return None
async def approve(self, bot: Bot, id_: int, type_: str) -> int:
"""
同意请求
:param bot: Bot
:param id_: id
:param type_: 类型private group
"""
return await self._set_add_request(bot, id_, type_, True)
async def refused(self, bot: Bot, id_: int, type_: str) -> Optional[int]:
"""
拒绝请求
:param bot: Bot
:param id_: id
:param type_: 类型private group
"""
return await self._set_add_request(bot, id_, type_, False)
def clear(self, type_: Optional[str] = None): # type_: Optional[Literal["group", "private"]] = None
"""
清空所有请求信息无视请求
:param type_: 类型
"""
if type_:
self._data[type_] = {}
else:
self._data = {"private": {}, "group": {}}
self.save()
def delete_request(self, id_: int, type_: str): # type_: Literal["group", "private"]
"""
删除请求
:param id_: id
:param type_: 类型
"""
id_ = str(id_)
if self._data[type_].get(id_):
del self._data[type_][id_]
self.save()
def set_group_name(self, group_name: str, group_id: int):
"""
设置群聊名称
:param group_name: 名称
:param group_id: id
"""
for id_ in self._data["group"].keys():
if self._data["group"][id_]["invite_group"] == group_id:
self._data["group"][id_]["group_name"] = group_name
break
self.save()
async def show(self, type_: str) -> Optional[str]:
"""
请求可视化
"""
data = self._data[type_]
if not data:
return None
img_list = []
id_list = list(data.keys())
id_list.reverse()
for id_ in id_list:
age = data[id_]["age"]
nickname = data[id_]["nickname"]
comment = data[id_]["comment"] if type_ == "private" else ""
from_ = data[id_]["from"]
sex = data[id_]["sex"]
ava = BuildImage(
80, 80, background=BytesIO(await get_user_avatar(data[id_]["id"]))
)
ava.circle()
age_bk = BuildImage(
len(str(age)) * 6 + 6,
15,
color="#04CAF7" if sex == "male" else "#F983C1",
)
age_bk.text((3, 1), f"{age}", fill=(255, 255, 255))
x = BuildImage(
90, 32, font_size=15, color="#EEEFF4", font="HYWenHei-85W.ttf"
)
x.text((0, 0), "同意/拒绝", center_type="center")
x.circle_corner(10)
A = BuildImage(500, 100, font_size=24, font="msyh.ttf")
A.paste(ava, (15, 0), alpha=True, center_type="by_height")
A.text((120, 15), nickname)
A.paste(age_bk, (120, 50), True)
A.paste(
BuildImage(
200,
0,
font_size=12,
plain_text=f"对方留言:{comment}",
font_color=(140, 140, 143),
),
(120 + age_bk.w + 10, 49),
True,
)
if type_ == "private":
A.paste(
BuildImage(
200,
0,
font_size=12,
plain_text=f"来源:{from_}",
font_color=(140, 140, 143),
),
(120, 70),
True,
)
else:
A.paste(
BuildImage(
200,
0,
font_size=12,
plain_text=f"邀请你加入:{data[id_]['group_name']}({data[id_]['invite_group']})",
font_color=(140, 140, 143),
),
(120, 70),
True,
)
A.paste(x, (380, 35), True)
A.paste(
BuildImage(
0,
0,
plain_text=f"id{id_}",
font_size=13,
font_color=(140, 140, 143),
),
(400, 10),
True,
)
img_list.append(A)
A = BuildImage(500, len(img_list) * 100, 500, 100)
for img in img_list:
A.paste(img)
bk = BuildImage(A.w, A.h + 50, color="#F8F9FB", font_size=20)
bk.paste(A, (0, 50))
bk.text(
(15, 13), "好友请求" if type_ == "private" else "群聊请求", fill=(140, 140, 143)
)
return bk.pic2bs4()
async def _set_add_request(
self, bot: Bot, id_: int, type_: str, approve: bool
) -> int:
"""
处理请求
:param bot: Bot
:param id_: id
:param type_: 类型private group
:param approve: 是否同意
"""
id_ = str(id_)
if id_ in self._data[type_].keys():
try:
if type_ == "private":
await bot.set_friend_add_request(
flag=self._data[type_][id_]["flag"], approve=approve
)
rid = self._data[type_][id_]["id"]
else:
await bot.set_group_add_request(
flag=self._data[type_][id_]["flag"],
sub_type="invite",
approve=approve,
)
rid = self._data[type_][id_]["invite_group"]
except ActionFailed:
logger.info(
f"同意{self._data[type_][id_]['nickname']}({self._data[type_][id_]['id']})"
f"{'好友' if type_ == 'private' else '入群'}请求失败了..."
)
return 1 # flag失效
else:
logger.info(
f"{'同意' if approve else '拒绝'}{self._data[type_][id_]['nickname']}({self._data[type_][id_]['id']})"
f"{'好友' if type_ == 'private' else '入群'}请求..."
)
del self._data[type_][id_]
self.save()
return rid
return 2 # 未找到id
from utils.manager.data_class import StaticData
from nonebot.adapters.onebot.v11 import Bot, ActionFailed
from services.log import logger
from typing import Optional
from utils.image_utils import BuildImage
from utils.utils import get_user_avatar
from pathlib import Path
from io import BytesIO
class RequestManager(StaticData):
"""
好友请求/邀请请求 管理
"""
def __init__(self, file: Optional[Path]):
super().__init__(file)
if not self._data:
self._data = {"private": {}, "group": {}}
def add_request(
self,
id_: int,
type_: str,
flag: str,
*,
nickname: Optional[str] = None,
level: Optional[int] = None,
sex: Optional[str] = None,
age: Optional[str] = None,
from_: Optional[str] = "",
comment: Optional[str] = None,
invite_group: Optional[int] = None,
group_name: Optional[str] = None,
):
"""
添加一个请求
:param id_: id用户id或群id
:param type_: 类型private group
:param flag: event.flag
:param nickname: 用户昵称
:param level: 等级
:param sex: 性别
:param age: 年龄
:param from_: 请求来自
:param comment: 附加消息
:param invite_group: 邀请群聊
:param group_name: 群聊名称
"""
self._data[type_][str(len(self._data[type_].keys()))] = {
"id": id_,
"flag": flag,
"nickname": nickname,
"level": level,
"sex": sex,
"age": age,
"from": from_,
"comment": comment,
"invite_group": invite_group,
"group_name": group_name,
}
self.save()
def remove_request(self, type_: str, id_: int):
"""
删除一个请求数据
:param type_: 类型
:param id_: iduser_id group_id
"""
for x in self._data[type_].keys():
if self._data[type_][x].get("id") == id_:
del self._data[type_][x]
break
self.save()
def get_group_id(self, id_: int) -> Optional[int]:
"""
通过id获取群号
:param id_: id
"""
data = self._data["group"].get(str(id_))
if data:
return data["invite_group"]
return None
async def approve(self, bot: Bot, id_: int, type_: str) -> int:
"""
同意请求
:param bot: Bot
:param id_: id
:param type_: 类型private group
"""
return await self._set_add_request(bot, id_, type_, True)
async def refused(self, bot: Bot, id_: int, type_: str) -> Optional[int]:
"""
拒绝请求
:param bot: Bot
:param id_: id
:param type_: 类型private group
"""
return await self._set_add_request(bot, id_, type_, False)
def clear(self, type_: Optional[str] = None): # type_: Optional[Literal["group", "private"]] = None
"""
清空所有请求信息无视请求
:param type_: 类型
"""
if type_:
self._data[type_] = {}
else:
self._data = {"private": {}, "group": {}}
self.save()
def delete_request(self, id_: int, type_: str): # type_: Literal["group", "private"]
"""
删除请求
:param id_: id
:param type_: 类型
"""
id_ = str(id_)
if self._data[type_].get(id_):
del self._data[type_][id_]
self.save()
def set_group_name(self, group_name: str, group_id: int):
"""
设置群聊名称
:param group_name: 名称
:param group_id: id
"""
for id_ in self._data["group"].keys():
if self._data["group"][id_]["invite_group"] == group_id:
self._data["group"][id_]["group_name"] = group_name
break
self.save()
async def show(self, type_: str) -> Optional[str]:
"""
请求可视化
"""
data = self._data[type_]
if not data:
return None
img_list = []
id_list = list(data.keys())
id_list.reverse()
for id_ in id_list:
age = data[id_]["age"]
nickname = data[id_]["nickname"]
comment = data[id_]["comment"] if type_ == "private" else ""
from_ = data[id_]["from"]
sex = data[id_]["sex"]
ava = BuildImage(
80, 80, background=BytesIO(await get_user_avatar(data[id_]["id"]))
)
ava.circle()
age_bk = BuildImage(
len(str(age)) * 6 + 6,
15,
color="#04CAF7" if sex == "male" else "#F983C1",
)
age_bk.text((3, 1), f"{age}", fill=(255, 255, 255))
x = BuildImage(
90, 32, font_size=15, color="#EEEFF4", font="HYWenHei-85W.ttf"
)
x.text((0, 0), "同意/拒绝", center_type="center")
x.circle_corner(10)
A = BuildImage(500, 100, font_size=24, font="msyh.ttf")
A.paste(ava, (15, 0), alpha=True, center_type="by_height")
A.text((120, 15), nickname)
A.paste(age_bk, (120, 50), True)
A.paste(
BuildImage(
200,
0,
font_size=12,
plain_text=f"对方留言:{comment}",
font_color=(140, 140, 143),
),
(120 + age_bk.w + 10, 49),
True,
)
if type_ == "private":
A.paste(
BuildImage(
200,
0,
font_size=12,
plain_text=f"来源:{from_}",
font_color=(140, 140, 143),
),
(120, 70),
True,
)
else:
A.paste(
BuildImage(
200,
0,
font_size=12,
plain_text=f"邀请你加入:{data[id_]['group_name']}({data[id_]['invite_group']})",
font_color=(140, 140, 143),
),
(120, 70),
True,
)
A.paste(x, (380, 35), True)
A.paste(
BuildImage(
0,
0,
plain_text=f"id{id_}",
font_size=13,
font_color=(140, 140, 143),
),
(400, 10),
True,
)
img_list.append(A)
A = BuildImage(500, len(img_list) * 100, 500, 100)
for img in img_list:
A.paste(img)
bk = BuildImage(A.w, A.h + 50, color="#F8F9FB", font_size=20)
bk.paste(A, (0, 50))
bk.text(
(15, 13), "好友请求" if type_ == "private" else "群聊请求", fill=(140, 140, 143)
)
return bk.pic2bs4()
async def _set_add_request(
self, bot: Bot, id_: int, type_: str, approve: bool
) -> int:
"""
处理请求
:param bot: Bot
:param id_: id
:param type_: 类型private group
:param approve: 是否同意
"""
id_ = str(id_)
if id_ in self._data[type_].keys():
try:
if type_ == "private":
await bot.set_friend_add_request(
flag=self._data[type_][id_]["flag"], approve=approve
)
rid = self._data[type_][id_]["id"]
else:
await bot.set_group_add_request(
flag=self._data[type_][id_]["flag"],
sub_type="invite",
approve=approve,
)
rid = self._data[type_][id_]["invite_group"]
except ActionFailed:
logger.info(
f"同意{self._data[type_][id_]['nickname']}({self._data[type_][id_]['id']})"
f"{'好友' if type_ == 'private' else '入群'}请求失败了..."
)
return 1 # flag失效
else:
logger.info(
f"{'同意' if approve else '拒绝'}{self._data[type_][id_]['nickname']}({self._data[type_][id_]['id']})"
f"{'好友' if type_ == 'private' else '入群'}请求..."
)
del self._data[type_][id_]
self.save()
return rid
return 2 # 未找到id

2
utils/manager/resources_manager.py Executable file → Normal file
View File

@ -1,7 +1,7 @@
from typing import Union, List, Optional
from configs.path_config import IMAGE_PATH, DATA_PATH, RECORD_PATH, TEXT_PATH, FONT_PATH, LOG_PATH
from .data_class import StaticData
from utils.manager.data_class import StaticData
from pathlib import Path
from ruamel.yaml import YAML
from services.log import logger

0
utils/manager/withdraw_message_manager.py Executable file → Normal file
View File

View File

@ -450,4 +450,4 @@ def change_img_md5(path_file: Union[str, Path]) -> bool:
return True
except Exception as e:
logger.warning(f"改变图片MD5发生错误 {type(e)}{e} Path{path_file}")
return False
return False