feat: PIX图库

This commit is contained in:
HibiKier 2024-05-20 22:03:11 +08:00
parent c2fd8661b5
commit c6afb8c1e9
15 changed files with 1738 additions and 9 deletions

View File

@ -0,0 +1,62 @@
from pathlib import Path
from typing import Tuple
import nonebot
from zhenxun.configs.config import Config
Config.add_plugin_config(
"hibiapi",
"HIBIAPI",
"https://api.obfs.dev",
help="如果没有自建或其他hibiapi请不要修改",
default_value="https://api.obfs.dev",
)
Config.add_plugin_config("pixiv", "PIXIV_NGINX_URL", "i.pximg.cf", help="Pixiv反向代理")
Config.add_plugin_config(
"pix",
"PIX_IMAGE_SIZE",
"master",
help="PIX图库下载的画质 可能的值original原图master缩略图加快发送速度",
default_value="master",
)
Config.add_plugin_config(
"pix",
"SEARCH_HIBIAPI_BOOKMARKS",
5000,
help="最低收藏PIX使用HIBIAPI搜索图片时达到最低收藏才会添加至图库",
default_value=5000,
type=int,
)
Config.add_plugin_config(
"pix",
"WITHDRAW_PIX_MESSAGE",
(0, 1),
help="自动撤回参1延迟撤回色图时间(秒)0 为关闭 | 参2监控聊天类型0(私聊) 1(群聊) 2(群聊+私聊)",
default_value=(0, 1),
type=Tuple[int, int],
)
Config.add_plugin_config(
"pix",
"PIX_OMEGA_PIXIV_RATIO",
(10, 0),
help="PIX图库 与 额外图库OmegaPixivIllusts 混合搜索的比例 参1PIX图库 参2OmegaPixivIllusts扩展图库没有此图库请设置为0",
default_value=(10, 0),
type=Tuple[int, int],
)
Config.add_plugin_config(
"pix", "TIMEOUT", 10, help="下载图片超时限制(秒)", default_value=10, type=int
)
Config.add_plugin_config(
"pix",
"SHOW_INFO",
True,
help="是否显示图片的基本信息如PID等",
default_value=True,
type=bool,
)
Config.set_name("pix", "PIX图库")
nonebot.load_plugins(str(Path(__file__).parent.resolve()))

View File

