🐛 恶意命令检测问题

This commit is contained in:
AkashiCoin 2024-09-02 11:52:44 +08:00
parent f92ede4cb8
commit 4cfd884916
3 changed files with 92 additions and 14 deletions

View File

@ -371,7 +371,7 @@ async def test_remove_plugin(
assert not (mock_base_path / "plugins" / "search_image" / "__init__.py").is_file() assert not (mock_base_path / "plugins" / "search_image" / "__init__.py").is_file()
async def test_plugin_not_exist( async def test_plugin_not_exist_add(
app: App, app: App,
mocker: MockerFixture, mocker: MockerFixture,
mocked_api: MockRouter, mocked_api: MockRouter,
@ -379,12 +379,12 @@ async def test_plugin_not_exist(
tmp_path: Path, tmp_path: Path,
) -> None: ) -> None:
""" """
测试插件不存在 测试插件不存在添加插件
""" """
from zhenxun.builtin_plugins.plugin_store import _matcher from zhenxun.builtin_plugins.plugin_store import _matcher
init_mocked_api(mocked_api=mocked_api) init_mocked_api(mocked_api=mocked_api)
plugin_id = 10 plugin_id = -1
async with app.test_matcher(_matcher) as ctx: async with app.test_matcher(_matcher) as ctx:
bot = create_bot(ctx) bot = create_bot(ctx)
@ -412,13 +412,32 @@ async def test_plugin_not_exist(
bot=bot, bot=bot,
) )
async def test_plugin_not_exist_update(
app: App,
mocker: MockerFixture,
mocked_api: MockRouter,
create_bot: Callable,
tmp_path: Path,
) -> None:
"""
测试插件不存在更新插件
"""
from zhenxun.builtin_plugins.plugin_store import _matcher
init_mocked_api(mocked_api=mocked_api)
plugin_id = -1
async with app.test_matcher(_matcher) as ctx:
bot = create_bot(ctx)
bot: Bot = cast(Bot, bot)
raw_message = f"更新插件 {plugin_id}" raw_message = f"更新插件 {plugin_id}"
event: GroupMessageEvent = _v11_group_message_event( event: GroupMessageEvent = _v11_group_message_event(
message=raw_message, message=raw_message,
self_id=BotId.QQ_BOT, self_id=BotId.QQ_BOT,
user_id=UserId.SUPERUSER, user_id=UserId.SUPERUSER,
group_id=GroupId.GROUP_ID_LEVEL_5, group_id=GroupId.GROUP_ID_LEVEL_5,
message_id=MessageId.MESSAGE_ID, message_id=MessageId.MESSAGE_ID_2,
to_me=True, to_me=True,
) )
ctx.receive_event(bot=bot, event=event) ctx.receive_event(bot=bot, event=event)
@ -434,13 +453,69 @@ async def test_plugin_not_exist(
result=None, result=None,
bot=bot, bot=bot,
) )
async def test_plugin_not_exist_remove(
app: App,
mocker: MockerFixture,
mocked_api: MockRouter,
create_bot: Callable,
tmp_path: Path,
) -> None:
"""
测试插件不存在移除插件
"""
from zhenxun.builtin_plugins.plugin_store import _matcher
init_mocked_api(mocked_api=mocked_api)
plugin_id = -1
async with app.test_matcher(_matcher) as ctx:
bot = create_bot(ctx)
bot: Bot = cast(Bot, bot)
raw_message = f"移除插件 {plugin_id}"
event: GroupMessageEvent = _v11_group_message_event(
message=raw_message,
self_id=BotId.QQ_BOT,
user_id=UserId.SUPERUSER,
group_id=GroupId.GROUP_ID_LEVEL_5,
message_id=MessageId.MESSAGE_ID_2,
to_me=True,
)
ctx.receive_event(bot=bot, event=event)
ctx.should_call_send(
event=event,
message=Message(message="插件ID不存在..."),
result=None,
bot=bot,
)
async def test_plugin_not_exist_search(
app: App,
mocker: MockerFixture,
mocked_api: MockRouter,
create_bot: Callable,
tmp_path: Path,
) -> None:
"""
测试插件不存在搜索插件
"""
from zhenxun.builtin_plugins.plugin_store import _matcher
init_mocked_api(mocked_api=mocked_api)
plugin_id = -1
async with app.test_matcher(_matcher) as ctx:
bot = create_bot(ctx)
bot: Bot = cast(Bot, bot)
raw_message = f"搜索插件 {plugin_id}" raw_message = f"搜索插件 {plugin_id}"
event: GroupMessageEvent = _v11_group_message_event( event: GroupMessageEvent = _v11_group_message_event(
message=raw_message, message=raw_message,
self_id=BotId.QQ_BOT, self_id=BotId.QQ_BOT,
user_id=UserId.SUPERUSER, user_id=UserId.SUPERUSER,
group_id=GroupId.GROUP_ID_LEVEL_5, group_id=GroupId.GROUP_ID_LEVEL_5,
message_id=MessageId.MESSAGE_ID, message_id=MessageId.MESSAGE_ID_3,
to_me=True, to_me=True,
) )
ctx.receive_event(bot=bot, event=event) ctx.receive_event(bot=bot, event=event)

