Add files via upload

This commit is contained in:
HibiKier 2021-05-20 19:25:51 +08:00 committed by GitHub
parent 7207982a30
commit 9cac07b5ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1525 additions and 0 deletions

View File

@ -0,0 +1,53 @@
from .group_user_checkin import group_user_check_in, group_user_check, group_impression_rank
from nonebot.typing import T_State
from nonebot.adapters.cqhttp import Bot, Event, GroupMessageEvent
from nonebot.adapters.cqhttp.permission import GROUP
from util.utils import get_message_text
from nonebot.plugin import MatcherGroup
__plugin_name__ = '签到'
__plugin_usage__ = (
'用法:\n'
'对我说 “签到” 来签到\n'
'“我的签到” 来获取历史签到信息\n'
'“好感度排行” 来查看当前好感度前十的伙伴\n'
'/ 签到时有 3% 概率 * 2 /'
)
sign_match_group = MatcherGroup(priority=5, permission=GROUP, block=True)
sign = sign_match_group.on_command("签到")
@sign.handle()
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
if get_message_text(event.json()) in ['帮助']:
await sign.finish(__plugin_usage__)
await sign.send(
await group_user_check_in(event.user_id, event.group_id),
at_sender=True,
)
my_sign = sign_match_group.on_command(cmd="我的签到", aliases={'好感度'})
@my_sign.handle()
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
await my_sign.send(
await group_user_check(event.user_id, event.group_id),
at_sender=True,
)
sign_ranking = sign_match_group.on_command(cmd="积分排行", aliases={'好感度排行', '签到排行', '积分排行', '好感排行',
'好感度排名,签到排名,积分排名'})
@sign_ranking.handle()
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
await sign_ranking.send(
await group_impression_rank(event.group_id)
)

View File

@ -0,0 +1,119 @@
import random
from datetime import datetime, timedelta
from services.log import logger
from services.db_context import db
from models.sigin_group_user import SignGroupUser
from models.group_member_info import GroupInfoUser
from models.bag_user import UserBag
from configs.config import MAX_SIGN_GOLD
async def group_user_check_in(user_qq: int, group: int) -> str:
'Returns string describing the result of checking in'
present = datetime.now()
async with db.transaction():
# 取得相应用户
user = await SignGroupUser.ensure(user_qq, group, for_update=True)
# 如果同一天签到过,特殊处理
if user.checkin_time_last.date() == present.date():
return _handle_already_checked_in(user)
return await _handle_check_in(user_qq, group, present) # ok
def _handle_already_checked_in(user: SignGroupUser) -> str:
return f'已经签到过啦~ 好感度:{user.impression:.2f}'
async def _handle_check_in(user_qq: int, group: int, present: datetime) -> str:
user = await SignGroupUser.ensure(user_qq, group, for_update=True)
impression_added = random.random()
present = present + timedelta(hours=8)
critx2 = random.random()
add_probability = user.add_probability
specify_probability = user.specify_probability
if critx2 + add_probability > 0.97:
impression_added *= 2
elif critx2 < specify_probability:
impression_added *= 2
new_impression = user.impression + impression_added
message = random.choice((
'谢谢,你是个好人!',
'对了,来喝杯茶吗?',
))
await user.update(
checkin_count=user.checkin_count + 1,
checkin_time_last=present,
impression=new_impression,
add_probability=0,
specify_probability=0,
).apply()
# glod = await random_glod(user_qq, group, specify_probability)
if user.impression < 1:
impression = 1
else:
impression = user.impression
gold = random.randint(1, 100)
imgold = random.randint(1, int(impression))
if imgold > MAX_SIGN_GOLD:
imgold = MAX_SIGN_GOLD
await UserBag.add_glod(user_qq, group, gold + imgold)
if critx2 + add_probability > 0.97 or critx2 < specify_probability:
logger.info(f'(USER {user.user_qq}, GROUP {user.belonging_group})'
f' CHECKED IN successfully. score: {new_impression:.2f} (+{impression_added * 2:.2f}).获取金币:{gold+imgold}')
return f'{message} 好感度:{new_impression:.2f} (+{impression_added/2:.2f}×2)\n获取金币:{gold} \n好感度额外获得金币:{imgold}'
else:
logger.info(f'(USER {user.user_qq}, GROUP {user.belonging_group})'
f' CHECKED IN successfully. score: {new_impression:.2f} (+{impression_added:.2f}).获取金币:{gold+imgold}')
return f'{message} 好感度:{new_impression:.2f} (+{impression_added:.2f})\n获取金币:{gold} \n好感度额外获得金币:{imgold}'
async def group_user_check(user_qq: int, group: int) -> str:
# heuristic: if users find they have never checked in they are probable to check in
user = await SignGroupUser.ensure(user_qq, group)
glod = await UserBag.get_gold(user_qq, group)
return '好感度:{:.2f}\n金币:{}\n历史签到数:{}\n上次签到日期:{}'.format(
user.impression,
glod,
user.checkin_count,
user.checkin_time_last.date() if user.checkin_time_last != datetime.min else '从未',
)
async def group_impression_rank(group: int) -> str:
result = "\t好感度排行榜\t\n"
user_qq_list, impression_list = await SignGroupUser.query_impression_all(group)
_count = 11
if user_qq_list and impression_list:
for i in range(1, 100):
if len(user_qq_list) == 0 or len(impression_list) == 0 or i == _count:
break
impression = max(impression_list)
index = impression_list.index(impression)
user_qq = user_qq_list[index]
print(user_qq, group)
try:
user_name = (await GroupInfoUser.select_member_info(user_qq, group)).user_name
except Exception as e:
logger.info(f"USER {user_qq}, GROUP {group} 不在群内")
_count += 1
impression_list.remove(impression)
user_qq_list.remove(user_qq)
continue
result += f"{i - _count + 11}. {user_name}: {impression:.2f}\n"
impression_list.remove(impression)
user_qq_list.remove(user_qq)
return result[:-1]
async def random_glod(user_id, group_id, impression):
if impression < 1:
impression = 1
glod = random.randint(1, 100) + random.randint(1, int(impression))
if await UserBag.add_glod(user_id, group_id, glod):
return glod
else:
return 0

View File

@ -0,0 +1,81 @@
from .data_source import dataGet, dataProcess
from nonebot.adapters import Bot, Event
from nonebot.typing import T_State
from nonebot import on_command
dataget = dataGet()
songpicker = on_command("点歌", priority=5, block=True)
@songpicker.handle()
async def handle_first_receive(bot: Bot, event: Event, state: T_State):
args = str(event.get_message()).strip()
# if args.isdigit():
# if "songName" in state:
# state["songNum"] = int(args)
if args:
state["songName"] = args
@songpicker.got("songName", prompt="歌名是?")
async def handle_songName(bot: Bot, event: Event, state: T_State):
songName = state["songName"]
songIdList = await dataget.songIds(songName=songName)
if not songIdList:
await songpicker.reject("没有找到这首歌,请发送其它歌名!")
songInfoList = list()
for songId in songIdList:
songInfoDict = await dataget.songInfo(songId)
songInfoList.append(songInfoDict)
# songInfoMessage = await dataProcess.mergeSongInfo(songInfoList)
# await songpicker.send(songInfoMessage)
state["songIdList"] = songIdList
@songpicker.got("songName")
async def handle_songNum(bot: Bot, event: Event, state: T_State):
songIdList = state["songIdList"]
# songNum = state["songNum"]
songNum = 0
# 处理重选
# if not songNum.isdigit():
# await songpicker.finish()
# else:
# songNum = int(songNum)
#
# if songNum >= len(songIdList) or songNum < 0:
# await songpicker.reject("数字序号错误")
selectedSongId = songIdList[int(songNum)]
songContent = [
{
"type": "music",
"data": {
"type": 163,
"id": selectedSongId
}
}
]
await songpicker.send(songContent)
# songCommentsDict = await dataget.songComments(songId=selectedSongId)
# songCommentsMessage = await dataProcess.mergeSongComments(songCommentsDict)
# commentContent = [
# {
# "type": "text",
# "data": {
# "text": "下面为您播送热评:\n"
# }
# },
# {
# "type": "text",
# "data": {
# "text": songCommentsMessage
# }
# }
# ]
#
# await songpicker.send(commentContent)