@ -0,0 +1,426 @@
import asyncio
import math
from asyncio.exceptions import TimeoutError
from asyncio.locks import Semaphore
from copy import deepcopy
from pathlib import Path
import aiofiles
from httpx import ConnectError
from zhenxun.configs.config import Config
from zhenxun.configs.path_config import TEMP_PATH
from zhenxun.services.log import logger
from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.utils.image_utils import BuildImage
from zhenxun.utils.utils import change_img_md5, change_pixiv_image_links
from ._model.omega_pixiv_illusts import OmegaPixivIllusts
from ._model.pixiv import Pixiv
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6;"
" rv:2.0.1) Gecko/20100101 Firefox/4.0.1",
"Referer": "https://www.pixiv.net/",
}
HIBIAPI = Config.get_config("hibiapi", "HIBIAPI")
if not HIBIAPI:
HIBIAPI = "https://api.obfs.dev"
HIBIAPI = HIBIAPI[:-1] if HIBIAPI[-1] == "/" else HIBIAPI
async def start_update_image_url(
current_keyword: list[str], black_pid: list[str], is_pid: bool
) -> tuple[int, int]:
"""开始更新图片url
参数:
current_keyword: 关键词
black_pid: 黑名单pid
is_pid: pid强制更新不受限制
返回:
tuple[int, int]: pid数量和图片数量
"""
global HIBIAPI
pid_count = 0
pic_count = 0
tasks = []
semaphore = asyncio.Semaphore(10)
for keyword in current_keyword:
for page in range(1, 110):
if keyword.startswith("uid:"):
url = f"{HIBIAPI}/api/pixiv/member_illust"
params = {"id": keyword[4:], "page": page}
if page == 30:
break
elif keyword.startswith("pid:"):
url = f"{HIBIAPI}/api/pixiv/illust"
params = {"id": keyword[4:]}
else:
url = f"{HIBIAPI}/api/pixiv/search"
params = {"word": keyword, "page": page}
tasks.append(
asyncio.ensure_future(
search_image(
url, keyword, params, semaphore, page, black_pid, is_pid
)
)
)
if keyword.startswith("pid:"):
break
result = await asyncio.gather(*tasks)
for x in result:
pid_count += x[0]
pic_count += x[1]
return pid_count, pic_count
async def search_image(
url: str,
keyword: str,
params: dict,
semaphore: Semaphore,
page: int = 1,
black: list[str] = [],
is_pid: bool = False,
) -> tuple[int, int]:
"""搜索图片
参数:
url: 搜索url
keyword: 关键词
params: params参数
semaphore: semaphore
page: 页面
black: pid黑名单
is_pid: pid强制更新不受限制
返回:
tuple[int, int]: pid数量和图片数量
"""
tmp_pid = []
pic_count = 0
pid_count = 0
async with semaphore:
# try:
data = (await AsyncHttpx.get(url, params=params)).json()
if (
not data
or data.get("error")
or (not data.get("illusts") and not data.get("illust"))
):
return 0, 0
if url != f"{HIBIAPI}/api/pixiv/illust":
logger.info(f'{keyword}: 获取数据成功...数据总量:{len(data["illusts"])}')
data = data["illusts"]
else:
logger.info(f'获取数据成功...PID{params.get("id")}')
data = [data["illust"]]
img_data = {}
for x in data:
pid = x["id"]
title = x["title"]
width = x["width"]
height = x["height"]
view = x["total_view"]
bookmarks = x["total_bookmarks"]
uid = x["user"]["id"]
author = x["user"]["name"]
tags = []
for tag in x["tags"]:
for i in tag:
if tag[i]:
tags.append(tag[i])
img_urls = []
if x["page_count"] == 1:
img_urls.append(x["meta_single_page"]["original_image_url"])
else:
for urls in x["meta_pages"]:
img_urls.append(urls["image_urls"]["original"])
if (
(
bookmarks >= Config.get_config("pix", "SEARCH_HIBIAPI_BOOKMARKS")
or (
url == f"{HIBIAPI}/api/pixiv/member_illust"
and bookmarks >= 1500
)
or (url == f"{HIBIAPI}/api/pixiv/illust")
)
and len(img_urls) < 10
and _check_black(img_urls, black)
) or is_pid:
img_data[pid] = {
"pid": pid,
"title": title,
"width": width,
"height": height,
"view": view,
"bookmarks": bookmarks,
"img_urls": img_urls,
"uid": uid,
"author": author,
"tags": tags,
}
else:
continue
for x in img_data.keys():
data = img_data[x]
data_copy = deepcopy(data)
del data_copy["img_urls"]
for img_url in data["img_urls"]:
img_p = img_url[img_url.rfind("_") + 1 : img_url.rfind(".")]
data_copy["img_url"] = img_url
data_copy["img_p"] = img_p
data_copy["is_r18"] = "R-18" in data["tags"]
if not await Pixiv.exists(
pid=data["pid"], img_url=img_url, img_p=img_p
):
data_copy["img_url"] = img_url
await Pixiv.create(**data_copy)
if data["pid"] not in tmp_pid:
pid_count += 1
tmp_pid.append(data["pid"])
pic_count += 1
logger.info(f'存储图片PID{data["pid"]} IMG_P{img_p}')
else:
logger.warning(f'{data["pid"]} | {img_url} 已存在...')
# except Exception as e:
# logger.warning(f"PIX在线搜索图片错误已再次调用 {type(e)}{e}")
# await search_image(url, keyword, params, semaphore, page, black)
return pid_count, pic_count
async def get_image(img_url: str, user_id: str) -> str | Path | None:
"""下载图片
参数:
img_url: 图片url
user_id: 用户id
返回:
str | Path | None: 图片名称
"""
if "https://www.pixiv.net/artworks" in img_url:
pid = img_url.rsplit("/", maxsplit=1)[-1]
params = {"id": pid}
for _ in range(3):
try:
response = await AsyncHttpx.get(
f"{HIBIAPI}/api/pixiv/illust", params=params
)
if response.status_code == 200:
data = response.json()
if data.get("illust"):
if data["illust"]["page_count"] == 1:
img_url = data["illust"]["meta_single_page"][
"original_image_url"
]
else:
img_url = data["illust"]["meta_pages"][0]["image_urls"][
"original"
]
break
except TimeoutError:
pass
old_img_url = img_url
img_url = change_pixiv_image_links(
img_url,
Config.get_config("pix", "PIX_IMAGE_SIZE"),
Config.get_config("pixiv", "PIXIV_NGINX_URL"),
)
old_img_url = change_pixiv_image_links(
old_img_url, None, Config.get_config("pixiv", "PIXIV_NGINX_URL")
)
for _ in range(3):
try:
response = await AsyncHttpx.get(
img_url,
headers=headers,
timeout=Config.get_config("pix", "TIMEOUT"),
)
if response.status_code == 404:
img_url = old_img_url
continue
async with aiofiles.open(
TEMP_PATH / f"pix_{user_id}_{img_url.split('/')[-1][:-4]}.jpg", "wb"
) as f:
await f.write(response.content)
change_img_md5(
TEMP_PATH / f"pix_{user_id}_{img_url.split('/')[-1][:-4]}.jpg"
)
return TEMP_PATH / f"pix_{user_id}_{img_url.split('/')[-1][:-4]}.jpg"
except TimeoutError:
logger.warning(f"PIX{img_url} 图片下载超时...")
except ConnectError:
logger.warning(f"PIX{img_url} 图片下载连接失败...")
return None
async def uid_pid_exists(id_: str) -> bool:
"""检测 pid/uid 是否有效
参数:
id_: pid/uid
返回:
bool: 是否有效
"""
if id_.startswith("uid:"):
url = f"{HIBIAPI}/api/pixiv/member"
elif id_.startswith("pid:"):
url = f"{HIBIAPI}/api/pixiv/illust"
else:
return False
params = {"id": int(id_[4:])}
data = (await AsyncHttpx.get(url, params=params)).json()
if data.get("error"):
return False
return True
async def get_keyword_num(keyword: str) -> tuple[int, int, int, int, int]:
"""查看图片相关 tag 数量
参数:
keyword: 关键词tag
返回:
tuple[int, int, int, int, int]: 总数, r18数, Omg图库总数, Omg图库色图数, Omg图库r18数
"""
count, r18_count = await Pixiv.get_keyword_num(keyword.split())
count_, setu_count, r18_count_ = await OmegaPixivIllusts.get_keyword_num(
keyword.split()
)
return count, r18_count, count_, setu_count, r18_count_
async def remove_image(pid: int, img_p: str | None):
"""删除置顶图片
参数:
pid: pid
img_p: 图片 p p0p1
"""
if img_p:
if "p" not in img_p:
img_p = f"p{img_p}"
if img_p:
await Pixiv.filter(pid=pid, img_p=img_p).delete()
else:
await Pixiv.filter(pid=pid).delete()
async def gen_keyword_pic(
_pass_keyword: list[str], not_pass_keyword: list[str], is_superuser: bool
) -> BuildImage:
"""已通过或未通过的所有关键词/uid/pid
参数:
_pass_keyword: 通过列表
not_pass_keyword: 未通过列表
is_superuser: 是否超级用户
返回:
BuildImage: 数据图片
"""
_keyword = [
x
for x in _pass_keyword
if not x.startswith("uid:")
and not x.startswith("pid:")
and not x.startswith("black:")
]
_uid = [x for x in _pass_keyword if x.startswith("uid:")]
_pid = [x for x in _pass_keyword if x.startswith("pid:")]
_n_keyword = [
x
for x in not_pass_keyword
if not x.startswith("uid:")
and not x.startswith("pid:")
and not x.startswith("black:")
]
_n_uid = [
x
for x in not_pass_keyword
if x.startswith("uid:") and not x.startswith("black:")
]
_n_pid = [
x
for x in not_pass_keyword
if x.startswith("pid:") and not x.startswith("black:")
]
img_width = 0
img_data = {
"_keyword": {"width": 0, "data": _keyword},
"_uid": {"width": 0, "data": _uid},
"_pid": {"width": 0, "data": _pid},
"_n_keyword": {"width": 0, "data": _n_keyword},
"_n_uid": {"width": 0, "data": _n_uid},
"_n_pid": {"width": 0, "data": _n_pid},
}
for x in list(img_data.keys()):
img_data[x]["width"] = math.ceil(len(img_data[x]["data"]) / 40)
img_width += img_data[x]["width"] * 200
if not is_superuser:
img_width = (
img_width
- (
img_data["_n_keyword"]["width"]
+ img_data["_n_uid"]["width"]
+ img_data["_n_pid"]["width"]
)
* 200
)
del img_data["_n_keyword"]
del img_data["_n_pid"]
del img_data["_n_uid"]
current_width = 0
A = BuildImage(img_width, 1100)
for x in list(img_data.keys()):
if img_data[x]["data"]:
# img = BuildImage(img_data[x]["width"] * 200, 1100, 200, 1100, font_size=40)
img = BuildImage(img_data[x]["width"] * 200, 1100, font_size=40)
start_index = 0
end_index = 40
total_index = img_data[x]["width"] * 40
for _ in range(img_data[x]["width"]):
tmp = BuildImage(198, 1100, font_size=20)
text_img = BuildImage(198, 100, font_size=50)
key_str = "\n".join(
[key for key in img_data[x]["data"][start_index:end_index]]
)
await tmp.text((10, 100), key_str)
if x.find("_n") == -1:
await text_img.text((24, 24), "已收录")
else:
await text_img.text((24, 24), "待收录")
await tmp.paste(text_img, (0, 0))
start_index += 40
end_index = (
end_index + 40 if end_index + 40 <= total_index else total_index
)
background_img = BuildImage(200, 1100, color="#FFE4C4")
await background_img.paste(tmp, (1, 1))
await img.paste(background_img)
await A.paste(img, (current_width, 0))
current_width += img_data[x]["width"] * 200
return A
def _check_black(img_urls: list[str], black: list[str]) -> bool:
"""检测pid是否在黑名单中
参数:
img_urls: 图片img列表
black: 黑名单
返回:
bool: 是否在黑名单中
"""
for b in black:
for img_url in img_urls:
if b in img_url:
return False
return True

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,89 @@
from tortoise import fields
from tortoise.contrib.postgres.functions import Random
from zhenxun.services.db_context import Model
class OmegaPixivIllusts(Model):
id = fields.IntField(pk=True, generated=True, auto_increment=True)
"""自增id"""
pid = fields.BigIntField()
"""pid"""
uid = fields.BigIntField()
"""uid"""
title = fields.CharField(255)
"""标题"""
uname = fields.CharField(255)
"""画师名称"""
classified = fields.IntField()
"""标记标签, 0=未标记, 1=已人工标记或从可信已标记来源获取"""
nsfw_tag = fields.IntField()
"""nsfw标签,-1=未标记, 0=safe, 1=setu. 2=r18"""
width = fields.IntField()
"""宽度"""
height = fields.IntField()
"""高度"""
tags = fields.TextField()
"""tags"""
url = fields.CharField(255)
"""pixiv url链接"""
class Meta:
table = "omega_pixiv_illusts"
table_description = "omega图库数据表"
unique_together = ("pid", "url")
@classmethod
async def query_images(
cls,
keywords: list[str] | None = None,
uid: int | None = None,
pid: int | None = None,
nsfw_tag: int | None = 0,
num: int = 100,
) -> list["OmegaPixivIllusts"]:
"""查找符合条件的图片
参数:
keywords: 关键词
uid: 画师uid
pid: 图片pid
nsfw_tag: nsfw标签, 0=safe, 1=setu. 2=r18
num: 获取图片数量
"""
if not num:
return []
query = cls
if nsfw_tag is not None:
query = cls.filter(nsfw_tag=nsfw_tag)
if keywords:
for keyword in keywords:
query = query.filter(tags__contains=keyword)
elif uid:
query = query.filter(uid=uid)
elif pid:
query = query.filter(pid=pid)
query = query.annotate(rand=Random()).limit(num)
return await query.all() # type: ignore
@classmethod
async def get_keyword_num(
cls, tags: list[str] | None = None
) -> tuple[int, int, int]:
"""获取相关关键词(keyword, tag)在图库中的数量
参数:
tags: 关键词/Tag
"""
query = cls
if tags:
for tag in tags:
query = query.filter(tags__contains=tag)
else:
query = query.all()
count = await query.filter(nsfw_tag=0).count()
setu_count = await query.filter(nsfw_tag=1).count()
r18_count = await query.filter(nsfw_tag=2).count()
return count, setu_count, r18_count

