zhenxun_bot/plugins/mute.py

143 lines
5.5 KiB
Python
Raw Normal View History

2021-05-20 19:27:31 +08:00
from nonebot import on_message, on_command
from nonebot.adapters.cqhttp import Bot, GroupMessageEvent
from nonebot.adapters.cqhttp.permission import GROUP
2021-06-30 19:50:55 +08:00
from utils.utils import get_message_text, is_number, get_message_imgs, get_local_proxy
2021-05-20 19:27:31 +08:00
from nonebot.typing import T_State
import time
from nonebot.adapters.cqhttp.exception import ActionFailed
from configs.path_config import DATA_PATH, IMAGE_PATH
2021-07-30 21:21:51 +08:00
from utils.image_utils import get_img_hash
2021-05-20 19:27:31 +08:00
from services.log import logger
import aiohttp
import aiofiles
from configs.config import MUTE_DEFAULT_COUNT, MUTE_DEFAULT_TIME, MUTE_DEFAULT_DURATION
2021-07-30 21:21:51 +08:00
2021-05-20 19:27:31 +08:00
try:
import ujson as json
except ModuleNotFoundError:
import json
2021-07-30 21:21:51 +08:00
__plugin_name__ = "刷屏禁言"
2021-05-20 19:27:31 +08:00
2021-07-30 21:21:51 +08:00
__plugin_usage__ = "刷屏禁言检测"
2021-06-15 10:57:08 +08:00
2021-05-20 19:27:31 +08:00
mute = on_message(priority=1, block=False)
2021-07-30 21:21:51 +08:00
mute_setting = on_command(
"mute_setting",
aliases={"设置检测时间", "设置检测次数", "设置禁言时长", "刷屏检测设置"},
permission=GROUP,
block=True,
)
2021-05-20 19:27:31 +08:00
def get_data():
try:
2021-07-30 21:21:51 +08:00
with open(DATA_PATH + "group_mute_data.json", "r", encoding="utf8") as f:
2021-05-20 19:27:31 +08:00
data = json.load(f)
except (ValueError, FileNotFoundError):
data = {}
return data
def save_data():
global mute_data
2021-07-30 21:21:51 +08:00
with open(DATA_PATH + "group_mute_data.json", "w", encoding="utf8") as f:
2021-05-20 19:27:31 +08:00
json.dump(mute_data, f, indent=4)
async def download_img_and_hash(url, group_id):
async with aiohttp.ClientSession() as session:
async with session.get(url, proxy=get_local_proxy(), timeout=10) as response:
2021-07-30 21:21:51 +08:00
async with aiofiles.open(
IMAGE_PATH + f"temp/mute_{group_id}_img.jpg", "wb"
) as f:
2021-05-20 19:27:31 +08:00
await f.write(await response.read())
2021-07-30 21:21:51 +08:00
return str(get_img_hash(IMAGE_PATH + f"temp/mute_{group_id}_img.jpg"))
2021-05-20 19:27:31 +08:00
mute_dict = {}
mute_data = get_data()
@mute.handle()
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
group_id = str(event.group_id)
msg = get_message_text(event.json())
imgs = get_message_imgs(event.json())
2021-07-30 21:21:51 +08:00
img_hash = ""
2021-05-20 19:27:31 +08:00
for img in imgs:
img_hash += await download_img_and_hash(img, event.group_id)
msg += img_hash
if not mute_data.get(group_id):
mute_data[group_id] = {
2021-07-30 21:21:51 +08:00
"count": MUTE_DEFAULT_COUNT,
"time": MUTE_DEFAULT_TIME,
"duration": MUTE_DEFAULT_DURATION,
2021-05-20 19:27:31 +08:00
}
if not mute_dict.get(event.user_id):
2021-07-30 21:21:51 +08:00
mute_dict[event.user_id] = {"time": time.time(), "count": 1, "msg": msg}
2021-05-20 19:27:31 +08:00
else:
if state["_prefix"]["raw_command"] or not msg:
return
2021-07-30 21:21:51 +08:00
if msg and msg.find(mute_dict[event.user_id]["msg"]) != -1:
mute_dict[event.user_id]["count"] += 1
2021-05-20 19:27:31 +08:00
else:
2021-07-30 21:21:51 +08:00
mute_dict[event.user_id]["time"] = time.time()
mute_dict[event.user_id]["count"] = 1
mute_dict[event.user_id]["msg"] = msg
if time.time() - mute_dict[event.user_id]["time"] > mute_data[group_id]["time"]:
mute_dict[event.user_id]["time"] = time.time()
mute_dict[event.user_id]["count"] = 1
if (
mute_dict[event.user_id]["count"] > mute_data[group_id]["count"]
and time.time() - mute_dict[event.user_id]["time"]
< mute_data[group_id]["time"]
):
2021-05-20 19:27:31 +08:00
try:
2021-07-30 21:21:51 +08:00
if mute_data[group_id]["duration"] != 0:
await bot.set_group_ban(
group_id=event.group_id,
user_id=event.user_id,
duration=mute_data[group_id]["duration"],
)
await mute.send("检测到恶意刷屏,真寻要把你关进小黑屋!", at_sender=True)
mute_dict[event.user_id]["count"] = 0
logger.info(
f"USER {event.user_id} GROUP {event.group_id} "
f'检测刷屏 被禁言 {mute_data[group_id]["duration"] / 60} 分钟'
)
2021-05-20 19:27:31 +08:00
except ActionFailed:
pass
@mute_setting.handle()
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
group_id = str(event.group_id)
if not mute_data.get(group_id):
2021-07-30 21:21:51 +08:00
mute_data[group_id] = {"count": 10, "time": 7, "duration": 0}
2021-05-20 19:27:31 +08:00
msg = get_message_text(event.json())
2021-07-30 21:21:51 +08:00
if state["_prefix"]["raw_command"] == "刷屏检测设置":
await mute_setting.finish(
f'最大次数:{mute_data[group_id]["count"]}\n'
f'规定时间:{mute_data[group_id]["time"]}\n'
f'禁言时长:{mute_data[group_id]["duration"] / 60} 分钟\n'
f"【在规定时间内发送相同消息超过最大次数则禁言\n当禁言时长为0时关闭此功能】"
)
2021-05-20 19:27:31 +08:00
if not is_number(msg):
2021-07-30 21:21:51 +08:00
await mute.finish("设置的参数必须是数字啊!", at_sender=True)
if state["_prefix"]["raw_command"] == "设置检测时间":
mute_data[group_id]["time"] = int(msg)
msg += ""
if state["_prefix"]["raw_command"] == "设置检测次数":
mute_data[group_id]["count"] = int(msg)
msg += ""
if state["_prefix"]["raw_command"] == "设置禁言时长":
mute_data[group_id]["duration"] = int(msg) * 60
msg += " 分钟"
2021-05-20 19:27:31 +08:00
await mute_setting.send(f'刷屏检测:{state["_prefix"]["raw_command"]}{msg}')
2021-07-30 21:21:51 +08:00
logger.info(
f'USER {event.user_id} GROUP {group_id} {state["_prefix"]["raw_command"]}{msg}'
)
2021-05-20 19:27:31 +08:00
save_data()