This commit is contained in:
hibiki 2021-06-23 15:57:03 +08:00
parent 862776b011
commit f84896213a
17 changed files with 400 additions and 149 deletions

View File

@ -1,5 +1,6 @@
from .utils.util import get_config_data
from typing import List
from services.service_config import TL_M_KEY, SYSTEM_PROXY
try:
import ujson as json
except ModuleNotFoundError:
@ -11,20 +12,20 @@ USE_CONFIG_FILE = False
# API KEY必要
LOLICON_KEY: str = "" # lolicon(可不填lolicon已放开api限制)
LOLICON_KEY: str = "" # lolicon
RSSHUBAPP: str = "https://rsshub.app/" # rsshub
# 图灵
TL_KEY: List[str] = []
# 数据库(必要)
# 示例:"bind": "postgresql://user:password@127.0.0.1:5432/database"
bind: str = '' # 数据库连接url
sql_name: str = 'postgresql'
user: str = '' # 数据库用户名
password: str = '' # 数据库密码
address: str = '' # 数据库地址
port: str = '' # 数据库端口
database: str = '' # 数据库名称
# 如果填写了bind就不需要再填写后面的字段了#
bind: str = ''
sql_name: str = ''
user: str = ''
password: str = ''
address: str = ''
port: str = ''
database: str = ''
# 公开图库列表
@ -54,7 +55,7 @@ FGO_FLAG = True # 命运-冠位指定FGO
ONMYOJI_FLAG = True # 阴阳师
PCR_TAI = True # pcr是否开启台服卡池
SEMAPHORE = 5 # 限制更新碧蓝航线和FGO数据并发数
SEMAPHORE = 5 # 限制碧蓝航线和FGO并发数
ADMIN_DEFAULT_AUTH: int = 5 # 默认群管理员权限
@ -114,7 +115,6 @@ plugins2name_dict = {
'my_gold': ['我的金币'],
'my_props': ['我的道具'],
'shop_handle': ['商店'],
'nonebot_plugin_cocdicer': ['骰子娘'],
'update_pic': ['图片', '操作图片', '修改图片'],
'search_buff_skin_price': ['查询皮肤'],
'weather': ['天气', '查询天气', '天气查询'],
@ -138,6 +138,46 @@ plugins2name_dict = {
'gold_redbag': ['塞红包', '红包', '抢红包']
}
# 功能所需的群权限
plugins2level_dict = {
'sign_in': 5,
'send_img': 5,
'send_setu': 9,
'white2black': 5,
'coser': 9,
'quotations': 5,
'jitang': 5,
'send_dinggong_voice': 5,
'open_cases': 5,
'luxun': 5,
'fake_msg': 5,
'buy': 5,
'my_gold': 5,
'my_props': 5,
'shop_handle': 5,
'update_pic': 5,
'search_buff_skin_price': 5,
'weather': 5,
'yiqing': 5,
'what_anime': 5,
'search_anime': 5,
'songpicker2': 5,
'epic': 5,
'pixiv': 9,
'poke': 5,
'draw_card': 5,
'ai': 5,
'one_friend': 5,
'translate': 5,
'nonebot_plugin_picsearcher': 5,
'almanac': 5,
'material_remind': 5,
'qiu_qiu_translation': 5,
'query_resource_points': 5,
'russian': 5,
'gold_redbag': 5
}
# 群管理员功能 与 对应权限
admin_plugins_auth = {
'admin_bot_manage': OC_LEVEL,
@ -149,6 +189,12 @@ admin_plugins_auth = {
'mute': MUTE_LEVEL,
}
if TL_M_KEY:
TL_KEY = TL_M_KEY
if SYSTEM_PROXY:
system_proxy = SYSTEM_PROXY
# 配置文件应用
if USE_CONFIG_FILE:
config = get_config_data()

View File

@ -92,6 +92,7 @@ async def _(bot: Bot, event: MessageEvent, state: T_State):
if group_id:
if user_id:
await bot.send_group_msg(group_id=group_id, message=at(user_id) + "\n管理员回复\n=======\n" + text)
await reply.finish("发送成功", at_sender=True)
else:
await bot.send_group_msg(group_id=group_id, message=text)
await reply.finish("发送成功", at_sender=True)

View File

@ -16,10 +16,12 @@ headers = {'User-Agent': '"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Te
prts_up_char = Path(DRAW_PATH + "/draw_card_up/prts_up_char.json")
genshin_up_char = Path(DRAW_PATH + "/draw_card_up/genshin_up_char.json")
pretty_up_char = Path(DRAW_PATH + "/draw_card_up/pretty_up_char.json")
guardian_up_char = Path(DRAW_PATH + "/draw_card_up/guardian_up_char.json")
prts_url = "https://wiki.biligame.com/arknights/%E6%96%B0%E9%97%BB%E5%85%AC%E5%91%8A"
genshin_url = "https://wiki.biligame.com/ys/%E7%A5%88%E6%84%BF"
pretty_url = "https://wiki.biligame.com/umamusume/%E5%85%AC%E5%91%8A"
guardian_url = "https://wiki.biligame.com/gt/%E9%A6%96%E9%A1%B5"
# 是否过时
@ -36,10 +38,10 @@ def is_expired(data: dict):
# 检查写入
def check_write(data: dict, up_char_file, game_name: str = ''):
tmp = data
if game_name in ['genshin', 'pretty']:
if game_name in ['genshin', 'pretty', 'guardian']:
tmp = data['char']
if not is_expired(tmp):
if game_name in ['genshin']:
if game_name in ['genshin', 'guardian']:
data['char']['title'] = ''
data['arms']['title'] = ''
elif game_name in ['pretty']:
@ -57,7 +59,7 @@ def check_write(data: dict, up_char_file, game_name: str = ''):
with open(up_char_file, 'r', encoding='utf8') as f:
old_data = json.load(f)
tmp = old_data
if game_name in ['genshin', 'pretty']:
if game_name in ['genshin', 'pretty', 'guardian']:
tmp = old_data['char']
if is_expired(tmp):
return old_data
@ -256,28 +258,30 @@ class PrettyAnnouncement:
for msg in x:
if msg.find('') != -1:
msg = msg.replace('<br/>', '')
msg = msg.split(' ')
if (star := len(msg[0].strip())) == 3:
data['char']['up_char']['3'][msg[1]] = '70'
char_name = msg[msg.find('['):].strip()
if (star := len(msg[:msg.find('[')].strip())) == 3:
data['char']['up_char']['3'][char_name] = '70'
elif star == 2:
data['char']['up_char']['2'][msg[1]] = '70'
data['char']['up_char']['2'][char_name] = '70'
elif star == 1:
data['char']['up_char']['1'][msg[1]] = '70'
data['char']['up_char']['1'][char_name] = '70'
if str(p).find('当期UP对象') != -1 and str(p).find('赛马娘') == -1:
data['card']['pool_img'] = p.find('img')['src']
r = re.search(r'■全?新?支援卡当期UP对象([\s\S]*)</p>', str(p))
if r:
rmsg = r.group(1)
rmsg = r.group(1).strip()
rmsg = rmsg.split('<br/>')
for x in rmsg[1:]:
rmsg = [x for x in rmsg if x]
for x in rmsg:
x = x.replace('\n', '').replace('', '')
x = x.split(' ')
if x[0] == 'SSR':
data['card']['up_char']['3'][x[1]] = '70'
if x[0] == 'SR':
data['card']['up_char']['2'][x[1]] = '70'
if x[0] == 'R':
data['card']['up_char']['1'][x[1]] = '70'
star = x[:x.find('[')].strip()
char_name = x[x.find('['):].strip()
if star == 'SSR':
data['card']['up_char']['3'][char_name] = '70'
if star == 'SR':
data['card']['up_char']['2'][char_name] = '70'
if star == 'R':
data['card']['up_char']['1'][char_name] = '70'
# 日文->中文
with open(DRAW_PATH + 'pretty_card.json', 'r', encoding='utf8') as f:
all_data = json.load(f)
@ -291,8 +295,67 @@ class PrettyAnnouncement:
data['card']['up_char'][star][all_data[x]['中文名']] = '70'
except TimeoutError:
logger.warning(f'更新赛马娘UP池信息超时...')
with open(pretty_up_char, 'r', encoding='utf8') as f:
data = json.load(f)
if pretty_up_char.exists():
with open(pretty_up_char, 'r', encoding='utf8') as f:
data = json.load(f)
except Exception as e:
logger.error(f'赛马娘up更新失败 {type(e)}{e}')
return check_write(data, pretty_up_char, 'pretty')
class GuardianAnnouncement:
@staticmethod
async def get_announcement_text():
async with aiohttp.ClientSession(headers=headers) as session:
async with session.get(guardian_url, timeout=7) as res:
return await res.text()
@staticmethod
async def update_up_char():
data = {
'char': {'up_char': {'3': {}}, 'title': '', 'time': '', 'pool_img': ''},
'arms': {'up_char': {'5': {}}, 'title': '', 'time': '', 'pool_img': ''}
}
try:
text = await GuardianAnnouncement.get_announcement_text()
soup = BeautifulSoup(text, 'lxml')
context = soup.select('div.col-sm-3:nth-child(3) > div:nth-child(2) > div:nth-child(1) '
'> div:nth-child(2) > div:nth-child(3) > font:nth-child(1)')[0]
title = context.find('p').find('b').text
tmp = title.split('')
time = ''
for msg in tmp:
r = re.search(r'[从|至](.*)(开始|结束)', msg)
if r:
time += r.group(1).strip() + ' - '
time = time[:-3]
title = time.split(' - ')[0] + 'UP卡池'
data['char']['title'] = title
data['arms']['title'] = title
data['char']['time'] = time
data['arms']['time'] = time
start_idx = -1
end_idx = -1
index = 0
divs = context.find_all('div')
for x in divs:
if x.text == '角色':
start_idx = index
if x.text == '武器':
end_idx = index
break
index += 1
for x in divs[start_idx+1: end_idx]:
name = x.find('p').find_all('a')[-1].text
data['char']['up_char']['3'][name] = '0'
for x in divs[end_idx+1:]:
name = x.find('p').find_all('a')[-1].text
data['arms']['up_char']['5'][name] = '0'
except TimeoutError:
print(f'更新坎公骑冠剑UP池信息超时...')
with open(pretty_up_char, 'r', encoding='utf8') as f:
data = json.load(f)
except Exception as e:
print(f'坎公骑冠剑up更新失败 {type(e)}{e}')
return check_write(data, guardian_up_char, 'guardian')

View File

@ -3,8 +3,10 @@ import os
import nonebot
from nonebot.adapters.cqhttp import MessageSegment
from .update_game_info import update_info
from services.log import logger
from .announcement import GuardianAnnouncement
from .util import init_star_rst, generate_img, max_card, BaseData,\
set_list, get_star, format_card_information
set_list, get_star, format_card_information, UpEvent
import random
from .config import DRAW_PATH, GUARDIAN_ONE_CHAR_P, GUARDIAN_TWO_CHAR_P, GUARDIAN_THREE_CHAR_P, \
GUARDIAN_THREE_CHAR_UP_P, GUARDIAN_TWO_ARMS_P, GUARDIAN_FIVE_ARMS_P, GUARDIAN_THREE_CHAR_OTHER_P, \
@ -12,7 +14,6 @@ from .config import DRAW_PATH, GUARDIAN_ONE_CHAR_P, GUARDIAN_TWO_CHAR_P, GUARDIA
GUARDIAN_EXCLUSIVE_ARMS_OTHER_P, GUARDIAN_FLAG
from dataclasses import dataclass
from .init_card_pool import init_game_pool
import asyncio
try:
import ujson as json
except ModuleNotFoundError:
@ -23,6 +24,12 @@ driver: nonebot.Driver = nonebot.get_driver()
ALL_CHAR = []
ALL_ARMS = []
_CURRENT_CHAR_POOL_TITLE = ''
_CURRENT_ARMS_POOL_TITLE = ''
UP_CHAR = []
UP_ARMS = []
POOL_IMG = ''
@dataclass
class GuardianChar(BaseData):
@ -41,12 +48,33 @@ async def guardian_draw(count: int, pool_name):
else:
cnlist = ['★★★', '★★', '']
star_list = [0, 0, 0]
title = ''
up_type = []
up_list = []
if pool_name == 'char' and _CURRENT_CHAR_POOL_TITLE:
up_type = UP_CHAR
title = _CURRENT_CHAR_POOL_TITLE
elif pool_name == 'arms' and _CURRENT_ARMS_POOL_TITLE:
up_type = UP_ARMS
title = _CURRENT_ARMS_POOL_TITLE
tmp = ''
if up_type:
for x in up_type:
for operator in x.operators:
up_list.append(operator)
if pool_name == 'char':
if x.star == 3:
tmp += f'三星UP{" ".join(x.operators)} \n'
else:
if x.star == 5:
tmp += f'五星UP{" ".join(x.operators)}'
obj_list, obj_dict, max_list, star_list, max_index_list = format_card_information(count, star_list,
_get_guardian_card, pool_name)
rst = init_star_rst(star_list, cnlist, max_list, max_index_list)
rst = init_star_rst(star_list, cnlist, max_list, max_index_list, up_list)
pool_info = f'当前up池{title}\n{tmp}' if title else ''
if count > 90:
obj_list = set_list(obj_list)
return MessageSegment.image(
return pool_info + '\n' + MessageSegment.image(
"base64://" + await generate_img(obj_list, 'guardian', star_list)) \
+ '\n' + rst[:-1] + '\n' + max_card(obj_dict)
@ -78,41 +106,66 @@ async def init_guardian_data():
guardian_arms_dict = json.load(f)
ALL_CHAR = init_game_pool('guardian', guardian_char_dict, GuardianChar)
ALL_ARMS = init_game_pool('guardian_arms', guardian_arms_dict, GuardianArms)
await _init_up_char()
# 抽取卡池
def _get_guardian_card(pool_name: str):
global ALL_CHAR, ALL_ARMS
if pool_name != 'arms':
global ALL_ARMS, ALL_CHAR, UP_ARMS, UP_CHAR, _CURRENT_ARMS_POOL_TITLE, _CURRENT_CHAR_POOL_TITLE
if pool_name == 'char':
star = get_star([3, 2, 1], [GUARDIAN_THREE_CHAR_P, GUARDIAN_TWO_CHAR_P, GUARDIAN_ONE_CHAR_P])
chars = [x for x in ALL_CHAR if x.star == star]
return random.choice(chars), 3 - star
up_lst = UP_CHAR
flag = _CURRENT_CHAR_POOL_TITLE
_max_star = 3
all_data = ALL_CHAR
else:
star = get_star([5, 4, 3, 2], [GUARDIAN_FIVE_ARMS_P, GUARDIAN_FOUR_ARMS_P,
GUARDIAN_THREE_ARMS_P, GUARDIAN_TWO_ARMS_P])
arms = [x for x in ALL_ARMS if x.star == star]
return random.choice(arms), 5 - star
# 整理数据
def _format_card_information(count: int, pool_name: str):
max_star_lst = []
max_index_lst = []
obj_list = []
obj_dict = {}
if pool_name == 'arms':
star_list = [0, 0, 0, 0]
up_lst = UP_ARMS
flag = _CURRENT_ARMS_POOL_TITLE
_max_star = 5
all_data = ALL_ARMS
# 是否UP
if flag and star == _max_star and pool_name:
# 获取up角色列表
up_char_lst = [x.operators for x in up_lst if x.star == star][0]
# 成功获取up角色
if random.random() < 0.5:
up_char_name = random.choice(up_char_lst)
acquire_char = [x for x in all_data if x.name == up_char_name][0]
else:
# 无up
all_char_lst = [x for x in all_data if x.star == star and x.name not in up_char_lst and not x.limited]
acquire_char = random.choice(all_char_lst)
else:
star_list = [0, 0, 0]
for i in range(count):
obj, code = _get_guardian_card(pool_name)
star_list[code] += 1
if code == 0:
max_star_lst.append(obj.name)
max_index_lst.append(i)
try:
obj_dict[obj.name] += 1
except KeyError:
obj_dict[obj.name] = 1
obj_list.append(obj)
return obj_list, obj_dict, max_star_lst, star_list, max_index_lst
chars = [x for x in all_data if x.star == star and not x.limited]
acquire_char = random.choice(chars)
return acquire_char, _max_star - star
# 获取up和概率
async def _init_up_char():
global _CURRENT_CHAR_POOL_TITLE, _CURRENT_ARMS_POOL_TITLE, UP_CHAR, UP_ARMS, POOL_IMG
UP_CHAR = []
UP_ARMS = []
up_char_dict = await GuardianAnnouncement.update_up_char()
_CURRENT_CHAR_POOL_TITLE = up_char_dict['char']['title']
_CURRENT_ARMS_POOL_TITLE = up_char_dict['arms']['title']
if _CURRENT_CHAR_POOL_TITLE and _CURRENT_ARMS_POOL_TITLE:
POOL_IMG = MessageSegment.image(up_char_dict['char']['pool_img']) + \
MessageSegment.image(up_char_dict['arms']['pool_img'])
logger.info(f'成功获取坎公骑冠剑当前up信息...当前up池: {_CURRENT_CHAR_POOL_TITLE} & {_CURRENT_ARMS_POOL_TITLE}')
for key in up_char_dict.keys():
for star in up_char_dict[key]['up_char'].keys():
up_char_lst = []
for char in up_char_dict[key]['up_char'][star].keys():
up_char_lst.append(char)
if key == 'char':
UP_CHAR.append(UpEvent(star=int(star), operators=up_char_lst, zoom=0))
else:
UP_ARMS.append(UpEvent(star=int(star), operators=up_char_lst, zoom=0))
async def reload_guardian_pool():
await _init_up_char()
return f'当前UP池子{_CURRENT_CHAR_POOL_TITLE} & {_CURRENT_ARMS_POOL_TITLE} {POOL_IMG}'

View File

@ -42,7 +42,12 @@ def init_game_pool(game: str, data: dict, Operator: Any):
tmp_lst.append(Operator(name=key, star=data[key]['初始星级'], limited=False))
if game == 'pretty_card':
for key in data.keys():
tmp_lst.append(Operator(name=data[key]['中文名'], star=len(data[key]['稀有度']), limited=False))
limited = False
if '卡池' not in data[key]['获取方式']:
limited = True
if not data[key]['获取方式']:
limited = False
tmp_lst.append(Operator(name=data[key]['中文名'], star=len(data[key]['稀有度']), limited=limited))
if game in ['guardian', 'guardian_arms']:
for key in data.keys():
tmp_lst.append(Operator(name=data[key]['名称'], star=int(data[key]['星级']), limited=False))
@ -55,9 +60,10 @@ def init_game_pool(game: str, data: dict, Operator: Any):
if game == 'azur':
for key in data.keys():
limited = False
if int(data[key]['星级']) > 4 or key.find('兵装') != -1 or key[-1] == '' or key.find('布里') != -1:
if '可以建造' not in data[key]['获取途径']:
limited = True
tmp_lst.append(Operator(name=data[key]['名称'], star=int(data[key]['星级']), limited=limited, itype=data[key]['类型']))
tmp_lst.append(Operator(name=data[key]['名称'], star=int(data[key]['星级']),
limited=limited, itype=data[key]['类型']))
if game in ['fgo', 'fgo_card']:
for key in data.keys():
limited = False
@ -74,5 +80,6 @@ def init_game_pool(game: str, data: dict, Operator: Any):
'灶门祢豆子', '灶门炭治郎']:
limited = True
tmp_lst.append(Operator(name=data[key]['名称'], star=data[key]['星级'], limited=limited))
# print(tmp_lst)
return tmp_lst

View File

@ -125,11 +125,17 @@ async def retrieve_char_data(char: bs4.element.Tag, game_name: str, data: dict,
if game_name == 'azur':
char = char.find('td').find('div')
avatar_img = char.find('a').find('img')
try:
member_dict['名称'] = remove_prohibited_str(str(avatar_img['alt'])[: str(avatar_img['alt']).find('头像')])
except TypeError:
member_dict['名称'] = char.find('a')['title'][:-4]
try:
member_dict['头像'] = unquote(str(avatar_img['srcset']).split(' ')[-2])
except KeyError:
member_dict['头像'] = unquote(str(avatar_img['src']).split(' ')[-2])
member_dict['名称'] = remove_prohibited_str(str(avatar_img['alt'])[: str(avatar_img['alt']).find('头像')])
except TypeError:
member_dict['头像'] = "img link not find..."
logger(f'{member_dict["名称"]} 图片缺失....')
star = char.find('div').find('img')['alt']
if star == '舰娘头像外框普通.png':
star = 1
@ -159,15 +165,19 @@ async def _async_update_azur_extra_info(key: str, session: aiohttp.ClientSession
try:
async with session.get(f'https://wiki.biligame.com/blhx/{key}', timeout=7) as res:
soup = BeautifulSoup(await res.text(), 'lxml')
construction_time = str(soup.find('table', {'class': 'wikitable sv-general'}).find('tbody'))
x = {key: {'获取途径': []}}
if construction_time.find('无法建造') != -1:
x[key]['获取途径'].append('无法建造')
elif construction_time.find('活动已关闭') != -1:
x[key]['获取途径'].append('活动限定')
else:
x[key]['获取途径'].append('可以建造')
logger.info(f'碧蓝航线获取额外信息 {key}...{x[key]["获取途径"]}')
try:
construction_time = str(soup.find('table', {'class': 'wikitable sv-general'}).find('tbody'))
x = {key: {'获取途径': []}}
if construction_time.find('无法建造') != -1:
x[key]['获取途径'].append('无法建造')
elif construction_time.find('活动已关闭') != -1:
x[key]['获取途径'].append('活动限定')
else:
x[key]['获取途径'].append('可以建造')
logger.info(f'碧蓝航线获取额外信息 {key}...{x[key]["获取途径"]}')
except AttributeError:
x = {key: {'获取途径': []}}
logger.warning(f'碧蓝航线获取额外信息错误 {key}...{[]}')
return x
except TimeoutError:
logger.warning(f'访问 https://wiki.biligame.com/blhx/{key}{i}次 超时...已再次访问')

View File

@ -15,7 +15,7 @@ url = 'https://rsshub.app/epicgames/freegames'
async def get_epic_game() -> str:
result = ''
result = '今天没有游戏可以白嫖了!'
async with aiohttp.ClientSession(headers=get_user_agent()) as session:
async with session.get(url, proxy=get_local_proxy(), timeout=7) as response:
data = feedparser.parse(await response.text())['entries']
@ -35,7 +35,6 @@ async def get_epic_game() -> str:
result = 'epic限免游戏速速白嫖\n' + result
else:
result = '今天没有游戏可以白嫖了!'
print(result)
return result

View File

@ -0,0 +1,89 @@
from nonebot import on_command, on_regex
from util.utils import get_message_text, is_number
from nonebot.rule import to_me
from services.log import logger
from nonebot.adapters.cqhttp import Bot, GroupMessageEvent, MessageEvent, GROUP
from nonebot.typing import T_State
from nonebot.matcher import Matcher
from nonebot.permission import SUPERUSER
from pathlib import Path
from configs.config import plugins2level_dict, plugins2name_dict
from nonebot.message import run_preprocessor, IgnoredException
try:
import ujson as json
except ModuleNotFoundError:
import json
__plugin_name__ = '群权限'
__plugin_usage__ = '区分权限功能'
group_level_data = Path() / 'data'/ 'manager' / 'group_level.json'
group_level_data.parent.mkdir(exist_ok=True, parents=True)
group_data = {}
if group_level_data.exists():
group_data = json.load(open(group_level_data, 'r', encoding='utf8'))
@run_preprocessor
async def _(matcher: Matcher, bot: Bot, event: GroupMessageEvent, state: T_State):
if matcher.type == 'message' and matcher.priority not in [1, 9]:
if isinstance(event, GroupMessageEvent):
if not group_data.get(str(event.group_id)):
group_data[str(event.group_id)] = 5
if plugins2level_dict.get(matcher.module):
if plugins2level_dict[matcher.module] > group_data[str(event.group_id)]:
await bot.send_group_msg(group_id=event.group_id, message='群权限不足...')
raise IgnoredException('群权限不足')
add_group_level = on_command('修改群权限', priority=1, permission=SUPERUSER, block=True)
my_group_level = on_command('查看群权限', aliases={'群权限'}, priority=5, permission=GROUP, block=True)
what_up_group_level = on_regex('.*?(提高|提升|升高|增加|加上)(.*?)群权限.*?', rule=to_me(), priority=5, permission=GROUP, block=True)
@add_group_level.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
msg = get_message_text(event.json())
if not msg:
await add_group_level.finish('用法:修改群权限 [group] [level]')
msg = msg.split(' ')
if len(msg) < 2:
await add_group_level.finish('参数不完全..[group] [level]')
if is_number(msg[0]) and is_number(msg[1]):
group_id = msg[0]
level = int(msg[1])
else:
await add_group_level.finish('参数错误...group和level必须是数字..')
if not group_data.get(group_id):
group_data[group_id] = 5
await add_group_level.send('修改成功...', at_sender=True)
await bot.send_group_msg(group_id=int(group_id), message=f'管理员修改了此群权限:{group_data[group_id]} -> {level}')
group_data[group_id] = level
with open(group_level_data, 'w', encoding='utf8') as f:
json.dump(group_data, f, ensure_ascii=False, indent=4)
logger.info(f'{event.user_id} 修改了 {group_id} 的权限:{level}')
@my_group_level.handle()
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
if group_data.get(str(event.group_id)):
level = group_data[str(event.group_id)]
tmp = ''
for plugin in plugins2level_dict:
if plugins2level_dict[plugin] > level:
plugin_name = plugins2name_dict[plugin][0]
if plugin_name == 'pixiv':
plugin_name = '搜图 p站排行'
tmp += f'{plugin_name}\n'
if tmp:
tmp = '\n目前无法使用的功能:\n' + tmp
await my_group_level.finish(f'当前群权限:{level}{tmp}')
@what_up_group_level.handle()
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
await what_up_group_level.finish(f'[此功能用于防止内鬼,如果引起不便那真是抱歉了]\n'
f'目前提高群权限的方法:\n'
f'\t1.管理员修改权限')

View File

@ -39,7 +39,7 @@ async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
async def _(bot: Bot, event: PrivateMessageEvent, state: T_State):
msg = get_message_text(event.json())
if not msg:
await _help.finish(image(abspath=''))
await _help.finish(image('help.png'))
else:
await _help.finish(get_plugin_help(msg))

View File

@ -55,4 +55,5 @@ other_help = [
'有人记得你是什么时候加入我们的 --> 指令:我的信息',
'让我看看更新了什么 --> 指令:更新信息',
'真寻给我把话收回去! --> 指令:撤回 [id](默认0)',
'群拥有的权限 --> 指令:查看群权限 '
]

View File

@ -2,6 +2,8 @@ from nonebot import on_command
from util.user_agent import get_user_agent
from services.log import logger
from nonebot.adapters.cqhttp import Bot, Event
from nonebot.matcher import Matcher
from nonebot.message import run_preprocessor, IgnoredException
from nonebot.typing import T_State
import aiohttp
from asyncio.exceptions import TimeoutError

View File

@ -29,11 +29,12 @@ send_img = on_command("img", aliases=cmd, priority=5, block=True)
async def _(bot: Bot, event: Event, state: T_State):
img_id = str(event.get_message())
path = cn2py(state["_prefix"]["raw_command"]) + '/'
if not os.path.exists(IMAGE_PATH + path):
logger.warning(f'未找到 {path} 文件夹,调用取消!')
return
if path in IMAGE_DIR_LIST:
if not os.path.exists(f'{IMAGE_PATH}/{path}/'):
os.mkdir(f'{IMAGE_PATH}/{path}/')
length = len(os.listdir(IMAGE_PATH + path)) - 1
if length == 0:
if length < 1:
await send_img.finish('该图库中没有图片噢')
logger.warning(f'图库 {path} 为空,调用取消!')
return
index = img_id if img_id else str(random.randint(0, length))

View File

@ -30,15 +30,18 @@ except (FileNotFoundError, ValueError):
'month_statistics': {
'total': {},
},
'start_time': str(datetime.now().date())
'start_time': str(datetime.now().date()),
'day_index': 0
}
# 添加命令次数
@run_postprocessor
async def _(matcher: Matcher, exception: Optional[Exception], bot: Bot, event: GroupMessageEvent, state: T_State):
global _prefix_count_dict
if matcher.type == 'message' and matcher.priority not in [1, 9]:
model = matcher.module
day_index = _prefix_count_dict['day_index']
# print(f'model --> {model}')
for plugin in plugins2name_dict:
if plugin == model:
@ -53,17 +56,19 @@ async def _(matcher: Matcher, exception: Optional[Exception], bot: Bot, event: G
_prefix_count_dict['day_statistics']['total'][plugin_name] += 1
_prefix_count_dict['week_statistics']['total'][plugin_name] += 1
_prefix_count_dict['month_statistics']['total'][plugin_name] += 1
# print(_prefix_count_dict)
if group_id != 'total':
_prefix_count_dict['total_statistics'][group_id][plugin_name] += 1
_prefix_count_dict['day_statistics'][group_id][plugin_name] += 1
_prefix_count_dict['week_statistics'][group_id][plugin_name] += 1
_prefix_count_dict['month_statistics'][group_id][plugin_name] += 1
_prefix_count_dict['week_statistics'][group_id][str(day_index % 7)][plugin_name] += 1
_prefix_count_dict['month_statistics'][group_id][str(day_index % 30)][plugin_name] += 1
with open(DATA_PATH + '_prefix_count.json', 'w', encoding='utf8') as f:
json.dump(_prefix_count_dict, f, indent=4, ensure_ascii=False)
break
def check_exists_key(group_id, plugin_name):
def check_exists_key(group_id: str, plugin_name: str):
global _prefix_count_dict
if not _prefix_count_dict['total_statistics']['total'].get(plugin_name):
_prefix_count_dict['total_statistics']['total'][plugin_name] = 0
if not _prefix_count_dict['day_statistics']['total'].get(plugin_name):
@ -79,29 +84,26 @@ def check_exists_key(group_id, plugin_name):
elif not _prefix_count_dict['total_statistics'][group_id].get(plugin_name):
_prefix_count_dict['total_statistics'][group_id][plugin_name] = 0
if not _prefix_count_dict['day_statistics'].get(group_id):
_prefix_count_dict['day_statistics'][group_id] = {
'1': {plugin_name: 0},
'2': {plugin_name: 0},
'3': {plugin_name: 0},
'4': {plugin_name: 0},
'5': {plugin_name: 0},
'6': {plugin_name: 0},
'7': {plugin_name: 0},
}
_prefix_count_dict['day_statistics'][group_id] = {}
_prefix_count_dict['day_statistics'][group_id][plugin_name] = 0
elif not _prefix_count_dict['day_statistics'][group_id].get(plugin_name):
_prefix_count_dict['day_statistics'][group_id][plugin_name] = 0
if not _prefix_count_dict['week_statistics'].get(group_id):
_prefix_count_dict['week_statistics'][group_id] = {
plugin_name: 0,
}
elif not _prefix_count_dict['week_statistics'][group_id].get(plugin_name):
_prefix_count_dict['week_statistics'][group_id][plugin_name] = 0
_prefix_count_dict['week_statistics'][group_id] = {}
for i in range(7):
_prefix_count_dict['week_statistics'][group_id][str(i)] = {}
_prefix_count_dict['week_statistics'][group_id][str(i)][plugin_name] = 0
elif not _prefix_count_dict['week_statistics'][group_id]['0'].get(plugin_name):
for i in range(7):
_prefix_count_dict['week_statistics'][group_id][str(i)][plugin_name] = 0
if not _prefix_count_dict['month_statistics'].get(group_id):
_prefix_count_dict['month_statistics'][group_id] = {
plugin_name: 0,
}
elif not _prefix_count_dict['month_statistics'][group_id].get(plugin_name):
_prefix_count_dict['month_statistics'][group_id][plugin_name] = 0
_prefix_count_dict['month_statistics'][group_id] = {}
for i in range(30):
_prefix_count_dict['month_statistics'][group_id][str(i)] = {}
_prefix_count_dict['month_statistics'][group_id][str(i)][plugin_name] = 0
elif not _prefix_count_dict['month_statistics'][group_id]['0'].get(plugin_name):
for i in range(30):
_prefix_count_dict['month_statistics'][group_id][str(i)][plugin_name] = 0
# 天
@ -114,36 +116,8 @@ async def _():
for group_id in _prefix_count_dict['day_statistics'].keys():
for key in _prefix_count_dict['day_statistics'][group_id].keys():
_prefix_count_dict['day_statistics'][group_id][key] = 0
_prefix_count_dict['day_index'] += 1
with open(DATA_PATH + '_prefix_count.json', 'w', encoding='utf8') as f:
json.dump(_prefix_count_dict, f, indent=4, ensure_ascii=False)
# 早上好
@scheduler.scheduled_job(
'cron',
day_of_week="mon",
hour=0,
minute=1,
)
async def _():
for group_id in _prefix_count_dict['week_statistics'].keys():
for key in _prefix_count_dict['week_statistics'][group_id].keys():
_prefix_count_dict['week_statistics'][group_id][key] = 0
with open(DATA_PATH + '_prefix_count.json', 'w', encoding='utf8') as f:
json.dump(_prefix_count_dict, f, indent=4, ensure_ascii=False)
# 早上好
@scheduler.scheduled_job(
'cron',
day=1,
hour=0,
minute=1,
)
async def _():
for group_id in _prefix_count_dict['month_statistics'].keys():
for key in _prefix_count_dict['month_statistics'][group_id].keys():
_prefix_count_dict['month_statistics'][group_id][key] = 0
with open(DATA_PATH + '_prefix_count.json', 'w', encoding='utf8') as f:
json.dump(_prefix_count_dict, f, indent=4, ensure_ascii=False)

View File

@ -1,7 +1,7 @@
from nonebot.rule import to_me
from nonebot.typing import T_State
from nonebot.adapters.cqhttp import Bot, GroupMessageEvent, MessageEvent
from nonebot import on_command, on_keyword
from nonebot import on_command, on_keyword, on_metaevent
from nonebot.plugin import MatcherGroup
from nonebot.adapters.cqhttp.event import GroupRequestEvent
import nonebot
@ -20,7 +20,7 @@ from models.group_member_info import GroupInfoUser
# erm = on_command('异世相遇,尽享美味', aliases={'异世相遇 尽享美味', '异世相遇,尽享美味'}, priority=5, block=True)
# matcher = on_keyword({"test"})
matcher = on_keyword({"test"})
#
#
# @matcher.handle()
@ -41,3 +41,12 @@ from models.group_member_info import GroupInfoUser
# async def first_receive(bot: Bot, event: Event, state: T_State):
# print(record('erm'))
# await matcher.send(record('erm'))
@matcher.args_parser
async def _(bot: Bot, event: MessageEvent, state: T_State):
msg = str(event.get_message())
if len(msg) > 5:
return
await matcher.reject()

View File

@ -12,7 +12,7 @@ db = Gino()
async def init():
i_bind = DATABASE_URI if DATABASE_URI else bind
if not bind:
if not i_bind:
i_bind = f"{sql_name}://{user}:{password}@{address}:{port}/{database}"
# print(i_bind)
try:

View File

@ -1,7 +1,7 @@
import logging
from datetime import datetime
from configs.path_config import LOG_PATH
import sys
# CRITICAL 50
# ERROR 40
# WARNING 30
@ -17,8 +17,8 @@ logger = logging.getLogger('hibiki')
logger.setLevel(level=logging.DEBUG)
formatter = logging.Formatter('[%(asctime)s] - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s')
# print(LOG_PATH)
file_handler = logging.FileHandler(LOG_PATH + str((datetime.now()).date()) + '.log', mode='a', encoding='utf-8')
file_handler = logging.FileHandler(LOG_PATH + str(datetime.now().date()) + '.log', mode='a', encoding='utf-8')
file_handler.setLevel(level=logging.INFO)
file_handler.setFormatter(formatter)

View File

@ -1,13 +1,9 @@
import os
DATABASE_URI = os.environ.get('DATABASE_URI')
try:
DATABASE_URI = os.environ['DATABASE_URI']
except KeyError:
DATABASE_URI = ''
TL_M_KEY = os.environ.get('TL_KEY')
SYSTEM_PROXY = os.environ.get('SYSTEM_PROXY')
try:
TL_M_KEY = os.environ['TL_KEY']
except KeyError:
TL_M_KEY = []