View File

@ -0,0 +1,91 @@
from tortoise import fields
from tortoise.contrib.postgres.functions import Random
from zhenxun.services.db_context import Model
class Pixiv(Model):
id = fields.IntField(pk=True, generated=True, auto_increment=True)
"""自增id"""
pid = fields.BigIntField()
"""pid"""
uid = fields.BigIntField()
"""uid"""
author = fields.CharField(255)
"""作者"""
title = fields.CharField(255)
"""标题"""
width = fields.IntField()
"""宽度"""
height = fields.IntField()
"""高度"""
view = fields.IntField()
"""pixiv查看数"""
bookmarks = fields.IntField()
"""收藏数"""
tags = fields.TextField()
"""tags"""
img_url = fields.CharField(255)
"""pixiv url链接"""
img_p = fields.CharField(255)
"""图片pN"""
is_r18 = fields.BooleanField()
class Meta:
table = "pixiv"
table_description = "pix图库数据表"
unique_together = ("pid", "img_url", "img_p")
# 0非r18 1r18 2混合
@classmethod
async def query_images(
cls,
keywords: list[str] | None = None,
uid: int | None = None,
pid: int | None = None,
r18: int | None = 0,
num: int = 100,
) -> list["Pixiv"]:
"""查找符合条件的图片
参数:
keywords: 关键词
uid: 画师uid
pid: 图片pid
r18: 是否r180非r18 1r18 2混合
num: 查找图片的数量
"""
if not num:
return []
query = cls
if r18 == 0:
query = query.filter(is_r18=False)
elif r18 == 1:
query = query.filter(is_r18=True)
if keywords:
for keyword in keywords:
query = query.filter(tags__contains=keyword)
elif uid:
query = query.filter(uid=uid)
elif pid:
query = query.filter(pid=pid)
query = query.annotate(rand=Random()).limit(num)
return await query.all() # type: ignore
@classmethod
async def get_keyword_num(cls, tags: list[str] | None = None) -> tuple[int, int]:
"""获取相关关键词(keyword, tag)在图库中的数量
参数:
tags: 关键词/Tag
"""
query = cls
if tags:
for tag in tags:
query = query.filter(tags__contains=tag)
else:
query = query.all()
count = await query.filter(is_r18=False).count()
r18_count = await query.filter(is_r18=True).count()
return count, r18_count

View File

@ -0,0 +1,52 @@
from tortoise import fields
from zhenxun.services.db_context import Model
class PixivKeywordUser(Model):
id = fields.IntField(pk=True, generated=True, auto_increment=True)
"""自增id"""
user_id = fields.CharField(255)
"""用户id"""
group_id = fields.CharField(255)
"""群聊id"""
keyword = fields.CharField(255, unique=True)
"""关键词"""
is_pass = fields.BooleanField()
"""是否通过"""
class Meta:
table = "pixiv_keyword_users"
table_description = "pixiv关键词数据表"
@classmethod
async def get_current_keyword(cls) -> tuple[list[str], list[str]]:
"""获取当前通过与未通过的关键词"""
pass_keyword = []
not_pass_keyword = []
for data in await cls.all().values_list("keyword", "is_pass"):
if data[1]:
pass_keyword.append(data[0])
else:
not_pass_keyword.append(data[0])
return pass_keyword, not_pass_keyword
@classmethod
async def get_black_pid(cls) -> list[str]:
"""获取黑名单PID"""
black_pid = []
keyword_list = await cls.filter(user_id="114514").values_list(
"keyword", flat=True
)
for image in keyword_list:
black_pid.append(image[6:])
return black_pid
@classmethod
async def _run_script(cls):
return [
"ALTER TABLE pixiv_keyword_users RENAME COLUMN user_qq TO user_id;", # 将user_qq改为user_id
"ALTER TABLE pixiv_keyword_users ALTER COLUMN user_id TYPE character varying(255);",
"ALTER TABLE pixiv_keyword_users ALTER COLUMN group_id TYPE character varying(255);",
]

View File

