Delete member_activity_handle.py

This commit is contained in:
HibiKier 2021-06-30 19:54:14 +08:00 committed by GitHub
parent f0774368b8
commit c609b5db07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,214 +0,0 @@
from nonebot import on_message, on_command
import time
from utils.utils import scheduler, get_bot, get_message_text, is_number, get_message_at
from models.group_member_info import GroupInfoUser
from services.log import logger
from nonebot.adapters.cqhttp import Bot, GroupMessageEvent, GROUP
from nonebot.typing import T_State
from pathlib import Path
from configs.path_config import DATA_PATH
try:
import ujson as json
except ModuleNotFoundError:
import json
__plugin_name__ = '群员发言检测 [Hidden]'
check_activity = on_message(priority=1, permission=GROUP, block=False)
show_setting = on_command('群员活跃检测设置', permission=GROUP, priority=1, block=True)
set_check_time = on_command('设置群员活跃检测时长', permission=GROUP, priority=1, block=True)
set_white_list = on_command('添加群员活跃检测白名单', aliases={'移除群员活跃检测白名单'}, permission=GROUP, priority=1, block=True)
show_white_list = on_command('查看群员活跃检测白名单', permission=GROUP, priority=1, block=True)
_file = Path(f'{DATA_PATH}/member_activity_check.json')
try:
data = json.load(open(_file, 'r', encoding='utf8'))
except (FileNotFoundError, ValueError, TypeError):
data = {
'check_time': time.time()
}
@check_activity.handle()
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
group = str(event.group_id)
user_id = str(event.user_id)
now = time.time()
if not data.get(group):
await _init_group_member_info(bot, group)
data[group]['data'][user_id] = now
if now - data['check_time'] > 10 * 60:
data['check_time'] = now
with open(_file, 'w', encoding='utf8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
logger.info('群员发言时间检测存储数据成功....')
@set_check_time.handle()
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
msg = get_message_text(event.json())
if not is_number(msg):
await set_check_time.finish('请检查输入是否是数字....', at_sender=True)
group = str(event.group_id)
if not data.get(group):
await _init_group_member_info(bot, group)
data[group]['check_time_day'] = int(msg)
await set_check_time.send(f'设置群员活跃检测时长成功:{msg}\n【设置为 0 即为关闭】')
with open(_file, 'w', encoding='utf8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
logger.info(f'USER {event.user_id} GROUP {event.group_id} 设置群员活跃检测时长:{msg}')
@set_white_list.handle()
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
group = str(event.group_id)
at = get_message_at(event.json())
if not data.get(group):
await _init_group_member_info(bot, group)
if at:
rst = ''
if state["_prefix"]["raw_command"] == '添加群员活跃检测白名单':
for user in at:
try:
user_name = (await GroupInfoUser.select_member_info(int(user), event.group_id)).user_name
except AttributeError:
user_name = str(user)
if str(user) not in data[group]['white_list']:
rst += f'{user_name} '
data[group]['white_list'].append(str(user))
else:
await set_white_list.send(f'{user_name} 已在群员活跃检测白名单中...')
rst = f'已将\n{rst}\n等添加入群员活跃检测白名单'
else:
for user in at:
try:
user_name = (await GroupInfoUser.select_member_info(int(user), event.group_id)).user_name
except AttributeError:
user_name = str(user)
if str(user) in data[group]['white_list']:
rst += f'{user_name} '
data[group]['white_list'].remove(str(user))
else:
await set_white_list.send(f'{user_name} 未在群员活跃检测白名单中...')
rst = f'已将 \n{rst}\n等添移除群员活跃检测白名单'
await set_white_list.send(rst, sender=True)
logger.info(f'群员活跃检测白名单变动 USER {event.user_id}{rst}')
with open(_file, 'w', encoding='utf8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
else:
await set_white_list.finish('添加群员活跃检测白名单时请艾特对象!', at_sender=True)
@show_white_list.handle()
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
group = str(event.group_id)
if not data.get(group):
await _init_group_member_info(bot, group)
if data[group]['white_list']:
user_name_list = []
for user in data[group]['white_list']:
try:
user_name = (await GroupInfoUser.select_member_info(int(user), event.group_id)).user_name
except AttributeError:
user_name = str(user)
user_name_list.append(user_name)
rst = ''
for i in range(len(user_name_list)):
rst += f'{user_name_list[i]}({data[group]["white_list"][i]})\n'
await show_white_list.send(rst)
else:
await show_white_list.finish('群员活跃检测白名单中没有任何用户...')
@show_setting.handle()
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
group = str(event.group_id)
user_name_list = []
if not data.get(group):
await _init_group_member_info(bot, group)
if data[group]['white_list']:
for user in data[group]['white_list']:
try:
user_name = (await GroupInfoUser.select_member_info(int(user), event.group_id)).user_name
except AttributeError:
user_name = str(user)
user_name_list.append(user_name)
await show_setting.send(f'群员活跃检测:\n'
f'检测天数:{data[group]["check_time_day"]}\n'
f'白名单:{user_name_list}\n'
f'【检测天数=0时为关闭】')
@scheduler.scheduled_job(
'cron',
hour=7,
minute=1,
)
async def _():
bot = get_bot()
now = time.time()
rst = '群员发言时间检测:\n'
for group in data.keys():
if group != 'check_time':
member_list = await bot.get_group_member_list(group_id=int(group), self_id=int(bot.self_id))
member_list = [x['user_id'] for x in member_list]
for user in member_list:
if user not in data[group]['data']:
data[group][str(user)] = now
logger.info('群员活跃检测:自动更新群员完成...')
if group != 'check_time' and data[group]['check_time_day'] > 0:
for user in list(data[group]['data'].keys()):
user = str(user)
if user not in data[group]['white_list'] and \
now - data[group]['data'][user] > data[group]['check_time_day'] * 24 * 60 * 60:
try:
# await bot.set_group_kick(group_id=int(group), user_id=int(user))
t = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(data[group]['data'][user]))
try:
user_name = (await GroupInfoUser.select_member_info(int(user), int(group))).user_name
except AttributeError:
user_name = user
rst += f'检测 {user_name}({user}) 上次发言时间过久:\n时间:{t}\n'
# await bot.send_group_msg(group_id=int(group),
# message=f'群员发言时间检测:检测 {user_name}({user}) 上次'
# f'发言时间过久:..\n时间{t}..')
del data[group]['data'][user]
except Exception as e:
logger.warning(f'群员活跃检测:{user} 踢出失败,疑真寻不是管理或对方是管理或不在群内...e{e}')
if rst:
await bot.send_group_msg(group_id=int(group),
message=rst[:-1])
with open(_file, 'w', encoding='utf8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
async def _init_group_member_info(bot: Bot, group: str):
global data
now = time.time()
data[group] = {}
data[group]['data'] = {}
data[group]['white_list'] = []
data[group]['check_time_day'] = 0
member_list = await bot.get_group_member_list(group_id=int(group), self_id=int(bot.self_id))
member_list = [x['user_id'] for x in member_list]
for user in member_list:
data[group]['data'][str(user)] = now
with open(_file, 'w', encoding='utf8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)