View File

@ -0,0 +1,156 @@
import aiohttp
import json
class dataApi():
'''
从网易云音乐接口直接获取数据实验性
'''
headers = {"referer": "http://music.163.com"}
cookies = {"appver": "2.0.2"}
async def search(self, songName: str):
'''
搜索接口用于由歌曲名查找id
'''
async with aiohttp.ClientSession(headers=self.headers, cookies=self.cookies) as session:
async with session.post(f"http://music.163.com/api/search/get/", data={"s": songName, "limit": 5, "type": 1, "offset": 0},) as r:
if r.status != 200:
return None
r = await r.text()
return json.loads(r)
# async def getHotComments(self, songId: int):
# '''
# 获取热评
# '''
# async with httpx.AsyncClient() as client:
# async with aiohttp.ClientSession(headers=self.headers, cookies=self.cookies) as session:
# async with session.post(
# f"https://music.163.com/weapi/v1/resource/hotcomments/R_SO_4_{songId}?csrf_token=",
# data={
# "params": 'D33zyir4L/58v1qGPcIPjSee79KCzxBIBy507IYDB8EL7jEnp41aDIqpHBhowfQ6iT1Xoka8jD+0p
# 44nRKNKUA0dv+n5RWPOO57dZLVrd+T1J/sNrTdzUhdHhoKRIgegVcXYjYu+CshdtCBe6WEJozBRlaHyLeJtGrA
# BfMOEb4PqgI3h/uELC82S05NtewlbLZ3TOR/TIIhNV6hVTtqHDVHjkekrvEmJzT5pk1UY6r0=',
# "encSecKey": '45c8bcb07e69c6b545d3045559bd300db897509b8720ee2b45a72bf2d3b216ddc77fb10dae
# c4ca54b466f2da1ffac1e67e245fea9d842589dc402b92b262d3495b12165a721aed880bf09a0a99ff94c959d
# 04e49085dc21c78bbbe8e3331827c0ef0035519e89f097511065643120cbc478f9c0af96400ba4649265781fc9079'
# },
# ) as r:
# if r.status != 200:
# return None
# r = await r.json()
# return r
async def getSongInfo(self, songId: int):
'''
获取歌曲信息
'''
async with aiohttp.ClientSession(headers=self.headers, cookies=self.cookies) as session:
async with session.post(f"http://music.163.com/api/song/detail/?id={songId}&ids=%5B{songId}%5D",) as r:
if r.status != 200:
return None
r = await r.text()
return json.loads(r)
class dataGet(dataApi):
'''
从dataApi获取数据并做简单处理
'''
api = dataApi()
async def songIds(self, songName: str, amount=5) -> list:
'''
根据用户输入的songName 获取候选songId列表 [默认songId数量5]
'''
songIds = list()
r = await self.api.search(songName=songName)
if r is None:
raise WrongDataError
idRange = amount if amount < len(
r["result"]["songs"]) else len(r["result"]["songs"])
for i in range(idRange):
songIds.append(r["result"]["songs"][i]["id"])
return songIds
# async def songComments(self, songId: int, amount=3) -> dict:
# '''
# 根据传递的单一songId获取songComments dict [默认评论数量上限3]
# '''
# songComments = dict()
# r = await self.api.getHotComments(songId)
# if r is None:
# raise WrongDataError
# commentsRange = amount if amount < len(
# r['hotComments']) else len(r['hotComments'])
# for i in range(commentsRange):
# songComments[r['hotComments'][i]['user']
# ['nickname']] = r['hotComments'][i]['content']
# return songComments
async def songInfo(self, songId: int) -> dict:
'''
根据传递的songId获取歌曲名歌手专辑等信息作为dict返回
'''
songInfo = dict()
r = await self.api.getSongInfo(songId)
if r is None:
raise WrongDataError
songInfo["songName"] = r["songs"][0]["name"]
songArtists = list()
for ars in r["songs"][0]["artists"]:
songArtists.append(ars["name"])
songArtistsStr = "".join(songArtists)
songInfo["songArtists"] = songArtistsStr
songInfo["songAlbum"] = r["songs"][0]["album"]["name"]
return songInfo
class dataProcess():
'''
将获取的数据处理为用户能看懂的形式
'''
@staticmethod
async def mergeSongInfo(songInfos: list) -> str:
'''
将歌曲信息list处理为字符串供用户点歌
传递进的歌曲信息list含有多个歌曲信息dict
'''
songInfoMessage = "请输入欲点播歌曲的序号:\n"
numId = 0
for songInfo in songInfos:
songInfoMessage += f"{numId}"
songInfoMessage += songInfo["songName"]
songInfoMessage += "-"
songInfoMessage += songInfo["songArtists"]
songInfoMessage += " 专辑:"
songInfoMessage += songInfo["songAlbum"]
songInfoMessage += "\n"
numId += 1
return songInfoMessage
@staticmethod
async def mergeSongComments(songComments: dict) -> str:
songCommentsMessage = '\n'.join(
['%s %s' % (key, value) for (key, value) in songComments.items()])
return songCommentsMessage
class Error(Exception):
'''
谁知道网易的接口会出什么幺蛾子
'''
pass
class WrongDataError(Error):
def __init__(self, expression, message):
self.expression = expression
self.message = message
self.message += "\n未从网易接口获取到有效的数据!"

View File