@ -0,0 +1,251 @@
import random
from nonebot.adapters import Bot
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import (
Alconna,
Args,
Arparma,
Match,
Option,
on_alconna,
store_true,
)
from nonebot_plugin_saa import Image, MessageFactory, Text
from nonebot_plugin_session import EventSession
from zhenxun.configs.config import Config
from zhenxun.configs.utils import BaseBlock, PluginExtraData, RegisterConfig
from zhenxun.services.log import logger
from zhenxun.utils.platform import PlatformUtils
from zhenxun.utils.withdraw_manage import WithdrawManager
from ._data_source import get_image
from ._model.omega_pixiv_illusts import OmegaPixivIllusts
from ._model.pixiv import Pixiv
__plugin_meta__ = PluginMetadata(
name="PIX",
description="这里是PIX图库",
usage="""
指令
pix ?*[tags]: 通过 tag 获取相似图片不含tag时随机抽取
pid [uid]: 通过uid获取图片
pix pid[pid]: 查看图库中指定pid图片
示例pix 萝莉 白丝
示例pix 萝莉 白丝 10 10为数量
示例pix #02 当tag只有1个tag且为数字时使用#标记,否则将被判定为数量)
示例pix 34582394 查询指定uid图片
示例pix pid:12323423 查询指定pid图片
""".strip(),
extra=PluginExtraData(
author="HibiKier",
version="0.1",
superuser_help="""
指令
pix -s ?*[tags]: 通过tag获取色图不含tag时随机
pix -r ?*[tags]: 通过tag获取r18图不含tag时随机
""",
menu_type="来点好康的",
limits=[BaseBlock(result="您有PIX图片正在处理请稍等...")],
configs=[
RegisterConfig(
key="MAX_ONCE_NUM2FORWARD",
value=None,
help="单次发送的图片数量达到指定值时转发为合并消息",
default_value=None,
type=int,
),
RegisterConfig(
key="ALLOW_GROUP_SETU",
value=False,
help="允许非超级用户使用-s参数",
default_value=False,
type=bool,
),
RegisterConfig(
key="ALLOW_GROUP_R18",
value=False,
help="允许非超级用户使用-r参数",
default_value=False,
type=bool,
),
],
).dict(),
)
# pix = on_command("pix", aliases={"PIX", "Pix"}, priority=5, block=True)
_matcher = on_alconna(
Alconna(
"pix",
Args["tags?", list[str]],
Option("-s", action=store_true, help_text="色图"),
Option("-r", action=store_true, help_text="r18"),
),
priority=5,
block=True,
)
PIX_RATIO = None
OMEGA_RATIO = None
@_matcher.handle()
async def _(bot: Bot, session: EventSession, arparma: Arparma, tags: Match[list[str]]):
global PIX_RATIO, OMEGA_RATIO
gid = session.id3 or session.id2
if not session.id1:
await Text("用户id为空...").finish()
if PIX_RATIO is None:
pix_omega_pixiv_ratio = Config.get_config("pix", "PIX_OMEGA_PIXIV_RATIO")
PIX_RATIO = pix_omega_pixiv_ratio[0] / (
pix_omega_pixiv_ratio[0] + pix_omega_pixiv_ratio[1]
)
OMEGA_RATIO = 1 - PIX_RATIO
num = 1
# keyword = arg.extract_plain_text().strip()
keyword = ""
spt = tags.result if tags.available else []
if arparma.find("s"):
nsfw_tag = 1
elif arparma.find("r"):
nsfw_tag = 2
else:
nsfw_tag = 0
if session.id1 not in bot.config.superusers:
if (nsfw_tag == 1 and not Config.get_config("pix", "ALLOW_GROUP_SETU")) or (
nsfw_tag == 2 and not Config.get_config("pix", "ALLOW_GROUP_R18")
):
await Text("你不能看这些噢,这些都是是留给管理员看的...").finish()
if (n := len(spt)) == 1:
if str(spt[0]).isdigit() and int(spt[0]) < 100:
num = int(spt[0])
keyword = ""
elif spt[0].startswith("#"):
keyword = spt[0][1:]
elif n > 1:
if str(spt[-1]).isdigit():
num = int(spt[-1])
if num > 10:
if session.id1 not in bot.config.superusers or (
session.id1 in bot.config.superusers and num > 30
):
num = random.randint(1, 10)
await Text(f"太贪心了,就给你发 {num}张 好了").send()
spt = spt[:-1]
keyword = " ".join(spt)
pix_num = int(num * PIX_RATIO) + 15 if PIX_RATIO != 0 else 0
omega_num = num - pix_num + 15
if str(keyword).isdigit():
if num == 1:
pix_num = 15
omega_num = 15
all_image = await Pixiv.query_images(
uid=int(keyword), num=pix_num, r18=1 if nsfw_tag == 2 else 0
) + await OmegaPixivIllusts.query_images(
uid=int(keyword), num=omega_num, nsfw_tag=nsfw_tag
)
elif keyword.lower().startswith("pid"):
pid = keyword.replace("pid", "").replace(":", "").replace("", "")
if not str(pid).isdigit():
await Text("PID必须是数字...").finish(reply=True)
all_image = await Pixiv.query_images(
pid=int(pid), r18=1 if nsfw_tag == 2 else 0
)
if not all_image:
all_image = await OmegaPixivIllusts.query_images(
pid=int(pid), nsfw_tag=nsfw_tag
)
num = len(all_image)
else:
tmp = await Pixiv.query_images(
spt, r18=1 if nsfw_tag == 2 else 0, num=pix_num
) + await OmegaPixivIllusts.query_images(spt, nsfw_tag=nsfw_tag, num=omega_num)
tmp_ = []
all_image = []
for x in tmp:
if x.pid not in tmp_:
all_image.append(x)
tmp_.append(x.pid)
if not all_image:
await Text(f"未在图库中找到与 {keyword} 相关Tag/UID/PID的图片...").finish(
reply=True
)
msg_list = []
for _ in range(num):
img_url = None
author = None
if not all_image:
await Text("坏了...发完了,没图了...").finish()
img = random.choice(all_image)
all_image.remove(img) # type: ignore
if isinstance(img, OmegaPixivIllusts):
img_url = img.url
author = img.uname
elif isinstance(img, Pixiv):
img_url = img.img_url
author = img.author
pid = img.pid
title = img.title
uid = img.uid
if img_url:
_img = await get_image(img_url, session.id1)
if _img:
if Config.get_config("pix", "SHOW_INFO"):
msg_list.append(
MessageFactory(
[
Text(
f"title{title}\n"
f"author{author}\n"
f"PID{pid}\nUID{uid}\n"
),
Image(_img),
]
)
)
else:
msg_list.append(Image(_img))
logger.info(
f" 查看PIX图库PID: {pid}", arparma.header_result, session=session
)
else:
msg_list.append(Text("这张图似乎下载失败了"))
logger.info(
f" 查看PIX图库PID: {pid},下载图片出错",
arparma.header_result,
session=session,
)
if (
Config.get_config("pix", "MAX_ONCE_NUM2FORWARD")
and num >= Config.get_config("pix", "MAX_ONCE_NUM2FORWARD")
and gid
):
for msg in msg_list:
# receipt = await PlatformUtils.send_message(
# bot, None, group_id=gid, message=msg
# )
receipt = await msg.send()
if receipt:
message_id = receipt.extract_message_id().message_id
await WithdrawManager.withdraw_message(
bot,
str(message_id),
Config.get_config("pix", "WITHDRAW_PIX_MESSAGE"),
session,
)
else:
for msg in msg_list:
receipt = await msg.send()
# receipt = await PlatformUtils.send_message(
# bot, session.id1, group_id=gid, message=msg
# )
if receipt:
message_id = receipt.extract_message_id().message_id
await WithdrawManager.withdraw_message(
bot,
message_id,
Config.get_config("pix", "WITHDRAW_PIX_MESSAGE"),
session,
)

