update v0.1.4.7

This commit is contained in:
HibiKier 2022-04-10 22:19:50 +08:00
parent 2dcac5f009
commit 0215abac30
14 changed files with 502 additions and 233 deletions

View File

@ -242,6 +242,12 @@ __Docker 最新版本由 [Sakuracio](https://github.com/Sakuracio) 提供__
## 更新
### 2022/4/10 \[v0.1.4.7]
* 新增消息记录模块
* 丰富处理请求操作提示
* web ui新增配置项修改
### 2022/4/9
* fix: 更新问题,戳一戳图片路径问题 [@pull/144](https://github.com/HibiKier/zhenxun_bot/pull/144)

View File

@ -1 +1 @@
__version__: v0.1.4.6
__version__: v0.1.4.7

View File

@ -0,0 +1,37 @@
from nonebot.adapters.onebot.v11 import GroupMessageEvent, MessageEvent
from models.chat_history import ChatHistory
from ._rule import rule
from configs.config import Config
from nonebot import on_message
Config.add_plugin_config(
"chat_history",
"FLAG",
True,
help_="是否开启消息自从存储",
name="消息存储",
default_value=True
)
chat_history = on_message(rule=rule)
# test = on_command("aa")
@chat_history.handle()
async def _(event: MessageEvent):
if isinstance(event, GroupMessageEvent):
await ChatHistory.add_chat_msg(event.user_id, event.group_id, str(event.get_message()))
else:
await ChatHistory.add_chat_msg(event.user_id, None, str(event.get_message()))
# @test.handle()
# async def _(event: MessageEvent):
# print(await ChatHistory.get_user_msg(event.user_id, "private"))
# print(await ChatHistory.get_user_msg_count(event.user_id, "private"))
# print(await ChatHistory.get_user_msg(event.user_id, "group"))
# print(await ChatHistory.get_user_msg_count(event.user_id, "group"))
# print(await ChatHistory.get_group_msg(event.group_id))
# print(await ChatHistory.get_group_msg_count(event.group_id))

View File

@ -0,0 +1,6 @@
from nonebot.adapters.onebot.v11 import Event, MessageEvent
from configs.config import Config
def rule(event: Event) -> bool:
return Config.get_config("chat_history", "FLAG") and isinstance(event, MessageEvent)

View File

@ -82,15 +82,16 @@ async def _(bot: Bot, cmd: Tuple[str, ...] = Command(), arg: Message = CommandAr
if is_number(id_):
id_ = int(id_)
if cmd[:2] == "同意":
if await requests_manager.approve(bot, id_, "private"):
await friend_handle.send("同意好友请求成功..")
else:
await friend_handle.send("同意好友请求失败可能是未找到此id的请求..")
flag = await requests_manager.approve(bot, id_, "private")
else:
if await requests_manager.refused(bot, id_, "private"):
await friend_handle.send("拒绝好友请求成功..")
else:
await friend_handle.send("拒绝好友请求失败可能是未找到此id的请求..")
flag = await requests_manager.refused(bot, id_, "private")
if flag == 1:
await friend_handle.send(f"{cmd[:2]}好友请求失败,该请求已失效..")
requests_manager.delete_request(id_, "private")
elif flag == 2:
await friend_handle.send(f"{cmd[:2]}好友请求失败未找到此id的请求..")
else:
await friend_handle.send(f"{cmd[:2]}好友请求成功!")
else:
await friend_handle.send("id必须为纯数字")
@ -99,12 +100,12 @@ async def _(bot: Bot, cmd: Tuple[str, ...] = Command(), arg: Message = CommandAr
async def _(bot: Bot, cmd: Tuple[str, ...] = Command(), arg: Message = CommandArg()):
cmd = cmd[0]
id_ = arg.extract_plain_text().strip()
flag = None
if is_number(id_):
id_ = int(id_)
if cmd[:2] == "同意":
rid = requests_manager.get_group_id(id_)
if rid:
await friend_handle.send("同意群聊请求成功..")
if await GroupInfo.get_group_info(rid):
await GroupInfo.set_group_flag(rid, 1)
else:
@ -116,14 +117,18 @@ async def _(bot: Bot, cmd: Tuple[str, ...] = Command(), arg: Message = CommandAr
group_info["member_count"],
1
)
await requests_manager.approve(bot, id_, "group")
flag = await requests_manager.approve(bot, id_, "group")
else:
await friend_handle.send("同意群聊请求失败,可能是未找到此id的请求..")
await friend_handle.send("同意群聊请求失败,未找到此id的请求..")
else:
if await requests_manager.refused(bot, id_, "group"):
await friend_handle.send("拒绝群聊请求成功..")
else:
await friend_handle.send("拒绝群聊请求失败可能是未找到此id的请求..")
flag = await requests_manager.refused(bot, id_, "group")
if flag == 1:
await friend_handle.send(f"{cmd[:2]}群聊请求失败,该请求已失效..")
requests_manager.delete_request(id_, "group")
elif flag == 2:
await friend_handle.send(f"{cmd[:2]}群聊请求失败未找到此id的请求..")
else:
await friend_handle.send(f"{cmd[:2]}群聊请求成功!")
else:
await friend_handle.send("id必须为纯数字")

View File

@ -11,14 +11,19 @@ class ConfigsManager:
def __init__(self, file: Path):
self._data: dict = {}
self._simple_data: dict = {}
self._admin_level_data = []
self._simple_file = Path() / "configs" / "config.yaml"
if file:
file.parent.mkdir(exist_ok=True, parents=True)
self.file = file
_yaml = YAML()
if file.exists():
_yaml = YAML()
with open(file, "r", encoding="utf8") as f:
self._data = _yaml.load(f)
if self._simple_file.exists():
with open(self._simple_file, "r", encoding="utf8") as f:
self._simple_data = _yaml.load(f)
def add_plugin_config(
self,
@ -71,6 +76,7 @@ class ConfigsManager:
"""
if module in self._data.keys():
del self._data[module]
self.save()
def set_config(self, module: str, key: str, value: str):
"""
@ -80,8 +86,10 @@ class ConfigsManager:
:param value:
"""
if module in self._data.keys():
if self._data[module].get(key) is not None:
if self._data[module].get(key) is not None and self._data[module][key] != value:
self._data[module][key]["value"] = value
self._simple_data[module][key] = value
self.save()
def set_help(self, module: str, key: str, help_: str):
"""
@ -93,6 +101,7 @@ class ConfigsManager:
if module in self._data.keys():
if self._data[module].get(key) is not None:
self._data[module][key]["help"] = help_
self.save()
def set_default_value(self, module: str, key: str, value: str):
"""
@ -104,6 +113,7 @@ class ConfigsManager:
if module in self._data.keys():
if self._data[module].get(key) is not None:
self._data[module][key]["default_value"] = value
self.save()
def get_config(self, module: str, key: str, default: Optional[Any] = None) -> Optional[Any]:
"""
@ -142,11 +152,17 @@ class ConfigsManager:
if key in self._data.keys():
return self._data[key]
def save(self, path: Union[str, Path] = None):
def save(self, path: Union[str, Path] = None, save_simple_data: bool = False):
"""
保存数据
:param path: 路径
:param save_simple_data: 同时保存至config.yaml
"""
if save_simple_data:
with open(self._simple_file, "w", encoding="utf8") as f:
yaml.dump(
self._simple_data, f, indent=2, Dumper=yaml.RoundTripDumper, allow_unicode=True
)
path = path if path else self.file
with open(path, "w", encoding="utf8") as f:
yaml.dump(

123
models/chat_history.py Normal file
View File

@ -0,0 +1,123 @@
from datetime import datetime, timedelta
from typing import List, Literal, Optional
from services.db_context import db
class ChatHistory(db.Model):
__tablename__ = "chat_history"
id = db.Column(db.Integer(), primary_key=True)
user_qq = db.Column(db.BigInteger(), nullable=False)
group_id = db.Column(db.BigInteger())
text = db.Column(db.Text())
create_time = db.Column(db.DateTime(timezone=True), nullable=False)
@classmethod
async def add_chat_msg(cls, user_qq: int, group_id: Optional[int], text: str):
await cls.create(
user_qq=user_qq, group_id=group_id, text=text, create_time=datetime.now()
)
@classmethod
async def get_user_msg(
cls,
uid: int,
msg_type: Optional[Literal["private", "group"]],
days: Optional[int] = None,
) -> List["ChatHistory"]:
"""
说明
获取用户消息
参数
:param uid: 用户qq
:param msg_type: 消息类型私聊或群聊
:param days: 限制日期
"""
return await cls._get_msg(uid, None, "user", msg_type, days).gino.all()
@classmethod
async def get_user_msg_count(
cls,
uid: int,
msg_type: Optional[Literal["private", "group"]],
days: Optional[int] = None,
) -> int:
"""
说明
获取用户消息数量
参数
:param uid: 用户qq
:param msg_type: 消息类型私聊或群聊
:param days: 限制日期
"""
return (await cls._get_msg(uid, None, "user", msg_type, days, True).gino.first())[0]
@classmethod
async def get_group_msg(
cls,
gid: int,
days: Optional[int] = None,
) -> List["ChatHistory"]:
"""
说明
获取群聊消息
参数
:param gid: 用户qq
:param days: 限制日期
"""
return await cls._get_msg(None, gid, "group", None, days).gino.all()
@classmethod
async def get_group_msg_count(
cls,
gid: int,
days: Optional[int] = None,
) -> List["ChatHistory"]:
"""
说明
获取群聊消息数量
参数
:param gid: 用户qq
:param days: 限制日期
"""
return (await cls._get_msg(None, gid, "group", None, days, True).gino.first())[0]
@classmethod
def _get_msg(
cls,
uid: Optional[int],
gid: Optional[int],
type_: Literal["user", "group"],
msg_type: Optional[Literal["private", "group"]],
days: Optional[int],
is_select_count: bool = False
):
"""
说明
获取消息查询query
参数
:param uid: 用户qq
:param gid: 群号
:param type_: 类型私聊或群聊
:param msg_type: 消息类型用户或群聊
:param days: 限制日期
"""
if is_select_count:
setattr(ChatHistory, 'count', db.func.count(cls.id).label('count'))
query = cls.select('count')
else:
query = cls.query
if type_ == "user":
query = query.where(cls.user_qq == uid)
if msg_type == "private":
query = query.where(cls.group_id == None)
elif msg_type == "group":
query = query.where(cls.group_id != None)
else:
query = query.where(cls.group_id == gid)
if days:
query = query.where(
cls.create_time >= datetime.now() - timedelta(days=days)
)
return query

View File

@ -76,6 +76,7 @@ async def _(bot: Bot, event: MessageEvent):
f"问题:{msg} ---- 回答:{result}"
)
if result:
result = str(result)
for t in Config.get_config("ai", "TEXT_FILTER"):
result = result.replace(t, "*")
await ai.finish(Message(result))

View File

@ -1,206 +1,3 @@
from utils.manager import (
plugins_manager,
group_manager,
plugins2settings_manager,
plugins2cd_manager,
plugins2block_manager,
plugins2count_manager,
requests_manager,
)
from ..auth import token_to_user, Depends, User
from utils.utils import get_matchers, get_bot
from models.group_info import GroupInfo
from pydantic.error_wrappers import ValidationError
from services.log import logger
from ..config import *
import nonebot
app = nonebot.get_app()
plugin_name_list = None
@app.get("/webui/plugins")
def _(type_: Optional[str], user: User = Depends(token_to_user)) -> Result:
"""
获取插件列表
:param type_: 类型 normal, superuser, hidden, admin
"""
global plugin_name_list
if not plugin_name_list:
plugin_name_list = [x.plugin_name for x in get_matchers()]
plugin_list = []
plugin_data = plugins_manager.get_data()
for model in plugin_data:
if model in plugin_name_list:
data = plugin_data.get(model)
data["model"] = model
plugin_name = data.get("plugin_name")
if (
(type_ == "hidden" and "[hidden]" not in plugin_name.lower())
or (type_ == "admin" and "[admin]" not in plugin_name.lower())
or (type_ == "superuser" and "[superuser]" not in plugin_name.lower())
):
continue
if type_ == "normal" and (
"[hidden]" in plugin_name.lower()
or "[admin]" in plugin_name.lower()
or "[superuser]" in plugin_name.lower()
):
continue
data = {"model": model}
if x := plugin_data.get(model):
if not x.get("status") and x.get("block_type") in [
"group",
"private",
"all",
]:
x["block_type"] = (
"群聊"
if x["block_type"] == "group"
else "私聊"
if x["block_type"] == "private"
else "全部"
)
data["plugin_manager"] = PluginManager(**x)
if x := plugins2settings_manager.get(model):
if x.get("cmd") and isinstance(x.get("cmd"), list):
x["cmd"] = ",".join(x["cmd"])
# if isinstance(x["plugin_type"], list):
# x["plugin_type"] = x["plugin_type"][0]
data["plugin_settings"] = PluginSettings(**x)
if x := plugins2cd_manager.get(model):
data["cd_limit"] = CdLimit(**x)
if x := plugins2block_manager.get(model):
data["block_limit"] = BlockLimit(**x)
if x := plugins2count_manager.get(model):
data["count_limit"] = CountLimit(**x)
# if x := resources_manager.get(model):
# data = dict(data, **x)
plugin_list.append(Plugin(**data))
return Result(code=200, data=plugin_list)
@app.post("/webui/plugins")
def _(plugin: Plugin, user: User = Depends(token_to_user)) -> Result:
"""
修改插件信息
:param plugin: 插件内容
"""
if plugin.plugin_settings:
for key, value in plugin.plugin_settings:
plugins2settings_manager.set_module_data(plugin.model, key, value)
if plugin.plugin_manager:
for key, value in plugin.plugin_manager:
plugins_manager.set_module_data(plugin.model, key, value)
return Result(code=200)
@app.get("/webui/group")
async def _(user: User = Depends(token_to_user)) -> Result:
"""
获取群信息
"""
group_list_result = []
group_info = {}
if bot := get_bot():
group_list = await bot.get_group_list()
for g in group_list:
group_info[g["group_id"]] = Group(**g)
group_data = group_manager.get_data()
for group_id in group_data["group_manager"]:
try:
task_list = []
data = group_data["group_manager"][group_id]
for tn, status in data["group_task_status"].items():
task_list.append(
Task(
**{
"name": tn,
"nameZh": group_manager.get_task_data().get(tn) or tn,
"status": status,
}
)
)
data["task"] = task_list
if x := group_info.get(int(group_id)):
data["group"] = x
else:
continue
try:
group_list_result.append(GroupResult(**data))
except ValidationError:
pass
except Exception as e:
logger.error(f"WEB_UI /webui/group 发生错误 {type(e)}{e}")
return Result(code=200, data=group_list_result)
@app.post("/webui/group")
async def _(group: GroupResult, user: User = Depends(token_to_user)) -> Result:
"""
修改群信息
"""
group_id = group.group.group_id
group_manager.set_group_level(group_id, group.level)
if group.status:
group_manager.turn_on_group_bot_status(group_id)
else:
group_manager.shutdown_group_bot_status(group_id)
return Result(code=200)
@app.get("/webui/request")
def _(type_: Optional[str], user: User = Depends(token_to_user)) -> Result:
req_data = requests_manager.get_data()
req_list = []
if type_ in ["group", "private"]:
req_data = req_data[type_]
for x in req_data:
req_data[x]["oid"] = x
req_list.append(RequestResult(**req_data[x]))
return Result(code=200, data=req_list)
@app.delete("/webui/request")
def _(type_: Optional[str], user: User = Depends(token_to_user)) -> Result:
"""
清空请求
:param type_: 类型
"""
requests_manager.clear(type_)
return Result(code=200)
@app.post("/webui/request")
async def _(parma: RequestParma, user: User = Depends(token_to_user)) -> Result:
"""
操作请求
:param parma: 参数
"""
result = "error"
if bot := get_bot():
if parma.handle == "approve":
if parma.type == "group":
rid = requests_manager.get_group_id(parma.id)
if await GroupInfo.get_group_info(rid):
await GroupInfo.set_group_flag(rid, 1)
else:
group_info = await bot.get_group_info(group_id=rid)
await GroupInfo.add_group_info(
rid,
group_info["group_name"],
group_info["max_member_count"],
group_info["member_count"],
1,
)
if await requests_manager.approve(bot, parma.id, parma.type):
result = "ok"
elif parma.handle == "refuse":
if await requests_manager.refused(bot, parma.id, parma.type):
result = "ok"
elif parma.handle == "delete":
requests_manager.delete_request(parma.id, parma.type)
result = "ok"
return Result(code=200, data=result)
from .group import *
from .plugins import *
from .request import *

View File

@ -0,0 +1,61 @@
from pydantic.error_wrappers import ValidationError
from services.log import logger
from utils.manager import group_manager
from utils.utils import get_bot
from ..auth import Depends, User, token_to_user
from ..config import *
@app.get("/webui/group")
async def _(user: User = Depends(token_to_user)) -> Result:
"""
获取群信息
"""
group_list_result = []
group_info = {}
if bot := get_bot():
group_list = await bot.get_group_list()
for g in group_list:
group_info[g["group_id"]] = Group(**g)
group_data = group_manager.get_data()
for group_id in group_data["group_manager"]:
try:
task_list = []
data = group_data["group_manager"][group_id]
for tn, status in data["group_task_status"].items():
task_list.append(
Task(
**{
"name": tn,
"nameZh": group_manager.get_task_data().get(tn) or tn,
"status": status,
}
)
)
data["task"] = task_list
if x := group_info.get(int(group_id)):
data["group"] = x
else:
continue
try:
group_list_result.append(GroupResult(**data))
except ValidationError:
pass
except Exception as e:
logger.error(f"WEB_UI /webui/group 发生错误 {type(e)}{e}")
return Result(code=200, data=group_list_result)
@app.post("/webui/group")
async def _(group: GroupResult, user: User = Depends(token_to_user)) -> Result:
"""
修改群信息
"""
group_id = group.group.group_id
group_manager.set_group_level(group_id, group.level)
if group.status:
group_manager.turn_on_group_bot_status(group_id)
else:
group_manager.shutdown_group_bot_status(group_id)
return Result(code=200)

View File

@ -0,0 +1,142 @@
from configs.config import Config
from services.log import logger
from utils.manager import (plugins2block_manager, plugins2cd_manager,
plugins2count_manager, plugins2settings_manager,
plugins_manager)
from utils.utils import get_matchers
from ..auth import Depends, User, token_to_user
from ..config import *
plugin_name_list = None
@app.get("/webui/plugins")
def _(type_: Optional[str], user: User = Depends(token_to_user)) -> Result:
"""
获取插件列表
:param type_: 类型 normal, superuser, hidden, admin
"""
global plugin_name_list
if not plugin_name_list:
plugin_name_list = [x.plugin_name for x in get_matchers()]
plugin_list = []
plugin_data = plugins_manager.get_data()
for model in plugin_data:
if model in plugin_name_list:
try:
data = plugin_data.get(model)
data["model"] = model
plugin_name = data.get("plugin_name")
if (
(type_ == "hidden" and "[hidden]" not in plugin_name.lower())
or (type_ == "admin" and "[admin]" not in plugin_name.lower())
or (
type_ == "superuser"
and "[superuser]" not in plugin_name.lower()
)
):
continue
if type_ == "normal" and (
"[hidden]" in plugin_name.lower()
or "[admin]" in plugin_name.lower()
or "[superuser]" in plugin_name.lower()
):
continue
data = {"model": model}
if x := plugin_data.get(model):
if not x.get("status") and x.get("block_type") in [
"group",
"private",
"all",
]:
x["block_type"] = (
"群聊"
if x["block_type"] == "group"
else "私聊"
if x["block_type"] == "private"
else "全部"
)
data["plugin_manager"] = PluginManager(**x)
if x := plugins2settings_manager.get(model):
if x.get("cmd") and isinstance(x.get("cmd"), list):
x["cmd"] = ",".join(x["cmd"])
data["plugin_settings"] = PluginSettings(**x)
if x := plugins2cd_manager.get(model):
data["cd_limit"] = CdLimit(**x)
if x := plugins2block_manager.get(model):
data["block_limit"] = BlockLimit(**x)
if x := plugins2count_manager.get(model):
data["count_limit"] = CountLimit(**x)
if x := Config.get(model):
id_ = 0
tmp = []
for key in x.keys():
tmp.append(
PluginConfig(
**{
"key": key,
"help_": x[key].get("help"),
"id": id_,
**x[key],
}
)
)
id_ += 1
data["plugin_config"] = tmp
plugin_list.append(Plugin(**data))
except Exception as e:
logger.error(
f"WEB_UI GET /webui/plugins model{model} 发生错误 {type(e)}{e}"
)
return Result(
code=500,
data=f"WEB_UI GET /webui/plugins model{model} 发生错误 {type(e)}{e}",
)
return Result(code=200, data=plugin_list)
@app.post("/webui/plugins")
def _(plugin: Plugin, user: User = Depends(token_to_user)) -> Result:
"""
修改插件信息
:param plugin: 插件内容
"""
try:
if plugin.plugin_config:
for c in plugin.plugin_config:
if str(c.value).lower() in ["true", "false"] and (
c.default_value is None or isinstance(c.default_value, bool)
):
c.value = True if str(c.value).lower() == "true" else False
elif isinstance(
Config.get_config(plugin.model, c.key, c.value), int
) or isinstance(c.default_value, int):
c.value = int(c.value)
elif isinstance(
Config.get_config(plugin.model, c.key, c.value), float
) or isinstance(c.default_value, float):
c.value = float(c.value)
elif isinstance(c.value, str) and (
isinstance(Config.get_config(plugin.model, c.key, c.value), list)
or isinstance(c.default_value, list)
):
c.value = c.value.split(",")
Config.set_config(plugin.model, c.key, c.value)
Config.save(None, True)
else:
if plugin.plugin_settings:
for key, value in plugin.plugin_settings:
plugins2settings_manager.set_module_data(plugin.model, key, value)
if plugin.plugin_manager:
for key, value in plugin.plugin_manager:
plugins_manager.set_module_data(plugin.model, key, value)
except Exception as e:
logger.error(
f"WEB_UI POST /webui/plugins model{plugin.model} 发生错误 {type(e)}{e}"
)
return Result(
code=500,
data=f"WEB_UI POST /webui/plugins model{plugin.model} 发生错误 {type(e)}{e}",
)
return Result(code=200)

View File

@ -0,0 +1,65 @@
from utils.manager import requests_manager
from ..auth import token_to_user, Depends, User
from utils.utils import get_bot
from models.group_info import GroupInfo
from ..config import *
@app.get("/webui/request")
def _(type_: Optional[str], user: User = Depends(token_to_user)) -> Result:
req_data = requests_manager.get_data()
req_list = []
if type_ in ["group", "private"]:
req_data = req_data[type_]
for x in req_data:
req_data[x]["oid"] = x
req_list.append(RequestResult(**req_data[x]))
req_list.reverse()
return Result(code=200, data=req_list)
@app.delete("/webui/request")
def _(type_: Optional[str], user: User = Depends(token_to_user)) -> Result:
"""
清空请求
:param type_: 类型
"""
requests_manager.clear(type_)
return Result(code=200)
@app.post("/webui/request")
async def _(parma: RequestParma, user: User = Depends(token_to_user)) -> Result:
"""
操作请求
:param parma: 参数
"""
result = "ok"
flag = 3
if bot := get_bot():
if parma.handle == "approve":
if parma.type == "group":
rid = requests_manager.get_group_id(parma.id)
if await GroupInfo.get_group_info(rid):
await GroupInfo.set_group_flag(rid, 1)
else:
group_info = await bot.get_group_info(group_id=rid)
await GroupInfo.add_group_info(
rid,
group_info["group_name"],
group_info["max_member_count"],
group_info["member_count"],
1,
)
flag = await requests_manager.approve(bot, parma.id, parma.type)
elif parma.handle == "refuse":
flag = await requests_manager.refused(bot, parma.id, parma.type)
elif parma.handle == "delete":
requests_manager.delete_request(parma.id, parma.type)
if parma.handle != "delete":
if flag == 1:
result = "该请求已失效"
requests_manager.delete_request(parma.id, parma.type)
elif flag == 2:
result = "未找到此Id"
return Result(code=200, data=result)

View File

@ -57,10 +57,19 @@ class PluginSettings(BaseModel):
plugin_type: Optional[List[Union[str, int]]] # 帮助类型
class PluginConfig(BaseModel):
id: int
key: str
value: Optional[Any]
help_: Optional[str]
default_value: Optional[Any]
class Plugin(BaseModel):
model: str # 模块
plugin_settings: Optional[PluginSettings]
plugin_manager: Optional[PluginManager]
plugin_config: Optional[List[PluginConfig]]
cd_limit: Optional[CdLimit]
block_limit: Optional[BlockLimit]
count_limit: Optional[CountLimit]

View File

@ -85,7 +85,7 @@ class RequestManager(StaticData):
return data["invite_group"]
return None
async def approve(self, bot: Bot, id_: int, type_: str) -> Optional[int]:
async def approve(self, bot: Bot, id_: int, type_: str) -> int:
"""
同意请求
:param bot: Bot
@ -232,7 +232,7 @@ class RequestManager(StaticData):
async def _set_add_request(
self, bot: Bot, id_: int, type_: str, approve: bool
) -> Optional[int]:
) -> int:
"""
处理请求
:param bot: Bot
@ -260,12 +260,13 @@ class RequestManager(StaticData):
f"同意{self._data[type_][id_]['nickname']}({self._data[type_][id_]['id']})"
f"{'好友' if type_ == 'private' else '入群'}请求失败了..."
)
return None
logger.info(
f"{'同意' if approve else '拒绝'}{self._data[type_][id_]['nickname']}({self._data[type_][id_]['id']})"
f"{'好友' if type_ == 'private' else '入群'}请求..."
)
return 1 # flag失效
else:
logger.info(
f"{'同意' if approve else '拒绝'}{self._data[type_][id_]['nickname']}({self._data[type_][id_]['id']})"
f"{'好友' if type_ == 'private' else '入群'}请求..."
)
del self._data[type_][id_]
self.save()
return rid
return None
return 2 # 未找到id