@ -0,0 +1,158 @@
from nonebot import on_command
from nonebot.permission import SUPERUSER
from models.level_user import LevelUser
from nonebot.typing import T_State
from nonebot.adapters import Bot, Event
from nonebot.rule import to_me
from util.utils import get_message_at, get_message_text, is_number, get_bot
from services.log import logger
from .data_source import open_remind, close_remind
from models.group_info import GroupInfo
from models.friend_user import FriendUser
__plugin_name__ = '超级用户指令 [Hidden]'
__plugin_usage__ = '用法'
super_cmd = on_command("添加管理", aliases={'删除管理', '添加权限', '删除权限'}, rule=to_me(), priority=1, permission=SUPERUSER, block=True)
oc_gb = on_command('开启广播通知', aliases={'关闭广播通知'}, rule=to_me(), permission=SUPERUSER, priority=1, block=True)
cls_group = on_command("所有群组", rule=to_me(), permission=SUPERUSER, priority=1, block=True)
cls_friend = on_command("所有好友", rule=to_me(), permission=SUPERUSER, priority=1, block=True)
del_group = on_command('退群', rule=to_me(), permission=SUPERUSER, priority=1, block=True)
update_group_info = on_command('更新群信息', rule=to_me(), permission=SUPERUSER, priority=1, block=True)
update_friend_info = on_command('更新好友信息', rule=to_me(), permission=SUPERUSER, priority=1, block=True)
# update_cmd = on_command('更新命令开关', rule=to_me(), permission=SUPERUSER, priority=1, block=True)
@super_cmd.handle()
async def _(bot: Bot, event: Event, state: T_State):
try:
args = get_message_text(event.json()).strip().split(" ")
qq = int(get_message_at(event.json())[0])
if state["_prefix"]["raw_command"][:2] == "添加":
level = int(args[0])
if await LevelUser.set_level(qq, event.group_id, level, 1):
result = "添加管理成功, 权限: " + str(level)
else:
result = "管理已存在, 更新权限: " + str(level)
else:
if await LevelUser.delete_level(qq, event.group_id, True):
result = "删除管理成功!"
else:
result = "该账号无管理权限!"
await super_cmd.send(result)
except Exception as e:
await super_cmd.send("执行指令失败!")
logger.error(f'执行指令失败 e{e}')
@oc_gb.handle()
async def _(bot: Bot, event: Event, state: T_State):
group = get_message_text(event.json())
if group:
if is_number(group):
group = int(group)
for g in await bot.get_group_list():
if g['group_id'] == group:
break
else:
await oc_gb.finish('没有加入这个群...', at_sender=True)
# try:
if state["_prefix"]["raw_command"] == '开启广播通知':
logger.info(f'USER {event.user_id} 开启了 GROUP {group} 的广播')
await oc_gb.finish(await open_remind(group, 'gb'), at_sender=True)
else:
logger.info(f'USER {event.user_id} 关闭了 GROUP {group} 的广播')
await oc_gb.finish(await close_remind(group, 'gb'), at_sender=True)
# except Exception as e:
# await oc_gb.finish(f'关闭 {group} 的广播失败', at_sender=True)
else:
await oc_gb.finish('请输入正确的群号', at_sender=True)
else:
await oc_gb.finish('请输入要关闭广播的群号', at_sender=True)
@del_group.handle()
async def _(bot: Bot, event: Event, state: T_State):
group = get_message_text(event.json())
if group:
if is_number(group):
try:
await bot.set_group_leave(group_id=int(group))
logger.info(f'退出群聊 {group} 成功')
await del_group.finish(f'退出群聊 {group} 成功', at_sender=True)
except Exception as e:
logger.info(f'退出群聊 {group} 失败 e:{e}')
else:
await del_group.finish(f'请输入正确的群号', at_sender=True)
else:
await del_group.finish(f'请输入群号', at_sender=True)
@cls_group.handle()
async def _(bot: Bot, event: Event, state: T_State):
gl = await bot.get_group_list(self_id=bot.self_id)
msg = ["{group_id} {group_name}".format_map(g) for g in gl]
msg = "\n".join(msg)
msg = f"bot:{bot.self_id}\n| 群号 | 群名 | 共{len(gl)}个群\n" + msg
await bot.send_private_msg(self_id=bot.self_id, user_id=int(list(bot.config.superusers)[0]), message=msg)
@cls_friend.handle()
async def _(bot: Bot, event: Event, state: T_State):
gl = await bot.get_friend_list(self_id=bot.self_id)
msg = ["{user_id} {nickname}".format_map(g) for g in gl]
msg = "\n".join(msg)
msg = f"| QQ号 | 昵称 | 共{len(gl)}个好友\n" + msg
await bot.send_private_msg(self_id=bot.self_id, user_id=int(list(bot.config.superusers)[0]), message=msg)
@update_group_info.handle()
async def _(bot: Bot, event: Event, state: T_State):
bot = get_bot()
gl = await bot.get_group_list(self_id=bot.self_id)
gl = [g['group_id'] for g in gl]
num = 0
rst = ''
for g in gl:
group_info = await bot.get_group_info(group_id=g)
if await GroupInfo.add_group_info(group_info['group_id'], group_info['group_name'],
group_info['max_member_count'], group_info['member_count']):
num += 1
logger.info(f'自动更新群组 {g} 信息成功')
else:
logger.info(f'自动更新群组 {g} 信息失败')
rst += f'{g} 更新失败\n'
await update_group_info.send(f'成功更新了 {num} 个群的信息\n{rst[:-1]}')
@update_friend_info.handle()
async def _(bot: Bot, event: Event, state: T_State):
num = 0
rst = ''
fl = await get_bot().get_friend_list(self_id=bot.self_id)
for f in fl:
if await FriendUser.add_friend_info(f['user_id'], f['nickname']):
logger.info(f'自动更新好友 {f["user_id"]} 信息成功')
num += 1
else:
logger.warning(f'自动更新好友 {f["user_id"]} 信息失败')
rst += f'{f["user_id"]} 更新失败\n'
await update_friend_info.send(f'成功更新了 {num} 个好友的信息\n{rst[:-1]}')
# @update_cmd.handle()
# async def _(bot: Bot, event: Event, state: T_State):
# gl = await bot.get_group_list(self_id=bot.self_id)
# gl = [g['group_id'] for g in gl]
# for g in gl:
# check_group_switch_json(g)
# for file in os.listdir(DATA_PATH + 'group_help'):
# os.remove(DATA_PATH + f'group_help/{file}')
# await update_cmd.finish('更新命令完成!', at_sender=True)

View File

@ -0,0 +1,76 @@
from models.group_remind import GroupRemind
async def open_remind(group: int, name: str) -> str:
_name = ''
if name == 'zwa':
_name = '早晚安'
if name == 'dz':
_name = '地震播报'
if name == 'hy':
_name = '群欢迎'
if name == 'kxcz':
_name = '开箱重置提醒'
if name == 'gb':
_name = '广播'
if await GroupRemind.get_status(group, name):
return f'该群已经开启过 {_name} 通知,请勿重复开启!'
if await GroupRemind.set_status(group, name, True):
return f'成功开启 {_name} 通知0v0'
else:
return f'开启 {_name} 通知失败了...'
async def close_remind(group: int, name: str) -> str:
_name = ''
if name == 'zwa':
_name = '早晚安'
if name == 'dz':
_name = '地震播报'
if name == 'hy':
_name = '群欢迎'
if name == 'kxcz':
_name = '开箱重置提醒'
if name == 'gb':
_name = '广播'
if not await GroupRemind.get_status(group, name):
return f'该群已经取消过 {_name} 通知,请勿重复取消!'
if await GroupRemind.set_status(group, name, False):
return f'成功关闭 {_name} 通知0v0'
else:
return f'关闭 {_name} 通知失败了...'
# cmd_list = ['总开关', '签到', '发送图片', '色图', '黑白草图', 'coser', '鸡汤/语录', '骂我', '开箱', '鲁迅说', '假消息', '商店系统',
# '操作图片', '查询皮肤', '天气', '疫情', '识番', '搜番', '点歌', 'pixiv', 'rss', '方舟一井', '查干员', '骰子娘', '原神一井']
#
#
# def check_group_switch_json(group_id):
# if not os.path.exists(DATA_PATH + f'rule/group_switch/'):
# os.mkdir(DATA_PATH + f'rule/group_switch/')
# if not os.path.exists(DATA_PATH + f'rule/group_switch/{group_id}.json'):
# with open(DATA_PATH + f'rule/group_switch/{group_id}.json', 'w', encoding='utf8') as f:
# data = {}
# for cmd in cmd_list:
# data[cmd] = True
# f.write(json.dumps(data, ensure_ascii=False))
# else:
# with open(DATA_PATH + f'rule/group_switch/{group_id}.json', 'r', encoding='utf8') as f:
# try:
# data = json.load(f)
# except ValueError:
# data = {}
# if len(data.keys()) - 1 != len(cmd_list):
# for cmd in cmd_list:
# if cmd not in data.keys():
# data[cmd] = True
# with open(DATA_PATH + f'rule/group_switch/{group_id}.json', 'w', encoding='utf8') as wf:
# wf.write(json.dumps(data, ensure_ascii=False))
# reload(data)
# for file in os.listdir(DATA_PATH + 'group_help'):
# os.remove(DATA_PATH + f'group_help/{file}')
def reload(data):
static_group_dict = data

View File

@ -0,0 +1,25 @@
from nonebot import on_command
from nonebot.permission import SUPERUSER
from nonebot.typing import T_State
from nonebot.adapters import Bot, Event
from nonebot.rule import to_me
super_help = on_command("超级用户帮助", rule=to_me(), priority=1, permission=SUPERUSER, block=True)
@super_help.handle()
async def _(bot: Bot, event: Event, state: T_State):
result = '''超级用户帮助:
1.添加/删除管理
2.查看群组/查看好友
3.广播 --> 指令:广播-
4.更新色图
5.回复 --> 指令:/t 用户 群号
6.更新cookie --> 指令:更新cookie text
7.开启广播通知 --> 指令:开启广播通知 群号
8.退群 --> 指令:退群 群号
9.自检
10.更新好友信息
11.更新群群信息'''
await super_help.finish(result, at_sender=True)

View File