View File

@ -0,0 +1,129 @@
from nonebot.adapters import Bot
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import (
Alconna,
Args,
Arparma,
Option,
on_alconna,
store_true,
)
from nonebot_plugin_saa import Text
from nonebot_plugin_session import EventSession
from zhenxun.configs.utils import PluginExtraData
from zhenxun.services.log import logger
from ._data_source import uid_pid_exists
from ._model.pixiv import Pixiv
from ._model.pixiv_keyword_user import PixivKeywordUser
__plugin_meta__ = PluginMetadata(
name="PIX添加",
description="PIX关键词/UID/PID添加管理",
usage="""
指令
添加pix关键词 [Tag]: 添加一个pix搜索收录Tag
pix添加 uid [uid]: 添加一个pix搜索收录uid
pix添加 pid [pid]: 添加一个pix收录pid
""".strip(),
extra=PluginExtraData(
author="HibiKier",
version="0.1",
).dict(),
)
_add_matcher = on_alconna(
Alconna("添加pix关键词", Args["keyword", str]), priority=5, block=True
)
_uid_matcher = on_alconna(
Alconna(
"pix添加",
Args["add_type", ["uid", "pid"]]["id", str],
Option("-f", action=store_true, help_text="强制收录不检查是否存在"),
),
priority=5,
block=True,
)
_black_matcher = on_alconna(
Alconna("添加pix黑名单", Args["pid", str]), priority=5, block=True
)
@_add_matcher.handle()
async def _(bot: Bot, session: EventSession, keyword: str, arparma: Arparma):
group_id = session.id3 or session.id2 or -1
if not await PixivKeywordUser.exists(keyword=keyword):
await PixivKeywordUser.create(
user_id=str(session.id1),
group_id=str(group_id),
keyword=keyword,
is_pass=str(session.id1) in bot.config.superusers,
)
text = f"已成功添加pixiv搜图关键词{keyword}"
if session.id1 not in bot.config.superusers:
text += ",请等待管理员通过该关键词!"
await Text(text).send(reply=True)
logger.info(
f"添加了pixiv搜图关键词: {keyword}", arparma.header_result, session=session
)
else:
await Text(f"该关键词 {keyword} 已存在...").send()
@_uid_matcher.handle()
async def _(bot: Bot, session: EventSession, arparma: Arparma, add_type: str, id: str):
group_id = session.id3 or session.id2 or -1
exists_flag = True
if arparma.find("f") and session.id1 in bot.config.superusers:
exists_flag = False
word = None
if add_type == "uid":
word = f"uid:{id}"
else:
word = f"pid:{id}"
if await Pixiv.get_or_none(pid=int(id), img_p="p0"):
await Text(f"该PID{id}已存在...").finish(reply=True)
if not await uid_pid_exists(word) and exists_flag:
await Text("画师或作品不存在或搜索正在CD请稍等...").finish(reply=True)
if not await PixivKeywordUser.exists(keyword=word):
await PixivKeywordUser.create(
user_id=session.id1,
group_id=str(group_id),
keyword=word,
is_pass=session.id1 in bot.config.superusers,
)
text = f"已成功添加pixiv搜图UID/PID{id}"
if session.id1 not in bot.config.superusers:
text += ",请等待管理员通过该关键词!"
await Text(text).send(reply=True)
else:
await Text(f"该UID/PID{id} 已存在...").send()
@_black_matcher.handle()
async def _(bot: Bot, session: EventSession, arparma: Arparma, pid: str):
img_p = ""
if "p" in pid:
img_p = pid.split("p")[-1]
pid = pid.replace("_", "")
pid = pid[: pid.find("p")]
if not pid.isdigit:
await Text("PID必须全部是数字").finish(reply=True)
if not await PixivKeywordUser.exists(
keyword=f"black:{pid}{f'_p{img_p}' if img_p else ''}"
):
await PixivKeywordUser.create(
user_id=114514,
group_id=114514,
keyword=f"black:{pid}{f'_p{img_p}' if img_p else ''}",
is_pass=session.id1 in bot.config.superusers,
)
await Text(f"已添加PID{pid} 至黑名单中...").send()
logger.info(
f" 添加了pixiv搜图黑名单 PID:{pid}", arparma.header_result, session=session
)
else:
await Text(f"PID{pid} 已添加黑名单中,添加失败...").send()

View File