View File

@ -18,3 +18,5 @@ class GroupId:
class MessageId: class MessageId:
MESSAGE_ID = 30001 MESSAGE_ID = 30001
MESSAGE_ID_2 = 30002 MESSAGE_ID_2 = 30002
MESSAGE_ID_3 = 30003
MESSAGE_ID_4 = 30004

View File

@ -2,19 +2,19 @@ import time
from collections import defaultdict from collections import defaultdict
from nonebot.adapters import Event from nonebot.adapters import Event
from nonebot.adapters.onebot.v11 import Bot
from nonebot.exception import IgnoredException
from nonebot.matcher import Matcher
from nonebot.message import run_preprocessor
from nonebot.typing import T_State from nonebot.typing import T_State
from nonebot.matcher import Matcher
from nonebot_plugin_alconna import At from nonebot_plugin_alconna import At
from nonebot.adapters.onebot.v11 import Bot
from nonebot.message import run_preprocessor
from nonebot.exception import IgnoredException
from nonebot_plugin_session import EventSession from nonebot_plugin_session import EventSession
from zhenxun.configs.config import Config
from zhenxun.models.ban_console import BanConsole
from zhenxun.services.log import logger from zhenxun.services.log import logger
from zhenxun.configs.config import Config
from zhenxun.utils.enum import PluginType from zhenxun.utils.enum import PluginType
from zhenxun.utils.message import MessageUtils from zhenxun.utils.message import MessageUtils
from zhenxun.models.ban_console import BanConsole
malicious_check_time = Config.get_config("hook", "MALICIOUS_CHECK_TIME") malicious_check_time = Config.get_config("hook", "MALICIOUS_CHECK_TIME")
malicious_ban_count = Config.get_config("hook", "MALICIOUS_BAN_COUNT") malicious_ban_count = Config.get_config("hook", "MALICIOUS_BAN_COUNT")
@ -36,12 +36,12 @@ class BanCheckLimiter:
self.default_check_time = default_check_time self.default_check_time = default_check_time
self.default_count = default_count self.default_count = default_count
def add(self, key: str | int | float): def add(self, key: str | float):
if self.mint[key] == 1: if self.mint[key] == 1:
self.mtime[key] = time.time() self.mtime[key] = time.time()
self.mint[key] += 1 self.mint[key] += 1
def check(self, key: str | int | float) -> bool: def check(self, key: str | float) -> bool:
if time.time() - self.mtime[key] > self.default_check_time: if time.time() - self.mtime[key] > self.default_check_time:
self.mtime[key] = time.time() self.mtime[key] = time.time()
self.mint[key] = 0 self.mint[key] = 0
@ -76,6 +76,7 @@ async def _(
PluginType.HIDDEN, PluginType.HIDDEN,
PluginType.DEPENDANT, PluginType.DEPENDANT,
PluginType.ADMIN, PluginType.ADMIN,
PluginType.SUPERUSER,
]: ]:
return return
else: else:
@ -101,7 +102,7 @@ async def _(
await MessageUtils.build_message( await MessageUtils.build_message(
[ [
At(flag="user", target=user_id), At(flag="user", target=user_id),
f"检测到恶意触发命令,您将被封禁 30 分钟", "检测到恶意触发命令,您将被封禁 30 分钟",
] ]
).send() ).send()
logger.debug( logger.debug(