@ -0,0 +1,37 @@
from nonebot import on_command
from util.utils import get_message_text
from services.log import logger
from nonebot.adapters.cqhttp import Bot, MessageEvent
from nonebot.typing import T_State
from .data_source import translate_msg
__plugin_name__ = '翻译'
translate = on_command("translate", aliases={'英翻', '翻英',
'日翻', '翻日',
'韩翻', '翻韩'
}, priority=5, block=True)
@translate.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
msg = get_message_text(event.json())
if msg:
state['msg'] = msg
@translate.got('msg', prompt='你要翻译的消息是啥?')
async def _(bot: Bot, event: MessageEvent, state: T_State):
msg = state['msg']
if len(msg) > 150:
await translate.finish('翻译过长请不要超过150字', at_sender=True)
await translate.send(await translate_msg(state["_prefix"]["raw_command"], msg))
logger.info(
f"(USER {event.user_id}, GROUP "
f"{event.group_id if event.message_type != 'private' else 'private'}) 使用翻译:{msg}")

View File

@ -0,0 +1,69 @@
import aiohttp
from util.utils import get_local_proxy
from util.user_agent import get_user_agent
url = f'http://fanyi.youdao.com/translate?smartresult=dict&smartresult=rule&smartresult=ugc&sessionFrom=null'
async def translate_msg(language_type, msg):
data = {
'type': parse_language(language_type),
'i': msg,
"doctype": "json",
"version": "2.1",
"keyfrom": "fanyi.web",
"ue": "UTF-8",
"action": "FY_BY_CLICKBUTTON",
"typoResult": "true"
}
async with aiohttp.ClientSession(headers=get_user_agent()) as session:
async with session.post(url, data=data, proxy=get_local_proxy()) as res:
data = await res.json()
if data['errorCode'] == 0:
return data['translateResult'][0][0]['tgt']
return '翻译惜败..'
# ZH_CN2EN 中文 » 英语
# ZH_CN2JA 中文 » 日语
# ZH_CN2KR 中文 » 韩语
# ZH_CN2FR 中文 » 法语
# ZH_CN2RU 中文 » 俄语
# ZH_CN2SP 中文 » 西语
# EN2ZH_CN 英语 » 中文
# JA2ZH_CN 日语 » 中文
# KR2ZH_CN 韩语 » 中文
# FR2ZH_CN 法语 » 中文
# RU2ZH_CN 俄语 » 中文
# SP2ZH_CN 西语 » 中文
def parse_language(language_type):
if language_type == '英翻':
return 'EN2ZH_CN'
if language_type == '日翻':
return 'JA2ZH_CN'
if language_type == '韩翻':
return 'KR2ZH_CN'
# if language_type == '法翻':
# return 'FR2ZH_CN'
# if language_type == '俄翻':
# return 'RU2ZH_CN'
if language_type == '翻英':
return 'ZH_CN2EN'
if language_type == '翻日':
return 'ZH_CN2JA'
if language_type == '翻韩':
return 'ZH_CN2KR'
# if language_type == '翻法':
# return 'ZH_CN2FR'
# if language_type == '翻俄':
# return 'ZH_CN2RU'

View File

@ -0,0 +1,61 @@
from nonebot import on_command
from nonebot.typing import T_State
from nonebot.adapters.cqhttp import Bot, GroupMessageEvent
from .data_source import download_gocq_lasted, upload_gocq_lasted
import os
from nonebot.adapters.cqhttp.permission import GROUP
from services.log import logger
from util.utils import scheduler, get_bot, UserExistLimiter
from configs.config import UPDATE_GOCQ_GROUP
from pathlib import Path
path = str(Path('/resources/gocqhttp_file/').absolute()) + '/'
lasted_gocqhttp = on_command("更新gocq", permission=GROUP, priority=5, block=True)
_ulmt = UserExistLimiter()
@lasted_gocqhttp.handle()
async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
# try:
if event.group_id in UPDATE_GOCQ_GROUP:
await lasted_gocqhttp.send('检测中...')
info = await download_gocq_lasted()
if info == 'gocqhttp没有更新':
await lasted_gocqhttp.finish('gocqhttp没有更新')
if _ulmt.check(event.group_id):
await lasted_gocqhttp.finish('gocqhttp正在更新请勿重复使用该命令', at_sender=True)
_ulmt.set_True(event.group_id)
try:
for file in os.listdir(path):
await upload_gocq_lasted(file, event.group_id)
logger.info(f'更新了cqhttp...{file}')
await lasted_gocqhttp.send(f'gocqhttp更新了已上传成功\n更新内容:\n{info}')
except Exception as e:
logger.error(f'更新gocq错误 e{e}')
_ulmt.set_False(event.group_id)
# 更新gocq
@scheduler.scheduled_job(
'cron',
hour=3,
minute=1,
)
async def _():
if UPDATE_GOCQ_GROUP:
bot = get_bot()
try:
info = await download_gocq_lasted()
if info == 'gocqhttp没有更新':
logger.info('gocqhttp没有更新')
return
for group in UPDATE_GOCQ_GROUP:
for file in os.listdir(path):
await upload_gocq_lasted(file, group)
await bot.send_group_msg(group_id=group, message=f"gocqhttp更新了已上传成功\n更新内容:\n{info}")
except Exception as e:
logger.error(f'自动更新gocq出错 e:{e}')

View File

@ -0,0 +1,76 @@
import aiohttp
from util.utils import get_local_proxy, get_bot
from util.user_agent import get_user_agent
import asyncio
import platform
import aiofiles
from bs4 import BeautifulSoup
import os
if platform.system() == 'Windows':
path = os.getcwd() + '/resources/gocqhttp_file/'
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
else:
path = r'/home/hibiki/hibikibot/resources/gocqhttp_file/'
url = 'https://github.com/Mrs4s/go-cqhttp/releases'
async def download_gocq_lasted():
async with aiohttp.ClientSession(headers=get_user_agent()) as session:
async with session.get(url, proxy=get_local_proxy()) as response:
soup = BeautifulSoup(await response.text(), 'lxml')
a = soup.find('div', {'class': 'release-header'}).find('a')
title = a.text
_url = a.get('href')
for file in os.listdir(path):
if file.endswith('.zip'):
if file == title + '-windows-amd64.zip' or file == title + '_windows_amd64.zip':
return 'gocqhttp没有更新'
for file in os.listdir(path):
os.remove(path + file)
async with session.get("https://github.com" + _url, proxy=get_local_proxy()) as res:
update_info = ''
soup = BeautifulSoup(await res.text(), 'lxml')
info_div = soup.find('div', {'class': 'markdown-body'})
for p in info_div.find_all('p'):
update_info += p.text.replace('<br>', '\n') + '\n'
div_all = soup.select('div.d-flex.flex-justify-between.flex-items-center.py-1.py-md-2.Box-body.px-2')
for div in div_all:
if div.find('a').find('span').text == title + '-windows-amd64.zip' or\
div.find('a').find('span').text == title + '-linux-arm64.tar.gz' or\
div.find('a').find('span').text == 'go-cqhttp_windows_amd64.zip' or \
div.find('a').find('span').text == 'go-cqhttp_linux_arm64.tar.gz':
file_url = div.find('a').get('href')
if div.find('a').find('span').text.find('windows') == -1:
tag = '-linux-arm64.tar.gz'
else:
tag = '-windows-amd64.zip'
async with session.get("https://github.com" + file_url, proxy=get_local_proxy()) as res_file:
async with aiofiles.open(path + title + tag, 'wb') as f:
await f.write(await res_file.read())
return update_info
async def upload_gocq_lasted(name, group_id):
bot = get_bot()
folder_id = 0
for folder in (await bot.get_group_root_files(group_id=group_id))['folders']:
if folder['folder_name'] == 'gocq':
folder_id = folder['folder_id']
if not folder_id:
await bot.send_group_msg(group_id=group_id,
message=f'请创建gocq文件夹后重试')
for file in os.listdir(path):
os.remove(path+file)
else:
await bot.upload_group_file(
group_id=group_id,
folder=folder_id,
file=path + name,
name=name
)
# asyncio.get_event_loop().run_until_complete(download_gocq_lasted())

View File

