mirror of
https://github.com/zhenxun-org/zhenxun_bot.git
synced 2025-12-15 06:12:53 +08:00
Add files via upload
This commit is contained in:
parent
9cac07b5ec
commit
405fde91d4
40
plugins/group_welcome_msg.py
Normal file
40
plugins/group_welcome_msg.py
Normal file
@ -0,0 +1,40 @@
|
||||
from nonebot import on_command
|
||||
from nonebot.typing import T_State
|
||||
from nonebot.adapters.cqhttp import Bot, GroupMessageEvent
|
||||
from nonebot.adapters.cqhttp.permission import GROUP
|
||||
from configs.path_config import DATA_PATH
|
||||
from util.init_result import image
|
||||
import os
|
||||
from pathlib import Path
|
||||
try:
|
||||
import ujson as json
|
||||
except ModuleNotFoundError:
|
||||
import json
|
||||
|
||||
view_custom_welcome = on_command('群欢迎消息', aliases={'查看群欢迎消息', '查看当前群欢迎消息'}, permission=GROUP, priority=5, block=True)
|
||||
|
||||
|
||||
@view_custom_welcome.handle()
|
||||
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
|
||||
img = ''
|
||||
msg = ''
|
||||
if os.path.exists(DATA_PATH + f'custom_welcome_msg/{event.group_id}.jpg'):
|
||||
img = image(abspath=DATA_PATH + f'custom_welcome_msg/{event.group_id}.jpg')
|
||||
custom_welcome_msg_json = Path() / "data" / "custom_welcome_msg" / "custom_welcome_msg.json"
|
||||
if custom_welcome_msg_json.exists():
|
||||
data = json.load(open(custom_welcome_msg_json, 'r'))
|
||||
if data.get(str(event.group_id)):
|
||||
msg = data[str(event.group_id)]
|
||||
if msg.find('[at]') != -1:
|
||||
msg = msg.replace('[at]', '')
|
||||
if img or msg:
|
||||
await view_custom_welcome.finish(msg + img, at_sender=True)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
152
plugins/hook.py
Normal file
152
plugins/hook.py
Normal file
@ -0,0 +1,152 @@
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.message import run_preprocessor, IgnoredException
|
||||
from nonebot.typing import T_State
|
||||
from nonebot.adapters.cqhttp import Bot, MessageEvent, PrivateMessageEvent, GroupMessageEvent
|
||||
from configs.config import BAN_RESULT, admin_plugins_auth, MALICIOUS_BAN_TIME, MALICIOUS_CHECK_TIME, MALICIOUS_BAN_COUNT
|
||||
from models.ban_user import BanUser
|
||||
from util.utils import is_number, static_flmt, BanCheckLimiter
|
||||
from util.init_result import at
|
||||
from services.log import logger
|
||||
from models.level_user import LevelUser
|
||||
try:
|
||||
import ujson as json
|
||||
except ModuleNotFoundError:
|
||||
import json
|
||||
|
||||
|
||||
# 检查是否被ban
|
||||
@run_preprocessor
|
||||
async def _(matcher: Matcher, bot: Bot, event: MessageEvent, state: T_State):
|
||||
if matcher.type == 'message' and matcher.priority not in [1, 9]:
|
||||
if await BanUser.isban(event.user_id) and str(event.user_id) not in bot.config.superusers:
|
||||
time = await BanUser.check_ban_time(event.user_id)
|
||||
if is_number(time):
|
||||
time = abs(int(time))
|
||||
if time < 60:
|
||||
time = str(time) + ' 秒'
|
||||
else:
|
||||
time = str(int(time / 60)) + ' 分'
|
||||
else:
|
||||
time = str(time) + ' 分'
|
||||
if event.message_type == 'group':
|
||||
if not static_flmt.check(event.user_id):
|
||||
raise IgnoredException('用户处于黑名单中')
|
||||
static_flmt.start_cd(event.user_id)
|
||||
if matcher.priority != 9:
|
||||
await bot.send_group_msg(group_id=event.group_id,
|
||||
message=at(event.user_id) + BAN_RESULT + f' 在..在 {time}后才会理你喔')
|
||||
else:
|
||||
if not static_flmt.check(event.user_id):
|
||||
raise IgnoredException('用户处于黑名单中')
|
||||
static_flmt.start_cd(event.user_id)
|
||||
if matcher.priority != 9:
|
||||
await bot.send_private_msg(user_id=event.user_id,
|
||||
message=at(event.user_id) + BAN_RESULT + f' 在..在 {time}后才会理你喔')
|
||||
raise IgnoredException('用户处于黑名单中')
|
||||
|
||||
|
||||
_blmt = BanCheckLimiter(MALICIOUS_BAN_TIME, MALICIOUS_BAN_COUNT)
|
||||
|
||||
|
||||
# 恶意触发命令检测
|
||||
@run_preprocessor
|
||||
async def _(matcher: Matcher, bot: Bot, event: GroupMessageEvent, state: T_State):
|
||||
if matcher.type == 'message' and matcher.priority not in [1, 9]:
|
||||
if state["_prefix"]["raw_command"]:
|
||||
# print(state["_prefix"]["raw_command"])
|
||||
if _blmt.check(f'{event.user_id}{state["_prefix"]["raw_command"]}'):
|
||||
if await BanUser.ban(event.user_id, 9, MALICIOUS_BAN_TIME * 60):
|
||||
logger.info(f'USER {event.user_id} 触发了恶意触发检测')
|
||||
# await update_img.finish('检测到恶意触发命令,您将被封禁 30 分钟', at_sender=True)
|
||||
if event.message_type == 'group':
|
||||
if not static_flmt.check(event.user_id):
|
||||
return
|
||||
static_flmt.start_cd(event.user_id)
|
||||
await bot.send_group_msg(group_id=event.group_id,
|
||||
message=at(event.user_id) + '检测到恶意触发命令,您将被封禁 30 分钟')
|
||||
else:
|
||||
if not static_flmt.check(event.user_id):
|
||||
return
|
||||
static_flmt.start_cd(event.user_id)
|
||||
await bot.send_private_msg(user_id=event.user_id,
|
||||
message=at(event.user_id) + '检测到恶意触发命令,您将被封禁 30 分钟')
|
||||
raise IgnoredException('检测到恶意触发命令')
|
||||
_blmt.add(f'{event.user_id}{state["_prefix"]["raw_command"]}')
|
||||
|
||||
|
||||
# 权限检测
|
||||
@run_preprocessor
|
||||
async def _(matcher: Matcher, bot: Bot, event: MessageEvent, state: T_State):
|
||||
if await BanUser.isban(event.user_id):
|
||||
return
|
||||
if matcher.module in admin_plugins_auth.keys() and matcher.priority not in [1, 9]:
|
||||
if event.message_type == 'group':
|
||||
if not await LevelUser.check_level(event.user_id, event.group_id, admin_plugins_auth[matcher.module]):
|
||||
await bot.send_group_msg(group_id=event.group_id,
|
||||
message=f'{at(event.user_id)}你的权限不足喔,该功能需要的权限等级:'
|
||||
f'{admin_plugins_auth[matcher.module]}')
|
||||
raise IgnoredException('权限不足')
|
||||
else:
|
||||
if not await LevelUser.check_level(event.user_id, 0, admin_plugins_auth[matcher.module]):
|
||||
await bot.send_private_msg(user_id=event.user_id,
|
||||
message=f'你的权限不足喔,该功能需要的权限等级:{admin_plugins_auth[matcher.module]}')
|
||||
raise IgnoredException('权限不足')
|
||||
|
||||
|
||||
# 为什么AI会自己和自己聊天
|
||||
@run_preprocessor
|
||||
async def _(matcher: Matcher, bot: Bot, event: PrivateMessageEvent, state: T_State):
|
||||
if matcher.type == 'message' and event.user_id == int(bot.self_id):
|
||||
raise IgnoredException('为什么AI会自己和自己聊天')
|
||||
|
||||
|
||||
# 有命令就别说话了
|
||||
@run_preprocessor
|
||||
async def _(matcher: Matcher, bot: Bot, event: MessageEvent, state: T_State):
|
||||
if matcher.type == 'message':
|
||||
if state["_prefix"]["raw_command"] and matcher.module == 'ai':
|
||||
raise IgnoredException('有命令就别说话了')
|
||||
|
||||
|
||||
# _ulmt = UserExistLimiter()
|
||||
#
|
||||
#
|
||||
# # 是否有命令正在处理
|
||||
# @run_preprocessor
|
||||
# async def _(matcher: Matcher, bot: Bot, event: Event, state: T_State):
|
||||
# if matcher.module == 'admin_bot_manage':
|
||||
# return
|
||||
# if event.user_id == bot.self_id:
|
||||
# raise IgnoredException('自己和自己聊天?')
|
||||
# print(state["_prefix"]["raw_command"])
|
||||
# print(matcher.type)
|
||||
# if (event.is_tome() and (state["_prefix"]["raw_command"] and matcher.type == 'message')
|
||||
# and matcher.module == 'ai') or not static_group_dict[event.group_id]['总开关']:
|
||||
# raise IgnoredException('Ai给爷爬?')
|
||||
# # if matcher.module in ['ai']:
|
||||
# # raise IgnoredException('Ai给爷爬?')
|
||||
# if matcher.type == 'message' and matcher.priority not in [1, 8, 9]:
|
||||
# if _ulmt.check(event.user_id):
|
||||
# if event.message_type == 'group':
|
||||
# await bot.send_group_msg(group_id=event.group_id, message=at(event.user_id) + '您有命令正在处理,等一哈!')
|
||||
# else:
|
||||
# await bot.send_private_msg(user_id=event.user_id, message='您有命令正在处理,等一哈!')
|
||||
# raise IgnoredException('有命令正在处理')
|
||||
# _ulmt.set_True(event.user_id)
|
||||
#
|
||||
#
|
||||
# # 结束正在处理的命令
|
||||
# @run_postprocessor
|
||||
# async def _(matcher: Matcher, bot: Bot, event: Event, state: T_State):
|
||||
# _ulmt.set_False(event.user_id)
|
||||
#
|
||||
#
|
||||
# # 结束正在处理的命令
|
||||
# @event_preprocessor
|
||||
# async def _(bot: Bot, event: Event, state: T_State):
|
||||
# try:
|
||||
# _ulmt.set_False(event.user_id)
|
||||
# except AttributeError:
|
||||
# pass
|
||||
|
||||
|
||||
35
plugins/jitang.py
Normal file
35
plugins/jitang.py
Normal file
@ -0,0 +1,35 @@
|
||||
from nonebot import on_command
|
||||
from util.user_agent import get_user_agent
|
||||
from bs4 import BeautifulSoup
|
||||
from services.log import logger
|
||||
from nonebot.adapters.cqhttp import Bot, Event
|
||||
from nonebot.typing import T_State
|
||||
import aiohttp
|
||||
|
||||
|
||||
__plugin_name__ = '鸡汤'
|
||||
__plugin_usage__ = '用法: 要喝一点鸡汤吗?'
|
||||
|
||||
|
||||
url = "https://new.toodo.fun/funs/content?type=du"
|
||||
|
||||
|
||||
jitang = on_command("鸡汤", aliases={"毒鸡汤"}, priority=5, block=True)
|
||||
|
||||
|
||||
@jitang.handle()
|
||||
async def _(bot: Bot, event: Event, state: T_State):
|
||||
if str(event.get_message()) in ['帮助']:
|
||||
await jitang.finish(__plugin_usage__)
|
||||
try:
|
||||
async with aiohttp.ClientSession(headers=get_user_agent()) as session:
|
||||
async with session.get(url, timeout=7) as response:
|
||||
soup = BeautifulSoup(await response.text(), 'lxml')
|
||||
result = (soup.find_all('h3', {'class': 'text-center'}))[0].text
|
||||
await jitang.send(result)
|
||||
logger.info(
|
||||
f"(USER {event.user_id}, GROUP {event.group_id if event.message_type != 'private' else 'private'})"
|
||||
f" 发送鸡汤:" + result)
|
||||
except Exception as e:
|
||||
await jitang.send("出错啦!再试一次吧!", at_sender=True)
|
||||
logger.info(f'鸡汤error e:{e}')
|
||||
129
plugins/mute.py
Normal file
129
plugins/mute.py
Normal file
@ -0,0 +1,129 @@
|
||||
from nonebot import on_message, on_command
|
||||
from nonebot.adapters.cqhttp import Bot, GroupMessageEvent
|
||||
from nonebot.adapters.cqhttp.permission import GROUP
|
||||
from util.utils import get_message_text, is_number, get_message_imgs, get_local_proxy
|
||||
from nonebot.typing import T_State
|
||||
import time
|
||||
from nonebot.adapters.cqhttp.exception import ActionFailed
|
||||
from configs.path_config import DATA_PATH, IMAGE_PATH
|
||||
from util.img_utils import get_img_hash
|
||||
from services.log import logger
|
||||
import aiohttp
|
||||
import aiofiles
|
||||
from configs.config import MUTE_DEFAULT_COUNT, MUTE_DEFAULT_TIME, MUTE_DEFAULT_DURATION
|
||||
try:
|
||||
import ujson as json
|
||||
except ModuleNotFoundError:
|
||||
import json
|
||||
|
||||
__plugin_name__ = '刷屏禁言'
|
||||
|
||||
|
||||
mute = on_message(priority=1, block=False)
|
||||
mute_setting = on_command('mute_setting', aliases={'设置检测时间', '设置检测次数', '设置禁言时长', '刷屏检测设置'}, permission=GROUP, block=True)
|
||||
|
||||
|
||||
def get_data():
|
||||
try:
|
||||
with open(DATA_PATH + 'group_mute_data.json', 'r', encoding='utf8') as f:
|
||||
data = json.load(f)
|
||||
except (ValueError, FileNotFoundError):
|
||||
data = {}
|
||||
return data
|
||||
|
||||
|
||||
def save_data():
|
||||
global mute_data
|
||||
with open(DATA_PATH + 'group_mute_data.json', 'w', encoding='utf8') as f:
|
||||
json.dump(mute_data, f, indent=4)
|
||||
|
||||
|
||||
async def download_img_and_hash(url, group_id):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url, proxy=get_local_proxy(), timeout=10) as response:
|
||||
async with aiofiles.open(IMAGE_PATH + f'temp/mute_{group_id}_img.jpg', 'wb') as f:
|
||||
await f.write(await response.read())
|
||||
return str(get_img_hash(IMAGE_PATH + f'temp/mute_{group_id}_img.jpg'))
|
||||
|
||||
|
||||
mute_dict = {}
|
||||
mute_data = get_data()
|
||||
|
||||
|
||||
@mute.handle()
|
||||
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
|
||||
group_id = str(event.group_id)
|
||||
msg = get_message_text(event.json())
|
||||
imgs = get_message_imgs(event.json())
|
||||
img_hash = ''
|
||||
for img in imgs:
|
||||
img_hash += await download_img_and_hash(img, event.group_id)
|
||||
msg += img_hash
|
||||
if not mute_data.get(group_id):
|
||||
mute_data[group_id] = {
|
||||
'count': MUTE_DEFAULT_COUNT,
|
||||
'time': MUTE_DEFAULT_TIME,
|
||||
'duration': MUTE_DEFAULT_DURATION
|
||||
}
|
||||
if not mute_dict.get(event.user_id):
|
||||
mute_dict[event.user_id] = {
|
||||
'time': time.time(),
|
||||
'count': 1,
|
||||
'msg': msg
|
||||
}
|
||||
else:
|
||||
if state["_prefix"]["raw_command"] or not msg:
|
||||
return
|
||||
if msg and msg.find(mute_dict[event.user_id]['msg']) != -1:
|
||||
mute_dict[event.user_id]['count'] += 1
|
||||
else:
|
||||
mute_dict[event.user_id]['time'] = time.time()
|
||||
mute_dict[event.user_id]['count'] = 1
|
||||
mute_dict[event.user_id]['msg'] = msg
|
||||
if time.time() - mute_dict[event.user_id]['time'] > mute_data[group_id]['time']:
|
||||
mute_dict[event.user_id]['time'] = time.time()
|
||||
mute_dict[event.user_id]['count'] = 1
|
||||
if mute_dict[event.user_id]['count'] > mute_data[group_id]['count'] and\
|
||||
time.time() - mute_dict[event.user_id]['time'] < mute_data[group_id]['time']:
|
||||
try:
|
||||
if mute_data[group_id]['duration'] != 0:
|
||||
await bot.set_group_ban(group_id=event.group_id, user_id=event.user_id,
|
||||
duration=mute_data[group_id]['duration'])
|
||||
await mute.send('检测到恶意刷屏,真寻要把你关进小黑屋!', at_sender=True)
|
||||
mute_dict[event.user_id]['count'] = 0
|
||||
logger.info(f'USER {event.user_id} GROUP {event.group_id} '
|
||||
f'检测刷屏 被禁言 {mute_data[group_id]["duration"] / 60} 分钟')
|
||||
except ActionFailed:
|
||||
pass
|
||||
|
||||
|
||||
@mute_setting.handle()
|
||||
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
|
||||
group_id = str(event.group_id)
|
||||
if not mute_data.get(group_id):
|
||||
mute_data[group_id] = {
|
||||
'count': 10,
|
||||
'time': 7,
|
||||
'duration': 0
|
||||
}
|
||||
msg = get_message_text(event.json())
|
||||
if state["_prefix"]["raw_command"] == '刷屏检测设置':
|
||||
await mute_setting.finish(f'最大次数:{mute_data[group_id]["count"]} 次\n'
|
||||
f'规定时间:{mute_data[group_id]["time"]} 秒\n'
|
||||
f'禁言时长:{mute_data[group_id]["duration"] / 60} 分钟\n'
|
||||
f'【在规定时间内发送相同消息超过最大次数则禁言\n当禁言时长为0时关闭此功能】')
|
||||
if not is_number(msg):
|
||||
await mute.finish('设置的参数必须是数字啊!', at_sender=True)
|
||||
if state["_prefix"]["raw_command"] == '设置检测时间':
|
||||
mute_data[group_id]['time'] = int(msg)
|
||||
msg += '秒'
|
||||
if state["_prefix"]["raw_command"] == '设置检测次数':
|
||||
mute_data[group_id]['count'] = int(msg)
|
||||
msg += ' 次'
|
||||
if state["_prefix"]["raw_command"] == '设置禁言时长':
|
||||
mute_data[group_id]['duration'] = int(msg) * 60
|
||||
msg += ' 分钟'
|
||||
await mute_setting.send(f'刷屏检测:{state["_prefix"]["raw_command"]}为 {msg}')
|
||||
logger.info(f'USER {event.user_id} GROUP {group_id} {state["_prefix"]["raw_command"]}:{msg}')
|
||||
save_data()
|
||||
|
||||
40
plugins/my_info.py
Normal file
40
plugins/my_info.py
Normal file
@ -0,0 +1,40 @@
|
||||
from nonebot import on_command
|
||||
from nonebot.adapters.cqhttp.permission import GROUP
|
||||
from nonebot.typing import T_State
|
||||
from nonebot.adapters.cqhttp import Bot, GroupMessageEvent
|
||||
from models.group_member_info import GroupInfoUser
|
||||
from datetime import timedelta
|
||||
from models.level_user import LevelUser
|
||||
|
||||
__plugin_name__ = '更新群组成员列表 [Hidden]'
|
||||
__plugin_usage__ = (
|
||||
'用法:\n'
|
||||
'更新群员的信息'
|
||||
)
|
||||
|
||||
|
||||
get_my_group_info = on_command("我的信息", permission=GROUP, priority=1, block=True)
|
||||
my_level = on_command('我的权限', permission=GROUP, priority=5, block=True)
|
||||
|
||||
|
||||
@get_my_group_info.handle()
|
||||
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
|
||||
result = await get_member_info(event.user_id, event.group_id)
|
||||
await get_my_group_info.finish(result)
|
||||
|
||||
|
||||
async def get_member_info(user_qq: int, group_id: int) -> str:
|
||||
user = await GroupInfoUser.select_member_info(user_qq, group_id)
|
||||
if user is None:
|
||||
return "该群员不在列表中,请更新群成员信息"
|
||||
result = ""
|
||||
result += "昵称:" + user.user_name + "\n"
|
||||
result += "加群时间:" + str(user.user_join_time.date() + timedelta(hours=8))
|
||||
return result
|
||||
|
||||
|
||||
@my_level.handle()
|
||||
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
|
||||
if (level := await LevelUser.get_user_level(event.user_id, event.group_id)) == -1:
|
||||
await my_level.finish('您目前没有任何权限了,硬要说的话就是0吧~', at_sender=True)
|
||||
await my_level.finish(f'您目前的权限等级:{level}', at_sender=True)
|
||||
168
plugins/nickname.py
Normal file
168
plugins/nickname.py
Normal file
@ -0,0 +1,168 @@
|
||||
from nonebot import on_command
|
||||
from nonebot.typing import T_State
|
||||
from nonebot.adapters.cqhttp import Bot, GroupMessageEvent, PrivateMessageEvent
|
||||
from nonebot.rule import to_me
|
||||
from util.utils import get_message_text
|
||||
from models.group_member_info import GroupInfoUser
|
||||
from models.friend_user import FriendUser
|
||||
import random
|
||||
from models.ban_user import BanUser
|
||||
from services.log import logger
|
||||
|
||||
nickname = on_command('nickname',
|
||||
aliases={'以后叫我', '以后请叫我', '称呼我', '以后请称呼我', '以后称呼我', '叫我', '请叫我'},
|
||||
rule=to_me(), priority=5, block=True)
|
||||
|
||||
my_nickname = on_command('my_name', aliases={'我叫什么', '我是谁', '我的名字'}, rule=to_me(), priority=5, block=True)
|
||||
|
||||
|
||||
cancel_nickname = on_command('取消昵称', rule=to_me(), priority=5, block=True)
|
||||
|
||||
|
||||
@nickname.handle()
|
||||
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
|
||||
msg = get_message_text(event.json())
|
||||
if not msg:
|
||||
await nickname.finish('叫你空白?叫你虚空?叫你无名??', at_sender=True)
|
||||
if len(msg) > 10:
|
||||
await nickname.finish('昵称可不能超过10个字!', at_sender=True)
|
||||
if await GroupInfoUser.set_group_member_nickname(event.user_id, event.group_id, msg):
|
||||
if len(msg) < 5:
|
||||
if random.random() < 0.5:
|
||||
msg = "~".join(msg)
|
||||
await nickname.send(random.choice([
|
||||
'好啦好啦,我知道啦,{},以后就这么叫你吧',
|
||||
'嗯嗯,小真寻记住你的昵称了哦,{}',
|
||||
'好突然,突然要叫你昵称什么的...{}..',
|
||||
'小真寻会好好记住的{}的,放心吧',
|
||||
'好..好.,那窝以后就叫你{}了.'
|
||||
]).format(msg))
|
||||
logger.info(f'USER {event.user_id} GROUP {event.group_id} 设置群昵称 {msg}')
|
||||
else:
|
||||
await nickname.send('设置昵称失败,请更新群组成员信息!', at_sender=True)
|
||||
logger.warning(f'USER {event.user_id} GROUP {event.group_id} 设置群昵称 {msg} 失败')
|
||||
|
||||
|
||||
@nickname.handle()
|
||||
async def _(bot: Bot, event: PrivateMessageEvent, state: T_State):
|
||||
msg = get_message_text(event.json())
|
||||
if not msg:
|
||||
await nickname.finish('叫你空白?叫你虚空?叫你无名??', at_sender=True)
|
||||
if len(msg) > 10:
|
||||
await nickname.finish('不要超过10个字!', at_sender=True)
|
||||
if await FriendUser.set_friend_nickname(event.user_id, msg):
|
||||
await nickname.send(random.choice([
|
||||
'好啦好啦,我知道啦,{},以后就这么叫你吧',
|
||||
'嗯嗯,小真寻记住你的昵称了哦,{}',
|
||||
'好突然,突然要叫你昵称什么的...{}..',
|
||||
'小真寻会好好记住的{}的,放心吧',
|
||||
'好..好.,那窝以后就叫你{}了.'
|
||||
]).format(msg))
|
||||
logger.info(f'USER {event.user_id} 设置昵称 {msg}')
|
||||
else:
|
||||
await nickname.send('设置昵称失败了,明天再来试一试!或联系管理员更新好友!', at_sender=True)
|
||||
logger.warning(f'USER {event.user_id} 设置昵称 {msg} 失败')
|
||||
|
||||
|
||||
@my_nickname.handle()
|
||||
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
|
||||
try:
|
||||
nickname = await GroupInfoUser.get_group_member_nickname(event.user_id, event.group_id)
|
||||
except AttributeError:
|
||||
nickname = ''
|
||||
if nickname:
|
||||
await my_nickname.send(random.choice([
|
||||
'我肯定记得你啊,你是{}啊',
|
||||
'我不会忘记你的,你也不要忘记我!{}',
|
||||
'哼哼,真寻记忆力可是很好的,{}',
|
||||
'嗯?你是失忆了嘛...{}..',
|
||||
'不要小看真寻的记忆力啊!笨蛋{}!QAQ',
|
||||
'哎?{}..怎么了吗..突然这样问..'
|
||||
]).format(nickname))
|
||||
else:
|
||||
try:
|
||||
nickname = (await GroupInfoUser.select_member_info(event.user_id, event.group_id)).user_name
|
||||
except AttributeError:
|
||||
nickname = event.user_id
|
||||
await my_nickname.finish(random.choice([
|
||||
'对不起,我只记得一串神秘代码..{}\n【群管理快更新群组成员信息!】'
|
||||
]).format(nickname))
|
||||
await my_nickname.send(random.choice([
|
||||
'没..没有昵称嘛,{}',
|
||||
'啊,你是{}啊,我想叫你的昵称!',
|
||||
'是{}啊,有什么事吗?',
|
||||
'你是{}?'
|
||||
]).format(nickname))
|
||||
|
||||
|
||||
@my_nickname.handle()
|
||||
async def _(bot: Bot, event: PrivateMessageEvent, state: T_State):
|
||||
nickname = await FriendUser.get_friend_nickname(event.user_id)
|
||||
if nickname:
|
||||
await my_nickname.send(random.choice([
|
||||
'我肯定记得你啊,你是{}啊',
|
||||
'我不会忘记你的,你也不要忘记我!{}',
|
||||
'哼哼,真寻记忆力可是很好的,{}',
|
||||
'嗯?你是失忆了嘛...{}..',
|
||||
'不要小看真寻的记忆力啊!笨蛋{}!QAQ',
|
||||
'哎?{}..怎么了吗..突然这样问..'
|
||||
]).format(nickname))
|
||||
else:
|
||||
nickname = await FriendUser.get_user_name(event.user_id)
|
||||
if nickname:
|
||||
await my_nickname.send(random.choice([
|
||||
'没..没有昵称嘛,{}',
|
||||
'啊,你是{}啊,我想叫你的昵称!',
|
||||
'是{}啊,有什么事吗?',
|
||||
'你是{}?'
|
||||
]).format(nickname))
|
||||
else:
|
||||
nickname = event.user_id
|
||||
await my_nickname.finish(random.choice([
|
||||
'对不起,我只记得一串神秘代码..{}\n【火速联系管理员更新好友列表!】'
|
||||
]).format(nickname))
|
||||
|
||||
|
||||
@cancel_nickname.handle()
|
||||
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
|
||||
nickname = await GroupInfoUser.get_group_member_nickname(event.user_id, event.group_id)
|
||||
if nickname:
|
||||
await cancel_nickname.send(random.choice([
|
||||
'呜..真寻睡一觉就会忘记的..和梦一样..{}',
|
||||
'窝知道了..{}..',
|
||||
'是真寻哪里做的不好嘛..好吧..晚安{}',
|
||||
'呃,{},下次我绝对绝对绝对不会再忘记你!',
|
||||
'可..可恶!{}!太可恶了!呜'
|
||||
]).format(nickname))
|
||||
await GroupInfoUser.set_group_member_nickname(event.user_id, event.group_id, '')
|
||||
await BanUser.ban(event.user_id, 9, 60)
|
||||
else:
|
||||
await cancel_nickname.send('你在做梦吗?你没有昵称啊', at_sender=True)
|
||||
|
||||
|
||||
@cancel_nickname.handle()
|
||||
async def _(bot: Bot, event: PrivateMessageEvent, state: T_State):
|
||||
nickname = await FriendUser.get_user_name(event.user_id)
|
||||
if nickname:
|
||||
await cancel_nickname.send(random.choice([
|
||||
'呜..真寻睡一觉就会忘记的..和梦一样..{}',
|
||||
'窝知道了..{}..',
|
||||
'是真寻哪里做的不好嘛..好吧..晚安{}',
|
||||
'呃,{},下次我绝对绝对绝对不会再忘记你!',
|
||||
'可..可恶!{}!太可恶了!呜'
|
||||
]).format(nickname))
|
||||
await FriendUser.get_user_name(event.user_id)
|
||||
await BanUser.ban(event.user_id, 9, 60)
|
||||
else:
|
||||
await cancel_nickname.send('你在做梦吗?你没有昵称啊', at_sender=True)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
75
plugins/parse_bilibili_json.py
Normal file
75
plugins/parse_bilibili_json.py
Normal file
@ -0,0 +1,75 @@
|
||||
from nonebot import on_message
|
||||
from services.log import logger
|
||||
from nonebot.adapters.cqhttp import Bot, GroupMessageEvent
|
||||
from nonebot.typing import T_State
|
||||
from util.utils import get_message_json, get_local_proxy
|
||||
import json
|
||||
from util.user_agent import get_user_agent
|
||||
from nonebot.adapters.cqhttp.permission import GROUP
|
||||
from bilibili_api import video
|
||||
from util.init_result import image
|
||||
from models.group_remind import GroupRemind
|
||||
from nonebot.adapters.cqhttp.exception import ActionFailed
|
||||
import time
|
||||
import aiohttp
|
||||
import bilibili_api
|
||||
if get_local_proxy():
|
||||
proxy_ip_port = get_local_proxy().split("//", maxsplit=1)[1]
|
||||
bilibili_api.request_settings["proxies"] = {'http': f'http://{proxy_ip_port}', 'https': f'https://{proxy_ip_port}'}
|
||||
|
||||
|
||||
parse_bilibili_json = on_message(priority=1, permission=GROUP, block=False)
|
||||
|
||||
|
||||
@parse_bilibili_json.handle()
|
||||
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
|
||||
if await GroupRemind.get_status(event.group_id, 'blpar') and get_message_json(event.json()):
|
||||
data = json.loads(get_message_json(event.json())['data'])
|
||||
if data:
|
||||
if data.get('desc') == '哔哩哔哩':
|
||||
async with aiohttp.ClientSession(headers=get_user_agent()) as session:
|
||||
async with session.get(data['meta']['detail_1']['qqdocurl'], proxy=get_local_proxy(), timeout=7) as response:
|
||||
url = str(response.url).split("?")[0]
|
||||
print(url)
|
||||
bvid = url.split('/')[-1]
|
||||
vd_info = video.get_video_info(bvid=bvid)
|
||||
aid = vd_info['aid']
|
||||
title = vd_info['title']
|
||||
author = vd_info['owner']['name']
|
||||
reply = vd_info['stat']['reply'] # 回复
|
||||
favorite = vd_info['stat']['favorite'] # 收藏
|
||||
coin = vd_info['stat']['coin'] # 投币
|
||||
# like = vd_info['stat']['like'] # 点赞
|
||||
# danmu = vd_info['stat']['danmaku'] # 弹幕
|
||||
date = time.strftime("%Y-%m-%d", time.localtime(vd_info['ctime']))
|
||||
try:
|
||||
await parse_bilibili_json.send(
|
||||
image(vd_info["pic"]) +
|
||||
f'\nav{aid}\n标题:{title}\n'
|
||||
f'UP:{author}\n'
|
||||
f'上传日期:{date}\n'
|
||||
f'回复:{reply},收藏:{favorite},投币:{coin}\n'
|
||||
f'{url}'
|
||||
)
|
||||
except ActionFailed:
|
||||
logger.warning(f'{event.group_id} 发送bilibili解析失败')
|
||||
logger.info(f'USER {event.user_id} GROUP {event.group_id} 解析bilibili转发 {url}')
|
||||
else:
|
||||
return
|
||||
else:
|
||||
return
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
29
plugins/quotations.py
Normal file
29
plugins/quotations.py
Normal file
@ -0,0 +1,29 @@
|
||||
from nonebot import on_command
|
||||
import random
|
||||
from util.utils import get_lines
|
||||
from configs.path_config import TXT_PATH
|
||||
from services.log import logger
|
||||
from nonebot.adapters.cqhttp import Bot, MessageEvent
|
||||
from nonebot.typing import T_State
|
||||
|
||||
|
||||
__plugin_name__ = '语录'
|
||||
__plugin_usage__ = '用法: 二次元语录给你力量'
|
||||
|
||||
|
||||
lines = get_lines(TXT_PATH + "yulu.txt")
|
||||
|
||||
|
||||
quotations = on_command("语录", aliases={'二次元', '二次元语录'}, priority=5, block=True)
|
||||
|
||||
|
||||
@quotations.handle()
|
||||
async def _(bot: Bot, event: MessageEvent, state: T_State):
|
||||
if str(event.get_message()) in ['帮助']:
|
||||
await quotations.finish(__plugin_usage__)
|
||||
result = random.choice(lines)
|
||||
logger.info(
|
||||
f"(USER {event.user_id}, GROUP {event.group_id if event.message_type != 'private' else 'private'}) 发送语录:"
|
||||
+ result[:-1])
|
||||
await quotations.finish(result[:-1])
|
||||
|
||||
36
plugins/server_ip.py
Normal file
36
plugins/server_ip.py
Normal file
@ -0,0 +1,36 @@
|
||||
from nonebot import on_command
|
||||
from nonebot.adapters.cqhttp import Bot, PrivateMessageEvent, GroupMessageEvent
|
||||
from nonebot.typing import T_State
|
||||
from nonebot.rule import to_me
|
||||
|
||||
__plugin_name__ = '服务器'
|
||||
__plugin_usage__ = '用法: 无'
|
||||
|
||||
|
||||
server_ip = on_command("服务器", aliases={'ip'}, rule=to_me(), priority=5, block=True)
|
||||
|
||||
|
||||
@server_ip.handle()
|
||||
async def _(bot: Bot, event: PrivateMessageEvent, state: T_State):
|
||||
await server_ip.finish("|* 请不要发给其他人! *|\n"
|
||||
"\t121.40.195.22\n"
|
||||
"|* 请不要发给其他人! *|\n"
|
||||
"csgo ~号键控制台输入 connect 121.40.195.22 进入服务器\n"
|
||||
"然后再公屏输入 !diy 来使用皮肤(英文感叹号,注意)\n"
|
||||
"【说不定可以凑到内战噢,欢迎~{娱乐为主,别骂人球球了}】", at_sender=True)
|
||||
|
||||
|
||||
@server_ip.handle()
|
||||
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
|
||||
if event.group_id == 698279647:
|
||||
await server_ip.finish("嗨呀!当前服务器地址是:"
|
||||
"\ncsgo:\n\tay: 121.40.195.22\n\twzt: 101.200.199.143\n我的世界:\n\t47.111.1.220:25565")
|
||||
elif event.group_id == 1046451860:
|
||||
await server_ip.finish("嗨呀!当前服务器地址是:\n121.40.195.22\n !diy")
|
||||
else:
|
||||
await server_ip.finish("不好意思呀,小真寻不能为你使用此功能,因为服务器IP是真寻的小秘密呀!", at_sender=True)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
143
plugins/statistics_hook.py
Normal file
143
plugins/statistics_hook.py
Normal file
@ -0,0 +1,143 @@
|
||||
from configs.path_config import DATA_PATH
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.message import run_postprocessor
|
||||
from nonebot.typing import T_State
|
||||
from nonebot.adapters.cqhttp import Bot, GroupMessageEvent
|
||||
from datetime import datetime
|
||||
from configs.config import plugins2name_dict
|
||||
from util.utils import scheduler
|
||||
from nonebot.typing import Optional
|
||||
try:
|
||||
import ujson as json
|
||||
except ModuleNotFoundError:
|
||||
import json
|
||||
|
||||
|
||||
try:
|
||||
with open(DATA_PATH + '_prefix_count.json', 'r', encoding='utf8') as f:
|
||||
_prefix_count_dict = json.load(f)
|
||||
except (FileNotFoundError, ValueError):
|
||||
_prefix_count_dict = {
|
||||
'total_statistics': {
|
||||
'total': {},
|
||||
},
|
||||
'day_statistics': {
|
||||
'total': {},
|
||||
},
|
||||
'week_statistics': {
|
||||
'total': {},
|
||||
},
|
||||
'month_statistics': {
|
||||
'total': {},
|
||||
},
|
||||
'start_time': str(datetime.now().date())
|
||||
}
|
||||
|
||||
|
||||
# 添加命令次数
|
||||
@run_postprocessor
|
||||
async def _(matcher: Matcher, exception: Optional[Exception], bot: Bot, event: GroupMessageEvent, state: T_State):
|
||||
if matcher.type == 'message' and matcher.priority not in [1, 9]:
|
||||
model = matcher.module
|
||||
# print(f'model --> {model}')
|
||||
for plugin in plugins2name_dict:
|
||||
if plugin == model:
|
||||
# print(f'plugin --> {plugin}')
|
||||
try:
|
||||
group_id = str(event.group_id)
|
||||
except AttributeError:
|
||||
group_id = 'total'
|
||||
plugin_name = plugins2name_dict[plugin][0]
|
||||
check_exists_key(group_id, plugin_name)
|
||||
_prefix_count_dict['total_statistics']['total'][plugin_name] += 1
|
||||
_prefix_count_dict['day_statistics']['total'][plugin_name] += 1
|
||||
_prefix_count_dict['week_statistics']['total'][plugin_name] += 1
|
||||
_prefix_count_dict['month_statistics']['total'][plugin_name] += 1
|
||||
if group_id != 'total':
|
||||
_prefix_count_dict['total_statistics'][group_id][plugin_name] += 1
|
||||
_prefix_count_dict['day_statistics'][group_id][plugin_name] += 1
|
||||
_prefix_count_dict['week_statistics'][group_id][plugin_name] += 1
|
||||
_prefix_count_dict['month_statistics'][group_id][plugin_name] += 1
|
||||
with open(DATA_PATH + '_prefix_count.json', 'w', encoding='utf8') as f:
|
||||
json.dump(_prefix_count_dict, f, indent=4, ensure_ascii=False)
|
||||
break
|
||||
|
||||
|
||||
def check_exists_key(group_id, plugin_name):
|
||||
if not _prefix_count_dict['total_statistics']['total'].get(plugin_name):
|
||||
_prefix_count_dict['total_statistics']['total'][plugin_name] = 0
|
||||
if not _prefix_count_dict['day_statistics']['total'].get(plugin_name):
|
||||
_prefix_count_dict['day_statistics']['total'][plugin_name] = 0
|
||||
if not _prefix_count_dict['week_statistics']['total'].get(plugin_name):
|
||||
_prefix_count_dict['week_statistics']['total'][plugin_name] = 0
|
||||
if not _prefix_count_dict['month_statistics']['total'].get(plugin_name):
|
||||
_prefix_count_dict['month_statistics']['total'][plugin_name] = 0
|
||||
if not _prefix_count_dict['total_statistics'].get(group_id):
|
||||
_prefix_count_dict['total_statistics'][group_id] = {
|
||||
plugin_name: 0,
|
||||
}
|
||||
elif not _prefix_count_dict['total_statistics'][group_id].get(plugin_name):
|
||||
_prefix_count_dict['total_statistics'][group_id][plugin_name] = 0
|
||||
if not _prefix_count_dict['day_statistics'].get(group_id):
|
||||
_prefix_count_dict['day_statistics'][group_id] = {
|
||||
plugin_name: 0,
|
||||
}
|
||||
elif not _prefix_count_dict['day_statistics'][group_id].get(plugin_name):
|
||||
_prefix_count_dict['day_statistics'][group_id][plugin_name] = 0
|
||||
if not _prefix_count_dict['week_statistics'].get(group_id):
|
||||
_prefix_count_dict['week_statistics'][group_id] = {
|
||||
plugin_name: 0,
|
||||
}
|
||||
elif not _prefix_count_dict['week_statistics'][group_id].get(plugin_name):
|
||||
_prefix_count_dict['week_statistics'][group_id][plugin_name] = 0
|
||||
if not _prefix_count_dict['month_statistics'].get(group_id):
|
||||
_prefix_count_dict['month_statistics'][group_id] = {
|
||||
plugin_name: 0,
|
||||
}
|
||||
elif not _prefix_count_dict['month_statistics'][group_id].get(plugin_name):
|
||||
_prefix_count_dict['month_statistics'][group_id][plugin_name] = 0
|
||||
|
||||
|
||||
# 天
|
||||
@scheduler.scheduled_job(
|
||||
'cron',
|
||||
hour=0,
|
||||
minute=1,
|
||||
)
|
||||
async def _():
|
||||
for group_id in _prefix_count_dict['day_statistics'].keys():
|
||||
for key in _prefix_count_dict['day_statistics'][group_id].keys():
|
||||
_prefix_count_dict['day_statistics'][group_id][key] = 0
|
||||
with open(DATA_PATH + '_prefix_count.json', 'w', encoding='utf8') as f:
|
||||
json.dump(_prefix_count_dict, f, indent=4, ensure_ascii=False)
|
||||
|
||||
|
||||
# 早上好
|
||||
@scheduler.scheduled_job(
|
||||
'cron',
|
||||
day_of_week="mon",
|
||||
hour=0,
|
||||
minute=1,
|
||||
)
|
||||
async def _():
|
||||
for group_id in _prefix_count_dict['week_statistics'].keys():
|
||||
for key in _prefix_count_dict['week_statistics'][group_id].keys():
|
||||
_prefix_count_dict['week_statistics'][group_id][key] = 0
|
||||
with open(DATA_PATH + '_prefix_count.json', 'w', encoding='utf8') as f:
|
||||
json.dump(_prefix_count_dict, f, indent=4, ensure_ascii=False)
|
||||
|
||||
|
||||
# 早上好
|
||||
@scheduler.scheduled_job(
|
||||
'cron',
|
||||
day=1,
|
||||
hour=0,
|
||||
minute=1,
|
||||
)
|
||||
async def _():
|
||||
for group_id in _prefix_count_dict['month_statistics'].keys():
|
||||
for key in _prefix_count_dict['month_statistics'][group_id].keys():
|
||||
_prefix_count_dict['month_statistics'][group_id][key] = 0
|
||||
with open(DATA_PATH + '_prefix_count.json', 'w', encoding='utf8') as f:
|
||||
json.dump(_prefix_count_dict, f, indent=4, ensure_ascii=False)
|
||||
|
||||
36
plugins/test.py
Normal file
36
plugins/test.py
Normal file
@ -0,0 +1,36 @@
|
||||
from nonebot.rule import to_me
|
||||
from nonebot.typing import T_State
|
||||
from nonebot.adapters.cqhttp import Bot, GroupMessageEvent, MessageEvent
|
||||
from nonebot import on_command, on_keyword
|
||||
from nonebot.plugin import MatcherGroup
|
||||
from nonebot.adapters.cqhttp.event import GroupRequestEvent
|
||||
import nonebot
|
||||
from models.open_cases_user import OpenCasesUser
|
||||
from tzlocal import get_localzone
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from util.utils import get_bot, get_message_imgs, get_local_proxy
|
||||
from util.init_result import *
|
||||
from nonebot.adapters.cqhttp.message import MessageSegment
|
||||
import requests
|
||||
import aiohttp
|
||||
from models.bag_user import UserBag
|
||||
from nonebot.adapters.cqhttp.message import Message
|
||||
import asyncio
|
||||
from models.group_member_info import GroupInfoUser
|
||||
|
||||
# erm = on_command('异世相遇,尽享美味', aliases={'异世相遇 尽享美味', '异世相遇,尽享美味'}, priority=5, block=True)
|
||||
|
||||
matcher = on_keyword({"test"})
|
||||
|
||||
|
||||
@matcher.handle()
|
||||
async def _(bot: Bot, event: MessageEvent, state: T_State):
|
||||
for i in range(1001, len(os.listdir(IMAGE_PATH + 'setu/'))):
|
||||
await matcher.send(f"id:{i}" + image(f'{i}.jpg', 'setu'))
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
|
||||
# @erm.handle()
|
||||
# async def first_receive(bot: Bot, event: Event, state: T_State):
|
||||
# print(record('erm'))
|
||||
# await matcher.send(record('erm'))
|
||||
23
plugins/update_info.py
Normal file
23
plugins/update_info.py
Normal file
@ -0,0 +1,23 @@
|
||||
from nonebot import on_command
|
||||
from nonebot.adapters.cqhttp import Bot, Event
|
||||
from nonebot.typing import T_State
|
||||
from util.init_result import image
|
||||
|
||||
__plugin_name__ = '更新信息'
|
||||
|
||||
|
||||
update_info = on_command("更新信息", priority=5, block=True)
|
||||
|
||||
|
||||
@update_info.handle()
|
||||
async def _(bot: Bot, event: Event, state: T_State):
|
||||
img = image('update_info.png')
|
||||
if img:
|
||||
await update_info.finish(image('update_info.png'))
|
||||
else:
|
||||
await update_info.finish('目前没有更新信息哦')
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
250
plugins/update_pic.py
Normal file
250
plugins/update_pic.py
Normal file
@ -0,0 +1,250 @@
|
||||
from nonebot import on_command
|
||||
from PIL import Image, ImageFilter
|
||||
from util.init_result import image
|
||||
from configs.path_config import IMAGE_PATH
|
||||
from services.log import logger
|
||||
from nonebot.rule import to_me
|
||||
from nonebot.adapters.cqhttp import Bot, MessageEvent
|
||||
from nonebot.typing import T_State
|
||||
from util.utils import get_message_imgs
|
||||
import aiofiles
|
||||
import aiohttp
|
||||
from util.utils import is_number, get_message_text
|
||||
from util.img_utils import CreateImg, pic2b64
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
|
||||
__plugin_name__ = '图片'
|
||||
__plugin_usage__ = '''图片用法(目前):
|
||||
可以输入 指定操作 或 序号 来进行选择
|
||||
1.修改尺寸 宽(x) 高(y) 图片(在文字后跟图片即可)
|
||||
示例:小真寻修改图片 修改尺寸 200 300 图片(在文字后跟图片即可)
|
||||
2.等比压缩 比例(x) 图片(在文字后跟图片即可)
|
||||
示例:小真寻修改图片 等比压缩 2 图片(在文字后跟图片即可)
|
||||
3.旋转图片 角度(x) 图片(在文字后跟图片即可)
|
||||
示例:小真寻修改图片 旋转图片 45 图片(在文字后跟图片即可)
|
||||
4.水平翻转
|
||||
示例:小真寻修改图片 水平翻转 图片
|
||||
5.铅笔滤镜
|
||||
示例:小真寻修改图片 铅笔滤镜 图片
|
||||
6.模糊效果
|
||||
示例:小真寻修改图片 模糊效果 图片
|
||||
7.锐化效果
|
||||
示例:小真寻修改图片 锐化效果 图片
|
||||
8.高斯模糊
|
||||
示例:小真寻修改图片 高斯模糊 图片
|
||||
9.边缘检测
|
||||
示例:小真寻修改图片 边缘检测 图片
|
||||
10.底色替换 颜色 颜色 图片 (不专业)支持:(红,蓝)->(红,蓝,白,绿,黄)
|
||||
示例:小真寻修改图片 底色替换 蓝色 红色 图片'''
|
||||
|
||||
# IMAGE_LOCAL = IMAGE_PATH + "temp/{}_update.png"
|
||||
method_flag = ''
|
||||
|
||||
update_img = on_command("修改图片", aliases={'图片', '操作图片', '改图'}, priority=5, rule=to_me(), block=True)
|
||||
|
||||
method_list = ['修改尺寸', '等比压缩', '旋转图片', '水平翻转', '铅笔滤镜', '模糊效果', '锐化效果', '高斯模糊', '边缘检测', '底色替换']
|
||||
method_str = ''
|
||||
method_oper = []
|
||||
for i in range(len(method_list)):
|
||||
method_str += f'\n{i + 1}.{method_list[i]}'
|
||||
method_oper.append(method_list[i])
|
||||
method_oper.append(str(i + 1))
|
||||
|
||||
update_img_help = CreateImg(960, 700, font_size=24)
|
||||
update_img_help.text((10, 10), __plugin_usage__)
|
||||
update_img_help.save(IMAGE_PATH + 'update_img_help.png')
|
||||
|
||||
|
||||
@update_img.args_parser
|
||||
async def _(bot: Bot, event: MessageEvent, state: T_State):
|
||||
global method_flag
|
||||
if str(event.get_message()) in ['取消', '算了']:
|
||||
await update_img.finish("已取消操作..", at_sender=True)
|
||||
if state["_current_key"] in ['method']:
|
||||
if str(event.get_message()) not in method_oper:
|
||||
await update_img.reject(f"操作不正确,请重新输入!{method_str}")
|
||||
state[state["_current_key"]] = str(event.get_message())
|
||||
method_flag = str(event.get_message())
|
||||
if method_flag in ['1', '修改尺寸']:
|
||||
if state["_current_key"] == 'x':
|
||||
if not is_number(str(event.get_message())):
|
||||
await update_img.reject("宽度不正确!请重新输入数字...")
|
||||
state[state["_current_key"]] = str(event.get_message())
|
||||
if state["_current_key"] == 'y':
|
||||
if not is_number(str(event.get_message())):
|
||||
await update_img.reject("长度不正确!请重新输入数字...")
|
||||
state[state["_current_key"]] = str(event.get_message())
|
||||
elif method_flag in ['2', '等比压缩', '3', '旋转图片']:
|
||||
if state["_current_key"] == 'x':
|
||||
if not is_number(str(event.get_message())):
|
||||
await update_img.reject("比率不正确!请重新输入数字...")
|
||||
state[state["_current_key"]] = str(event.get_message())
|
||||
state['y'] = ''
|
||||
elif method_flag in ['4', '水平翻转', '5', '铅笔滤镜', '6', '模糊效果', '7', '锐化效果', '8', '高斯模糊', '9', '边缘检测']:
|
||||
state['x'] = ''
|
||||
state['y'] = ''
|
||||
elif method_flag in ['10', '底色替换']:
|
||||
if state["_current_key"] == 'x':
|
||||
if str(event.get_message()) not in ['红色', '蓝色', '红', '蓝']:
|
||||
await update_img.reject('请输入支持的被替换的底色:\n红色 蓝色')
|
||||
state['x'] = str(event.get_message())
|
||||
if state["_current_key"] == 'y':
|
||||
if str(event.get_message()) not in ['红色', '白色', '蓝色', '绿色', '黄色', '红', '白', '蓝', '绿', '黄']:
|
||||
await update_img.reject('请输入支持的替换的底色:\n红色 蓝色 白色 绿色')
|
||||
state['y'] = str(event.get_message())
|
||||
if state["_current_key"] == 'imgs':
|
||||
if not get_message_imgs(event.json()):
|
||||
await update_img.reject("没图?没图?没图?来图速来!")
|
||||
state[state["_current_key"]] = get_message_imgs(event.json())
|
||||
|
||||
|
||||
@update_img.handle()
|
||||
async def _(bot: Bot, event: MessageEvent, state: T_State):
|
||||
if str(event.get_message()) in ['帮助']:
|
||||
await update_img.finish(image('update_img_help.png'))
|
||||
raw_arg = get_message_text(event.json())
|
||||
img_list = get_message_imgs(event.json())
|
||||
if raw_arg:
|
||||
args = raw_arg.split("[")[0].split(" ")
|
||||
print(args)
|
||||
state['method'] = args[0]
|
||||
if len(args) == 2:
|
||||
if args[0] in ["等比压缩", '旋转图片']:
|
||||
if is_number(args[1]):
|
||||
state['x'] = args[1]
|
||||
state['y'] = ''
|
||||
elif len(args) > 2:
|
||||
if args[0] in ["修改尺寸"]:
|
||||
if is_number(args[1]):
|
||||
state['x'] = args[1]
|
||||
if is_number(args[2]):
|
||||
state['y'] = args[2]
|
||||
if args[0] in ['底色替换']:
|
||||
if args[1] in ['红色', '蓝色', '蓝', '红']:
|
||||
state['x'] = args[1]
|
||||
if args[2] in ['红色', '白色', '蓝色', '绿色', '黄色', '红', '白', '蓝', '绿', '黄']:
|
||||
state['y'] = args[2]
|
||||
if args[0] in ['水平翻转', '铅笔滤镜', '模糊效果', '锐化效果', '高斯模糊', '边缘检测']:
|
||||
state['x'] = ''
|
||||
state['y'] = ''
|
||||
if img_list:
|
||||
state['imgs'] = img_list
|
||||
|
||||
|
||||
@update_img.got("method", prompt=f"要使用图片的什么操作呢?{method_str}")
|
||||
@update_img.got("x", prompt="[宽度? 比率? 旋转角度? 底色?]")
|
||||
@update_img.got("y", prompt="[长度? 0 0 底色?]")
|
||||
@update_img.got("imgs", prompt="图呢图呢图呢图呢?GKD!")
|
||||
async def _(bot: Bot, event: MessageEvent, state: T_State):
|
||||
method = state['method']
|
||||
x = state['x'] if state['x'] else ''
|
||||
y = state['y'] if state['y'] else ''
|
||||
img_list = state['imgs']
|
||||
if is_number(x):
|
||||
x = float(x)
|
||||
if is_number(y):
|
||||
y = int(y)
|
||||
index = 0
|
||||
result = ''
|
||||
async with aiohttp.ClientSession() as session:
|
||||
for img_url in img_list:
|
||||
async with session.get(img_url, timeout=7) as response:
|
||||
if response.status == 200:
|
||||
async with aiofiles.open(IMAGE_PATH + f'temp/{event.user_id}_{index}_update.png', 'wb') as f:
|
||||
await f.write(await response.read())
|
||||
index += 1
|
||||
else:
|
||||
logger.warning(f"USER {event.user_id} GROUP "
|
||||
f"{event.group_id if event.message_type != 'private' else 'private'} "
|
||||
f"使用 {method} 时下载图片超时")
|
||||
await update_img.finish("获取图片超时了...", at_sender=True)
|
||||
if index == 0:
|
||||
return
|
||||
if method in ['修改尺寸', '1']:
|
||||
for i in range(index):
|
||||
img = Image.open(IMAGE_PATH + f'temp/{event.user_id}_{i}_update.png')
|
||||
img = img.convert('RGB')
|
||||
img = img.resize((int(x), int(y)), Image.ANTIALIAS)
|
||||
result += image(b64=pic2b64(img))
|
||||
await update_img.finish(result, at_sender=True)
|
||||
if method in ['等比压缩', '2']:
|
||||
for i in range(index):
|
||||
img = Image.open(IMAGE_PATH + f'temp/{event.user_id}_{i}_update.png')
|
||||
width, height = img.size
|
||||
img = img.convert('RGB')
|
||||
if width * x < 8000 and height * x < 8000:
|
||||
img = img.resize((int(x * width), int(x * height)))
|
||||
result += image(b64=pic2b64(img))
|
||||
else:
|
||||
await update_img.finish('小真寻不支持图片压缩后宽或高大于8000的存在!!')
|
||||
if method in ['旋转图片', '3']:
|
||||
for i in range(index):
|
||||
img = Image.open(IMAGE_PATH + f'temp/{event.user_id}_{i}_update.png')
|
||||
img = img.rotate(x)
|
||||
result += image(b64=pic2b64(img))
|
||||
if method in ['水平翻转', '4']:
|
||||
for i in range(index):
|
||||
img = Image.open(IMAGE_PATH + f'temp/{event.user_id}_{i}_update.png')
|
||||
img = img.transpose(Image.FLIP_LEFT_RIGHT)
|
||||
result += image(b64=pic2b64(img))
|
||||
if method in ['铅笔滤镜', '5']:
|
||||
for i in range(index):
|
||||
img = Image.open(IMAGE_PATH + f'temp/{event.user_id}_{i}_update.png').filter(ImageFilter.CONTOUR)
|
||||
result += image(b64=pic2b64(img))
|
||||
if method in ['模糊效果', '6']:
|
||||
for i in range(index):
|
||||
img = Image.open(IMAGE_PATH + f'temp/{event.user_id}_{i}_update.png').filter(ImageFilter.BLUR)
|
||||
result += image(b64=pic2b64(img))
|
||||
if method in ['锐化效果', '7']:
|
||||
for i in range(index):
|
||||
img = Image.open(IMAGE_PATH + f'temp/{event.user_id}_{i}_update.png').filter(ImageFilter.EDGE_ENHANCE)
|
||||
result += image(b64=pic2b64(img))
|
||||
if method in ['高斯模糊', '8']:
|
||||
for i in range(index):
|
||||
img = Image.open(IMAGE_PATH + f'temp/{event.user_id}_{i}_update.png').filter(ImageFilter.GaussianBlur)
|
||||
result += image(b64=pic2b64(img))
|
||||
if method in ['边缘检测', '9']:
|
||||
for i in range(index):
|
||||
img = Image.open(IMAGE_PATH + f'temp/{event.user_id}_{i}_update.png').filter(ImageFilter.FIND_EDGES)
|
||||
result += image(b64=pic2b64(img))
|
||||
if method in ['底色替换', '10']:
|
||||
if x in ['蓝色', '蓝']:
|
||||
lower = np.array([90, 70, 70])
|
||||
upper = np.array([110, 255, 255])
|
||||
if x in ['红色', '红']:
|
||||
lower = np.array([0, 135, 135])
|
||||
upper = np.array([180, 245, 230])
|
||||
if y in ['蓝色', '蓝']:
|
||||
color = (255, 0, 0)
|
||||
if y in ['红色', '红']:
|
||||
color = (0, 0, 255)
|
||||
if y in ['白色', '白']:
|
||||
color = (255, 255, 255)
|
||||
if y in ['绿色', '绿']:
|
||||
color = (0, 255, 0)
|
||||
if y in ['黄色', '黄']:
|
||||
color = (0, 255, 255)
|
||||
for k in range(index):
|
||||
img = cv2.imread(IMAGE_PATH + f'temp/{event.user_id}_{k}_update.png')
|
||||
img = cv2.resize(img, None, fx=0.3, fy=0.3)
|
||||
rows, cols, channels = img.shape
|
||||
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
|
||||
mask = cv2.inRange(hsv, lower, upper)
|
||||
# erode = cv2.erode(mask, None, iterations=1)
|
||||
dilate = cv2.dilate(mask, None, iterations=1)
|
||||
for i in range(rows):
|
||||
for j in range(cols):
|
||||
if dilate[i, j] == 255:
|
||||
img[i, j] = color
|
||||
cv2.imwrite(IMAGE_PATH + f'temp/{event.user_id}_{k}_ok_update.png', img)
|
||||
for i in range(index):
|
||||
result += image(f'{event.user_id}_{i}_ok_update.png', 'temp')
|
||||
if is_number(method):
|
||||
method = method_list[int(method) - 1]
|
||||
logger.info(
|
||||
f"(USER {event.user_id}, GROUP"
|
||||
f" {event.group_id if event.message_type != 'private' else 'private'}) 使用{method}")
|
||||
await update_img.finish(result, at_sender=True)
|
||||
|
||||
136
plugins/white2black_img.py
Normal file
136
plugins/white2black_img.py
Normal file
@ -0,0 +1,136 @@
|
||||
from nonebot.typing import T_State
|
||||
from nonebot.adapters.cqhttp import Bot, MessageEvent
|
||||
from nonebot import on_command
|
||||
from util.utils import get_message_imgs, get_local_proxy, get_message_text, is_Chinese
|
||||
from util.init_result import image
|
||||
import aiohttp
|
||||
import aiofiles
|
||||
from configs.path_config import IMAGE_PATH
|
||||
from util.img_utils import CreateImg
|
||||
from util.user_agent import get_user_agent
|
||||
from services.log import logger
|
||||
|
||||
# ZH_CN2EN 中文 » 英语
|
||||
# ZH_CN2JA 中文 » 日语
|
||||
# ZH_CN2KR 中文 » 韩语
|
||||
# ZH_CN2FR 中文 » 法语
|
||||
# ZH_CN2RU 中文 » 俄语
|
||||
# ZH_CN2SP 中文 » 西语
|
||||
# EN2ZH_CN 英语 » 中文
|
||||
# JA2ZH_CN 日语 » 中文
|
||||
# KR2ZH_CN 韩语 » 中文
|
||||
# FR2ZH_CN 法语 » 中文
|
||||
# RU2ZH_CN 俄语 » 中文
|
||||
# SP2ZH_CN 西语 » 中文
|
||||
|
||||
|
||||
__plugin_usage__ = '用法: \n\t黑白图 文字 图片\n示例:黑白草图 没有人不喜欢萝莉 (图片)'
|
||||
|
||||
w2b_img = on_command('黑白草图', aliases={'黑白图'}, priority=5, block=True)
|
||||
|
||||
|
||||
@w2b_img.handle()
|
||||
async def _(bot: Bot, event: MessageEvent, state: T_State):
|
||||
if not get_message_text(event.json()) or get_message_text(event.json()) in ['帮助']:
|
||||
await w2b_img.finish(__plugin_usage__)
|
||||
# try:
|
||||
img = get_message_imgs(event.json())
|
||||
msg = get_message_text(event.json())
|
||||
if not img or not msg:
|
||||
await w2b_img.finish(f"格式错误:\n" + __plugin_usage__)
|
||||
img = img[0]
|
||||
async with aiohttp.ClientSession(headers=get_user_agent()) as session:
|
||||
async with session.get(img, proxy=get_local_proxy()) as response:
|
||||
async with aiofiles.open(IMAGE_PATH + f'temp/{event.user_id}_w2b.png', 'wb') as f:
|
||||
await f.write(await response.read())
|
||||
msg = await get_translate(msg)
|
||||
w2b = CreateImg(0, 0, background=IMAGE_PATH + f'temp/{event.user_id}_w2b.png')
|
||||
w2b.convert('L')
|
||||
msg_sp = msg.split('<|>')
|
||||
w, h = w2b.size
|
||||
add_h, font_size = init_h_font_size(h)
|
||||
bg = CreateImg(w, h + add_h, color='black', font_size=font_size)
|
||||
bg.paste(w2b)
|
||||
chinese_msg = formalization_msg(msg)
|
||||
if not bg.check_font_size(chinese_msg):
|
||||
if len(msg_sp) == 1:
|
||||
centered_text(bg, chinese_msg, add_h)
|
||||
else:
|
||||
centered_text(bg, chinese_msg + '<|>' + msg_sp[1], add_h)
|
||||
elif not bg.check_font_size(msg_sp[0]):
|
||||
centered_text(bg, msg, add_h)
|
||||
else:
|
||||
ratio = (bg.getsize(msg_sp[0])[0] + 20) / bg.w
|
||||
add_h = add_h * ratio
|
||||
bg.resize(ratio)
|
||||
centered_text(bg, msg, add_h)
|
||||
await w2b_img.send(image(b64=bg.pic2bs4()))
|
||||
logger.info(
|
||||
f"(USER {event.user_id}, GROUP {event.group_id if event.message_type != 'private' else 'private'})"
|
||||
f" 制作黑白草图 {msg}")
|
||||
|
||||
|
||||
def centered_text(img: CreateImg, text: str, add_h: int):
|
||||
top_h = img.h - add_h + (img.h / 100)
|
||||
bottom_h = img.h - (img.h / 100)
|
||||
text_sp = text.split('<|>')
|
||||
w, h = img.getsize(text_sp[0])
|
||||
if len(text_sp) == 1:
|
||||
w = (img.w - w) / 2
|
||||
h = top_h + (bottom_h - top_h - h) / 2
|
||||
img.text((w, h), text_sp[0], (255, 255, 255))
|
||||
else:
|
||||
br_h = top_h + (bottom_h - top_h) / 2
|
||||
w = (img.w - w) / 2
|
||||
h = top_h + (br_h - top_h - h) / 2
|
||||
img.text((w, h), text_sp[0], (255, 255, 255))
|
||||
w, h = img.getsize(text_sp[1])
|
||||
w = (img.w - w) / 2
|
||||
h = br_h + (bottom_h - br_h - h) / 2
|
||||
img.text((w, h), text_sp[1], (255, 255, 255))
|
||||
|
||||
|
||||
async def get_translate(msg: str) -> str:
|
||||
url = f'http://fanyi.youdao.com/translate?smartresult=dict&smartresult=rule&smartresult=ugc&sessionFrom=null'
|
||||
data = {
|
||||
'type': "ZH_CN2JA",
|
||||
'i': msg,
|
||||
"doctype": "json",
|
||||
"version": "2.1",
|
||||
"keyfrom": "fanyi.web",
|
||||
"ue": "UTF-8",
|
||||
"action": "FY_BY_CLICKBUTTON",
|
||||
"typoResult": "true"
|
||||
}
|
||||
async with aiohttp.ClientSession(headers=get_user_agent()) as session:
|
||||
try:
|
||||
async with session.post(url, data=data, proxy=get_local_proxy()) as res:
|
||||
data = await res.json()
|
||||
if data['errorCode'] == 0:
|
||||
translate = data['translateResult'][0][0]['tgt']
|
||||
msg += '<|>' + translate
|
||||
except Exception as e:
|
||||
logger.warning(f'黑白草图翻译出错 e:{e}')
|
||||
return msg
|
||||
|
||||
|
||||
def formalization_msg(msg: str) -> str:
|
||||
rst = ''
|
||||
for i in range(len(msg)):
|
||||
if is_Chinese(msg[i]):
|
||||
rst += msg[i] + ' '
|
||||
else:
|
||||
rst += msg[i]
|
||||
if i + 1 < len(msg) and is_Chinese(msg[i + 1]) and msg[i].isalpha():
|
||||
rst += ' '
|
||||
return rst
|
||||
|
||||
|
||||
def init_h_font_size(h):
|
||||
# 高度 字体
|
||||
if h < 400:
|
||||
return init_h_font_size(400)
|
||||
elif 400 < h < 800:
|
||||
return init_h_font_size(800)
|
||||
return h * 0.2, h * 0.05
|
||||
|
||||
Loading…
Reference in New Issue
Block a user