diff --git a/zhenxun/builtin_plugins/sign_in/_data_source.py b/zhenxun/builtin_plugins/sign_in/_data_source.py index c5d41de8..fea99e1c 100644 --- a/zhenxun/builtin_plugins/sign_in/_data_source.py +++ b/zhenxun/builtin_plugins/sign_in/_data_source.py @@ -1,4 +1,3 @@ -import os import random import secrets from datetime import datetime @@ -103,8 +102,13 @@ class SignManage: new_log = ( await SignLog.filter(user_id=session.id1).order_by("-create_time").first() ) + log_time = None + if new_log: + log_time = new_log.create_time.astimezone( + pytz.timezone("Asia/Shanghai") + ).date() if not is_card_view: - if not new_log or (new_log and new_log.create_time.date() != now.date()): + if not new_log or (log_time and log_time != now.date()): return await cls._handle_sign_in(user, nickname, session) return await get_card( user, nickname, -1, user_console.gold, "", is_card_view=is_card_view diff --git a/zhenxun/plugins/draw_card/handles/ba_handle.py b/zhenxun/plugins/draw_card/handles/ba_handle.py index a5d50743..61b0d78f 100644 --- a/zhenxun/plugins/draw_card/handles/ba_handle.py +++ b/zhenxun/plugins/draw_card/handles/ba_handle.py @@ -110,7 +110,7 @@ class BaHandle(BaseHandle[BaChar]): async def _update_info(self): # TODO: ba获取链接失效 info = {} - url = "https://lonqie.github.io/SchaleDB/data/cn/students.min.json?v=49" + url = "https://schale.gg/data/cn/students.min.json?v=49" result = (await AsyncHttpx.get(url)).json() if not result: logger.warning(f"更新 {self.game_name_cn} 出错") @@ -119,12 +119,14 @@ class BaHandle(BaseHandle[BaChar]): for char in result: try: name = char["Name"] + id = str(char["Id"]) avatar = ( - "https://github.com/lonqie/SchaleDB/raw/main/images/student/icon/" - + char["CollectionTexture"] - + ".png" + "https://github.com/SchaleDB/SchaleDB/raw/main/images/student/icon/" + + id + + ".webp" ) star = char["StarGrade"] + star = char["StarGrade"] except IndexError: continue member_dict = { diff --git a/zhenxun/plugins/parse_bilibili/__init__.py b/zhenxun/plugins/parse_bilibili/__init__.py index 3d16396a..bcc09772 100644 --- a/zhenxun/plugins/parse_bilibili/__init__.py +++ b/zhenxun/plugins/parse_bilibili/__init__.py @@ -1,14 +1,22 @@ +import re +import time + +import ujson as json from nonebot import on_message from nonebot.plugin import PluginMetadata -from nonebot_plugin_alconna import UniMsg +from nonebot_plugin_alconna import Hyper, UniMsg +from nonebot_plugin_saa import Image, MessageFactory, Text from nonebot_plugin_session import EventSession +from zhenxun.configs.path_config import TEMP_PATH from zhenxun.configs.utils import PluginExtraData, RegisterConfig, Task from zhenxun.models.group_console import GroupConsole from zhenxun.models.task_info import TaskInfo from zhenxun.services.log import logger +from zhenxun.utils.http_utils import AsyncHttpx -from .data_source import Parser +from .information_container import InformationContainer +from .parse_url import parse_bili_url __plugin_meta__ = PluginMetadata( name="B站转发解析", @@ -48,10 +56,132 @@ async def _rule(session: EventSession) -> bool: _matcher = on_message(priority=1, block=False, rule=_rule) +_tmp = {} + @_matcher.handle() async def _(session: EventSession, message: UniMsg): + information_container = InformationContainer() + # 判断文本消息内容是否相关 + match = None + # 判断文本消息和小程序的内容是否指向一个b站链接 + get_url = None + # 判断文本消息是否包含视频相关内容 + vd_flag = False + # 设定时间阈值,阈值之下不会解析重复内容 + repet_second = 300 + # 尝试解析小程序消息 data = message[0] - if result := await Parser.parse(data, message.extract_plain_text().strip()): - await result.send() - logger.info(f"b站转发解析: {result}", "BILIBILI_PARSE", session=session) + if isinstance(data, Hyper) and data.raw: + try: + data = json.loads(data.raw) + except (IndexError, KeyError): + data = None + if data: + # 获取相关数据 + meta_data = data.get("meta", {}) + news_value = meta_data.get("news", {}) + detail_1_value = meta_data.get("detail_1", {}) + qqdocurl_value = detail_1_value.get("qqdocurl", {}) + jumpUrl_value = news_value.get("jumpUrl", {}) + get_url = (qqdocurl_value if qqdocurl_value else jumpUrl_value).split("?")[ + 0 + ] + # 解析文本消息 + elif msg := message.extract_plain_text(): + # 消息中含有视频号 + if "bv" in msg.lower() or "av" in msg.lower(): + match = re.search(r"((?=(?:bv|av))([A-Za-z0-9]+))", msg, re.IGNORECASE) + vd_flag = True + + # 消息中含有b23的链接,包括视频、专栏、动态、直播 + elif "https://b23.tv" in msg: + match = re.search(r"https://b23\.tv/[^?\s]+", msg, re.IGNORECASE) + + # 检查消息中是否含有直播、专栏、动态链接 + elif any( + keyword in msg + for keyword in [ + "https://live.bilibili.com/", + "https://www.bilibili.com/read/", + "https://www.bilibili.com/opus/", + "https://t.bilibili.com/", + ] + ): + pattern = r"https://(live|www\.bilibili\.com/read|www\.bilibili\.com/opus|t\.bilibili\.com)/[^?\s]+" + match = re.search(pattern, msg) + + # 匹配成功,则获取链接 + if match: + if vd_flag: + number = match.group(1) + get_url = f"https://www.bilibili.com/video/{number}" + else: + get_url = match.group() + + if get_url: + # 将链接统一发送给处理函数 + vd_info, live_info, vd_url, live_url, image_info, image_url = ( + await parse_bili_url(get_url, information_container) + ) + if vd_info: + # 判断一定时间内是否解析重复内容,或者是第一次解析 + if ( + vd_url in _tmp.keys() and time.time() - _tmp[vd_url] > repet_second + ) or vd_url not in _tmp.keys(): + pic = vd_info.get("pic", "") # 封面 + aid = vd_info.get("aid", "") # av号 + title = vd_info.get("title", "") # 标题 + author = vd_info.get("owner", {}).get("name", "") # UP主 + reply = vd_info.get("stat", {}).get("reply", "") # 回复 + favorite = vd_info.get("stat", {}).get("favorite", "") # 收藏 + coin = vd_info.get("stat", {}).get("coin", "") # 投币 + like = vd_info.get("stat", {}).get("like", "") # 点赞 + danmuku = vd_info.get("stat", {}).get("danmaku", "") # 弹幕 + ctime = vd_info["ctime"] + date = time.strftime("%Y-%m-%d", time.localtime(ctime)) + logger.info(f"解析bilibili转发 {vd_url}", "b站解析", session=session) + _tmp[vd_url] = time.time() + _path = TEMP_PATH / f"{aid}.jpg" + await AsyncHttpx.download_file(pic, _path) + await MessageFactory( + [ + Image(_path), + Text( + f"av{aid}\n标题:{title}\nUP:{author}\n上传日期:{date}\n回复:{reply},收藏:{favorite},投币:{coin}\n点赞:{like},弹幕:{danmuku}\n{vd_url}" + ), + ] + ).send() + + elif live_info: + if ( + live_url in _tmp.keys() and time.time() - _tmp[live_url] > repet_second + ) or live_url not in _tmp.keys(): + uid = live_info.get("uid", "") # 主播uid + title = live_info.get("title", "") # 直播间标题 + description = live_info.get("description", "") # 简介,可能会出现标签 + user_cover = live_info.get("user_cover", "") # 封面 + keyframe = live_info.get("keyframe", "") # 关键帧画面 + live_time = live_info.get("live_time", "") # 开播时间 + area_name = live_info.get("area_name", "") # 分区 + parent_area_name = live_info.get("parent_area_name", "") # 父分区 + logger.info(f"解析bilibili转发 {live_url}", "b站解析", session=session) + _tmp[live_url] = time.time() + await MessageFactory( + [ + Image(user_cover), + Text( + f"开播用户:https://space.bilibili.com/{uid}\n开播时间:{live_time}\n直播分区:{parent_area_name}——>{area_name}\n标题:{title}\n简介:{description}\n直播截图:\n" + ), + Image(keyframe), + Text(f"{live_url}"), + ] + ).send() + elif image_info: + if ( + image_url in _tmp.keys() + and time.time() - _tmp[image_url] > repet_second + ) or image_url not in _tmp.keys(): + logger.info(f"解析bilibili转发 {image_url}", "b站解析", session=session) + _tmp[image_url] = time.time() + await image_info.send() diff --git a/zhenxun/plugins/parse_bilibili/data_source.py b/zhenxun/plugins/parse_bilibili/data_source.py deleted file mode 100644 index d130b3b0..00000000 --- a/zhenxun/plugins/parse_bilibili/data_source.py +++ /dev/null @@ -1,186 +0,0 @@ -import re -import time -import uuid -from pathlib import Path -from typing import Any - -import aiohttp -import ujson as json -from bilireq import video -from nonebot_plugin_alconna import Hyper -from nonebot_plugin_saa import Image, MessageFactory, Text - -from zhenxun.configs.path_config import TEMP_PATH -from zhenxun.services.log import logger -from zhenxun.utils.http_utils import AsyncPlaywright -from zhenxun.utils.user_agent import get_user_agent - - -class Parser: - - time_watch: dict[str, float] = {} - - @classmethod - async def parse(cls, data: Any, raw: str | None = None) -> MessageFactory | None: - """解析 - - 参数: - data: data数据 - raw: 文本. - - 返回: - MessageFactory | None: 返回信息 - """ - if isinstance(data, Hyper) and data.raw: - json_data = json.loads(data.raw) - if video_info := await cls.__parse_video_share(json_data): - return await cls.__handle_video_info(video_info) - if path := await cls.__parse_news_share(json_data): - return MessageFactory([Image(path)]) - if raw: - return await cls.__search(raw) - return None - - @classmethod - async def __search(cls, message: str) -> MessageFactory | None: - """根据bv,av,链接获取视频信息 - - 参数: - message: 文本内容 - - 返回: - MessageFactory | None: 返回信息 - """ - if "BV" in message: - index = message.find("BV") - if len(message[index + 2 :]) >= 10: - msg = message[index : index + 12] - url = f"https://www.bilibili.com/video/{msg}" - return await cls.__handle_video_info( - await video.get_video_base_info(msg), url - ) - elif "av" in message: - index = message.find("av") - if len(message[index + 2 :]) >= 1: - if r := re.search(r"av(\d+)", message): - url = f"https://www.bilibili.com/video/av{r.group(1)}" - return await cls.__handle_video_info( - await video.get_video_base_info(f"av{r.group(1)}"), url - ) - elif "https://b23.tv" in message: - url = ( - "https://" - + message[message.find("b23.tv") : message.find("b23.tv") + 14] - ) - async with aiohttp.ClientSession(headers=get_user_agent()) as session: - async with session.get( - url, - timeout=7, - ) as response: - url = (str(response.url).split("?")[0]).strip("/") - bvid = url.split("/")[-1] - return await cls.__handle_video_info( - await video.get_video_base_info(bvid), url - ) - return None - - @classmethod - async def __handle_video_info( - cls, vd_info: dict, url: str = "" - ) -> MessageFactory | None: - """处理视频信息 - - 参数: - vd_info: 视频数据 - url: 视频url. - - 返回: - MessageFactory | None: 返回信息 - """ - if url: - if url in cls.time_watch.keys() and time.time() - cls.time_watch[url] < 30: - logger.debug("b站 url 解析在30秒内重复, 跳过解析...") - return None - cls.time_watch[url] = time.time() - aid = vd_info["aid"] - title = vd_info["title"] - author = vd_info["owner"]["name"] - reply = vd_info["stat"]["reply"] # 回复 - favorite = vd_info["stat"]["favorite"] # 收藏 - coin = vd_info["stat"]["coin"] # 投币 - # like = vd_info['stat']['like'] # 点赞 - # danmu = vd_info['stat']['danmaku'] # 弹幕 - date = time.strftime("%Y-%m-%d", time.localtime(vd_info["ctime"])) - return MessageFactory( - [ - Image(vd_info["pic"]), - Text( - f"\nav{aid}\n标题:{title}\nUP:{author}\n上传日期:{date}\n回复:{reply},收藏:{favorite},投币:{coin}\n{url}" - ), - ] - ) - - @classmethod - async def __parse_video_share(cls, data: dict) -> dict | None: - """解析视频转发 - - 参数: - data: data数据 - - 返回: - dict | None: 视频信息 - """ - try: - if data["meta"]["detail_1"]["title"] == "哔哩哔哩": - try: - async with aiohttp.ClientSession( - headers=get_user_agent() - ) as session: - async with session.get( - data["meta"]["detail_1"]["qqdocurl"], - timeout=7, - ) as response: - url = str(response.url).split("?")[0] - if url[-1] == "/": - url = url[:-1] - bvid = url.split("/")[-1] - return await video.get_video_base_info(bvid) - except Exception as e: - logger.warning("解析b站视频失败", e=e) - except Exception as e: - pass - return None - - @classmethod - async def __parse_news_share(cls, data: dict) -> Path | None: - """解析b站专栏 - - 参数: - data: data数据 - - 返回: - Path | None: 截图路径 - """ - try: - if data["meta"]["news"]["desc"] == "哔哩哔哩专栏": - try: - url = data["meta"]["news"]["jumpUrl"] - async with AsyncPlaywright.new_page() as page: - await page.goto(url, wait_until="networkidle", timeout=10000) - await page.set_viewport_size({"width": 2560, "height": 1080}) - try: - await page.locator("div.bili-mini-close-icon").click() - except Exception: - pass - if div := await page.query_selector("#app > div"): - path = TEMP_PATH / f"bl_share_{uuid.uuid1()}.png" - await div.screenshot( - path=path, - timeout=100000, - ) - return path - except Exception as e: - logger.warning("解析b站专栏失败", e=e) - except Exception as e: - pass - return None diff --git a/zhenxun/plugins/parse_bilibili/get_image.py b/zhenxun/plugins/parse_bilibili/get_image.py new file mode 100644 index 00000000..e2f4ddcb --- /dev/null +++ b/zhenxun/plugins/parse_bilibili/get_image.py @@ -0,0 +1,107 @@ +import os +import re + +from nonebot_plugin_saa import Image + +from zhenxun.configs.path_config import TEMP_PATH +from zhenxun.services.log import logger +from zhenxun.utils.http_utils import AsyncPlaywright +from zhenxun.utils.image_utils import BuildImage +from zhenxun.utils.user_agent import get_user_agent_str + + +async def resize(path: str): + """调整图像大小的异步函数 + + 参数: + path (str): 图像文件路径 + """ + A = BuildImage(background=path) + await A.resize(0.5) + await A.save(path) + + +async def get_image(url) -> Image | None: + """获取Bilibili链接的截图,并返回base64格式的图片 + + 参数: + url (str): Bilibili链接 + + 返回: + Image: Image + """ + cv_match = None + opus_match = None + t_opus_match = None + + cv_number = None + opus_number = None + t_opus_number = None + + # 提取cv、opus、t_opus的编号 + url = url.split("?")[0] + cv_match = re.search(r"read/cv([A-Za-z0-9]+)", url, re.IGNORECASE) + opus_match = re.search(r"opus/([A-Za-z0-9]+)", url, re.IGNORECASE) + t_opus_match = re.search(r"https://t\.bilibili\.com/(\d+)", url, re.IGNORECASE) + + if cv_match: + cv_number = cv_match.group(1) + elif opus_match: + opus_number = opus_match.group(1) + elif t_opus_match: + t_opus_number = t_opus_match.group(1) + + screenshot_path = None + + # 根据编号构建保存路径 + if cv_number: + screenshot_path = f"{TEMP_PATH}/bilibili_cv_{cv_number}.png" + elif opus_number: + screenshot_path = f"{TEMP_PATH}/bilibili_opus_{opus_number}.png" + elif t_opus_number: + screenshot_path = f"{TEMP_PATH}/bilibili_opus_{t_opus_number}.png" + # t.bilibili.com和https://www.bilibili.com/opus在内容上是一样的,为便于维护,调整url至https://www.bilibili.com/opus/ + url = f"https://www.bilibili.com/opus/{t_opus_number}" + + if screenshot_path: + try: + # 如果文件不存在,进行截图 + if not os.path.exists(screenshot_path): + # 创建页面 + # random.choice(),从列表中随机抽取一个对象 + user_agent = get_user_agent_str() + try: + async with AsyncPlaywright.new_page() as page: + await page.set_viewport_size({"width": 5120, "height": 2560}) + # 设置请求拦截器 + await page.route( + re.compile(r"(\.png$)|(\.jpg$)"), + lambda route: route.abort(), + ) + # 访问链接 + await page.goto(url, wait_until="networkidle", timeout=10000) + # 根据不同的链接结构,设置对应的CSS选择器 + if cv_number: + css = "#app > div" + elif opus_number or t_opus_number: + css = "#app > div.opus-detail > div.bili-opus-view" + # 点击对应的元素 + await page.click(css) + # 查询目标元素 + div = await page.query_selector(css) + # 对目标元素进行截图 + await div.screenshot( # type: ignore + path=screenshot_path, + timeout=100000, + animations="disabled", + type="png", + ) + # 异步执行调整截图大小的操作 + await resize(screenshot_path) + except Exception as e: + logger.warning(f"尝试解析bilibili转发失败", e=e) + return None + return Image(screenshot_path) + except Exception as e: + logger.error(f"尝试解析bilibili转发失败", e=e) + return None diff --git a/zhenxun/plugins/parse_bilibili/information_container.py b/zhenxun/plugins/parse_bilibili/information_container.py new file mode 100644 index 00000000..1cbf651f --- /dev/null +++ b/zhenxun/plugins/parse_bilibili/information_container.py @@ -0,0 +1,60 @@ +class InformationContainer: + def __init__( + self, + vd_info=None, + live_info=None, + vd_url=None, + live_url=None, + image_info=None, + image_url=None, + ): + self._vd_info = vd_info + self._live_info = live_info + self._vd_url = vd_url + self._live_url = live_url + self._image_info = image_info + self._image_url = image_url + + @property + def vd_info(self): + return self._vd_info + + @property + def live_info(self): + return self._live_info + + @property + def vd_url(self): + return self._vd_url + + @property + def live_url(self): + return self._live_url + + @property + def image_info(self): + return self._image_info + + @property + def image_url(self): + return self._image_url + + def update(self, updates): + """ + 更新多个信息的通用方法 + Args: + updates (dict): 包含信息类型和对应新值的字典 + """ + for info_type, new_value in updates.items(): + if hasattr(self, f"_{info_type}"): + setattr(self, f"_{info_type}", new_value) + + def get_information(self): + return ( + self.vd_info, + self.live_info, + self.vd_url, + self.live_url, + self.image_info, + self.image_url, + ) diff --git a/zhenxun/plugins/parse_bilibili/parse_url.py b/zhenxun/plugins/parse_bilibili/parse_url.py new file mode 100644 index 00000000..b4e2a1fe --- /dev/null +++ b/zhenxun/plugins/parse_bilibili/parse_url.py @@ -0,0 +1,65 @@ +import aiohttp +from bilireq import live, video + +from zhenxun.utils.user_agent import get_user_agent + +from .get_image import get_image +from .information_container import InformationContainer + + +async def parse_bili_url(get_url: str, information_container: InformationContainer): + """解析Bilibili链接,获取相关信息 + + 参数: + get_url (str): 待解析的Bilibili链接 + information_container (InformationContainer): 信息容器 + + 返回: + dict: 包含解析得到的信息的字典 + """ + response_url = "" + + # 去除链接末尾的斜杠 + if get_url[-1] == "/": + get_url = get_url[:-1] + + # 发起HTTP请求,获取重定向后的链接 + async with aiohttp.ClientSession(headers=get_user_agent()) as session: + async with session.get( + get_url, + timeout=7, + ) as response: + response_url = str(response.url).split("?")[0] + + # 去除重定向后链接末尾的斜杠 + if response_url[-1] == "/": + response_url = response_url[:-1] + + # 根据不同类型的链接进行处理 + if response_url.startswith( + ("https://www.bilibili.com/video", "https://m.bilibili.com/video/") + ): + vd_url = response_url + vid = vd_url.split("/")[-1] + vd_info = await video.get_video_base_info(vid) + information_container.update({"vd_info": vd_info, "vd_url": vd_url}) + + elif response_url.startswith("https://live.bilibili.com"): + live_url = response_url + liveid = live_url.split("/")[-1] + live_info = await live.get_room_info_by_id(liveid) + information_container.update({"live_info": live_info, "live_url": live_url}) + + elif response_url.startswith("https://www.bilibili.com/read"): + cv_url = response_url + image_info = await get_image(cv_url) + information_container.update({"image_info": image_info, "image_url": cv_url}) + + elif response_url.startswith( + ("https://www.bilibili.com/opus", "https://t.bilibili.com") + ): + opus_url = response_url + image_info = await get_image(opus_url) + information_container.update({"image_info": image_info, "image_url": opus_url}) + + return information_container.get_information() diff --git a/zhenxun/utils/image_utils.py b/zhenxun/utils/image_utils.py index 0c6a2b27..c193a565 100644 --- a/zhenxun/utils/image_utils.py +++ b/zhenxun/utils/image_utils.py @@ -1,6 +1,7 @@ import os import random import re +from io import BytesIO from pathlib import Path from typing import Awaitable, Callable @@ -408,3 +409,14 @@ async def get_download_image_hash(url: str, mark: str) -> str: except Exception as e: logger.warning(f"下载读取图片Hash出错", e=e) return "" + + +def pic2bytes(image) -> bytes: + """获取bytes + + 返回: + bytes: bytes + """ + buf = BytesIO() + image.save(buf, format="PNG") + return buf.getvalue()