@ -0,0 +1,43 @@
from util.utils import scheduler
from nonebot import on_command
from nonebot.permission import SUPERUSER
from nonebot.typing import T_State
from nonebot.adapters import Bot, Event
from nonebot.rule import to_me
from .data_source import update_setu_img
from configs.config import DOWNLOAD_SETU
__name__ = "更新色图 [Hidden]"
update_setu = on_command("更新色图", rule=to_me(), permission=SUPERUSER, priority=1, block=True)
@update_setu.handle()
async def _(bot: Bot, event: Event, state: T_State):
if DOWNLOAD_SETU:
await update_setu.send("开始更新色图...", at_sender=True)
await update_setu.finish(await update_setu_img(), at_sender=True)
else:
await update_setu.finish('更新色图配置未开启')
# 更新色图
@scheduler.scheduled_job(
'cron',
# year=None,
# month=None,
# day=None,
# week=None,
# day_of_week="mon,tue,wed,thu,fri",
hour=4,
minute=30,
# second=None,
# start_date=None,
# end_date=None,
# timezone=None,
)
async def _():
if DOWNLOAD_SETU:
await update_setu_img()

View File

@ -0,0 +1,86 @@
from configs.path_config import IMAGE_PATH, TXT_PATH
import os
from util.user_agent import get_user_agent
from services.log import logger
from datetime import datetime
from util.img_utils import rar_imgs, get_img_hash
from util.utils import get_bot, get_local_proxy
from asyncio.exceptions import TimeoutError
import aiofiles
import aiohttp
try:
import ujson as json
except ModuleNotFoundError:
import json
async def update_setu_img():
async with aiohttp.ClientSession(headers=get_user_agent()) as session:
for file_name in ['setu_url.txt', 'setu_r18_url.txt']:
if file_name == 'setu_url.txt':
json_name = 'setu_img_hash.json'
path = 'setu/'
else:
json_name = 'r18_setu_img_hash.json'
path = 'r18/'
try:
data = json.load(open(TXT_PATH + json_name))
if not data:
continue
except (FileNotFoundError, TypeError):
continue
_success = 0
_similar = 0
try:
with open(TXT_PATH + file_name, 'r') as f:
txt_data = f.read()
if not txt_data:
continue
except FileNotFoundError:
continue
urls = list(set(txt_data[:-1].split(',')))
total = len(urls)
for url in urls:
index = str(len(os.listdir(IMAGE_PATH + path)))
logger.info(f'开始更新 index:{index} --> {url}')
for _ in range(3):
try:
async with session.get(url, proxy=get_local_proxy(), timeout=15) as response:
if response.status == 200:
async with aiofiles.open(IMAGE_PATH + 'rar/' + index + ".jpg", 'wb') as f:
await f.write(await response.read())
_success += 1
else:
logger.info(f'{url} 不存在,跳过更新')
break
if os.path.getsize(IMAGE_PATH + 'rar/' + str(index) + ".jpg") > 1024 * 1024 * 1.5:
rar_imgs(
'rar/',
path,
in_file_name=index,
out_file_name=index
)
else:
logger.info('不需要压缩,移动图片 ' + IMAGE_PATH + 'rar/' + index + ".jpg --> "
+ IMAGE_PATH + path + index + ".jpg")
os.rename(IMAGE_PATH + 'rar/' + index + ".jpg",
IMAGE_PATH + path + index + ".jpg")
img_hash = str(get_img_hash(f'{IMAGE_PATH}{path}{index}.jpg'))
if img_hash in data.values():
logger.info(f'index:{index}'
f'{list(data.keys())[list(data.values()).index(img_hash)]} 存在重复,删除')
os.remove(IMAGE_PATH + path + index + ".jpg")
_similar += 1
data[index] = img_hash
break
except TimeoutError:
continue
with open(TXT_PATH + json_name, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4)
open(TXT_PATH + file_name, 'w')
logger.info(
f'{str(datetime.now()).split(".")[0]} 更新 {file_name.split(".")[0]}完成,预计更新 {total} 张,实际更新 {_success} 张,相似 {_similar} 张,实际存入 {_success - _similar}')
await get_bot().send_private_msg(
user_id=775757368,
message=f'{str(datetime.now()).split(".")[0]} 更新{file_name.split(".")[0]}完成,预计更新 {total} 张,实际更新 {_success} 张,相似 {_similar} 张,实际存入 {_success - _similar}'
)

View File

@ -0,0 +1,87 @@
from nonebot import on_command
from configs.path_config import IMAGE_PATH
from services.log import logger
import os
from nonebot.rule import to_me
from nonebot.typing import T_State
from nonebot.adapters import Bot, Event
from util.utils import get_message_imgs, get_message_text
import aiohttp
import aiofiles
from util.utils import cn2py
from configs.config import IMAGE_DIR_LIST
__plugin_name__ = '上传图片'
__plugin_usage__ = '上传图片帮助:\n\t' \
'1.查看列表 --> 指令: 上传图片 列表/目录\n\t' \
'2.上传图片 序号 图片(在文字后跟图片即可), 即在相应目录下添加图片\n\t\t示例: 上传图片 1 图片(在文字后跟图片即可)'
upload_img = on_command("上传图片", rule=to_me(), priority=5, block=True)
@upload_img.args_parser
async def parse(bot: Bot, event: Event, state: T_State):
if str(event.get_message()) in ['取消', '算了']:
await upload_img.finish("已取消操作..", at_sender=True)
if state["_current_key"] in ['path']:
if str(event.get_message()) not in IMAGE_DIR_LIST:
await upload_img.reject("此目录不正确,请重新输入目录!")
state[state["_current_key"]] = str(event.get_message())
if state["_current_key"] in ['imgs']:
if not get_message_imgs(event.json()):
await upload_img.reject("图呢图呢图呢图呢GKD")
state[state['_current_key']] = get_message_imgs(event.json())
@upload_img.handle()
async def _(bot: Bot, event: Event, state: T_State):
raw_arg = get_message_text(event.json())
img_list = get_message_imgs(event.json())
if raw_arg:
if str(event.get_message()) in ['帮助']:
await upload_img.finish(__plugin_usage__)
if raw_arg.split("[")[0] in IMAGE_DIR_LIST:
state['path'] = raw_arg.split("[")[0]
if img_list:
state['imgs'] = img_list
@upload_img.got("path", prompt="要将图片上传至什么图库呢?")
@upload_img.got("imgs", prompt="图呢图呢图呢图呢GKD")
async def _(bot: Bot, event: Event, state: T_State):
path = IMAGE_PATH + cn2py(state['path'])
img_list = state['imgs']
img_id = len(os.listdir(path))
failed_list = []
success_id = ""
async with aiohttp.ClientSession() as session:
for img_url in img_list:
try:
async with session.get(img_url, timeout=7) as response:
if response.status == 200:
async with aiofiles.open(path + str(img_id) + ".jpg", 'wb') as f:
await f.write(await response.read())
success_id += str(img_id) + ""
img_id += 1
else:
failed_list.append(img_url)
logger.warning(f"图片:{img_url} 下载失败....")
except TimeoutError as e:
logger.warning(f"图片:{img_url} 下载超时....e:{e}")
if img_url not in failed_list:
failed_list.append(img_url)
failed_result = ""
for img in failed_list:
failed_result += str(img) + "\n"
logger.info(f"USER {event.user_id} GROUP {event.group_id if event.message_type != 'private' else 'private'}"
f" 上传图片至 {state['path']}{len(img_list)} 张,失败 {len(failed_list)}id={success_id[:-1]}")
if failed_result:
await upload_img.finish(f"这次一共为 {state['path']}库 添加了 {len(img_list) - len(failed_list)} 张图片\n"
f"依次的Id为{success_id[:-1]}\n"
f"上传失败:{failed_result[:-1]}\n"
f"小真寻感谢您对图库的扩充!WW", at_sender=True)
else:
await upload_img.finish(f"这次一共为 {state['path']}库 添加了 {len(img_list)} 张图片\n"
f"依次的Id为{success_id[:-1]}\n"
f"小真寻感谢您对图库的扩充!WW", at_sender=True)