@ -0,0 +1,217 @@
from nonebot.adapters import Bot
from nonebot.permission import SUPERUSER
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import (
Alconna,
Args,
Arparma,
Match,
Option,
on_alconna,
store_true,
)
from nonebot_plugin_saa import Mention, MessageFactory, Text
from nonebot_plugin_session import EventSession
from zhenxun.configs.utils import PluginExtraData
from zhenxun.services.log import logger
from zhenxun.utils.enum import PluginType
from zhenxun.utils.platform import PlatformUtils
from ._data_source import remove_image
from ._model.pixiv import Pixiv
from ._model.pixiv_keyword_user import PixivKeywordUser
__plugin_meta__ = PluginMetadata(
name="PIX删除",
description="PIX关键词/UID/PID添加管理",
usage="""
指令
pix关键词 [y/n] [关键词/pid/uid]
删除pix关键词 ['pid'/'uid'/'keyword'] [关键词/pid/uid]
删除pix图片 *[pid]
示例pix关键词 y 萝莉
示例pix关键词 y 12312312 uid
示例pix关键词 n 12312312 pid
示例删除pix关键词 keyword 萝莉
示例删除pix关键词 uid 123123123
示例删除pix关键词 pid 123123
示例删除pix图片 4223442
""".strip(),
extra=PluginExtraData(
author="HibiKier", version="0.1", plugin_type=PluginType.SUPERUSER
).dict(),
)
_pass_matcher = on_alconna(
Alconna(
"pix关键词", Args["status", ["y", "n"]]["keyword", str]["type?", ["uid", "pid"]]
),
permission=SUPERUSER,
priority=1,
block=True,
)
_del_matcher = on_alconna(
Alconna("删除pix关键词", Args["type", ["pid", "uid", "keyword"]]["keyword", str]),
permission=SUPERUSER,
priority=1,
block=True,
)
_del_pic_matcher = on_alconna(
Alconna(
"删除pix图片",
Args["pid", str],
Option("-b|--black", action=store_true, help_text=""),
),
permission=SUPERUSER,
priority=1,
block=True,
)
@_pass_matcher.handle()
async def _(
bot: Bot,
session: EventSession,
arparma: Arparma,
status: str,
keyword: str,
type: Match[str],
):
tmp = {"group": {}, "private": {}}
flag = status == "y"
if type.available:
if type.result == "uid":
keyword = f"uid:{keyword}"
else:
keyword = f"pid:{keyword}"
if not keyword[4:].isdigit():
await Text(f"{keyword} 非全数字...").finish(reply=True)
data = await PixivKeywordUser.get_or_none(keyword=keyword)
user_id = 0
group_id = 0
if data:
data.is_pass = flag
await data.save(update_fields=["is_pass"])
user_id, group_id = data.user_id, data.group_id
if not user_id:
await Text(f"未找到关键词/UID{keyword},请检查关键词/UID是否存在...").finish(
reply=True
)
if flag:
if group_id == -1:
if not tmp["private"].get(user_id):
tmp["private"][user_id] = {"keyword": [keyword]}
else:
tmp["private"][user_id]["keyword"].append(keyword)
else:
if not tmp["group"].get(group_id):
tmp["group"][group_id] = {}
if not tmp["group"][group_id].get(user_id):
tmp["group"][group_id][user_id] = {"keyword": [keyword]}
else:
tmp["group"][group_id][user_id]["keyword"].append(keyword)
await Text(f"已成功{'通过' if flag else '拒绝'}搜图关键词:{keyword}...").send()
for user in tmp["private"]:
text = "".join(tmp["private"][user]["keyword"])
await PlatformUtils.send_message(
bot,
user,
None,
f"你的关键词/UID/PID {text} 已被管理员通过,将在下一次进行更新...",
)
# await bot.send_private_msg(
# user_id=user,
# message=f"你的关键词/UID/PID {x} 已被管理员通过,将在下一次进行更新...",
# )
for group in tmp["group"]:
for user in tmp["group"][group]:
text = "".join(tmp["group"][group][user]["keyword"])
await PlatformUtils.send_message(
bot,
None,
group_id=group,
message=MessageFactory(
[
Mention(user_id=user),
Text(
"你的关键词/UID/PID {x} 已被管理员通过,将在下一次进行更新..."
),
]
),
)
# await bot.send_group_msg(
# group_id=group,
# message=Message(
# f"{at(user)}你的关键词/UID/PID {x} 已被管理员通过,将在下一次进行更新..."
# ),
# )
logger.info(
f" 通过了pixiv搜图关键词/UID: {keyword}", arparma.header_result, session=session
)
@_del_matcher.handle()
async def _(bot: Bot, session: EventSession, arparma: Arparma, type: str, keyword: str):
if type != "keyword":
keyword = f"{type}:{keyword}"
if data := await PixivKeywordUser.get_or_none(keyword=keyword):
await data.delete()
await Text(f"删除搜图关键词/UID{keyword} 成功...").send()
logger.info(
f" 删除了pixiv搜图关键词: {keyword}", arparma.header_result, session=session
)
else:
await Text(f"未查询到搜索关键词/UID/PID{keyword},删除失败!").send()
@_del_pic_matcher.handle()
async def _(bot: Bot, session: EventSession, arparma: Arparma, keyword: str):
msg = ""
black_pid = ""
flag = arparma.find("black")
img_p = None
if "p" in keyword:
img_p = keyword.split("p")[-1]
keyword = keyword.replace("_", "")
keyword = keyword[: keyword.find("p")]
elif "ugoira" in keyword:
img_p = keyword.split("ugoira")[-1]
keyword = keyword.replace("_", "")
keyword = keyword[: keyword.find("ugoira")]
if keyword.isdigit():
if await Pixiv.query_images(pid=int(keyword), r18=2):
if await remove_image(int(keyword), img_p):
msg += f'{keyword}{f"_p{img_p}" if img_p else ""}'
if flag:
if await PixivKeywordUser.exists(
keyword=f"black:{keyword}{f'_p{img_p}' if img_p else ''}"
):
await PixivKeywordUser.create(
user_id="114514",
group_id="114514",
keyword=f"black:{keyword}{f'_p{img_p}' if img_p else ''}",
is_pass=False,
)
black_pid += f'{keyword}{f"_p{img_p}" if img_p else ""}'
logger.info(
f" 删除了PIX图片 PID:{keyword}{f'_p{img_p}' if img_p else ''}",
arparma.header_result,
session=session,
)
# else:
# await del_pic.send(
# f"PIX:删除pid{pid}{f'_p{img_p}' if img_p else ''} 失败.."
# )
else:
await Text(
f"PIX:图片pix{keyword}{f'_p{img_p}' if img_p else ''} 不存在...无法删除.."
).send()
else:
await Text(f"PID必须为数字pid{keyword}").send(reply=True)
await Text(f"PIX:成功删除图片:{msg[:-1]}").send()
if flag:
await Text(f"成功图片PID加入黑名单{black_pid[:-1]}").send()

View File

@ -0,0 +1,81 @@
from nonebot.adapters import Bot
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import Alconna, Args, Arparma, Match, on_alconna
from nonebot_plugin_saa import Image, Text
from nonebot_plugin_session import EventSession
from zhenxun.configs.utils import PluginExtraData
from zhenxun.services.log import logger
from ._data_source import gen_keyword_pic, get_keyword_num
from ._model.pixiv_keyword_user import PixivKeywordUser
__plugin_meta__ = PluginMetadata(
name="查看pix图库",
description="让我看看管理员私藏了多少货",
usage="""
指令
我的pix关键词
显示pix关键词
查看pix图库 ?[tag]: 查看指定tag图片数量为空时查看整个图库
""".strip(),
extra=PluginExtraData(
author="HibiKier",
version="0.1",
).dict(),
)
_my_matcher = on_alconna(Alconna("我的pix关键词"), priority=5, block=True)
_show_matcher = on_alconna(Alconna("显示pix关键词"), priority=5, block=True)
_pix_matcher = on_alconna(
Alconna("查看pix图库", Args["keyword?", str]), priority=5, block=True
)
@_my_matcher.handle()
async def _(arparma: Arparma, session: EventSession):
data = await PixivKeywordUser.filter(user_id=session.id1).values_list(
"keyword", flat=True
)
if not data:
await Text("您目前没有提供任何Pixiv搜图关键字...").finish(reply=True)
await Text(f"您目前提供的如下关键字:\n\t" + "".join(data)).send() # type: ignore
logger.info("查看我的pix关键词", arparma.header_result, session=session)
@_show_matcher.handle()
async def _(bot: Bot, arparma: Arparma, session: EventSession):
_pass_keyword, not_pass_keyword = await PixivKeywordUser.get_current_keyword()
if _pass_keyword or not_pass_keyword:
image = await gen_keyword_pic(
_pass_keyword, not_pass_keyword, session.id1 in bot.config.superusers
)
await Image(image.pic2bytes()).send() # type: ignore
else:
if session.id1 in bot.config.superusers:
await Text(f"目前没有已收录或待收录的搜索关键词...").send()
else:
await Text(f"目前没有已收录的搜索关键词...").send()
@_pix_matcher.handle()
async def _(bot: Bot, arparma: Arparma, session: EventSession, keyword: Match[str]):
_keyword = ""
if keyword.available:
_keyword = keyword.result
count, r18_count, count_, setu_count, r18_count_ = await get_keyword_num(_keyword)
await Text(
f"PIX图库{_keyword}\n"
f"总数:{count + r18_count}\n"
f"美图:{count}\n"
f"R18{r18_count}\n"
f"---------------\n"
f"Omega图库{_keyword}\n"
f"总数:{count_ + setu_count + r18_count_}\n"
f"美图:{count_}\n"
f"色图:{setu_count}\n"
f"R18{r18_count_}"
).send()
logger.info("查看pix图库", arparma.header_result, session=session)

