diff --git a/plugins/alapi/_data_source.py b/plugins/alapi/_data_source.py index 0fd0e010..916143e2 100644 --- a/plugins/alapi/_data_source.py +++ b/plugins/alapi/_data_source.py @@ -1,7 +1,3 @@ -from nonebot.adapters.onebot.v11 import MessageSegment -from utils.image_utils import BuildImage -from utils.message_builder import image -from configs.path_config import IMAGE_PATH from typing import Optional, Tuple, Union from configs.config import Config from utils.http_utils import AsyncHttpx @@ -28,24 +24,3 @@ async def get_data(url: str, params: Optional[dict] = None) -> Tuple[Union[dict, return f'发生了错误...code:{data["code"]}', 999 except TimeoutError: return "超时了....", 998 - - -def gen_wbtop_pic(data: dict) -> MessageSegment: - """ - 生成微博热搜图片 - :param data: 微博热搜数据 - """ - bk = BuildImage(700, 32 * 50 + 280, 700, 32, color="#797979") - wbtop_bk = BuildImage(700, 280, background=f"{IMAGE_PATH}/other/webtop.png") - bk.paste(wbtop_bk) - text_bk = BuildImage(700, 32 * 50, 700, 32, color="#797979") - for i, data in enumerate(data): - title = f"{i+1}. {data['hot_word']}" - hot = data["hot_word_num"] - img = BuildImage(700, 30, font_size=20) - w, h = img.getsize(title) - img.text((10, int((30 - h) / 2)), title) - img.text((580, int((30 - h) / 2)), hot) - text_bk.paste(img) - bk.paste(text_bk, (0, 280)) - return image(b64=bk.pic2bs4()) diff --git a/plugins/alapi/wbtop.py b/plugins/wbtop/__init__.py old mode 100755 new mode 100644 similarity index 69% rename from plugins/alapi/wbtop.py rename to plugins/wbtop/__init__.py index 7e3bdfab..34eb2ba4 --- a/plugins/alapi/wbtop.py +++ b/plugins/wbtop/__init__.py @@ -1,70 +1,78 @@ -from nonebot import on_command -from nonebot.adapters.onebot.v11 import MessageEvent, GroupMessageEvent, Message -from nonebot.params import CommandArg -from services.log import logger -from ._data_source import get_data, gen_wbtop_pic -from utils.utils import is_number -from configs.path_config import IMAGE_PATH -from utils.http_utils import AsyncPlaywright -import asyncio - -__zx_plugin_name__ = "微博热搜" -__plugin_usage__ = """ -usage: - 在QQ上吃个瓜 - 指令: - 微博热搜:发送实时热搜 - 微博热搜 [id]:截图该热搜页面 - 示例:微博热搜 5 -""".strip() -__plugin_des__ = "刚买完瓜,在吃瓜现场" -__plugin_cmd__ = ["微博热搜", "微博热搜 [id]"] -__plugin_version__ = 0.1 -__plugin_author__ = "HibiKier" -__plugin_settings__ = { - "level": 5, - "default_status": True, - "limit_superuser": False, - "cmd": ["微博热搜"], -} - -wbtop = on_command("wbtop", aliases={"微博热搜"}, priority=5, block=True) - - -wbtop_url = "https://v2.alapi.cn/api/new/wbtop" - -wbtop_data = [] - - -@wbtop.handle() -async def _(event: MessageEvent, arg: Message = CommandArg()): - global wbtop_data - msg = arg.extract_plain_text().strip() - if not wbtop_data or not msg: - data, code = await get_data(wbtop_url) - if code != 200: - await wbtop.finish(data, at_sender=True) - wbtop_data = data["data"] - if not msg: - img = await asyncio.get_event_loop().run_in_executor( - None, gen_wbtop_pic, wbtop_data - ) - await wbtop.send(img) - logger.info( - f"(USER {event.user_id}, GROUP {event.group_id if isinstance(event, GroupMessageEvent) else 'private'})" - f" 查询微博热搜" - ) - if is_number(msg) and 0 < int(msg) <= 50: - url = wbtop_data[int(msg) - 1]["url"] - await wbtop.send("开始截取数据...") - img = await AsyncPlaywright.screenshot( - url, - f"{IMAGE_PATH}/temp/wbtop_{event.user_id}.png", - "#pl_feed_main", - wait_time=12 - ) - if img: - await wbtop.send(img) - else: - await wbtop.send("发生了一些错误.....") - +from nonebot import on_command +from nonebot.adapters.onebot.v11 import MessageEvent, GroupMessageEvent, Message +from nonebot.params import CommandArg +from services.log import logger +from .data_source import gen_wbtop_pic,get_wbtop +from utils.utils import is_number +from configs.path_config import IMAGE_PATH +from utils.http_utils import AsyncPlaywright +import asyncio +import datetime +__zx_plugin_name__ = "微博热搜" +__plugin_usage__ = """ +usage: + 在QQ上吃个瓜 + 指令: + 微博热搜:发送实时热搜 + 微博热搜 [id]:截图该热搜页面 + 示例:微博热搜 5 +""".strip() +__plugin_des__ = "刚买完瓜,在吃瓜现场" +__plugin_cmd__ = ["微博热搜", "微博热搜 [id]"] +__plugin_version__ = 0.2 +__plugin_author__ = "HibiKier & yajiwa" +__plugin_settings__ = { + "level": 5, + "default_status": True, + "limit_superuser": False, + "cmd": ["微博热搜"], +} + +wbtop = on_command("wbtop", aliases={"微博热搜"}, priority=5, block=True) + + +wbtop_url = "https://weibo.com/ajax/side/hotSearch" + +wbtop_data = [] + + +@wbtop.handle() +async def _(event: MessageEvent, arg: Message = CommandArg()): + global wbtop_data + msg = arg.extract_plain_text().strip() + if not wbtop_data or not msg: + if wbtop_data: + now_time = datetime.datetime.now() + if now_time > wbtop_data["time"] + datetime.timedelta(minutes=5): + data, code = await get_wbtop(wbtop_url) + if code != 200: + await wbtop.finish(data, at_sender=True) + wbtop_data = data + else: + data, code = await get_wbtop(wbtop_url) + if code != 200: + await wbtop.finish(data, at_sender=True) + wbtop_data = data + if not msg: + img = await asyncio.get_event_loop().run_in_executor( + None, gen_wbtop_pic, wbtop_data["data"] + ) + await wbtop.send(img) + logger.info( + f"(USER {event.user_id}, GROUP {event.group_id if isinstance(event, GroupMessageEvent) else 'private'})" + f" 查询微博热搜" + ) + if is_number(msg) and 0 < int(msg) <= 50: + url = wbtop_data["data"][int(msg) - 1]["url"] + await wbtop.send("开始截取数据...") + img = await AsyncPlaywright.screenshot( + url, + f"{IMAGE_PATH}/temp/wbtop_{event.user_id}.png", + "#pl_feed_main", + wait_time=12 + ) + if img: + await wbtop.send(img) + else: + await wbtop.send("发生了一些错误.....") + diff --git a/plugins/wbtop/data_source.py b/plugins/wbtop/data_source.py new file mode 100644 index 00000000..f8eb404d --- /dev/null +++ b/plugins/wbtop/data_source.py @@ -0,0 +1,62 @@ +from nonebot.adapters.onebot.v11 import MessageSegment +from utils.image_utils import BuildImage +from utils.message_builder import image +from configs.path_config import IMAGE_PATH +from typing import Tuple, Union +from utils.http_utils import AsyncHttpx +import datetime + + +async def get_wbtop(url: str) -> Tuple[Union[dict, str], int]: + """ + :param url: 请求链接 + """ + n = 0 + while True: + try: + data = [] + get_response = (await AsyncHttpx.get(url, timeout=20)) + if get_response.status_code == 200: + data_json = get_response.json()['data']['realtime'] + for data_item in data_json: + # 如果是广告,则不添加 + if 'is_ad' in data_item: + continue + dic = { + 'hot_word': data_item['note'], + 'hot_word_num': str(data_item['num']), + 'url': 'https://s.weibo.com/weibo?q=%23' + data_item['word'] + '%23', + } + data.append(dic) + if not data: + return "没有搜索到...", 997 + return {'data': data, 'time': datetime.datetime.now()}, 200 + else: + if n > 2: + return f'获取失败,请十分钟后再试', 999 + else: + n += 1 + continue + except TimeoutError: + return "超时了....", 998 + + +def gen_wbtop_pic(data: dict) -> MessageSegment: + """ + 生成微博热搜图片 + :param data: 微博热搜数据 + """ + bk = BuildImage(700, 32 * 50 + 280, 700, 32, color="#797979") + wbtop_bk = BuildImage(700, 280, background=f"{IMAGE_PATH}/other/webtop.png") + bk.paste(wbtop_bk) + text_bk = BuildImage(700, 32 * 50, 700, 32, color="#797979") + for i, data in enumerate(data): + title = f"{i + 1}. {data['hot_word']}" + hot = data["hot_word_num"] + img = BuildImage(700, 30, font_size=20) + w, h = img.getsize(title) + img.text((10, int((30 - h) / 2)), title) + img.text((580, int((30 - h) / 2)), hot) + text_bk.paste(img) + bk.paste(text_bk, (0, 280)) + return image(b64=bk.pic2bs4())