View File

@ -0,0 +1,95 @@
from nonebot import on_keyword, on_regex
from .data_source import get_weather_of_city
from nonebot.adapters.cqhttp import Bot, Event
from jieba import posseg
from services.log import logger
from nonebot.typing import T_State
from .config import city_list
import re
from util.utils import get_message_text
__plugin_name__ = '天气查询'
__plugin_usage__ = "普普通通的查天气吧\n示例:北京天气"
weather = on_regex(r".*?(.*)市?的?天气.*?", priority=5, block=True)
@weather.handle()
async def _(bot: Bot, event: Event, state: T_State):
if str(event.get_message()) in ['帮助']:
await weather.finish(__plugin_usage__)
msg = get_message_text(event.json())
msg = re.search(r'.*?(.*)市?的?天气.*?', msg)
msg = msg.group(1)
if msg[-1] == '':
msg = msg[:-1]
if msg[-1] != '':
msg += ''
city = ''
if msg:
for word in posseg.lcut(msg):
if word.word in city_list.keys():
await weather.finish("不要查一个省的天气啊,这么大查起来很累人的..", at_sender=True)
if word.flag == 'ns':
city = str(word.word).strip()
break
if word.word == '火星':
await weather.finish('没想到你个小呆子还真的想看火星天气!\n火星大气中含有95的二氧化碳,气压低,加之极度的干燥,'
'就阻止了水的形成积聚。这意味着火星几乎没有云,冰层覆盖了火星的两极,它们的融化和冻结受到火星与太'
'阳远近距离的影响,它产生了强大的尘埃云,阻挡了太阳光,使冰层的融化慢下来。\n所以说火星天气太恶劣了,'
'去过一次就不想再去第二次了')
if city:
city_weather = await get_weather_of_city(city)
logger.info(f'(USER {event.user_id}, GROUP {event.group_id if event.message_type != "private" else "private"} ) '
f'查询天气:' + city)
await weather.finish(city_weather)
# if str(event.get_message()).strip() == '天气':
# state['city'] = '-1'
# return
# msg = str(event.get_message()).strip()
# print(msg)
# flag = True
# if msg.find('开启') != -1 or msg.find('关闭') != -1:
# state['city'] = '-1'
# return
# if msg.startswith('天气'):
# if not msg.endswith('市'):
# state['city'] = msg[2:] + '市'
# else:
# state['city'] = msg[2:]
# flag = False
# elif msg.find('的天气') != -1:
# i = msg.find('的天气')
# msg = msg[:i] + '市' + msg[i:]
# elif msg.find('天气') != -1:
# i = msg.find('天气')
# msg = msg[:i] + '市' + msg[i:]
# if msg != "天气" and flag:
# print(posseg.lcut(msg))
# for word in posseg.lcut(msg):
# if word.word in city_list.keys():# and word.word in ['北京', '上海', '天津', '重庆', '台湾']:
# await weather.finish("不要查一个省的天气啊,这么大查起来很累人的..", at_sender=True)
# if word.flag == 'ns':
# state["city"] = word.word
# break
# if word.word == '火星':
# await weather.finish('没想到你个小呆子还真的想看火星天气!\n火星大气中含有95的二氧化碳,气压低,加之极度的干燥,'
# '就阻止了水的形成积聚。这意味着火星几乎没有云,冰层覆盖了火星的两极,它们的融化和冻结受到火星与太'
# '阳远近距离的影响,它产生了强大的尘埃云,阻挡了太阳光,使冰层的融化慢下来。\n所以说火星天气太恶劣了'
# '去过一次就不想再去第二次了')
# @weather.got("city", prompt="你想查询哪个城市的天气呢?")
# async def _(bot: Bot, event: Event, state: dict):
# if state['city'] == '-1' or not state['city'].strip():
# return
# if state['city'] in ['取消', '算了']:
# await weather.finish('已取消此次调用..')
# if state['city'][-1] != '市':
# state['city'] += '市'
# city_weather = await get_weather_of_city(state['city'].strip())
# logger.info(f'(USER {event.user_id}, GROUP {event.group_id if event.message_type != "private" else "private"} ) '
# f'查询天气:' + state['city'])
# await weather.finish(city_weather)

34
plugins/weather/config.py Normal file
View File

@ -0,0 +1,34 @@
city_list = {
"北京": ["北京"],
"天津": ["天津"],
"山西": ["太原", "阳泉", "晋城", "长治", "临汾", "运城", "忻州", "吕梁", "晋中", "大同", "朔州"],
"河北": ["沧州", "石家庄", "唐山", "保定", "廊坊", "衡水", "邯郸", "邢台", "张家口", "辛集", "秦皇岛", "定州", "承德", "涿州"],
"山东": ["济南", "淄博", "聊城", "德州", "滨州", "济宁", "菏泽", "枣庄", "烟台", "威海", "泰安", "青岛", "临沂", "莱芜", "东营", "潍坊", "日照"],
"河南": ["郑州", "新乡", "鹤壁", "安阳", "焦作", "濮阳", "开封", "驻马店", "商丘", "三门峡", "南阳", "洛阳", "周口", "许昌", "信阳", "漯河", "平顶山", "济源"],
"广东": ["珠海", "中山", "肇庆", "深圳", "清远", "揭阳", "江门", "惠州", "河源", "广州", "佛山", "东莞", "潮州", "汕尾", "梅州", "阳江", "云浮", "韶关", "湛江", "汕头", "茂名"],
"浙江": ["舟山", "温州", "台州", "绍兴", "衢州", "宁波", "丽水", "金华", "嘉兴", "湖州", "杭州"],
"宁夏": ["中卫", "银川", "吴忠", "石嘴山", "固原"],
"江苏": ["镇江", "扬州", "盐城", "徐州", "宿迁", "无锡", "苏州", "南通", "南京", "连云港", "淮安", "常州", "泰州"],
"湖南": ["长沙", "邵阳", "怀化", "株洲", "张家界", "永州", "益阳", "湘西", "娄底", "衡阳", "郴州", "岳阳", "常德", "湘潭"],
"吉林": ["长春", "长春", "通化", "松原", "四平", "辽源", "吉林", "延边", "白山", "白城"],
"福建": ["漳州", "厦门", "福州", "三明", "莆田", "宁德", "南平", "龙岩", "泉州"],
"甘肃": ["张掖", "陇南", "兰州", "嘉峪关", "白银", "武威", "天水", "庆阳", "平凉", "临夏", "酒泉", "金昌", "甘南", "定西"],
"陕西": ["榆林", "西安", "延安", "咸阳", "渭南", "铜川", "商洛", "汉中", "宝鸡", "安康"],
"辽宁": ["营口", "铁岭", "沈阳", "盘锦", "辽阳", "锦州", "葫芦岛", "阜新", "抚顺", "丹东", "大连", "朝阳", "本溪", "鞍山"],
"江西": ["鹰潭", "宜春", "上饶", "萍乡", "南昌", "景德镇", "吉安", "抚州", "新余", "九江", "赣州"],
"黑龙江": ["伊春", "七台河", "牡丹江", "鸡西", "黑河", "鹤岗", "哈尔滨", "大兴安岭", "绥化", "双鸭山", "齐齐哈尔", "佳木斯", "大庆"],
"安徽": ["宣城", "铜陵", "六安", "黄山", "淮南", "合肥", "阜阳", "亳州", "安庆", "池州", "宿州", "芜湖", "马鞍山", "淮北", "滁州", "蚌埠"],
"湖北": ["孝感", "武汉", "十堰", "荆门", "黄冈", "襄阳", "咸宁", "随州", "黄石", "恩施", "鄂州", "荆州", "宜昌", "潜江", "天门", "神农架", "仙桃"],
"青海": ["西宁", "海西", "海东", "玉树", "黄南", "海南", "海北", "果洛"],
"新疆": ["乌鲁木齐", "克州", "阿勒泰", "五家渠", "石河子", "伊犁", "吐鲁番", "塔城", "克拉玛依", "喀什", "和田", "哈密", "昌吉", "博尔塔拉", "阿克苏", "巴音郭楞", "阿拉尔", "图木舒克", "铁门关"],
"贵州": ["铜仁", "黔东南", "贵阳", "安顺", "遵义", "黔西南", "黔南", "六盘水", "毕节"],
"四川": ["遂宁", "攀枝花", "眉山", "凉山", "成都", "巴中", "广安", "自贡", "甘孜", "资阳", "宜宾", "雅安", "内江", "南充", "绵阳", "泸州", "凉山", "乐山", "广元", "甘孜", "德阳", "达州", "阿坝"],
"上海": ["上海"],
"广西": ["南宁", "贵港", "玉林", "梧州", "钦州", "柳州", "来宾", "贺州", "河池", "桂林", "防城港", "崇左", "北海", "百色"],
"西藏": ["拉萨", "山南", "日喀则", "那曲", "林芝", "昌都", "阿里"],
"云南": ["昆明", "红河", "大理", "玉溪", "昭通", "西双版纳", "文山", "曲靖", "普洱", "怒江", "临沧", "丽江", "红河", "迪庆", "德宏", "大理", "楚雄", "保山"],
"内蒙古": ["呼和浩特", "乌兰察布", "兴安", "赤峰", "呼伦贝尔", "锡林郭勒", "乌海", "通辽", "巴彦淖尔", "阿拉善", "鄂尔多斯", "包头"],
"海南": ["海口", "三沙", "三亚", "临高", "五指山", "陵水", "文昌", "万宁", "白沙", "乐东", "澄迈", "屯昌", "定安", "东方", "保亭", "琼中", "琼海", "儋州", "昌江"],
"重庆": ["重庆"],
"台湾": ["台北", "高雄", "基隆", "台中", "台南", "新竹", "嘉义", "新北", "桃园"]
}

