mirror of
https://github.com/zhenxun-org/zhenxun_bot.git
synced 2025-12-15 06:12:53 +08:00
54 lines
1.8 KiB
Python
54 lines
1.8 KiB
Python
import re
|
|
from typing import Any, Dict
|
|
|
|
from nonebot.adapters.onebot.v11 import Bot, Message, unescape
|
|
from nonebot.exception import MockApiException
|
|
|
|
from services.log import logger
|
|
from utils.manager import group_manager
|
|
|
|
|
|
@Bot.on_calling_api
|
|
async def _(bot: Bot, api: str, data: Dict[str, Any]):
|
|
r = None
|
|
task = None
|
|
group_id = None
|
|
try:
|
|
msg = unescape(
|
|
data["message"].strip()
|
|
if isinstance(data["message"], str)
|
|
else str(data["message"]["text"]).strip()
|
|
)
|
|
if (
|
|
(
|
|
(api == "send_msg" and data.get("message_type") == "group")
|
|
or api == "send_group_msg"
|
|
)
|
|
and (
|
|
r := re.search(
|
|
"^\[\[_task\|(.*)]]",
|
|
data["message"].strip()
|
|
if isinstance(data["message"], str)
|
|
else str(data["message"]["text"]).strip(),
|
|
)
|
|
)
|
|
and r.group(1) in group_manager.get_task_data().keys()
|
|
):
|
|
task = r.group(1)
|
|
group_id = data["group_id"]
|
|
except Exception as e:
|
|
logger.error(f"TaskHook ERROR", "HOOK", e=e)
|
|
else:
|
|
if task and group_id:
|
|
if group_manager.get_group_level(
|
|
group_id
|
|
) < 0 or not group_manager.check_task_status(task, group_id):
|
|
logger.debug(f"被动技能 {task} 处于关闭状态")
|
|
raise MockApiException(f"被动技能 {task} 处于关闭状态...")
|
|
else:
|
|
msg = str(data["message"]).strip()
|
|
msg = msg.replace(f"[[_task|{task}]]", "").replace(
|
|
f"[[_task|{task}]]", ""
|
|
)
|
|
data["message"] = Message(msg)
|