feat: p站排行/搜图

This commit is contained in:
HibiKier 2024-05-23 13:58:53 +08:00
parent c6afb8c1e9
commit 90ef1c843a
5 changed files with 410 additions and 1 deletions

View File

@ -11,10 +11,12 @@
"displayname", "displayname",
"flmt", "flmt",
"getbbox", "getbbox",
"hibiapi",
"httpx", "httpx",
"kaiheila", "kaiheila",
"nonebot", "nonebot",
"onebot", "onebot",
"pixiv",
"tobytes", "tobytes",
"unban", "unban",
"userinfo", "userinfo",

View File

@ -1,6 +1,6 @@
import copy import copy
from pathlib import Path from pathlib import Path
from typing import Any, Callable, Dict, Type from typing import Any, Callable, Dict, Set, Type
import cattrs import cattrs
from pydantic import BaseModel from pydantic import BaseModel
@ -163,6 +163,8 @@ class PluginExtraData(BaseModel):
"""技能被动""" """技能被动"""
superuser_help: str | None = None superuser_help: str | None = None
"""超级用户帮助""" """超级用户帮助"""
aliases: Set[str] = set()
"""额外名称"""
class NoSuchConfig(Exception): class NoSuchConfig(Exception):

View File

@ -0,0 +1,215 @@
from asyncio.exceptions import TimeoutError
from httpx import NetworkError
from nonebot.adapters import Bot
from nonebot.plugin import PluginMetadata
from nonebot.rule import to_me
from nonebot_plugin_alconna import (
Alconna,
Args,
Arparma,
Match,
Option,
on_alconna,
store_true,
)
from nonebot_plugin_saa import MessageFactory, Text
from nonebot_plugin_session import EventSession
from zhenxun.configs.config import Config
from zhenxun.configs.utils import BaseBlock, PluginExtraData, RegisterConfig
from zhenxun.services.log import logger
from zhenxun.utils.utils import is_valid_date
from .data_source import download_pixiv_imgs, get_pixiv_urls, search_pixiv_urls
__plugin_meta__ = PluginMetadata(
name="P站排行/搜图",
description="P站排行榜直接冲P站搜图跟着冲",
usage="""
P站排行
可选参数:
类型
1. 日排行
2. 周排行
3. 月排行
4. 原创排行
5. 新人排行
6. R18日排行
7. R18周排行
8. R18受男性欢迎排行
9. R18重口排行慎重
使用时选择参数序号即可R18仅可私聊
p站排行 ?[参数] ?[数量] ?[日期]
示例
p站排行 [无参数默认为日榜]
p站排行 1
p站排行 1 5
p站排行 1 5 2018-4-25
注意空格在线搜索会较慢
---------------------------------
P站搜图
搜图 [关键词] ?[数量] ?[页数=1] ?[r18](不屏蔽R-18)
示例
搜图 樱岛麻衣
搜图 樱岛麻衣 5
搜图 樱岛麻衣 5 r18
搜图 樱岛麻衣#1000users 5
多个关键词用#分割】
默认为 热度排序
注意空格在线搜索会较慢数量可能不符可能该页数量不够也可能被R-18屏蔽
""".strip(),
extra=PluginExtraData(
author="HibiKier",
version="0.1",
aliases={"P站排行", "搜图"},
menu_type="来点好康的",
limits=[BaseBlock(result="P站排行榜或搜图正在搜索请不要重复触发命令...")],
configs=[
RegisterConfig(
key="TIMEOUT",
value=10,
help="图片下载超时限制",
default_value=10,
type=int,
),
RegisterConfig(
key="MAX_PAGE_LIMIT",
value=20,
help="作品最大页数限制,超过的作品会被略过",
default_value=20,
type=int,
),
RegisterConfig(
key="ALLOW_GROUP_R18",
value=False,
help="图允许群聊中使用 r18 参数",
default_value=False,
type=bool,
),
RegisterConfig(
module="hibiapi",
key="HIBIAPI",
value="https://api.obfs.dev",
help="如果没有自建或其他hibiapi请不要修改",
default_value="https://api.obfs.dev",
),
RegisterConfig(
module="pixiv",
key="PIXIV_NGINX_URL",
value="i.pixiv.re",
help="Pixiv反向代理",
),
],
).dict(),
)
rank_dict = {
"1": "day",
"2": "week",
"3": "month",
"4": "week_original",
"5": "week_rookie",
"6": "day_r18",
"7": "week_r18",
"8": "day_male_r18",
"9": "week_r18g",
}
_rank_matcher = on_alconna(
Alconna("p站排行", Args["rank_type", int, 1]["num", int, 10]["datetime?", str]),
aliases={"p站排行榜"},
priority=5,
block=True,
rule=to_me(),
)
_keyword_matcher = on_alconna(
Alconna(
"搜图",
Args["keyword", str]["num", int, 10]["page", int, 1],
Option("-r", action=store_true, help_text="是否屏蔽r18"),
),
priority=5,
block=True,
rule=to_me(),
)
@_rank_matcher.handle()
async def _(
bot: Bot,
session: EventSession,
arparma: Arparma,
rank_type: int,
num: int,
datetime: Match[str],
):
gid = session.id3 or session.id2
if not session.id1:
await Text("用户id为空...").finish()
code = 0
info_list = []
_datetime = None
if datetime.available:
_datetime = datetime.result
if not is_valid_date(_datetime):
await Text("日期不合法,示例: 2018-4-25").finish(reply=True)
if rank_type in [6, 7, 8, 9]:
if gid:
await Text("羞羞脸!私聊里自己看!").finish(at_sender=True)
info_list, code = await get_pixiv_urls(
rank_dict[str(rank_type)], num, date=_datetime
)
if code != 200 and info_list:
if isinstance(info_list[0], str):
await Text(info_list[0]).finish()
if not info_list:
await Text("没有找到啊,等等再试试吧~V").send(at_sender=True)
for title, author, urls in info_list:
try:
images = await download_pixiv_imgs(urls, session.id1) # type: ignore
await MessageFactory(
[Text(f"title: {title}\n"), Text(f"author: {author}\n")] + images
).send()
except (NetworkError, TimeoutError):
await Text("这张图网络直接炸掉了!").send()
logger.info(
f" 查看了P站排行榜 rank_type{rank_type}", arparma.header_result, session=session
)
@_keyword_matcher.handle()
async def _(
bot: Bot, session: EventSession, arparma: Arparma, keyword: str, num: int, page: int
):
gid = session.id3 or session.id2
if not session.id1:
await Text("用户id为空...").finish()
if gid:
if arparma.find("r") and not Config.get_config(
"pixiv_rank_search", "ALLOW_GROUP_R18"
):
await Text("(脸红#) 你不会害羞的 八嘎!").finish(at_sender=True)
r18 = 0 if arparma.find("r") else 1
info_list = None
keyword = keyword.replace("#", " ")
info_list, code = await search_pixiv_urls(keyword, num, page, r18)
if code != 200 and isinstance(info_list[0], str):
await Text(info_list[0]).finish()
if not info_list:
await Text("没有找到啊,等等再试试吧~V").finish(at_sender=True)
for title, author, urls in info_list:
try:
images = await download_pixiv_imgs(urls, session.id1) # type: ignore
await MessageFactory(
[Text(f"title: {title}\n"), Text(f"author: {author}\n")] + images
).send()
except (NetworkError, TimeoutError):
await Text("这张图网络直接炸掉了!").send()
logger.info(
f" 查看了搜索 {keyword} R18{r18}", arparma.header_result, session=session
)

