zhenxun_bot/plugins/fudu.py
2021-08-17 23:17:08 +08:00

113 lines
3.7 KiB
Python

from nonebot.adapters.cqhttp.permission import GROUP
from configs.path_config import IMAGE_PATH
from utils.image_utils import get_img_hash
import random
from utils.message_builder import image
from nonebot import on_message
from utils.utils import get_message_text, get_message_imgs, get_local_proxy
from nonebot.typing import T_State
from nonebot.adapters.cqhttp import Bot, GroupMessageEvent
import aiohttp
import aiofiles
from collections import defaultdict
from asyncio.exceptions import TimeoutError
from configs.config import FUDU_PROBABILITY
class Fudu:
def __init__(self):
self.data = {}
def append(self, key, content):
self._create(key)
self.data[key]['data'].append(content)
def clear(self, key):
self._create(key)
self.data[key]['data'] = []
self.data[key]['is_repeater'] = False
def size(self, key) -> int:
self._create(key)
return len(self.data[key]['data'])
def check(self, key, content) -> bool:
self._create(key)
return self.data[key]['data'][0] == content
def get(self, key):
self._create(key)
return self.data[key]['data'][0]
def is_repeater(self, key):
self._create(key)
return self.data[key]['is_repeater']
def set_repeater(self, key):
self._create(key)
self.data[key]['is_repeater'] = True
def _create(self, key):
if self.data.get(key) is None:
self.data[key] = {'is_repeater': False, 'data': []}
_fudulist = Fudu()
fudu = on_message(permission=GROUP, priority=9)
@fudu.handle()
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
if event.is_tome() or state["_prefix"]["raw_command"]:
return
if get_message_text(event.json()):
if get_message_text(event.json()).find("@可爱的小真寻") != -1:
await fudu.finish("复制粘贴的虚空艾特?", at_sender=True)
imgs = get_message_imgs(event.json())
msg = get_message_text(event.json())
if not imgs and not msg:
return
if imgs:
img_hash = await get_fudu_img_hash(imgs[0], event.group_id)
else:
img_hash = ""
add_msg = msg + "|-|" + img_hash
if _fudulist.size(event.group_id) == 0:
_fudulist.append(event.group_id, add_msg)
elif _fudulist.check(event.group_id, add_msg):
_fudulist.append(event.group_id, add_msg)
else:
_fudulist.clear(event.group_id)
_fudulist.append(event.group_id, add_msg)
if _fudulist.size(event.group_id) > 2:
if random.random() < FUDU_PROBABILITY and not _fudulist.is_repeater(event.group_id):
if random.random() < 0.2:
await fudu.finish("打断施法!")
if imgs and msg:
rst = msg + image(f"compare_{event.group_id}_img.jpg", "temp")
elif imgs:
rst = image(f"compare_{event.group_id}_img.jpg", "temp")
elif msg:
rst = msg
else:
rst = ""
if rst:
await fudu.send(rst)
_fudulist.set_repeater(event.group_id)
async def get_fudu_img_hash(url, group_id):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, proxy=get_local_proxy(), timeout=5) as response:
async with aiofiles.open(
IMAGE_PATH + f"temp/compare_{group_id}_img.jpg", "wb"
) as f:
await f.write(await response.read())
img_hash = get_img_hash(IMAGE_PATH + f"temp/compare_{group_id}_img.jpg")
return str(img_hash)
except TimeoutError:
return ''