diff --git a/zhenxun/plugins/fudu.py b/zhenxun/plugins/fudu.py index bb00565d..db1c0766 100644 --- a/zhenxun/plugins/fudu.py +++ b/zhenxun/plugins/fudu.py @@ -15,7 +15,7 @@ from zhenxun.models.task_info import TaskInfo from zhenxun.services.log import logger from zhenxun.utils.enum import PluginType from zhenxun.utils.http_utils import AsyncHttpx -from zhenxun.utils.image_utils import get_img_hash +from zhenxun.utils.image_utils import get_download_image_hash, get_img_hash from zhenxun.utils.rules import ensure_group __plugin_meta__ = PluginMetadata( @@ -91,7 +91,7 @@ class Fudu: _manage = Fudu() - + _matcher = on_message(rule=ensure_group, priority=999) @@ -115,7 +115,7 @@ async def _(message: UniMsg, event: Event, session: EventSession): if plain_text and plain_text.startswith(f"@可爱的{NICKNAME}"): await Text("复制粘贴的虚空艾特?").send(reply=True) if image_list: - img_hash = await get_fudu_img_hash(image_list[0], group_id) + img_hash = await get_download_image_hash(image_list[0], group_id) else: img_hash = "" add_msg = plain_text + "|-|" + img_hash @@ -147,26 +147,3 @@ async def _(message: UniMsg, event: Event, session: EventSession): rst = Text(plain_text) if rst: await rst.finish() - - -async def get_fudu_img_hash(url: str, group_id: str) -> str: - """下载图片获取哈希值 - - 参数: - url: 图片url - group_id: 群组id - - 返回: - str: 哈希值 - """ - try: - if await AsyncHttpx.download_file( - url, TEMP_PATH / f"compare_{group_id}_img.jpg" - ): - img_hash = get_img_hash(TEMP_PATH / f"compare_{group_id}_img.jpg") - return str(img_hash) - else: - logger.warning(f"复读下载图片失败...") - except Exception as e: - logger.warning(f"复读读取图片Hash出错 {type(e)}:{e}") - return "" diff --git a/zhenxun/plugins/mute/__init__.py b/zhenxun/plugins/mute/__init__.py new file mode 100644 index 00000000..eb35e275 --- /dev/null +++ b/zhenxun/plugins/mute/__init__.py @@ -0,0 +1,5 @@ +from pathlib import Path + +import nonebot + +nonebot.load_plugins(str(Path(__file__).parent.resolve())) diff --git a/zhenxun/plugins/mute/_data_source.py b/zhenxun/plugins/mute/_data_source.py new file mode 100644 index 00000000..7c03123e --- /dev/null +++ b/zhenxun/plugins/mute/_data_source.py @@ -0,0 +1,124 @@ +import time + +import ujson as json +from pydantic import BaseModel + +from zhenxun.configs.config import Config +from zhenxun.configs.path_config import DATA_PATH + +base_config = Config.get("mute") + + +class GroupData(BaseModel): + + count: int + """次数""" + time: int + """检测时长""" + duration: int + """禁言时长""" + message_data: dict = {} + """消息存储""" + + +class MuteManage: + + file = DATA_PATH / "group_mute_data.json" + + def __init__(self) -> None: + self._group_data: dict[str, GroupData] = {} + if self.file.exists(): + _data = json.load(open(self.file)) + for gid in _data: + self._group_data[gid] = GroupData( + count=_data[gid]["count"], + time=_data[gid]["time"], + duration=_data[gid]["duration"], + ) + + def get_group_data(self, group_id: str) -> GroupData: + """获取群组数据 + + 参数: + group_id: 群组id + + 返回: + GroupData: GroupData + """ + if group_id not in self._group_data: + self._group_data[group_id] = GroupData( + count=base_config.get("MUTE_DEFAULT_COUNT"), + time=base_config.get("MUTE_DEFAULT_TIME"), + duration=base_config.get("MUTE_DEFAULT_DURATION"), + ) + return self._group_data[group_id] + + def reset(self, user_id: str, group_id: str): + """重置用户检查次数 + + 参数: + user_id: 用户id + group_id: 群组id + """ + if group_data := self._group_data.get(group_id): + if user_id in group_data.message_data: + group_data.message_data[user_id]["count"] = 0 + + def save_data(self): + """保存数据""" + data = {} + for gid in self._group_data: + data[gid] = { + "count": self._group_data[gid].count, + "time": self._group_data[gid].time, + "duration": self._group_data[gid].duration, + } + with open(self.file, "w") as f: + json.dump(data, f, indent=4, ensure_ascii=False) + + def add_message(self, user_id: str, group_id: str, message: str) -> int: + """添加消息 + + 参数: + user_id: 用户id + group_id: 群组id + message: 消息内容 + + 返回: + int: 禁言时长 + """ + if group_id not in self._group_data: + self._group_data[group_id] = GroupData( + count=base_config.get("MUTE_DEFAULT_COUNT"), + time=base_config.get("MUTE_DEFAULT_TIME"), + duration=base_config.get("MUTE_DEFAULT_DURATION"), + ) + group_data = self._group_data[group_id] + if group_data.duration == 0: + return 0 + message_data = group_data.message_data + if not message_data.get(user_id): + message_data[user_id] = { + "time": time.time(), + "count": 1, + "message": message, + } + else: + if message.find(message_data[user_id]["message"]) != -1: + message_data[user_id]["count"] += 1 + else: + message_data[user_id]["time"] = time.time() + message_data[user_id]["count"] = 1 + message_data[user_id]["message"] = message + if time.time() - message_data[user_id]["time"] > group_data.time: + message_data[user_id]["time"] = time.time() + message_data[user_id]["count"] = 1 + if ( + message_data[user_id]["count"] > group_data.count + and time.time() - message_data[user_id]["time"] < group_data.time + ): + return group_data.duration + return 0 + + +mute_manage = MuteManage() diff --git a/zhenxun/plugins/mute/mute_message.py b/zhenxun/plugins/mute/mute_message.py new file mode 100644 index 00000000..401b2fc5 --- /dev/null +++ b/zhenxun/plugins/mute/mute_message.py @@ -0,0 +1,38 @@ +from nonebot import on_message +from nonebot.adapters import Bot +from nonebot_plugin_alconna import Image as alcImage +from nonebot_plugin_alconna import UniMsg +from nonebot_plugin_saa import Text +from nonebot_plugin_session import EventSession + +from zhenxun.configs.config import NICKNAME +from zhenxun.services.log import logger +from zhenxun.utils.image_utils import get_download_image_hash +from zhenxun.utils.platform import PlatformUtils + +from ._data_source import mute_manage + +_matcher = on_message(priority=1, block=False) + + +@_matcher.handle() +async def _(bot: Bot, session: EventSession, message: UniMsg): + group_id = session.id2 + if not session.id1 or not group_id: + return + plain_text = message.extract_plain_text() + image_list = [m.url for m in message if isinstance(m, alcImage) and m.url] + img_hash = "" + for url in image_list: + img_hash += await get_download_image_hash(url, "_mute_") + _message = plain_text + img_hash + if duration := mute_manage.add_message(session.id1, group_id, _message): + try: + await PlatformUtils.ban_user(bot, session.id1, group_id, duration) + await Text(f"检测到恶意刷屏,{NICKNAME}要把你关进小黑屋!").send( + at_sender=True + ) + mute_manage.reset(session.id1, group_id) + logger.info(f"检测刷屏 被禁言 {duration} 分钟", "禁言检查", session=session) + except Exception as e: + logger.error("禁言发送错误", "禁言检测", session=session, e=e) diff --git a/zhenxun/plugins/mute/mute_setting.py b/zhenxun/plugins/mute/mute_setting.py new file mode 100644 index 00000000..96237f4a --- /dev/null +++ b/zhenxun/plugins/mute/mute_setting.py @@ -0,0 +1,117 @@ +from nonebot.plugin import PluginMetadata +from nonebot_plugin_alconna import Alconna, Args, Arparma, Match, Option, on_alconna +from nonebot_plugin_saa import Text +from nonebot_plugin_session import EventSession + +from zhenxun.configs.config import NICKNAME +from zhenxun.configs.utils import PluginExtraData, RegisterConfig +from zhenxun.services.log import logger +from zhenxun.utils.enum import PluginType +from zhenxun.utils.rules import ensure_group + +from ._data_source import base_config, mute_manage + +__plugin_meta__ = PluginMetadata( + name="刷屏禁言", + description="刷屏禁言相关操作", + usage=""" + 刷屏禁言相关操作,需要 {NICKNAME} 有群管理员权限 + 指令: + 设置刷屏: 查看当前设置 + -c [count]: 检测最大次数 + -t [time]: 规定时间内 + -d [duration]: 禁言时长 + 示例: + 设置刷屏 -c 10: 设置最大次数为10 + 设置刷屏 -t 100 -d 20: 设置规定时间和禁言时长 + 设置刷屏 -d 10: 设置禁言时长为10 + * 即 X 秒内发送同样消息 N 次,禁言 M 分钟 * + """.strip(), + extra=PluginExtraData( + author="HibiKier", + version="0.1", + menu_type="其他", + plugin_type=PluginType.ADMIN, + admin_level=base_config.get("MUTE_LEVEL", 5), + configs=[ + RegisterConfig( + key="MUTE_LEVEL", + value=5, + help="更改禁言设置的管理权限", + default_value=5, + type=int, + ), + RegisterConfig( + key="MUTE_DEFAULT_COUNT", + value=10, + help="刷屏禁言默认检测次数", + default_value=10, + type=int, + ), + RegisterConfig( + key="MUTE_DEFAULT_TIME", + value=7, + help="刷屏检测默认规定时间", + default_value=7, + type=int, + ), + RegisterConfig( + key="MUTE_DEFAULT_DURATION", + value=10, + help="刷屏检测默禁言时长(分钟)", + default_value=10, + type=int, + ), + ], + ).dict(), +) + + +_setting_matcher = on_alconna( + Alconna( + "刷屏设置", + Option("-t|--time", Args["time", int], help_text="检测时长"), + Option("-c|--count", Args["count", int], help_text="检测次数"), + Option("-d|--duration", Args["duration", int], help_text="禁言时长"), + ), + rule=ensure_group, + block=True, + priority=5, +) + + +@_setting_matcher.handle() +async def _( + session: EventSession, + arparma: Arparma, + time: Match[int], + count: Match[int], + duration: Match[int], +): + group_id = session.id2 + if not session.id1 or not group_id: + return + _time = time.result if time.available else None + _count = count.result if count.available else None + _duration = duration.result if duration.available else None + group_data = mute_manage.get_group_data(group_id) + if _time is None and _count is None and _duration is None: + await Text( + f"最大次数:{group_data.count} 次\n" + f"规定时间:{group_data.time} 秒\n" + f"禁言时长:{group_data.duration:.2f} 分钟\n" + f"【在规定时间内发送相同消息超过最大次数则禁言\n当禁言时长为0时关闭此功能】" + ).finish(reply=True) + if _time is not None: + group_data.time = _time + if _count is not None: + group_data.count = _count + if _duration is not None: + group_data.duration = _duration + await Text("设置成功!").send(reply=True) + logger.info( + f"设置禁言配置 time: {_time}, count: {_count}, duration: {_duration}", + arparma.header_result, + session=session, + ) + mute_manage.save_data() diff --git a/zhenxun/utils/image_utils.py b/zhenxun/utils/image_utils.py index dfd03d29..b44abcde 100644 --- a/zhenxun/utils/image_utils.py +++ b/zhenxun/utils/image_utils.py @@ -10,7 +10,9 @@ from imagehash import ImageHash from nonebot.utils import is_coroutine_callable from PIL import Image -from zhenxun.configs.path_config import IMAGE_PATH +from zhenxun.configs.path_config import IMAGE_PATH, TEMP_PATH +from zhenxun.services.log import logger +from zhenxun.utils.http_utils import AsyncHttpx from ._build_image import BuildImage, ColorAlias from ._build_mat import BuildMat, MatType @@ -381,3 +383,24 @@ def get_img_hash(image_file: str | Path) -> str: with open(image_file, "rb") as fp: hash_value = imagehash.average_hash(Image.open(fp)) return str(hash_value) + + +async def get_download_image_hash(url: str, mark: str) -> str: + """下载图片获取哈希值 + + 参数: + url: 图片url + mark: 随机标志符 + + 返回: + str: 哈希值 + """ + try: + if await AsyncHttpx.download_file( + url, TEMP_PATH / f"compare_download_{mark}_img.jpg" + ): + img_hash = get_img_hash(TEMP_PATH / f"compare_download_{mark}_img.jpg") + return str(img_hash) + except Exception as e: + logger.warning(f"下载读取图片Hash出错", e=e) + return "" diff --git a/zhenxun/utils/platform.py b/zhenxun/utils/platform.py index bb5470bf..ac4844e5 100644 --- a/zhenxun/utils/platform.py +++ b/zhenxun/utils/platform.py @@ -50,6 +50,23 @@ class UserData(BaseModel): class PlatformUtils: + @classmethod + async def ban_user(cls, bot: Bot, user_id: str, group_id: str, duration: int): + """禁言 + + 参数: + bot: Bot + user_id: 用户id + group_id: 群组id + duration: 禁言时长(分钟) + """ + if isinstance(bot, v11Bot): + await bot.set_group_ban( + group_id=int(group_id), + user_id=int(user_id), + duration=duration * 60, + ) + @classmethod async def send_superuser( cls,