zhenxun_bot/utils/manager/group_manager.py

353 lines
10 KiB
Python
Raw Normal View History

2022-11-21 20:43:41 +08:00
from typing import List, Union, Dict, Callable
from pathlib import Path
from .models import BaseData, BaseGroup
from utils.manager.data_class import StaticData
from utils.utils import get_matchers, is_number
from configs.config import Config
import nonebot
import ujson as json
Config.add_plugin_config(
"group_manager", "DEFAULT_GROUP_LEVEL", 5, help_="默认群权限", default_value=5
)
Config.add_plugin_config(
"group_manager", "DEFAULT_GROUP_BOT_STATUS", True, help_="默认进群总开关状态", default_value=True
)
def init_group(func: Callable):
"""
说明:
初始化群数据
参数:
:param func: func
"""
def wrapper(*args, **kwargs):
self = args[0]
group_id = list(filter(lambda x: is_number(x), args[1:]))[0]
if self and group_id and not self._data.group_manager.get(str(group_id)):
self._data.group_manager[str(group_id)] = BaseGroup()
return func(*args, **kwargs)
return wrapper
def init_task(func: Callable):
"""
说明:
初始化群被动
参数:
:param func: func
"""
def wrapper(*args, **kwargs):
self = args[0]
group_id = str(args[1])
task = args[2] if len(args) > 1 else None
if group_id and task and self._data.group_manager[group_id].group_task_status.get(task) is None:
for task in self._data.task:
if self._data.group_manager[group_id].group_task_status.get(task) is None:
self._data.group_manager[group_id].group_task_status[task] = Config.get_config('_task', f'DEFAULT_{task}', default=True)
for task in list(self._data.group_manager[group_id].group_task_status):
if task not in self._data.task:
del self._data.group_manager[group_id].group_task_status[task]
self.save()
return func(*args, **kwargs)
return wrapper
class GroupManager(StaticData):
"""
群权限 | 功能 | 总开关 | 聊天时间 管理器
"""
def __init__(self, file: Path):
super().__init__(file, False)
self._data = BaseData.parse_file(file) if file.exists() else BaseData()
def block_plugin(self, module: str, group_id: int, is_save: bool = True):
"""
说明:
锁定插件
参数:
:param module: 功能模块名
:param group_id: 群组None时为超级用户禁用
:param is_save: 是否保存文件
"""
self._set_plugin_status(module, "block", group_id, is_save)
def unblock_plugin(self, module: str, group_id: int, is_save: bool = True):
"""
说明:
解锁插件
参数:
:param module: 功能模块名
:param group_id: 群组
:param is_save: 是否保存文件
"""
self._set_plugin_status(module, "unblock", group_id, is_save)
def turn_on_group_bot_status(self, group_id: int):
"""
说明:
开启群bot开关
参数:
:param group_id: 群号
"""
self._set_group_bot_status(group_id, True)
def shutdown_group_bot_status(self, group_id: int):
"""
说明:
关闭群bot开关
参数:
:param group_id: 群号
"""
self._set_group_bot_status(group_id, False)
@init_group
def check_group_bot_status(self, group_id: int) -> bool:
"""
说明:
检查群聊bot总开关状态
参数:
:param group_id: 说明
"""
return self._data.group_manager[str(group_id)].status
@init_group
def set_group_level(self, group_id: int, level: int):
"""
说明:
设置群权限
参数:
:param group_id: 群组
:param level: 权限等级
"""
self._data.group_manager[str(group_id)].level = level
self.save()
@init_group
def get_plugin_status(self, module: str, group_id: int) -> bool:
"""
说明:
获取插件状态
参数:
:param module: 功能模块名
:param group_id: 群组
"""
return module not in self._data.group_manager[str(group_id)].close_plugins
@init_group
def get_group_level(self, group_id: int) -> int:
"""
说明:
获取群等级
参数:
:param group_id: 群号
"""
return self._data.group_manager[str(group_id)].level
def check_group_is_white(self, group_id: int) -> bool:
"""
说明:
检测群聊是否在白名单
参数:
:param group_id: 群号
"""
return group_id in self._data.white_group
def add_group_white_list(self, group_id: int):
"""
说明:
将群聊加入白名单
参数:
:param group_id: 群号
"""
if group_id not in self._data.white_group:
self._data.white_group.append(group_id)
def delete_group_white_list(self, group_id: int):
"""
说明:
将群聊从白名单中删除
参数:
:param group_id: 群号
"""
if group_id in self._data.white_group:
self._data.white_group.remove(group_id)
def get_group_white_list(self) -> List[int]:
"""
说明:
获取所有群白名单
"""
return self._data.white_group
def load_task(self):
"""
说明:
加载被动技能
"""
for matcher in get_matchers(True):
_plugin = nonebot.plugin.get_plugin(matcher.plugin_name)
try:
_module = _plugin.module
plugin_task = _module.__getattribute__("__plugin_task__")
for key in plugin_task.keys():
if key in self._data.task.keys():
raise ValueError(f"plugin_task{key} 已存在!")
self._data.task[key] = plugin_task[key]
except AttributeError:
pass
@init_group
def delete_group(self, group_id: int):
"""
说明:
删除群配置
参数:
:param group_id: 群号
"""
if group_id in self._data.white_group:
self._data.white_group.remove(group_id)
self.save()
def open_group_task(self, group_id: int, task: str):
"""
说明:
开启群被动技能
参数:
:param group_id: 群号
:param task: 被动技能名称
"""
self._set_group_group_task_status(group_id, task, True)
def close_group_task(self, group_id: int, task: str):
"""
说明:
关闭群被动技能
参数:
:param group_id: 群号
:param task: 被动技能名称
"""
self._set_group_group_task_status(group_id, task, False)
@init_task
def check_group_task_status(self, group_id: int, task: str) -> bool:
"""
说明:
查看群被动技能状态
参数:
:param group_id: 群号
:param task: 被动技能名称
"""
return self._data.group_manager[str(group_id)].group_task_status.get(task, False)
def get_task_data(self) -> Dict[str, str]:
"""
说明:
获取所有被动任务
"""
return self._data.task
@init_task
def group_group_task_status(self, group_id: int) -> str:
"""
说明:
查看群被全部动技能状态
参数:
:param group_id: 群号
"""
x = "[群被动技能]:\n"
group_id = str(group_id)
for key in self._data.group_manager[group_id].group_task_status.keys():
x += f'{self._data.task[key]}{"" if self.check_group_task_status(int(group_id), key) else "×"}\n'
return x[:-1]
@init_task
@init_group
def _set_group_group_task_status(self, group_id: int, task: str, status: bool):
"""
说明:
管理群被动技能状态
参数:
:param group_id: 群号
:param task: 被动技能
:param status: 状态
"""
self._data.group_manager[str(group_id)].group_task_status[task] = status
self.save()
@init_group
def _set_plugin_status(
self,
module: str,
status: str,
group_id: int,
is_save: bool
):
"""
说明:
设置功能开关状态
参数:
:param module: 功能模块名
:param status: 功能状态
:param group_id: 群组
:param is_save: 是否保存
"""
group_id = str(group_id) if group_id else group_id
if status == "block":
if module not in self._data.group_manager[group_id].close_plugins:
self._data.group_manager[group_id].close_plugins.append(module)
else:
if module in self._data.group_manager[group_id].close_plugins:
self._data.group_manager[group_id].close_plugins.remove(module)
if is_save:
self.save()
@init_group
def _set_group_bot_status(self, group_id: Union[int, str], status: bool):
"""
说明:
设置群聊bot总开关
参数:
:param group_id: 群号
:param status: 开关状态
"""
self._data.group_manager[str(group_id)].status = status
self.save()
def reload(self):
if self.file.exists():
t = self._data.task
self._data = BaseData.parse_file(self.file)
self._data.task = t
def save(self, path: Union[str, Path] = None):
"""
说明:
保存文件
参数:
:param path: 路径文件
"""
path = path or self.file
if isinstance(path, str):
path = Path(path)
if path:
dict_data = self._data.dict()
del dict_data["task"]
with open(path, "w", encoding="utf8") as f:
json.dump(dict_data, f, ensure_ascii=False, indent=4)
# def get_super_old_data(self) -> Optional[dict]:
# """
# 说明:
# 获取旧数据,平时使用请不要调用
# """
# if self._data["super"].get("close_plugins"):
# _x = self._data["super"].get("close_plugins")
# del self._data["super"]["close_plugins"]
# return _x
# return None