feat: 识图 和 buff皮肤查询

This commit is contained in:
HibiKier 2024-05-23 16:26:06 +08:00
parent 1aed19035e
commit bd4106190e
5 changed files with 312 additions and 2 deletions

View File

@ -1,4 +1,3 @@
from nonebot.adapters import Bot
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import Alconna, Args, Arparma, Match, on_alconna
from nonebot_plugin_saa import Text
@ -30,7 +29,7 @@ __plugin_meta__ = PluginMetadata(
value=20,
help="搜索动漫返回的最大数量",
default_value=20,
type=10,
type=int,
)
],
).dict(),

View File

@ -0,0 +1,101 @@
from nonebot.permission import SUPERUSER
from nonebot.plugin import PluginMetadata
from nonebot.rule import to_me
from nonebot_plugin_alconna import Alconna, Args, Arparma, Match, on_alconna
from nonebot_plugin_saa import Text
from nonebot_plugin_session import EventSession
from zhenxun.configs.config import NICKNAME
from zhenxun.configs.utils import BaseBlock, PluginExtraData, RegisterConfig
from zhenxun.services.log import logger
from .data_source import get_price, update_buff_cookie
__plugin_meta__ = PluginMetadata(
name="BUFF查询皮肤",
description="BUFF皮肤底价查询",
usage="""
在线实时获取BUFF指定皮肤所有磨损底价
指令
查询皮肤 [枪械名] [皮肤名称]
示例查询皮肤 ak47 二西莫夫
""".strip(),
extra=PluginExtraData(
author="HibiKier",
version="0.1",
menu_type="一些工具",
limits=[BaseBlock(result="您有皮肤正在搜索,请稍等...")],
configs=[
RegisterConfig(
key="BUFF_PROXY",
value=None,
help="BUFF代理有些厂ip可能被屏蔽",
),
RegisterConfig(
key="COOKIE",
value=None,
help="BUFF的账号cookie",
),
],
).dict(),
)
_matcher = on_alconna(
Alconna("查询皮肤", Args["name", str]["skin", str]),
aliases={"皮肤查询"},
priority=5,
block=True,
)
_cookie_matcher = on_alconna(
Alconna("设置cookie", Args["cookie", str]),
rule=to_me(),
permission=SUPERUSER,
priority=1,
)
@_matcher.handle()
async def _(name: Match[str], skin: Match[str]):
if name.available:
_matcher.set_path_arg("name", name.result)
if skin.available:
_matcher.set_path_arg("skin", skin.result)
@_matcher.got_path("name", prompt="要查询什么武器呢?")
@_matcher.got_path("skin", prompt="要查询该武器的什么皮肤呢?")
async def arg_handle(
session: EventSession,
arparma: Arparma,
name: str,
skin: str,
):
if name in ["算了", "取消"] or skin in ["算了", "取消"]:
await Text("已取消操作...").finish()
result = ""
if name in ["ak", "ak47"]:
name = "ak-47"
name = name + " | " + skin
try:
result, status_code = await get_price(name)
except FileNotFoundError:
await Text(f'请先对{NICKNAME}"设置cookie"来设置cookie').send(at_sender=True)
if status_code in [996, 997, 998]:
await Text(result).finish()
if result:
logger.info(f"查询皮肤: {name}", arparma.header_result, session=session)
await Text(result).finish()
else:
logger.info(
f" 查询皮肤:{name} 没有查询到", arparma.header_result, session=session
)
await Text("没有查询到哦,请检查格式吧").send()
@_cookie_matcher.handle()
async def _(session: EventSession, arparma: Arparma, cookie: str):
result = update_buff_cookie(cookie)
await Text(result).send(at_sender=True)
logger.info("更新BUFF COOKIE", arparma.header_result, session=session)

View File

@ -0,0 +1,58 @@
from asyncio.exceptions import TimeoutError
from configs.config import Config
from utils.http_utils import AsyncHttpx
from services.log import logger
url = "https://buff.163.com/api/market/goods"
async def get_price(d_name: str) -> "str, int":
"""
查看皮肤价格
:param d_name: 武器皮肤awp 二西莫夫
"""
cookie = {"session": Config.get_config("search_buff_skin_price", "COOKIE")}
name_list = []
price_list = []
parameter = {"game": "csgo", "page_num": "1", "search": d_name}
try:
response = await AsyncHttpx.get(
url,
proxy=Config.get_config("search_buff_skin_price", "BUFF_PROXY"),
params=parameter,
cookies=cookie,
)
if response.status_code == 200:
try:
if response.text.find("Login Required") != -1:
return "BUFF登录被重置请联系管理员重新登入", 996
data = response.json()["data"]
total_page = data["total_page"]
data = data["items"]
for _ in range(total_page):
for i in range(len(data)):
name = data[i]["name"]
price = data[i]["sell_reference_price"]
name_list.append(name)
price_list.append(price)
except Exception as e:
logger.error(f"BUFF查询皮肤发生错误 {type(e)}{e}")
return "没有查询到...", 998
else:
return "访问失败!", response.status_code
except TimeoutError:
return "访问超时! 请重试或稍后再试!", 997
result = f"皮肤: {d_name}({len(name_list)})\n"
for i in range(len(name_list)):
result += name_list[i] + ": " + price_list[i] + "\n"
return result[:-1], 999
def update_buff_cookie(cookie: str) -> str:
Config.set_config("search_buff_skin_price", "COOKIE", cookie)
return "更新cookie成功"
if __name__ == "__main__":
print(get_price("awp 二西莫夫"))