View File

@ -0,0 +1,221 @@
import asyncio
import os
import re
import time
from pathlib import Path
from nonebot.adapters import Bot
from nonebot.permission import SUPERUSER
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import (
Alconna,
Args,
Arparma,
Match,
Option,
on_alconna,
store_true,
)
from nonebot_plugin_saa import Text
from nonebot_plugin_session import EventSession
from zhenxun.configs.utils import PluginExtraData
from zhenxun.services.log import logger
from zhenxun.utils.enum import PluginType
from ._data_source import start_update_image_url
from ._model.omega_pixiv_illusts import OmegaPixivIllusts
from ._model.pixiv import Pixiv
from ._model.pixiv_keyword_user import PixivKeywordUser
__plugin_meta__ = PluginMetadata(
name="pix检查更新",
description="pix图库收录数据检查更新",
usage="""
指令
更新pix关键词 *[keyword/uid/pid] [num=max]: 更新仅keyword/uid/pid或全部
pix检测更新检测从未更新过的uid和pid
示例更新pix关键词keyword
示例更新pix关键词uid 10
""".strip(),
extra=PluginExtraData(
author="HibiKier", version="0.1", plugin_type=PluginType.SUPERUSER
).dict(),
)
_update_matcher = on_alconna(
Alconna("更新pix关键词", Args["type", ["uid", "pid", "keyword"]]["num?", int]),
permission=SUPERUSER,
priority=1,
block=True,
)
_check_matcher = on_alconna(
Alconna(
"pix检测更新", Option("-u|--update", action=store_true, help_text="是否更新")
),
permission=SUPERUSER,
priority=1,
block=True,
)
_omega_matcher = on_alconna(
Alconna("检测omega图库"), permission=SUPERUSER, priority=1, block=True
)
@_update_matcher.handle()
async def _(arparma: Arparma, session: EventSession, type: str, num: Match[int]):
_pass_keyword, _ = await PixivKeywordUser.get_current_keyword()
_pass_keyword.reverse()
black_pid = await PixivKeywordUser.get_black_pid()
_keyword = [
x
for x in _pass_keyword
if not x.startswith("uid:")
and not x.startswith("pid:")
and not x.startswith("black:")
]
_uid = [x for x in _pass_keyword if x.startswith("uid:")]
_pid = [x for x in _pass_keyword if x.startswith("pid:")]
_num = num.result if num.available else 9999
if _num < 10000:
keyword_str = "".join(
_keyword[: _num if _num < len(_keyword) else len(_keyword)]
)
uid_str = "".join(_uid[: _num if _num < len(_uid) else len(_uid)])
pid_str = "".join(_pid[: _num if _num < len(_pid) else len(_pid)])
if type == "pid":
update_lst = _pid
info = f"开始更新Pixiv搜图PID\n{pid_str}"
elif type == "uid":
update_lst = _uid
info = f"开始更新Pixiv搜图UID\n{uid_str}"
elif type == "keyword":
update_lst = _keyword
info = f"开始更新Pixiv搜图关键词\n{keyword_str}"
else:
update_lst = _pass_keyword
info = f"开始更新Pixiv搜图关键词\n{keyword_str}\n更新UID{uid_str}\n更新PID{pid_str}"
_num = _num if _num < len(update_lst) else len(update_lst)
else:
if type == "pid":
update_lst = [f"pid:{_num}"]
info = f"开始更新Pixiv搜图UID\npid:{_num}"
else:
update_lst = [f"uid:{_num}"]
info = f"开始更新Pixiv搜图UID\nuid:{_num}"
await Text(info).send()
start_time = time.time()
pid_count, pic_count = await start_update_image_url(update_lst[:_num], black_pid, type == 'pid')
await Text(
f"Pixiv搜图关键词搜图更新完成...\n"
f"累计更新PID {pid_count}\n"
f"累计更新图片 {pic_count}"
+ "\n耗时:{:.2f}".format((time.time() - start_time))
).send()
logger.info("更新pix关键词", arparma.header_result, session=session)
@_check_matcher.handle()
async def _(bot: Bot, arparma: Arparma, session: EventSession):
_pass_keyword, _ = await PixivKeywordUser.get_current_keyword()
x_uid = []
x_pid = []
_uid = [int(x[4:]) for x in _pass_keyword if x.startswith("uid:")]
_pid = [int(x[4:]) for x in _pass_keyword if x.startswith("pid:")]
all_images = await Pixiv.query_images(r18=2)
for img in all_images:
if img.pid not in x_pid:
x_pid.append(img.pid)
if img.uid not in x_uid:
x_uid.append(img.uid)
await Text(
"从未更新过的UID"
+ "".join([f"uid:{x}" for x in _uid if x not in x_uid])
+ "\n"
+ "从未更新过的PID"
+ "".join([f"pid:{x}" for x in _pid if x not in x_pid])
).send()
if arparma.find("update"):
await Text("开始自动自动更新PID....").send()
update_lst = [f"pid:{x}" for x in _uid if x not in x_uid]
black_pid = await PixivKeywordUser.get_black_pid()
start_time = time.time()
pid_count, pic_count = await start_update_image_url(update_lst, black_pid, False)
await Text(
f"Pixiv搜图关键词搜图更新完成...\n"
f"累计更新PID {pid_count}\n"
f"累计更新图片 {pic_count}"
+ "\n耗时:{:.2f}".format((time.time() - start_time))
).send()
logger.info(
f"pix检测更新, 是否更新: {arparma.find('update')}",
arparma.header_result,
session=session,
)
@_omega_matcher.handle()
async def _():
async def _tasks(line: str, all_pid: list[int], length: int, index: int):
data = line.split("VALUES", maxsplit=1)[-1].strip()[1:-2]
num_list = re.findall(r"(\d+)", data)
pid = int(num_list[1])
uid = int(num_list[2])
id_ = 3
while num_list[id_] not in ["0", "1"]:
id_ += 1
classified = int(num_list[id_])
nsfw_tag = int(num_list[id_ + 1])
width = int(num_list[id_ + 2])
height = int(num_list[id_ + 3])
str_list = re.findall(r"'(.*?)',", data)
title = str_list[0]
uname = str_list[1]
tags = str_list[2]
url = str_list[3]
if pid in all_pid:
logger.info(f"添加OmegaPixivIllusts图库数据已存在 ---> pid{pid}")
return
_, is_create = await OmegaPixivIllusts.get_or_create(
pid=pid,
title=title,
width=width,
height=height,
url=url,
uid=uid,
nsfw_tag=nsfw_tag,
tags=tags,
uname=uname,
classified=classified,
)
if is_create:
logger.info(
f"成功添加OmegaPixivIllusts图库数据 pid{pid} 本次预计存储 {length} 张,已更新第 {index}"
)
else:
logger.info(f"添加OmegaPixivIllusts图库数据已存在 ---> pid{pid}")
omega_pixiv_illusts = None
for file in os.listdir("."):
if "omega_pixiv_artwork" in file and ".sql" in file:
omega_pixiv_illusts = Path() / file
if omega_pixiv_illusts:
with open(omega_pixiv_illusts, "r", encoding="utf8") as f:
lines = f.readlines()
tasks = []
length = len([x for x in lines if "INSERT INTO" in x.upper()])
all_pid = await OmegaPixivIllusts.all().values_list("pid", flat=True)
index = 0
logger.info("检测到OmegaPixivIllusts数据库准备开始更新....")
for line in lines:
if "INSERT INTO" in line.upper():
index += 1
logger.info(f"line: {line} 加入更新计划")
tasks.append(
asyncio.create_task(_tasks(line, all_pid, length, index)) # type: ignore
)
await asyncio.gather(*tasks)
omega_pixiv_illusts.unlink()

