feat: 禁言检测

This commit is contained in:
HibiKier 2024-05-29 02:01:26 +08:00
parent f73b40ebdb
commit 091ae93731
7 changed files with 328 additions and 27 deletions

View File

@ -15,7 +15,7 @@ from zhenxun.models.task_info import TaskInfo
from zhenxun.services.log import logger
from zhenxun.utils.enum import PluginType
from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.utils.image_utils import get_img_hash
from zhenxun.utils.image_utils import get_download_image_hash, get_img_hash
from zhenxun.utils.rules import ensure_group
__plugin_meta__ = PluginMetadata(
@ -91,7 +91,7 @@ class Fudu:
_manage = Fudu()
_matcher = on_message(rule=ensure_group, priority=999)
@ -115,7 +115,7 @@ async def _(message: UniMsg, event: Event, session: EventSession):
if plain_text and plain_text.startswith(f"@可爱的{NICKNAME}"):
await Text("复制粘贴的虚空艾特?").send(reply=True)
if image_list:
img_hash = await get_fudu_img_hash(image_list[0], group_id)
img_hash = await get_download_image_hash(image_list[0], group_id)
else:
img_hash = ""
add_msg = plain_text + "|-|" + img_hash
@ -147,26 +147,3 @@ async def _(message: UniMsg, event: Event, session: EventSession):
rst = Text(plain_text)
if rst:
await rst.finish()
async def get_fudu_img_hash(url: str, group_id: str) -> str:
"""下载图片获取哈希值
参数:
url: 图片url
group_id: 群组id
返回:
str: 哈希值
"""
try:
if await AsyncHttpx.download_file(
url, TEMP_PATH / f"compare_{group_id}_img.jpg"
):
img_hash = get_img_hash(TEMP_PATH / f"compare_{group_id}_img.jpg")
return str(img_hash)
else:
logger.warning(f"复读下载图片失败...")
except Exception as e:
logger.warning(f"复读读取图片Hash出错 {type(e)}{e}")
return ""

View File

@ -0,0 +1,5 @@
from pathlib import Path
import nonebot
nonebot.load_plugins(str(Path(__file__).parent.resolve()))

View File

@ -0,0 +1,124 @@
import time
import ujson as json
from pydantic import BaseModel
from zhenxun.configs.config import Config
from zhenxun.configs.path_config import DATA_PATH
base_config = Config.get("mute")
class GroupData(BaseModel):
count: int
"""次数"""
time: int
"""检测时长"""
duration: int
"""禁言时长"""
message_data: dict = {}
"""消息存储"""
class MuteManage:
file = DATA_PATH / "group_mute_data.json"
def __init__(self) -> None:
self._group_data: dict[str, GroupData] = {}
if self.file.exists():
_data = json.load(open(self.file))
for gid in _data:
self._group_data[gid] = GroupData(
count=_data[gid]["count"],
time=_data[gid]["time"],
duration=_data[gid]["duration"],
)
def get_group_data(self, group_id: str) -> GroupData:
"""获取群组数据
参数:
group_id: 群组id
返回:
GroupData: GroupData
"""
if group_id not in self._group_data:
self._group_data[group_id] = GroupData(
count=base_config.get("MUTE_DEFAULT_COUNT"),
time=base_config.get("MUTE_DEFAULT_TIME"),
duration=base_config.get("MUTE_DEFAULT_DURATION"),
)
return self._group_data[group_id]
def reset(self, user_id: str, group_id: str):
"""重置用户检查次数
参数:
user_id: 用户id
group_id: 群组id
"""
if group_data := self._group_data.get(group_id):
if user_id in group_data.message_data:
group_data.message_data[user_id]["count"] = 0
def save_data(self):
"""保存数据"""
data = {}
for gid in self._group_data:
data[gid] = {
"count": self._group_data[gid].count,
"time": self._group_data[gid].time,
"duration": self._group_data[gid].duration,
}
with open(self.file, "w") as f:
json.dump(data, f, indent=4, ensure_ascii=False)
def add_message(self, user_id: str, group_id: str, message: str) -> int:
"""添加消息
参数:
user_id: 用户id
group_id: 群组id
message: 消息内容
返回:
int: 禁言时长
"""
if group_id not in self._group_data:
self._group_data[group_id] = GroupData(
count=base_config.get("MUTE_DEFAULT_COUNT"),
time=base_config.get("MUTE_DEFAULT_TIME"),
duration=base_config.get("MUTE_DEFAULT_DURATION"),
)
group_data = self._group_data[group_id]
if group_data.duration == 0:
return 0
message_data = group_data.message_data
if not message_data.get(user_id):
message_data[user_id] = {
"time": time.time(),
"count": 1,
"message": message,
}
else:
if message.find(message_data[user_id]["message"]) != -1:
message_data[user_id]["count"] += 1
else:
message_data[user_id]["time"] = time.time()
message_data[user_id]["count"] = 1
message_data[user_id]["message"] = message
if time.time() - message_data[user_id]["time"] > group_data.time:
message_data[user_id]["time"] = time.time()
message_data[user_id]["count"] = 1
if (
message_data[user_id]["count"] > group_data.count
and time.time() - message_data[user_id]["time"] < group_data.time
):
return group_data.duration
return 0
mute_manage = MuteManage()

View File

@ -0,0 +1,38 @@
from nonebot import on_message
from nonebot.adapters import Bot
from nonebot_plugin_alconna import Image as alcImage
from nonebot_plugin_alconna import UniMsg
from nonebot_plugin_saa import Text
from nonebot_plugin_session import EventSession
from zhenxun.configs.config import NICKNAME
from zhenxun.services.log import logger
from zhenxun.utils.image_utils import get_download_image_hash
from zhenxun.utils.platform import PlatformUtils
from ._data_source import mute_manage
_matcher = on_message(priority=1, block=False)
@_matcher.handle()
async def _(bot: Bot, session: EventSession, message: UniMsg):
group_id = session.id2
if not session.id1 or not group_id:
return
plain_text = message.extract_plain_text()
image_list = [m.url for m in message if isinstance(m, alcImage) and m.url]
img_hash = ""
for url in image_list:
img_hash += await get_download_image_hash(url, "_mute_")
_message = plain_text + img_hash
if duration := mute_manage.add_message(session.id1, group_id, _message):
try:
await PlatformUtils.ban_user(bot, session.id1, group_id, duration)
await Text(f"检测到恶意刷屏,{NICKNAME}要把你关进小黑屋!").send(
at_sender=True
)
mute_manage.reset(session.id1, group_id)
logger.info(f"检测刷屏 被禁言 {duration} 分钟", "禁言检查", session=session)
except Exception as e:
logger.error("禁言发送错误", "禁言检测", session=session, e=e)

View File

@ -0,0 +1,117 @@
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import Alconna, Args, Arparma, Match, Option, on_alconna
from nonebot_plugin_saa import Text
from nonebot_plugin_session import EventSession
from zhenxun.configs.config import NICKNAME
from zhenxun.configs.utils import PluginExtraData, RegisterConfig
from zhenxun.services.log import logger
from zhenxun.utils.enum import PluginType
from zhenxun.utils.rules import ensure_group
from ._data_source import base_config, mute_manage
__plugin_meta__ = PluginMetadata(
name="刷屏禁言",
description="刷屏禁言相关操作",
usage="""
刷屏禁言相关操作需要 {NICKNAME} 有群管理员权限
指令
设置刷屏: 查看当前设置
-c [count]: 检测最大次数
-t [time]: 规定时间内
-d [duration]: 禁言时长
示例:
设置刷屏 -c 10: 设置最大次数为10
设置刷屏 -t 100 -d 20: 设置规定时间和禁言时长
设置刷屏 -d 10: 设置禁言时长为10
* X 秒内发送同样消息 N 禁言 M 分钟 *
""".strip(),
extra=PluginExtraData(
author="HibiKier",
version="0.1",
menu_type="其他",
plugin_type=PluginType.ADMIN,
admin_level=base_config.get("MUTE_LEVEL", 5),
configs=[
RegisterConfig(
key="MUTE_LEVEL",
value=5,
help="更改禁言设置的管理权限",
default_value=5,
type=int,
),
RegisterConfig(
key="MUTE_DEFAULT_COUNT",
value=10,
help="刷屏禁言默认检测次数",
default_value=10,
type=int,
),
RegisterConfig(
key="MUTE_DEFAULT_TIME",
value=7,
help="刷屏检测默认规定时间",
default_value=7,
type=int,
),
RegisterConfig(
key="MUTE_DEFAULT_DURATION",
value=10,
help="刷屏检测默禁言时长(分钟)",
default_value=10,
type=int,
),
],
).dict(),
)
_setting_matcher = on_alconna(
Alconna(
"刷屏设置",
Option("-t|--time", Args["time", int], help_text="检测时长"),
Option("-c|--count", Args["count", int], help_text="检测次数"),
Option("-d|--duration", Args["duration", int], help_text="禁言时长"),
),
rule=ensure_group,
block=True,
priority=5,
)
@_setting_matcher.handle()
async def _(
session: EventSession,
arparma: Arparma,
time: Match[int],
count: Match[int],
duration: Match[int],
):
group_id = session.id2
if not session.id1 or not group_id:
return
_time = time.result if time.available else None
_count = count.result if count.available else None
_duration = duration.result if duration.available else None
group_data = mute_manage.get_group_data(group_id)
if _time is None and _count is None and _duration is None:
await Text(
f"最大次数:{group_data.count}\n"
f"规定时间:{group_data.time}\n"
f"禁言时长:{group_data.duration:.2f} 分钟\n"
f"【在规定时间内发送相同消息超过最大次数则禁言\n当禁言时长为0时关闭此功能】"
).finish(reply=True)
if _time is not None:
group_data.time = _time
if _count is not None:
group_data.count = _count
if _duration is not None:
group_data.duration = _duration
await Text("设置成功!").send(reply=True)
logger.info(
f"设置禁言配置 time: {_time}, count: {_count}, duration: {_duration}",
arparma.header_result,
session=session,
)
mute_manage.save_data()

View File

@ -10,7 +10,9 @@ from imagehash import ImageHash
from nonebot.utils import is_coroutine_callable
from PIL import Image
from zhenxun.configs.path_config import IMAGE_PATH
from zhenxun.configs.path_config import IMAGE_PATH, TEMP_PATH
from zhenxun.services.log import logger
from zhenxun.utils.http_utils import AsyncHttpx
from ._build_image import BuildImage, ColorAlias
from ._build_mat import BuildMat, MatType
@ -381,3 +383,24 @@ def get_img_hash(image_file: str | Path) -> str:
with open(image_file, "rb") as fp:
hash_value = imagehash.average_hash(Image.open(fp))
return str(hash_value)
async def get_download_image_hash(url: str, mark: str) -> str:
"""下载图片获取哈希值
参数:
url: 图片url
mark: 随机标志符
返回:
str: 哈希值
"""
try:
if await AsyncHttpx.download_file(
url, TEMP_PATH / f"compare_download_{mark}_img.jpg"
):
img_hash = get_img_hash(TEMP_PATH / f"compare_download_{mark}_img.jpg")
return str(img_hash)
except Exception as e:
logger.warning(f"下载读取图片Hash出错", e=e)
return ""

View File

@ -50,6 +50,23 @@ class UserData(BaseModel):
class PlatformUtils:
@classmethod
async def ban_user(cls, bot: Bot, user_id: str, group_id: str, duration: int):
"""禁言
参数:
bot: Bot
user_id: 用户id
group_id: 群组id
duration: 禁言时长(分钟)
"""
if isinstance(bot, v11Bot):
await bot.set_group_ban(
group_id=int(group_id),
user_id=int(user_id),
duration=duration * 60,
)
@classmethod
async def send_superuser(
cls,