zhenxun_bot/zhenxun/builtin_plugins/hooks/chkdsk_hook.py
AkashiCoin 9eca6a97ca
🐛 修复插件商店检查插件更新问题 (#1597)
* 🐛 修复插件商店检查插件更新问题

* 🐛 恶意命令检测问题
2024-09-02 12:00:27 +08:00

115 lines
3.8 KiB
Python

import time
from collections import defaultdict
from nonebot.adapters import Event
from nonebot.typing import T_State
from nonebot.matcher import Matcher
from nonebot_plugin_alconna import At
from nonebot.adapters.onebot.v11 import Bot
from nonebot.message import run_preprocessor
from nonebot.exception import IgnoredException
from nonebot_plugin_session import EventSession
from zhenxun.services.log import logger
from zhenxun.configs.config import Config
from zhenxun.utils.enum import PluginType
from zhenxun.utils.message import MessageUtils
from zhenxun.models.ban_console import BanConsole
malicious_check_time = Config.get_config("hook", "MALICIOUS_CHECK_TIME")
malicious_ban_count = Config.get_config("hook", "MALICIOUS_BAN_COUNT")
if not malicious_check_time:
raise ValueError("模块: [hook], 配置项: [MALICIOUS_CHECK_TIME] 为空或小于0")
if not malicious_ban_count:
raise ValueError("模块: [hook], 配置项: [MALICIOUS_BAN_COUNT] 为空或小于0")
class BanCheckLimiter:
"""
恶意命令触发检测
"""
def __init__(self, default_check_time: float = 5, default_count: int = 4):
self.mint = defaultdict(int)
self.mtime = defaultdict(float)
self.default_check_time = default_check_time
self.default_count = default_count
def add(self, key: str | float):
if self.mint[key] == 1:
self.mtime[key] = time.time()
self.mint[key] += 1
def check(self, key: str | float) -> bool:
if time.time() - self.mtime[key] > self.default_check_time:
self.mtime[key] = time.time()
self.mint[key] = 0
return False
if (
self.mint[key] >= self.default_count
and time.time() - self.mtime[key] < self.default_check_time
):
self.mtime[key] = time.time()
self.mint[key] = 0
return True
return False
_blmt = BanCheckLimiter(
malicious_check_time,
malicious_ban_count,
)
# 恶意触发命令检测
@run_preprocessor
async def _(
matcher: Matcher, bot: Bot, session: EventSession, state: T_State, event: Event
):
module = None
if plugin := matcher.plugin:
module = plugin.module_name
if metadata := plugin.metadata:
extra = metadata.extra
if extra.get("plugin_type") in [
PluginType.HIDDEN,
PluginType.DEPENDANT,
PluginType.ADMIN,
PluginType.SUPERUSER,
]:
return
else:
return
if matcher.type == "notice":
return
user_id = session.id1
group_id = session.id3 or session.id2
malicious_ban_time = Config.get_config("hook", "MALICIOUS_BAN_TIME")
if not malicious_ban_time:
raise ValueError("模块: [hook], 配置项: [MALICIOUS_BAN_TIME] 为空或小于0")
if user_id:
if module:
if _blmt.check(f"{user_id}__{module}"):
await BanConsole.ban(
user_id, group_id, 9, malicious_ban_time * 60, bot.self_id
)
logger.info(
f"触发了恶意触发检测: {matcher.plugin_name}",
"HOOK",
session=session,
)
await MessageUtils.build_message(
[
At(flag="user", target=user_id),
"检测到恶意触发命令,您将被封禁 30 分钟",
]
).send()
logger.debug(
f"触发了恶意触发检测: {matcher.plugin_name}",
"HOOK",
session=session,
)
raise IgnoredException("检测到恶意触发命令")
_blmt.add(f"{user_id}__{module}")