View File

@ -0,0 +1,89 @@
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import Alconna, Args, Arparma
from nonebot_plugin_alconna import Image as alcImg
from nonebot_plugin_alconna import Match, on_alconna
from nonebot_plugin_saa import Image, Text
from nonebot_plugin_session import EventSession
from zhenxun.configs.utils import PluginExtraData, RegisterConfig
from zhenxun.services.log import logger
from .saucenao import get_saucenao_image
__plugin_meta__ = PluginMetadata(
name="识图",
description="以图搜图,看破本源",
usage="""
识别图片 [二次元图片]
指令
识图 [图片]
""".strip(),
extra=PluginExtraData(
author="HibiKier",
version="0.1",
menu_type="一些工具",
configs=[
RegisterConfig(
key="MAX_FIND_IMAGE_COUNT",
value=3,
help="搜索动漫返回的最大数量",
default_value=3,
type=int,
),
RegisterConfig(
key="API_KEY",
value=None,
help="Saucenao的API_KEY通过 https://saucenao.com/user.php?page=search-api 注册获取",
),
],
).dict(),
)
_matcher = on_alconna(
Alconna("识图", Args["mode?", str]["image?", alcImg]), block=True, priority=5
)
async def get_image_info(mod: str, url: str) -> str | list[Image | Text] | None:
if mod == "saucenao":
return await get_saucenao_image(url)
# def parse_image(key: str):
# async def _key_parser(state: T_State, img: Message = Arg(key)):
# if not get_message_img(img):
# await search_image.reject_arg(key, "请发送要识别的图片!")
# state[key] = img
# return _key_parser
@_matcher.handle()
async def _(mode: Match[str], img: Match[alcImg]):
if mode.available:
_matcher.set_path_arg("mode", mode.result)
else:
_matcher.set_path_arg("mode", "saucenao")
if img.available:
_matcher.set_path_arg("image", img.result)
@_matcher.got_path("image", prompt="图来!")
async def _(
session: EventSession,
arparma: Arparma,
mode: str,
image: alcImg,
):
if not image.url:
await Text("图片url为空...").finish()
await Text("开始处理图片...").send()
info_list = await get_image_info(mode, image.url)
if isinstance(info_list, str):
await Text(info_list).finish(at_sender=True)
if not info_list:
await Text("未查询到...").finish()
for info in info_list[1:]:
await info.send()
logger.info(f" 识图: {image.url}", arparma.header_result, session=session)

View File

@ -0,0 +1,63 @@
import random
from nonebot_plugin_saa import Image, Text
from zhenxun.configs.config import Config
from zhenxun.configs.path_config import TEMP_PATH
from zhenxun.services import logger
from zhenxun.utils.http_utils import AsyncHttpx
API_URL_SAUCENAO = "https://saucenao.com/search.php"
API_URL_ASCII2D = "https://ascii2d.net/search/url/"
API_URL_IQDB = "https://iqdb.org/"
async def get_saucenao_image(url: str) -> str | list[Image | Text]:
"""获取图片源
参数:
url: 图片url
返回:
str | list[Image | Text]: 识图数据
"""
api_key = Config.get_config("search_image", "API_KEY")
if not api_key:
return "Saucenao 缺失API_KEY"
params = {
"output_type": 2,
"api_key": api_key,
"testmode": 1,
"numres": 6,
"db": 999,
"url": url,
}
data = (await AsyncHttpx.post(API_URL_SAUCENAO, params=params)).json()
if data["header"]["status"] != 0:
return f"Saucenao识图失败..status{data['header']['status']}"
data = data["results"]
data = (
data
if len(data) < Config.get_config("search_image", "MAX_FIND_IMAGE_COUNT")
else data[: Config.get_config("search_image", "MAX_FIND_IMAGE_COUNT")]
)
msg_list = []
index = random.randint(0, 10000)
if await AsyncHttpx.download_file(url, TEMP_PATH / f"saucenao_search_{index}.jpg"):
msg_list.append(Image(TEMP_PATH / f"saucenao_search_{index}.jpg"))
for info in data:
try:
similarity = info["header"]["similarity"]
tmp = f"相似度:{similarity}%\n"
for x in info["data"].keys():
if x != "ext_urls":
tmp += f"{x}{info['data'][x]}\n"
try:
if "source" not in info["data"].keys():
tmp += f'source{info["data"]["ext_urls"][0]}\n'
except KeyError:
tmp += f'source{info["header"]["thumbnail"]}\n'
msg_list.append(Text(tmp[:-1]))
except Exception as e:
logger.warning(f"识图获取图片信息发生错误", e=e)
return msg_list