View File

@ -0,0 +1,21 @@
import requests
from util.init_result import image
async def get_weather_of_city(city) -> str:
url = 'http://wthrcdn.etouch.cn/weather_mini?city=' + city
data_json = requests.get(url).json()
if 'desc' in data_json:
if data_json['desc'] == "invilad-citykey":
return "你为啥不查火星的天气呢?小真寻只支持国内天气查询!!" + image("shengqi", "zhenxun")
elif data_json['desc'] == "OK":
w_type = data_json['data']['forecast'][0]['type']
w_max = data_json['data']['forecast'][0]['high'][3:]
w_min = data_json['data']['forecast'][0]['low'][3:]
fengli = data_json['data']['forecast'][0]['fengli'][9:-3]
ganmao = data_json['data']["ganmao"]
fengxiang = data_json['data']['forecast'][0]['fengxiang']
repass = f'{city}的天气是 {w_type}\n最高温度: {w_max}\n最低温度: {w_min}\n风力: {fengli} {fengxiang}\n{ganmao}'
return repass
else:
return '好像出错了?'

View File

@ -0,0 +1,76 @@
from .data_source import get_anime
from nonebot import on_command
from nonebot.typing import T_State
from nonebot.adapters import Bot, Event
from util.utils import get_message_imgs
from services.log import logger
from util.utils import UserExistLimiter
__plugin_name__ = '识番'
__plugin_usage__ = r"""
以图识番
识番 [图片]
""".strip()
_ulmt = UserExistLimiter()
what_anime = on_command('识番', priority=5, block=True)
@what_anime.args_parser
async def _(bot: Bot, event: Event, state: T_State):
if str(event.get_message()) in ['取消', '算了']:
await what_anime.finish("已取消操作..", at_sender=True)
img_url = get_message_imgs(event.json())
if not img_url:
await what_anime.reject(prompt='图呢图呢图呢图呢GKD', at_sender=True)
state['img_url'] = img_url
@what_anime.handle()
async def _(bot: Bot, event: Event, state: T_State):
if str(event.get_message()) in ['帮助']:
await what_anime.finish(__plugin_usage__)
if _ulmt.check(event.user_id):
await what_anime.finish('您有识番任务正在进行,请稍等...', at_sender=True)
img_url = get_message_imgs(event.json())
if img_url:
state['img_url'] = img_url
@what_anime.got('img_url', prompt='虚空识番来图来图GKD')
async def _(bot: Bot, event: Event, state: T_State):
img_url = state['img_url'][0]
_ulmt.set_True(event.user_id)
await what_anime.send('开始识别.....')
anime_data_report = await get_anime(img_url)
if anime_data_report:
await what_anime.send(anime_data_report, at_sender=True)
logger.info(f"USER {event.user_id} GROUP "f"{event.group_id if event.message_type != 'private' else 'private'}"
f" 识番 {img_url} --> {anime_data_report}")
else:
logger.info(f"USER {event.user_id} GROUP "
f"{event.group_id if event.message_type != 'private' else 'private'} 识番 {img_url} 未找到!!")
await what_anime.send(f"没有寻找到该番剧,果咩..", at_sender=True)
_ulmt.set_False(event.user_id)
# @whatanime.args_parser
# async def _(session: CommandSession):
# image_arg = session.current_arg_images
#
# if session.is_first_run:
# if image_arg:
# session.state['whatanime'] = image_arg[0]
# return
#
# if not image_arg:
# session.pause('没图说个J*GKD!')
#
# session.state[session.current_key] = image_arg
#
# @on_natural_language(keywords={'whatanime', '识番', '識番'}, permission=get_bot().level)
# async def _(session: NLPSession):
# msg = session.msg
# return IntentCommand(90.0, 'whatanime', current_arg=msg or '')

View File

@ -0,0 +1,35 @@
import time
from services.log import logger
from util.langconv import *
import aiohttp
from util.user_agent import get_user_agent
async def get_anime(anime: str) -> str:
s_time = time.time()
url = 'https://trace.moe/api/search?url={}'.format(anime)
logger.debug("[info]Now starting get the {}".format(url))
try:
async with aiohttp.ClientSession(headers=get_user_agent()) as session:
async with session.get(url, timeout=45) as response:
if response.status == 200:
anime_json = await response.json()
if anime_json == 'Error reading imagenull':
return "图像源错误,注意必须是静态图片哦"
repass = ""
for anime in anime_json["docs"][:5]:
anime_name = anime["anime"]
episode = anime["episode"]
at = int(anime["at"])
m, s = divmod(at, 60)
similarity = anime["similarity"]
putline = "[ {} ][{}][{}:{}] 相似度:{:.2%}". \
format(Converter("zh-hans").convert(anime_name),
episode if episode else '?', m, s, similarity)
repass += putline + '\n'
return f'耗时 {int(time.time() - s_time)}\n' + repass[:-1]
else:
return f'访问失败,请再试一次吧, status: {response.status}'
except Exception:
return '直接超时,那就没办法了,再试一次?'

View File

@ -0,0 +1,53 @@
from nonebot import on_command
from .data_source import get_yiqing_data, clear_data
from services.log import logger
from nonebot.adapters.cqhttp import Bot, Event
from nonebot.typing import T_State
from .config import city_list
from util.utils import scheduler
__plugin_name__ = '疫情查询'
__plugin_usage__ = '查询疫情帮助:\n\t对我说 查询疫情 省份/城市,我会回复疫情的实时数据\n\t示例: 查询疫情 温州'
yiqing = on_command("疫情", aliases={"查询疫情", "疫情查询"}, priority=5, block=True)
@yiqing.handle()
async def _(bot: Bot, event: Event, state: T_State):
if str(event.get_message()).strip() in ['帮助']:
await yiqing.finish(__plugin_usage__)
msg = str(event.get_message()).strip()
if msg:
if msg in city_list.keys():
province = msg
city = ''
else:
for key in city_list.keys():
if msg in city_list.get(key):
province = key
city = msg
break
else:
await yiqing.finish(__plugin_usage__)
try:
result = await get_yiqing_data(province, city)
if result:
await yiqing.send(result)
logger.info(
f"(USER {event.user_id}, GROUP {event.group_id if event.message_type != 'private' else 'private'}) 查询疫情:" + result)
else:
await yiqing.send("查询失败!!!!", at_sender=True)
logger.info(
f"(USER {event.user_id}, GROUP {event.group_id if event.message_type != 'private' else 'private'}) 查询疫情失败")
except UnboundLocalError:
await yiqing.finish('参数正确吗?只要一个参数啊', at_sender=True)
@scheduler.scheduled_job(
'cron',
hour=0,
minute=1,
)
async def _():
clear_data()