View File

@ -36,3 +36,11 @@ class InsufficientGold(Exception):
"""
pass
class NotFindSuperuser(Exception):
"""
未找到超级用户
"""
pass

View File

@ -1,3 +1,4 @@
import random
from typing import Awaitable, Callable, Literal, Set
import httpx
@ -20,11 +21,13 @@ from nonebot_plugin_saa import (
TargetQQPrivate,
Text,
)
from nonebot_plugin_saa.abstract_factories import Receipt
from pydantic import BaseModel
from zhenxun.models.friend_user import FriendUser
from zhenxun.models.group_console import GroupConsole
from zhenxun.services.log import logger
from zhenxun.utils.exception import NotFindSuperuser
class UserData(BaseModel):
@ -47,6 +50,34 @@ class UserData(BaseModel):
class PlatformUtils:
@classmethod
async def send_superuser(
cls,
bot: Bot,
message: str | MessageFactory | Text | Image,
superuser_id: str | None = None,
) -> Receipt | None:
"""发送消息给超级用户
参数:
bot: Bot
message: 消息
superuser_id: 指定超级用户id.
异常:
NotFindSuperuser: 未找到超级用户id
返回:
Receipt | None: Receipt
"""
if not superuser_id:
platform = cls.get_platform(bot)
platform_superusers = bot.config.PLATFORM_SUPERUSERS.get(platform) or []
if not platform_superusers:
raise NotFindSuperuser()
superuser_id = random.choice(platform_superusers)
return await cls.send_message(bot, superuser_id, None, message)
@classmethod
async def get_group_member_list(cls, bot: Bot, group_id: str) -> list[UserData]:
"""获取群组/频道成员列表
@ -277,7 +308,7 @@ class PlatformUtils:
user_id: str | None,
group_id: str | None,
message: str | Text | MessageFactory | Image,
) -> bool:
) -> Receipt | None:
"""发送消息
参数:
@ -287,13 +318,12 @@ class PlatformUtils:
message: 消息文本
返回:
bool: 是否发送成功
Receipt | None: 是否发送成功
"""
if target := cls.get_target(bot, user_id, group_id):
send_message = Text(message) if isinstance(message, str) else message
await send_message.send_to(target, bot)
return True
return False
return await send_message.send_to(target, bot)
return None
@classmethod
async def update_group(cls, bot: Bot) -> int:
@ -581,7 +611,10 @@ async def broadcast_group(
if is_continue:
continue
target = PlatformUtils.get_target(
_bot, None, group.group_id, group.channel_id
_bot,
None,
group.group_id,
# , group.channel_id
)
if target:
_used_group.append(key)

View File

@ -9,6 +9,7 @@ import httpx
import pypinyin
import pytz
from zhenxun.configs.config import Config
from zhenxun.services.log import logger
@ -165,3 +166,50 @@ async def get_group_avatar(gid: int | str) -> bytes | None:
except Exception as e:
logger.error("获取群头像错误", "Util", target=gid)
return None
def change_pixiv_image_links(
url: str, size: str | None = None, nginx_url: str | None = None
) -> str:
"""根据配置改变图片大小和反代链接
参数:
url: 图片原图链接
size: 模式
nginx_url: 反代
返回:
str: url
"""
if size == "master":
img_sp = url.rsplit(".", maxsplit=1)
url = img_sp[0]
img_type = img_sp[1]
url = url.replace("original", "master") + f"_master1200.{img_type}"
if not nginx_url:
nginx_url = Config.get_config("pixiv", "PIXIV_NGINX_URL")
if nginx_url:
url = (
url.replace("i.pximg.net", nginx_url)
.replace("i.pixiv.cat", nginx_url)
.replace("_webp", "")
)
return url
def change_img_md5(path_file: str | Path) -> bool:
"""改变图片MD5
参数:
path_file: 图片路径
返还:
bool: 是否修改成功
"""
try:
with open(path_file, "a") as f:
f.write(str(int(time.time() * 1000)))
return True
except Exception as e:
logger.warning(f"改变图片MD5错误 Path{path_file}", e=e)
return False

View File

@ -64,7 +64,11 @@ class WithdrawManager:
@classmethod
async def withdraw_message(
cls, bot: Bot, message_id: str | int, time: int | None = None
cls,
bot: Bot,
message_id: str | int,
time: int | tuple[int, int] | None = None,
session: EventSession | None = None,
):
"""消息撤回
@ -74,8 +78,24 @@ class WithdrawManager:
time: 延迟时间
"""
if time:
logger.debug(f"将在 {time}秒 内撤回消息ID: {message_id}", "WithdrawManager")
await asyncio.sleep(time)
gid = None
_time = 1
if isinstance(time, tuple):
if time[0] == 0:
return
if session:
gid = session.id3 or session.id2
if not gid and int(time[1]) not in [0, 2]:
return
if gid and int(time[1]) not in [1, 2]:
return
_time = time[0]
else:
_time = time
logger.debug(
f"将在 {_time}秒 内撤回消息ID: {message_id}", "WithdrawManager"
)
await asyncio.sleep(_time)
if isinstance(bot, v11Bot):
logger.debug(f"v11Bot 撤回消息ID: {message_id}", "WithdrawManager")
await bot.delete_msg(message_id=int(message_id))