mirror of
https://github.com/zhenxun-org/zhenxun_bot.git
synced 2025-12-15 06:12:53 +08:00
feat✨: 优化b站解析
This commit is contained in:
parent
d4a49a47e5
commit
c219264968
@ -1,4 +1,3 @@
|
||||
import os
|
||||
import random
|
||||
import secrets
|
||||
from datetime import datetime
|
||||
@ -103,8 +102,13 @@ class SignManage:
|
||||
new_log = (
|
||||
await SignLog.filter(user_id=session.id1).order_by("-create_time").first()
|
||||
)
|
||||
log_time = None
|
||||
if new_log:
|
||||
log_time = new_log.create_time.astimezone(
|
||||
pytz.timezone("Asia/Shanghai")
|
||||
).date()
|
||||
if not is_card_view:
|
||||
if not new_log or (new_log and new_log.create_time.date() != now.date()):
|
||||
if not new_log or (log_time and log_time != now.date()):
|
||||
return await cls._handle_sign_in(user, nickname, session)
|
||||
return await get_card(
|
||||
user, nickname, -1, user_console.gold, "", is_card_view=is_card_view
|
||||
|
||||
@ -110,7 +110,7 @@ class BaHandle(BaseHandle[BaChar]):
|
||||
async def _update_info(self):
|
||||
# TODO: ba获取链接失效
|
||||
info = {}
|
||||
url = "https://lonqie.github.io/SchaleDB/data/cn/students.min.json?v=49"
|
||||
url = "https://schale.gg/data/cn/students.min.json?v=49"
|
||||
result = (await AsyncHttpx.get(url)).json()
|
||||
if not result:
|
||||
logger.warning(f"更新 {self.game_name_cn} 出错")
|
||||
@ -119,12 +119,14 @@ class BaHandle(BaseHandle[BaChar]):
|
||||
for char in result:
|
||||
try:
|
||||
name = char["Name"]
|
||||
id = str(char["Id"])
|
||||
avatar = (
|
||||
"https://github.com/lonqie/SchaleDB/raw/main/images/student/icon/"
|
||||
+ char["CollectionTexture"]
|
||||
+ ".png"
|
||||
"https://github.com/SchaleDB/SchaleDB/raw/main/images/student/icon/"
|
||||
+ id
|
||||
+ ".webp"
|
||||
)
|
||||
star = char["StarGrade"]
|
||||
star = char["StarGrade"]
|
||||
except IndexError:
|
||||
continue
|
||||
member_dict = {
|
||||
|
||||
@ -1,14 +1,22 @@
|
||||
import re
|
||||
import time
|
||||
|
||||
import ujson as json
|
||||
from nonebot import on_message
|
||||
from nonebot.plugin import PluginMetadata
|
||||
from nonebot_plugin_alconna import UniMsg
|
||||
from nonebot_plugin_alconna import Hyper, UniMsg
|
||||
from nonebot_plugin_saa import Image, MessageFactory, Text
|
||||
from nonebot_plugin_session import EventSession
|
||||
|
||||
from zhenxun.configs.path_config import TEMP_PATH
|
||||
from zhenxun.configs.utils import PluginExtraData, RegisterConfig, Task
|
||||
from zhenxun.models.group_console import GroupConsole
|
||||
from zhenxun.models.task_info import TaskInfo
|
||||
from zhenxun.services.log import logger
|
||||
from zhenxun.utils.http_utils import AsyncHttpx
|
||||
|
||||
from .data_source import Parser
|
||||
from .information_container import InformationContainer
|
||||
from .parse_url import parse_bili_url
|
||||
|
||||
__plugin_meta__ = PluginMetadata(
|
||||
name="B站转发解析",
|
||||
@ -48,10 +56,132 @@ async def _rule(session: EventSession) -> bool:
|
||||
|
||||
_matcher = on_message(priority=1, block=False, rule=_rule)
|
||||
|
||||
_tmp = {}
|
||||
|
||||
|
||||
@_matcher.handle()
|
||||
async def _(session: EventSession, message: UniMsg):
|
||||
information_container = InformationContainer()
|
||||
# 判断文本消息内容是否相关
|
||||
match = None
|
||||
# 判断文本消息和小程序的内容是否指向一个b站链接
|
||||
get_url = None
|
||||
# 判断文本消息是否包含视频相关内容
|
||||
vd_flag = False
|
||||
# 设定时间阈值,阈值之下不会解析重复内容
|
||||
repet_second = 300
|
||||
# 尝试解析小程序消息
|
||||
data = message[0]
|
||||
if result := await Parser.parse(data, message.extract_plain_text().strip()):
|
||||
await result.send()
|
||||
logger.info(f"b站转发解析: {result}", "BILIBILI_PARSE", session=session)
|
||||
if isinstance(data, Hyper) and data.raw:
|
||||
try:
|
||||
data = json.loads(data.raw)
|
||||
except (IndexError, KeyError):
|
||||
data = None
|
||||
if data:
|
||||
# 获取相关数据
|
||||
meta_data = data.get("meta", {})
|
||||
news_value = meta_data.get("news", {})
|
||||
detail_1_value = meta_data.get("detail_1", {})
|
||||
qqdocurl_value = detail_1_value.get("qqdocurl", {})
|
||||
jumpUrl_value = news_value.get("jumpUrl", {})
|
||||
get_url = (qqdocurl_value if qqdocurl_value else jumpUrl_value).split("?")[
|
||||
0
|
||||
]
|
||||
# 解析文本消息
|
||||
elif msg := message.extract_plain_text():
|
||||
# 消息中含有视频号
|
||||
if "bv" in msg.lower() or "av" in msg.lower():
|
||||
match = re.search(r"((?=(?:bv|av))([A-Za-z0-9]+))", msg, re.IGNORECASE)
|
||||
vd_flag = True
|
||||
|
||||
# 消息中含有b23的链接,包括视频、专栏、动态、直播
|
||||
elif "https://b23.tv" in msg:
|
||||
match = re.search(r"https://b23\.tv/[^?\s]+", msg, re.IGNORECASE)
|
||||
|
||||
# 检查消息中是否含有直播、专栏、动态链接
|
||||
elif any(
|
||||
keyword in msg
|
||||
for keyword in [
|
||||
"https://live.bilibili.com/",
|
||||
"https://www.bilibili.com/read/",
|
||||
"https://www.bilibili.com/opus/",
|
||||
"https://t.bilibili.com/",
|
||||
]
|
||||
):
|
||||
pattern = r"https://(live|www\.bilibili\.com/read|www\.bilibili\.com/opus|t\.bilibili\.com)/[^?\s]+"
|
||||
match = re.search(pattern, msg)
|
||||
|
||||
# 匹配成功,则获取链接
|
||||
if match:
|
||||
if vd_flag:
|
||||
number = match.group(1)
|
||||
get_url = f"https://www.bilibili.com/video/{number}"
|
||||
else:
|
||||
get_url = match.group()
|
||||
|
||||
if get_url:
|
||||
# 将链接统一发送给处理函数
|
||||
vd_info, live_info, vd_url, live_url, image_info, image_url = (
|
||||
await parse_bili_url(get_url, information_container)
|
||||
)
|
||||
if vd_info:
|
||||
# 判断一定时间内是否解析重复内容,或者是第一次解析
|
||||
if (
|
||||
vd_url in _tmp.keys() and time.time() - _tmp[vd_url] > repet_second
|
||||
) or vd_url not in _tmp.keys():
|
||||
pic = vd_info.get("pic", "") # 封面
|
||||
aid = vd_info.get("aid", "") # av号
|
||||
title = vd_info.get("title", "") # 标题
|
||||
author = vd_info.get("owner", {}).get("name", "") # UP主
|
||||
reply = vd_info.get("stat", {}).get("reply", "") # 回复
|
||||
favorite = vd_info.get("stat", {}).get("favorite", "") # 收藏
|
||||
coin = vd_info.get("stat", {}).get("coin", "") # 投币
|
||||
like = vd_info.get("stat", {}).get("like", "") # 点赞
|
||||
danmuku = vd_info.get("stat", {}).get("danmaku", "") # 弹幕
|
||||
ctime = vd_info["ctime"]
|
||||
date = time.strftime("%Y-%m-%d", time.localtime(ctime))
|
||||
logger.info(f"解析bilibili转发 {vd_url}", "b站解析", session=session)
|
||||
_tmp[vd_url] = time.time()
|
||||
_path = TEMP_PATH / f"{aid}.jpg"
|
||||
await AsyncHttpx.download_file(pic, _path)
|
||||
await MessageFactory(
|
||||
[
|
||||
Image(_path),
|
||||
Text(
|
||||
f"av{aid}\n标题:{title}\nUP:{author}\n上传日期:{date}\n回复:{reply},收藏:{favorite},投币:{coin}\n点赞:{like},弹幕:{danmuku}\n{vd_url}"
|
||||
),
|
||||
]
|
||||
).send()
|
||||
|
||||
elif live_info:
|
||||
if (
|
||||
live_url in _tmp.keys() and time.time() - _tmp[live_url] > repet_second
|
||||
) or live_url not in _tmp.keys():
|
||||
uid = live_info.get("uid", "") # 主播uid
|
||||
title = live_info.get("title", "") # 直播间标题
|
||||
description = live_info.get("description", "") # 简介,可能会出现标签
|
||||
user_cover = live_info.get("user_cover", "") # 封面
|
||||
keyframe = live_info.get("keyframe", "") # 关键帧画面
|
||||
live_time = live_info.get("live_time", "") # 开播时间
|
||||
area_name = live_info.get("area_name", "") # 分区
|
||||
parent_area_name = live_info.get("parent_area_name", "") # 父分区
|
||||
logger.info(f"解析bilibili转发 {live_url}", "b站解析", session=session)
|
||||
_tmp[live_url] = time.time()
|
||||
await MessageFactory(
|
||||
[
|
||||
Image(user_cover),
|
||||
Text(
|
||||
f"开播用户:https://space.bilibili.com/{uid}\n开播时间:{live_time}\n直播分区:{parent_area_name}——>{area_name}\n标题:{title}\n简介:{description}\n直播截图:\n"
|
||||
),
|
||||
Image(keyframe),
|
||||
Text(f"{live_url}"),
|
||||
]
|
||||
).send()
|
||||
elif image_info:
|
||||
if (
|
||||
image_url in _tmp.keys()
|
||||
and time.time() - _tmp[image_url] > repet_second
|
||||
) or image_url not in _tmp.keys():
|
||||
logger.info(f"解析bilibili转发 {image_url}", "b站解析", session=session)
|
||||
_tmp[image_url] = time.time()
|
||||
await image_info.send()
|
||||
|
||||
@ -1,186 +0,0 @@
|
||||
import re
|
||||
import time
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
import ujson as json
|
||||
from bilireq import video
|
||||
from nonebot_plugin_alconna import Hyper
|
||||
from nonebot_plugin_saa import Image, MessageFactory, Text
|
||||
|
||||
from zhenxun.configs.path_config import TEMP_PATH
|
||||
from zhenxun.services.log import logger
|
||||
from zhenxun.utils.http_utils import AsyncPlaywright
|
||||
from zhenxun.utils.user_agent import get_user_agent
|
||||
|
||||
|
||||
class Parser:
|
||||
|
||||
time_watch: dict[str, float] = {}
|
||||
|
||||
@classmethod
|
||||
async def parse(cls, data: Any, raw: str | None = None) -> MessageFactory | None:
|
||||
"""解析
|
||||
|
||||
参数:
|
||||
data: data数据
|
||||
raw: 文本.
|
||||
|
||||
返回:
|
||||
MessageFactory | None: 返回信息
|
||||
"""
|
||||
if isinstance(data, Hyper) and data.raw:
|
||||
json_data = json.loads(data.raw)
|
||||
if video_info := await cls.__parse_video_share(json_data):
|
||||
return await cls.__handle_video_info(video_info)
|
||||
if path := await cls.__parse_news_share(json_data):
|
||||
return MessageFactory([Image(path)])
|
||||
if raw:
|
||||
return await cls.__search(raw)
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
async def __search(cls, message: str) -> MessageFactory | None:
|
||||
"""根据bv,av,链接获取视频信息
|
||||
|
||||
参数:
|
||||
message: 文本内容
|
||||
|
||||
返回:
|
||||
MessageFactory | None: 返回信息
|
||||
"""
|
||||
if "BV" in message:
|
||||
index = message.find("BV")
|
||||
if len(message[index + 2 :]) >= 10:
|
||||
msg = message[index : index + 12]
|
||||
url = f"https://www.bilibili.com/video/{msg}"
|
||||
return await cls.__handle_video_info(
|
||||
await video.get_video_base_info(msg), url
|
||||
)
|
||||
elif "av" in message:
|
||||
index = message.find("av")
|
||||
if len(message[index + 2 :]) >= 1:
|
||||
if r := re.search(r"av(\d+)", message):
|
||||
url = f"https://www.bilibili.com/video/av{r.group(1)}"
|
||||
return await cls.__handle_video_info(
|
||||
await video.get_video_base_info(f"av{r.group(1)}"), url
|
||||
)
|
||||
elif "https://b23.tv" in message:
|
||||
url = (
|
||||
"https://"
|
||||
+ message[message.find("b23.tv") : message.find("b23.tv") + 14]
|
||||
)
|
||||
async with aiohttp.ClientSession(headers=get_user_agent()) as session:
|
||||
async with session.get(
|
||||
url,
|
||||
timeout=7,
|
||||
) as response:
|
||||
url = (str(response.url).split("?")[0]).strip("/")
|
||||
bvid = url.split("/")[-1]
|
||||
return await cls.__handle_video_info(
|
||||
await video.get_video_base_info(bvid), url
|
||||
)
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
async def __handle_video_info(
|
||||
cls, vd_info: dict, url: str = ""
|
||||
) -> MessageFactory | None:
|
||||
"""处理视频信息
|
||||
|
||||
参数:
|
||||
vd_info: 视频数据
|
||||
url: 视频url.
|
||||
|
||||
返回:
|
||||
MessageFactory | None: 返回信息
|
||||
"""
|
||||
if url:
|
||||
if url in cls.time_watch.keys() and time.time() - cls.time_watch[url] < 30:
|
||||
logger.debug("b站 url 解析在30秒内重复, 跳过解析...")
|
||||
return None
|
||||
cls.time_watch[url] = time.time()
|
||||
aid = vd_info["aid"]
|
||||
title = vd_info["title"]
|
||||
author = vd_info["owner"]["name"]
|
||||
reply = vd_info["stat"]["reply"] # 回复
|
||||
favorite = vd_info["stat"]["favorite"] # 收藏
|
||||
coin = vd_info["stat"]["coin"] # 投币
|
||||
# like = vd_info['stat']['like'] # 点赞
|
||||
# danmu = vd_info['stat']['danmaku'] # 弹幕
|
||||
date = time.strftime("%Y-%m-%d", time.localtime(vd_info["ctime"]))
|
||||
return MessageFactory(
|
||||
[
|
||||
Image(vd_info["pic"]),
|
||||
Text(
|
||||
f"\nav{aid}\n标题:{title}\nUP:{author}\n上传日期:{date}\n回复:{reply},收藏:{favorite},投币:{coin}\n{url}"
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def __parse_video_share(cls, data: dict) -> dict | None:
|
||||
"""解析视频转发
|
||||
|
||||
参数:
|
||||
data: data数据
|
||||
|
||||
返回:
|
||||
dict | None: 视频信息
|
||||
"""
|
||||
try:
|
||||
if data["meta"]["detail_1"]["title"] == "哔哩哔哩":
|
||||
try:
|
||||
async with aiohttp.ClientSession(
|
||||
headers=get_user_agent()
|
||||
) as session:
|
||||
async with session.get(
|
||||
data["meta"]["detail_1"]["qqdocurl"],
|
||||
timeout=7,
|
||||
) as response:
|
||||
url = str(response.url).split("?")[0]
|
||||
if url[-1] == "/":
|
||||
url = url[:-1]
|
||||
bvid = url.split("/")[-1]
|
||||
return await video.get_video_base_info(bvid)
|
||||
except Exception as e:
|
||||
logger.warning("解析b站视频失败", e=e)
|
||||
except Exception as e:
|
||||
pass
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
async def __parse_news_share(cls, data: dict) -> Path | None:
|
||||
"""解析b站专栏
|
||||
|
||||
参数:
|
||||
data: data数据
|
||||
|
||||
返回:
|
||||
Path | None: 截图路径
|
||||
"""
|
||||
try:
|
||||
if data["meta"]["news"]["desc"] == "哔哩哔哩专栏":
|
||||
try:
|
||||
url = data["meta"]["news"]["jumpUrl"]
|
||||
async with AsyncPlaywright.new_page() as page:
|
||||
await page.goto(url, wait_until="networkidle", timeout=10000)
|
||||
await page.set_viewport_size({"width": 2560, "height": 1080})
|
||||
try:
|
||||
await page.locator("div.bili-mini-close-icon").click()
|
||||
except Exception:
|
||||
pass
|
||||
if div := await page.query_selector("#app > div"):
|
||||
path = TEMP_PATH / f"bl_share_{uuid.uuid1()}.png"
|
||||
await div.screenshot(
|
||||
path=path,
|
||||
timeout=100000,
|
||||
)
|
||||
return path
|
||||
except Exception as e:
|
||||
logger.warning("解析b站专栏失败", e=e)
|
||||
except Exception as e:
|
||||
pass
|
||||
return None
|
||||
107
zhenxun/plugins/parse_bilibili/get_image.py
Normal file
107
zhenxun/plugins/parse_bilibili/get_image.py
Normal file
@ -0,0 +1,107 @@
|
||||
import os
|
||||
import re
|
||||
|
||||
from nonebot_plugin_saa import Image
|
||||
|
||||
from zhenxun.configs.path_config import TEMP_PATH
|
||||
from zhenxun.services.log import logger
|
||||
from zhenxun.utils.http_utils import AsyncPlaywright
|
||||
from zhenxun.utils.image_utils import BuildImage
|
||||
from zhenxun.utils.user_agent import get_user_agent_str
|
||||
|
||||
|
||||
async def resize(path: str):
|
||||
"""调整图像大小的异步函数
|
||||
|
||||
参数:
|
||||
path (str): 图像文件路径
|
||||
"""
|
||||
A = BuildImage(background=path)
|
||||
await A.resize(0.5)
|
||||
await A.save(path)
|
||||
|
||||
|
||||
async def get_image(url) -> Image | None:
|
||||
"""获取Bilibili链接的截图,并返回base64格式的图片
|
||||
|
||||
参数:
|
||||
url (str): Bilibili链接
|
||||
|
||||
返回:
|
||||
Image: Image
|
||||
"""
|
||||
cv_match = None
|
||||
opus_match = None
|
||||
t_opus_match = None
|
||||
|
||||
cv_number = None
|
||||
opus_number = None
|
||||
t_opus_number = None
|
||||
|
||||
# 提取cv、opus、t_opus的编号
|
||||
url = url.split("?")[0]
|
||||
cv_match = re.search(r"read/cv([A-Za-z0-9]+)", url, re.IGNORECASE)
|
||||
opus_match = re.search(r"opus/([A-Za-z0-9]+)", url, re.IGNORECASE)
|
||||
t_opus_match = re.search(r"https://t\.bilibili\.com/(\d+)", url, re.IGNORECASE)
|
||||
|
||||
if cv_match:
|
||||
cv_number = cv_match.group(1)
|
||||
elif opus_match:
|
||||
opus_number = opus_match.group(1)
|
||||
elif t_opus_match:
|
||||
t_opus_number = t_opus_match.group(1)
|
||||
|
||||
screenshot_path = None
|
||||
|
||||
# 根据编号构建保存路径
|
||||
if cv_number:
|
||||
screenshot_path = f"{TEMP_PATH}/bilibili_cv_{cv_number}.png"
|
||||
elif opus_number:
|
||||
screenshot_path = f"{TEMP_PATH}/bilibili_opus_{opus_number}.png"
|
||||
elif t_opus_number:
|
||||
screenshot_path = f"{TEMP_PATH}/bilibili_opus_{t_opus_number}.png"
|
||||
# t.bilibili.com和https://www.bilibili.com/opus在内容上是一样的,为便于维护,调整url至https://www.bilibili.com/opus/
|
||||
url = f"https://www.bilibili.com/opus/{t_opus_number}"
|
||||
|
||||
if screenshot_path:
|
||||
try:
|
||||
# 如果文件不存在,进行截图
|
||||
if not os.path.exists(screenshot_path):
|
||||
# 创建页面
|
||||
# random.choice(),从列表中随机抽取一个对象
|
||||
user_agent = get_user_agent_str()
|
||||
try:
|
||||
async with AsyncPlaywright.new_page() as page:
|
||||
await page.set_viewport_size({"width": 5120, "height": 2560})
|
||||
# 设置请求拦截器
|
||||
await page.route(
|
||||
re.compile(r"(\.png$)|(\.jpg$)"),
|
||||
lambda route: route.abort(),
|
||||
)
|
||||
# 访问链接
|
||||
await page.goto(url, wait_until="networkidle", timeout=10000)
|
||||
# 根据不同的链接结构,设置对应的CSS选择器
|
||||
if cv_number:
|
||||
css = "#app > div"
|
||||
elif opus_number or t_opus_number:
|
||||
css = "#app > div.opus-detail > div.bili-opus-view"
|
||||
# 点击对应的元素
|
||||
await page.click(css)
|
||||
# 查询目标元素
|
||||
div = await page.query_selector(css)
|
||||
# 对目标元素进行截图
|
||||
await div.screenshot( # type: ignore
|
||||
path=screenshot_path,
|
||||
timeout=100000,
|
||||
animations="disabled",
|
||||
type="png",
|
||||
)
|
||||
# 异步执行调整截图大小的操作
|
||||
await resize(screenshot_path)
|
||||
except Exception as e:
|
||||
logger.warning(f"尝试解析bilibili转发失败", e=e)
|
||||
return None
|
||||
return Image(screenshot_path)
|
||||
except Exception as e:
|
||||
logger.error(f"尝试解析bilibili转发失败", e=e)
|
||||
return None
|
||||
60
zhenxun/plugins/parse_bilibili/information_container.py
Normal file
60
zhenxun/plugins/parse_bilibili/information_container.py
Normal file
@ -0,0 +1,60 @@
|
||||
class InformationContainer:
|
||||
def __init__(
|
||||
self,
|
||||
vd_info=None,
|
||||
live_info=None,
|
||||
vd_url=None,
|
||||
live_url=None,
|
||||
image_info=None,
|
||||
image_url=None,
|
||||
):
|
||||
self._vd_info = vd_info
|
||||
self._live_info = live_info
|
||||
self._vd_url = vd_url
|
||||
self._live_url = live_url
|
||||
self._image_info = image_info
|
||||
self._image_url = image_url
|
||||
|
||||
@property
|
||||
def vd_info(self):
|
||||
return self._vd_info
|
||||
|
||||
@property
|
||||
def live_info(self):
|
||||
return self._live_info
|
||||
|
||||
@property
|
||||
def vd_url(self):
|
||||
return self._vd_url
|
||||
|
||||
@property
|
||||
def live_url(self):
|
||||
return self._live_url
|
||||
|
||||
@property
|
||||
def image_info(self):
|
||||
return self._image_info
|
||||
|
||||
@property
|
||||
def image_url(self):
|
||||
return self._image_url
|
||||
|
||||
def update(self, updates):
|
||||
"""
|
||||
更新多个信息的通用方法
|
||||
Args:
|
||||
updates (dict): 包含信息类型和对应新值的字典
|
||||
"""
|
||||
for info_type, new_value in updates.items():
|
||||
if hasattr(self, f"_{info_type}"):
|
||||
setattr(self, f"_{info_type}", new_value)
|
||||
|
||||
def get_information(self):
|
||||
return (
|
||||
self.vd_info,
|
||||
self.live_info,
|
||||
self.vd_url,
|
||||
self.live_url,
|
||||
self.image_info,
|
||||
self.image_url,
|
||||
)
|
||||
65
zhenxun/plugins/parse_bilibili/parse_url.py
Normal file
65
zhenxun/plugins/parse_bilibili/parse_url.py
Normal file
@ -0,0 +1,65 @@
|
||||
import aiohttp
|
||||
from bilireq import live, video
|
||||
|
||||
from zhenxun.utils.user_agent import get_user_agent
|
||||
|
||||
from .get_image import get_image
|
||||
from .information_container import InformationContainer
|
||||
|
||||
|
||||
async def parse_bili_url(get_url: str, information_container: InformationContainer):
|
||||
"""解析Bilibili链接,获取相关信息
|
||||
|
||||
参数:
|
||||
get_url (str): 待解析的Bilibili链接
|
||||
information_container (InformationContainer): 信息容器
|
||||
|
||||
返回:
|
||||
dict: 包含解析得到的信息的字典
|
||||
"""
|
||||
response_url = ""
|
||||
|
||||
# 去除链接末尾的斜杠
|
||||
if get_url[-1] == "/":
|
||||
get_url = get_url[:-1]
|
||||
|
||||
# 发起HTTP请求,获取重定向后的链接
|
||||
async with aiohttp.ClientSession(headers=get_user_agent()) as session:
|
||||
async with session.get(
|
||||
get_url,
|
||||
timeout=7,
|
||||
) as response:
|
||||
response_url = str(response.url).split("?")[0]
|
||||
|
||||
# 去除重定向后链接末尾的斜杠
|
||||
if response_url[-1] == "/":
|
||||
response_url = response_url[:-1]
|
||||
|
||||
# 根据不同类型的链接进行处理
|
||||
if response_url.startswith(
|
||||
("https://www.bilibili.com/video", "https://m.bilibili.com/video/")
|
||||
):
|
||||
vd_url = response_url
|
||||
vid = vd_url.split("/")[-1]
|
||||
vd_info = await video.get_video_base_info(vid)
|
||||
information_container.update({"vd_info": vd_info, "vd_url": vd_url})
|
||||
|
||||
elif response_url.startswith("https://live.bilibili.com"):
|
||||
live_url = response_url
|
||||
liveid = live_url.split("/")[-1]
|
||||
live_info = await live.get_room_info_by_id(liveid)
|
||||
information_container.update({"live_info": live_info, "live_url": live_url})
|
||||
|
||||
elif response_url.startswith("https://www.bilibili.com/read"):
|
||||
cv_url = response_url
|
||||
image_info = await get_image(cv_url)
|
||||
information_container.update({"image_info": image_info, "image_url": cv_url})
|
||||
|
||||
elif response_url.startswith(
|
||||
("https://www.bilibili.com/opus", "https://t.bilibili.com")
|
||||
):
|
||||
opus_url = response_url
|
||||
image_info = await get_image(opus_url)
|
||||
information_container.update({"image_info": image_info, "image_url": opus_url})
|
||||
|
||||
return information_container.get_information()
|
||||
@ -1,6 +1,7 @@
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
from typing import Awaitable, Callable
|
||||
|
||||
@ -408,3 +409,14 @@ async def get_download_image_hash(url: str, mark: str) -> str:
|
||||
except Exception as e:
|
||||
logger.warning(f"下载读取图片Hash出错", e=e)
|
||||
return ""
|
||||
|
||||
|
||||
def pic2bytes(image) -> bytes:
|
||||
"""获取bytes
|
||||
|
||||
返回:
|
||||
bytes: bytes
|
||||
"""
|
||||
buf = BytesIO()
|
||||
image.save(buf, format="PNG")
|
||||
return buf.getvalue()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user