34
plugins/yiqing/config.py Normal file
View File

@ -0,0 +1,34 @@
city_list = {
"北京": ["北京"],
"天津": ["天津"],
"山西": ["太原", "阳泉", "晋城", "长治", "临汾", "运城", "忻州", "吕梁", "晋中", "大同", "朔州"],
"河北": ["沧州", "石家庄", "唐山", "保定", "廊坊", "衡水", "邯郸", "邢台", "张家口", "辛集", "秦皇岛", "定州", "承德", "涿州"],
"山东": ["济南", "淄博", "聊城", "德州", "滨州", "济宁", "菏泽", "枣庄", "烟台", "威海", "泰安", "青岛", "临沂", "莱芜", "东营", "潍坊", "日照"],
"河南": ["郑州", "新乡", "鹤壁", "安阳", "焦作", "濮阳", "开封", "驻马店", "商丘", "三门峡", "南阳", "洛阳", "周口", "许昌", "信阳", "漯河", "平顶山", "济源"],
"广东": ["珠海", "中山", "肇庆", "深圳", "清远", "揭阳", "江门", "惠州", "河源", "广州", "佛山", "东莞", "潮州", "汕尾", "梅州", "阳江", "云浮", "韶关", "湛江", "汕头", "茂名"],
"浙江": ["舟山", "温州", "台州", "绍兴", "衢州", "宁波", "丽水", "金华", "嘉兴", "湖州", "杭州"],
"宁夏": ["中卫", "银川", "吴忠", "石嘴山", "固原"],
"江苏": ["镇江", "扬州", "盐城", "徐州", "宿迁", "无锡", "苏州", "南通", "南京", "连云港", "淮安", "常州", "泰州"],
"湖南": ["长沙", "邵阳", "怀化", "株洲", "张家界", "永州", "益阳", "湘西", "娄底", "衡阳", "郴州", "岳阳", "常德", "湘潭"],
"吉林": ["长春", "长春", "通化", "松原", "四平", "辽源", "吉林", "延边", "白山", "白城"],
"福建": ["漳州", "厦门", "福州", "三明", "莆田", "宁德", "南平", "龙岩", "泉州"],
"甘肃": ["张掖", "陇南", "兰州", "嘉峪关", "白银", "武威", "天水", "庆阳", "平凉", "临夏", "酒泉", "金昌", "甘南", "定西"],
"陕西": ["榆林", "西安", "延安", "咸阳", "渭南", "铜川", "商洛", "汉中", "宝鸡", "安康"],
"辽宁": ["营口", "铁岭", "沈阳", "盘锦", "辽阳", "锦州", "葫芦岛", "阜新", "抚顺", "丹东", "大连", "朝阳", "本溪", "鞍山"],
"江西": ["鹰潭", "宜春", "上饶", "萍乡", "南昌", "景德镇", "吉安", "抚州", "新余", "九江", "赣州"],
"黑龙江": ["伊春", "七台河", "牡丹江", "鸡西", "黑河", "鹤岗", "哈尔滨", "大兴安岭", "绥化", "双鸭山", "齐齐哈尔", "佳木斯", "大庆"],
"安徽": ["宣城", "铜陵", "六安", "黄山", "淮南", "合肥", "阜阳", "亳州", "安庆", "池州", "宿州", "芜湖", "马鞍山", "淮北", "滁州", "蚌埠"],
"湖北": ["孝感", "武汉", "十堰", "荆门", "黄冈", "襄阳", "咸宁", "随州", "黄石", "恩施", "鄂州", "荆州", "宜昌", "潜江", "天门", "神农架", "仙桃"],
"青海": ["西宁", "海西", "海东", "玉树", "黄南", "海南", "海北", "果洛"],
"新疆": ["乌鲁木齐", "克州", "阿勒泰", "五家渠", "石河子", "伊犁", "吐鲁番", "塔城", "克拉玛依", "喀什", "和田", "哈密", "昌吉", "博尔塔拉", "阿克苏", "巴音郭楞", "阿拉尔", "图木舒克", "铁门关"],
"贵州": ["铜仁", "黔东南", "贵阳", "安顺", "遵义", "黔西南", "黔南", "六盘水", "毕节"],
"四川": ["遂宁", "攀枝花", "眉山", "凉山", "成都", "巴中", "广安", "自贡", "甘孜", "资阳", "宜宾", "雅安", "内江", "南充", "绵阳", "泸州", "凉山", "乐山", "广元", "甘孜", "德阳", "达州", "阿坝"],
"上海": ["上海"],
"广西": ["南宁", "贵港", "玉林", "梧州", "钦州", "柳州", "来宾", "贺州", "河池", "桂林", "防城港", "崇左", "北海", "百色"],
"西藏": ["拉萨", "山南", "日喀则", "那曲", "林芝", "昌都", "阿里"],
"云南": ["昆明", "红河", "大理", "玉溪", "昭通", "西双版纳", "文山", "曲靖", "普洱", "怒江", "临沧", "丽江", "红河", "迪庆", "德宏", "大理", "楚雄", "保山"],
"内蒙古": ["呼和浩特", "乌兰察布", "兴安", "赤峰", "呼伦贝尔", "锡林郭勒", "乌海", "通辽", "巴彦淖尔", "阿拉善", "鄂尔多斯", "包头"],
"海南": ["海口", "三沙", "三亚", "临高", "五指山", "陵水", "文昌", "万宁", "白沙", "乐东", "澄迈", "屯昌", "定安", "东方", "保亭", "琼中", "琼海", "儋州", "昌江"],
"重庆": ["重庆"],
"台湾": ["台北", "高雄", "基隆", "台中", "台南", "新竹", "嘉义", "新北", "桃园"]
}

View File

@ -0,0 +1,50 @@
from datetime import datetime
import aiohttp
from util.user_agent import get_user_agent
import json
import os
from configs.path_config import TXT_PATH
from util.utils import get_local_proxy
url = "https://api.yimian.xyz/coro/"
async def get_yiqing_data(province, city_=''):
if not os.path.exists(TXT_PATH + "yiqing/"):
os.mkdir(TXT_PATH + "yiqing/")
if not os.path.exists(TXT_PATH + "yiqing/" + str(datetime.now().date()) + ".json"):
async with aiohttp.ClientSession(headers=get_user_agent()) as session:
async with session.get(url, proxy=get_local_proxy(), timeout=7) as response:
datalist = await response.json()
with open(TXT_PATH + "yiqing/" + str(datetime.now().date()) + ".json", 'w') as f:
json.dump(datalist, f)
datalist = json.load(open(TXT_PATH + "yiqing/" + str(datetime.now().date()) + ".json", 'r'))
result = ''
for data in datalist:
if data['provinceShortName'] == province:
if city_ == '':
result = province + "疫情数据:\n现存确诊: " + \
str(data['currentConfirmedCount']) + "\n累计确诊: " + \
str(data['confirmedCount']) + "\n治愈: " + \
str(data['curedCount']) + "\n死亡: " + \
str(data['deadCount'])
break
else:
for city in data['cities']:
if city['cityName'] == city_:
result = city_ + "疫情数据:\n现存确诊: " + \
str(city['currentConfirmedCount']) + "\n累计确诊: " + str(city['confirmedCount']) +\
"\n治愈: " + str(city['curedCount']) + "\n死亡: " + str(city['deadCount'])
break
return result
def clear_data():
for file in os.listdir(TXT_PATH + "yiqing/"):
os.remove(TXT_PATH + "yiqing/" + file)
if __name__ == '__main__':
print(get_yiqing_data("浙江", city_=''))