View File

@ -0,0 +1,173 @@
from asyncio.exceptions import TimeoutError
from pathlib import Path
from nonebot_plugin_saa import Image, MessageFactory
from zhenxun.configs.config import Config
from zhenxun.configs.path_config import TEMP_PATH
from zhenxun.services.log import logger
from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.utils.utils import change_img_md5
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6;"
" rv:2.0.1) Gecko/20100101 Firefox/4.0.1",
"Referer": "https://www.pixiv.net/",
}
async def get_pixiv_urls(
mode: str, num: int = 10, page: int = 1, date: str | None = None
) -> tuple[list[tuple[str, str, list[str]] | str], int]:
"""获取排行榜图片url
参数:
mode: 模式类型
num: 数量.
page: 页数.
date: 日期.
返回:
tuple[list[tuple[str, str, list[str]] | str], int]: 图片标题作者url数据请求状态
"""
params = {"mode": mode, "page": page}
if date:
params["date"] = date
hibiapi = Config.get_config("hibiapi", "HIBIAPI")
hibiapi = hibiapi[:-1] if hibiapi[-1] == "/" else hibiapi
rank_url = f"{hibiapi}/api/pixiv/rank"
return await parser_data(rank_url, num, params, "rank")
async def search_pixiv_urls(
keyword: str, num: int, page: int, r18: int
) -> tuple[list[tuple[str, str, list[str]] | str], int]:
"""搜图图片url
参数:
keyword: 关键词
num: 数量
page: 页数
r18: 是否r18
返回:
tuple[list[tuple[str, str, list[str]] | str], int]: 图片标题作者url数据请求状态
"""
params = {"word": keyword, "page": page}
hibiapi = Config.get_config("hibiapi", "HIBIAPI")
hibiapi = hibiapi[:-1] if hibiapi[-1] == "/" else hibiapi
search_url = f"{hibiapi}/api/pixiv/search"
return await parser_data(search_url, num, params, "search", r18)
async def parser_data(
url: str, num: int, params: dict, type_: str, r18: int = 0
) -> tuple[list[tuple[str, str, list[str]] | str], int]:
"""解析数据搜索
参数:
url: 访问URL
num: 数量
params: 请求参数
type_: 类型rank或search
r18: 是否r18.
返回:
tuple[list[tuple[str, str, list[str]] | str], int]: 图片标题作者url数据请求状态
"""
info_list = []
for _ in range(3):
try:
response = await AsyncHttpx.get(
url,
params=params,
timeout=Config.get_config("pixiv_rank_search", "TIMEOUT"),
)
if response.status_code == 200:
data = response.json()
if data.get("illusts"):
data = data["illusts"]
break
except TimeoutError:
pass
except Exception as e:
logger.error(f"P站排行/搜图解析数据发生错误", e=e)
return ["发生了一些些错误..."], 995
else:
return ["网络不太好?没有该页数?也许过一会就好了..."], 998
num = num if num < 30 else 30
_data = []
for x in data:
if x["page_count"] < Config.get_config("pixiv_rank_search", "MAX_PAGE_LIMIT"):
if type_ == "search" and r18 == 1:
if "R-18" in str(x["tags"]):
continue
_data.append(x)
if len(_data) == num:
break
for x in _data:
title = x["title"]
author = x["user"]["name"]
urls = []
if x["page_count"] == 1:
urls.append(x["image_urls"]["large"])
else:
for j in x["meta_pages"]:
urls.append(j["image_urls"]["large"])
info_list.append((title, author, urls))
return info_list, 200
async def download_pixiv_imgs(
urls: list[str], user_id: str, forward_msg_index: int | None = None
) -> list[Image]:
"""下载图片
参数:
urls: 图片链接
user_id: 用户id
forward_msg_index: 转发消息中的图片排序.
返回:
MessageFactory: 图片
"""
result_list = []
index = 0
for url in urls:
ws_url = Config.get_config("pixiv", "PIXIV_NGINX_URL")
url = url.replace("_webp", "")
if ws_url:
url = url.replace("i.pximg.net", ws_url).replace("i.pixiv.cat", ws_url)
try:
file = (
TEMP_PATH / f"{user_id}_{forward_msg_index}_{index}_pixiv.jpg"
if forward_msg_index is not None
else TEMP_PATH / f"{user_id}_{index}_pixiv.jpg"
)
file = Path(file)
try:
if await AsyncHttpx.download_file(
url,
file,
timeout=Config.get_config("pixiv_rank_search", "TIMEOUT"),
headers=headers,
):
change_img_md5(file)
image = None
if forward_msg_index is not None:
image = Image(
TEMP_PATH
/ f"{user_id}_{forward_msg_index}_{index}_pixiv.jpg"
)
else:
image = Image(TEMP_PATH / f"{user_id}_{index}_pixiv.jpg")
if image:
result_list.append(image)
index += 1
except OSError:
if file.exists():
file.unlink()
except Exception as e:
logger.error(f"P站排行/搜图下载图片错误", e=e)
return result_list

View File

@ -213,3 +213,20 @@ def change_img_md5(path_file: str | Path) -> bool:
except Exception as e: except Exception as e:
logger.warning(f"改变图片MD5错误 Path{path_file}", e=e) logger.warning(f"改变图片MD5错误 Path{path_file}", e=e)
return False return False
def is_valid_date(date_text: str, separator: str = "-") -> bool:
"""日期是否合法
参数:
date_text: 日期
separator: 分隔符
返回:
bool: 日期是否合法
"""
try:
datetime.strptime(date_text, f"%Y{separator}%m{separator}%d")
return True
except ValueError:
return False