update wbtop

This commit is contained in:
yajiwa 2022-05-24 06:01:23 +08:00
parent 41836bde26
commit 570b5b1766
3 changed files with 140 additions and 95 deletions

View File

@ -1,7 +1,3 @@
from nonebot.adapters.onebot.v11 import MessageSegment
from utils.image_utils import BuildImage
from utils.message_builder import image
from configs.path_config import IMAGE_PATH
from typing import Optional, Tuple, Union from typing import Optional, Tuple, Union
from configs.config import Config from configs.config import Config
from utils.http_utils import AsyncHttpx from utils.http_utils import AsyncHttpx
@ -28,24 +24,3 @@ async def get_data(url: str, params: Optional[dict] = None) -> Tuple[Union[dict,
return f'发生了错误...code{data["code"]}', 999 return f'发生了错误...code{data["code"]}', 999
except TimeoutError: except TimeoutError:
return "超时了....", 998 return "超时了....", 998
def gen_wbtop_pic(data: dict) -> MessageSegment:
"""
生成微博热搜图片
:param data: 微博热搜数据
"""
bk = BuildImage(700, 32 * 50 + 280, 700, 32, color="#797979")
wbtop_bk = BuildImage(700, 280, background=f"{IMAGE_PATH}/other/webtop.png")
bk.paste(wbtop_bk)
text_bk = BuildImage(700, 32 * 50, 700, 32, color="#797979")
for i, data in enumerate(data):
title = f"{i+1}. {data['hot_word']}"
hot = data["hot_word_num"]
img = BuildImage(700, 30, font_size=20)
w, h = img.getsize(title)
img.text((10, int((30 - h) / 2)), title)
img.text((580, int((30 - h) / 2)), hot)
text_bk.paste(img)
bk.paste(text_bk, (0, 280))
return image(b64=bk.pic2bs4())

148
plugins/alapi/wbtop.py → plugins/wbtop/__init__.py Executable file → Normal file
View File

@ -1,70 +1,78 @@
from nonebot import on_command from nonebot import on_command
from nonebot.adapters.onebot.v11 import MessageEvent, GroupMessageEvent, Message from nonebot.adapters.onebot.v11 import MessageEvent, GroupMessageEvent, Message
from nonebot.params import CommandArg from nonebot.params import CommandArg
from services.log import logger from services.log import logger
from ._data_source import get_data, gen_wbtop_pic from .data_source import gen_wbtop_pic,get_wbtop
from utils.utils import is_number from utils.utils import is_number
from configs.path_config import IMAGE_PATH from configs.path_config import IMAGE_PATH
from utils.http_utils import AsyncPlaywright from utils.http_utils import AsyncPlaywright
import asyncio import asyncio
import datetime
__zx_plugin_name__ = "微博热搜" __zx_plugin_name__ = "微博热搜"
__plugin_usage__ = """ __plugin_usage__ = """
usage usage
在QQ上吃个瓜 在QQ上吃个瓜
指令 指令
微博热搜发送实时热搜 微博热搜发送实时热搜
微博热搜 [id]截图该热搜页面 微博热搜 [id]截图该热搜页面
示例微博热搜 5 示例微博热搜 5
""".strip() """.strip()
__plugin_des__ = "刚买完瓜,在吃瓜现场" __plugin_des__ = "刚买完瓜,在吃瓜现场"
__plugin_cmd__ = ["微博热搜", "微博热搜 [id]"] __plugin_cmd__ = ["微博热搜", "微博热搜 [id]"]
__plugin_version__ = 0.1 __plugin_version__ = 0.2
__plugin_author__ = "HibiKier" __plugin_author__ = "HibiKier & yajiwa"
__plugin_settings__ = { __plugin_settings__ = {
"level": 5, "level": 5,
"default_status": True, "default_status": True,
"limit_superuser": False, "limit_superuser": False,
"cmd": ["微博热搜"], "cmd": ["微博热搜"],
} }
wbtop = on_command("wbtop", aliases={"微博热搜"}, priority=5, block=True) wbtop = on_command("wbtop", aliases={"微博热搜"}, priority=5, block=True)
wbtop_url = "https://v2.alapi.cn/api/new/wbtop" wbtop_url = "https://weibo.com/ajax/side/hotSearch"
wbtop_data = [] wbtop_data = []
@wbtop.handle() @wbtop.handle()
async def _(event: MessageEvent, arg: Message = CommandArg()): async def _(event: MessageEvent, arg: Message = CommandArg()):
global wbtop_data global wbtop_data
msg = arg.extract_plain_text().strip() msg = arg.extract_plain_text().strip()
if not wbtop_data or not msg: if not wbtop_data or not msg:
data, code = await get_data(wbtop_url) if wbtop_data:
if code != 200: now_time = datetime.datetime.now()
await wbtop.finish(data, at_sender=True) if now_time > wbtop_data["time"] + datetime.timedelta(minutes=5):
wbtop_data = data["data"] data, code = await get_wbtop(wbtop_url)
if not msg: if code != 200:
img = await asyncio.get_event_loop().run_in_executor( await wbtop.finish(data, at_sender=True)
None, gen_wbtop_pic, wbtop_data wbtop_data = data
) else:
await wbtop.send(img) data, code = await get_wbtop(wbtop_url)
logger.info( if code != 200:
f"(USER {event.user_id}, GROUP {event.group_id if isinstance(event, GroupMessageEvent) else 'private'})" await wbtop.finish(data, at_sender=True)
f" 查询微博热搜" wbtop_data = data
) if not msg:
if is_number(msg) and 0 < int(msg) <= 50: img = await asyncio.get_event_loop().run_in_executor(
url = wbtop_data[int(msg) - 1]["url"] None, gen_wbtop_pic, wbtop_data["data"]
await wbtop.send("开始截取数据...") )
img = await AsyncPlaywright.screenshot( await wbtop.send(img)
url, logger.info(
f"{IMAGE_PATH}/temp/wbtop_{event.user_id}.png", f"(USER {event.user_id}, GROUP {event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
"#pl_feed_main", f" 查询微博热搜"
wait_time=12 )
) if is_number(msg) and 0 < int(msg) <= 50:
if img: url = wbtop_data["data"][int(msg) - 1]["url"]
await wbtop.send(img) await wbtop.send("开始截取数据...")
else: img = await AsyncPlaywright.screenshot(
await wbtop.send("发生了一些错误.....") url,
f"{IMAGE_PATH}/temp/wbtop_{event.user_id}.png",
"#pl_feed_main",
wait_time=12
)
if img:
await wbtop.send(img)
else:
await wbtop.send("发生了一些错误.....")

View File

@ -0,0 +1,62 @@
from nonebot.adapters.onebot.v11 import MessageSegment
from utils.image_utils import BuildImage
from utils.message_builder import image
from configs.path_config import IMAGE_PATH
from typing import Tuple, Union
from utils.http_utils import AsyncHttpx
import datetime
async def get_wbtop(url: str) -> Tuple[Union[dict, str], int]:
"""
:param url: 请求链接
"""
n = 0
while True:
try:
data = []
get_response = (await AsyncHttpx.get(url, timeout=20))
if get_response.status_code == 200:
data_json = get_response.json()['data']['realtime']
for data_item in data_json:
# 如果是广告,则不添加
if 'is_ad' in data_item:
continue
dic = {
'hot_word': data_item['note'],
'hot_word_num': str(data_item['num']),
'url': 'https://s.weibo.com/weibo?q=%23' + data_item['word'] + '%23',
}
data.append(dic)
if not data:
return "没有搜索到...", 997
return {'data': data, 'time': datetime.datetime.now()}, 200
else:
if n > 2:
return f'获取失败,请十分钟后再试', 999
else:
n += 1
continue
except TimeoutError:
return "超时了....", 998
def gen_wbtop_pic(data: dict) -> MessageSegment:
"""
生成微博热搜图片
:param data: 微博热搜数据
"""
bk = BuildImage(700, 32 * 50 + 280, 700, 32, color="#797979")
wbtop_bk = BuildImage(700, 280, background=f"{IMAGE_PATH}/other/webtop.png")
bk.paste(wbtop_bk)
text_bk = BuildImage(700, 32 * 50, 700, 32, color="#797979")
for i, data in enumerate(data):
title = f"{i + 1}. {data['hot_word']}"
hot = data["hot_word_num"]
img = BuildImage(700, 30, font_size=20)
w, h = img.getsize(title)
img.text((10, int((30 - h) / 2)), title)
img.text((580, int((30 - h) / 2)), hot)
text_bk.paste(img)
bk.paste(text_bk, (0, 280))
return image(b64=bk.pic2bs4())