mirror of
https://github.com/zhenxun-org/zhenxun_bot.git
synced 2025-12-15 06:12:53 +08:00
147 lines
5.5 KiB
Python
147 lines
5.5 KiB
Python
from nonebot.matcher import Matcher
|
|
from nonebot.message import run_preprocessor, run_postprocessor, IgnoredException
|
|
from nonebot.adapters.cqhttp.exception import ActionFailed
|
|
from models.friend_user import FriendUser
|
|
from models.group_member_info import GroupInfoUser
|
|
from utils.message_builder import at
|
|
from .utils import status_message_manager, set_block_limit_false
|
|
from utils.manager import (
|
|
plugins2cd_manager,
|
|
plugins2block_manager,
|
|
plugins2count_manager,
|
|
)
|
|
from typing import Optional
|
|
from nonebot.typing import T_State
|
|
from nonebot.adapters.cqhttp import (
|
|
Bot,
|
|
Event,
|
|
MessageEvent,
|
|
PrivateMessageEvent,
|
|
GroupMessageEvent,
|
|
Message,
|
|
)
|
|
|
|
|
|
# 命令cd | 命令阻塞 | 命令次数
|
|
@run_preprocessor
|
|
async def _(matcher: Matcher, bot: Bot, event: MessageEvent, state: T_State):
|
|
if not isinstance(event, MessageEvent) and matcher.module != "poke":
|
|
return
|
|
module = matcher.module
|
|
if (
|
|
isinstance(event, GroupMessageEvent)
|
|
and status_message_manager.get(event.group_id) is None
|
|
):
|
|
status_message_manager.delete(event.group_id)
|
|
# Count
|
|
if (
|
|
plugins2count_manager.check_plugin_count_status(module)
|
|
and event.user_id not in bot.config.superusers
|
|
):
|
|
plugin_count_data = plugins2count_manager.get_plugin_count_data(module)
|
|
limit_type = plugin_count_data["limit_type"]
|
|
rst = plugin_count_data["rst"]
|
|
count_type_ = event.user_id
|
|
if limit_type == "group" and isinstance(event, GroupMessageEvent):
|
|
count_type_ = event.group_id
|
|
if not plugins2count_manager.check(module, count_type_):
|
|
if rst:
|
|
rst = await init_rst(rst, event)
|
|
await send_msg(rst, bot, event)
|
|
raise IgnoredException(f"{module} count次数限制...")
|
|
else:
|
|
plugins2count_manager.increase(module, count_type_)
|
|
# Cd
|
|
if plugins2cd_manager.check_plugin_cd_status(module):
|
|
plugin_cd_data = plugins2cd_manager.get_plugin_cd_data(module)
|
|
check_type = plugin_cd_data["check_type"]
|
|
limit_type = plugin_cd_data["limit_type"]
|
|
rst = plugin_cd_data["rst"]
|
|
if (
|
|
(isinstance(event, PrivateMessageEvent) and check_type == "private")
|
|
or (isinstance(event, GroupMessageEvent) and check_type == "group")
|
|
or plugins2cd_manager.get_plugin_data(module).get("check_type") == "all"
|
|
):
|
|
cd_type_ = event.user_id
|
|
if limit_type == "group" and isinstance(event, GroupMessageEvent):
|
|
cd_type_ = event.group_id
|
|
if not plugins2cd_manager.check(module, cd_type_):
|
|
if rst:
|
|
rst = await init_rst(rst, event)
|
|
await send_msg(rst, bot, event)
|
|
raise IgnoredException(f"{module} 正在cd中...")
|
|
else:
|
|
plugins2cd_manager.start_cd(module, cd_type_)
|
|
# Block
|
|
if plugins2block_manager.check_plugin_block_status(module):
|
|
plugin_block_data = plugins2block_manager.get_plugin_block_data(module)
|
|
check_type = plugin_block_data["check_type"]
|
|
limit_type = plugin_block_data["limit_type"]
|
|
rst = plugin_block_data["rst"]
|
|
if (
|
|
(isinstance(event, PrivateMessageEvent) and check_type == "private")
|
|
or (isinstance(event, GroupMessageEvent) and check_type == "group")
|
|
or check_type == "all"
|
|
):
|
|
block_type_ = event.user_id
|
|
if limit_type == "group" and isinstance(event, GroupMessageEvent):
|
|
block_type_ = event.group_id
|
|
if plugins2block_manager.check(block_type_, module):
|
|
if rst:
|
|
rst = await init_rst(rst, event)
|
|
await send_msg(rst, bot, event)
|
|
raise IgnoredException(f"{event.user_id}正在调用{module}....")
|
|
else:
|
|
plugins2block_manager.set_true(block_type_, module)
|
|
|
|
|
|
async def send_msg(rst: str, bot: Bot, event: MessageEvent):
|
|
"""
|
|
发送信息
|
|
:param rst: pass
|
|
:param bot: pass
|
|
:param event: pass
|
|
"""
|
|
rst = await init_rst(rst, event)
|
|
try:
|
|
if isinstance(event, GroupMessageEvent):
|
|
status_message_manager.add(event.group_id)
|
|
await bot.send_group_msg(group_id=event.group_id, message=Message(rst))
|
|
else:
|
|
status_message_manager.add(event.user_id)
|
|
await bot.send_private_msg(user_id=event.user_id, message=Message(rst))
|
|
except ActionFailed:
|
|
pass
|
|
|
|
|
|
# 解除命令block阻塞
|
|
@run_postprocessor
|
|
async def _(
|
|
matcher: Matcher,
|
|
exception: Optional[Exception],
|
|
bot: Bot,
|
|
event: Event,
|
|
state: T_State,
|
|
):
|
|
if not isinstance(event, MessageEvent) and matcher.module != "poke":
|
|
return
|
|
module = matcher.module
|
|
set_block_limit_false(event, module)
|
|
|
|
|
|
async def init_rst(rst: str, event: MessageEvent):
|
|
if "[uname]" in rst:
|
|
uname = event.sender.card if event.sender.card else event.sender.nickname
|
|
rst = rst.replace("[uname]", uname)
|
|
if "[nickname]" in rst:
|
|
if isinstance(event, GroupMessageEvent):
|
|
nickname = await GroupInfoUser.get_group_member_nickname(
|
|
event.user_id, event.group_id
|
|
)
|
|
else:
|
|
nickname = await FriendUser.get_friend_nickname(event.user_id)
|
|
rst = rst.replace("[nickname]", nickname)
|
|
if "[at]" in rst and isinstance(event, GroupMessageEvent):
|
|
rst = rst.replace("[at]", str(at(event.user_id)))
|
|
return rst
|