mirror of
https://github.com/zhenxun-org/zhenxun_bot.git
synced 2025-12-15 06:12:53 +08:00
79 lines
2.2 KiB
Python
79 lines
2.2 KiB
Python
from asyncio.exceptions import TimeoutError
|
||
|
||
from httpx import ConnectTimeout
|
||
from nonebot.plugin import PluginMetadata
|
||
from nonebot_plugin_alconna import Alconna, Args, Arparma, Match, on_alconna
|
||
from nonebot_plugin_saa import Text
|
||
from nonebot_plugin_session import EventSession
|
||
|
||
from zhenxun.configs.utils import PluginExtraData, RegisterConfig
|
||
from zhenxun.services.log import logger
|
||
from zhenxun.utils.rules import ensure_private
|
||
|
||
from .data_source import get_bt_info
|
||
|
||
__plugin_meta__ = PluginMetadata(
|
||
name="磁力搜索",
|
||
description="bt(磁力搜索)[仅支持私聊,懂的都懂]",
|
||
usage="""
|
||
* 拒绝反冲斗士! *
|
||
指令:
|
||
bt [关键词] ?[页数]
|
||
示例:bt 钢铁侠
|
||
示例:bt 钢铁侠 3
|
||
""".strip(),
|
||
extra=PluginExtraData(
|
||
author="HibiKier",
|
||
version="0.1",
|
||
configs=[
|
||
RegisterConfig(
|
||
key="BT_MAX_NUM",
|
||
value=10,
|
||
help="单次BT搜索返回最大消息数量",
|
||
default_value=10,
|
||
type=int,
|
||
),
|
||
],
|
||
).dict(),
|
||
)
|
||
|
||
|
||
_matcher = on_alconna(
|
||
Alconna("bt", Args["keyword", str]["page?", int]),
|
||
rule=ensure_private,
|
||
priority=5,
|
||
block=True,
|
||
)
|
||
|
||
|
||
@_matcher.handle()
|
||
async def _(
|
||
session: EventSession,
|
||
arparma: Arparma,
|
||
keyword: str,
|
||
page: Match[int],
|
||
):
|
||
send_flag = False
|
||
try:
|
||
async for title, type_, create_time, file_size, link in get_bt_info(
|
||
keyword, page.result if page.available else 1
|
||
):
|
||
await Text(
|
||
f"标题:{title}\n"
|
||
f"类型:{type_}\n"
|
||
f"创建时间:{create_time}\n"
|
||
f"文件大小:{file_size}\n"
|
||
f"种子:{link}"
|
||
).send()
|
||
send_flag = True
|
||
except (TimeoutError, ConnectTimeout):
|
||
await Text(f"搜索 {keyword} 超时...").finish()
|
||
except Exception as e:
|
||
logger.error(f"bt 错误", arparma.header_result, session=session, e=e)
|
||
await Text(f"bt 其他未知错误..").finish()
|
||
if not send_flag:
|
||
await Text(f"{keyword} 未搜索到...").send()
|
||
logger.info(
|
||
f"BT搜索 {keyword} 第 {page} 页", arparma.header_result, session=session
|
||
)
|